From d7f6343df1e79392e95f13b99956d5ab960a47d0 Mon Sep 17 00:00:00 2001 From: blindlobstar Date: Tue, 8 Apr 2025 02:13:54 +0200 Subject: [PATCH] concurrent queue iii problem --- 12-concurrent-queue-iii/README.md | 11 ++ 12-concurrent-queue-iii/solution/solution.go | 58 +++++++ 12-concurrent-queue-iii/task.go | 25 +++ 12-concurrent-queue-iii/task_test.go | 163 +++++++++++++++++++ README.md | 1 + 5 files changed, 258 insertions(+) create mode 100644 12-concurrent-queue-iii/README.md create mode 100644 12-concurrent-queue-iii/solution/solution.go create mode 100644 12-concurrent-queue-iii/task.go create mode 100644 12-concurrent-queue-iii/task_test.go diff --git a/12-concurrent-queue-iii/README.md b/12-concurrent-queue-iii/README.md new file mode 100644 index 0000000..ea6c91a --- /dev/null +++ b/12-concurrent-queue-iii/README.md @@ -0,0 +1,11 @@ +# Concurrent Queue +Implement a thread-safe **zero allocation** queue with the following properties: + +- It has a fixed maximum size set at initialization. +- `Push(val int) error` adds an item to the queue. If the queue is full, it should return `ErrQueueFull`. +- `Pop() int` removes and returns first item from the queue. If the queue is empty, return -1. +- `Peek() int` returns first item from the queue. If the queue is empty, return -1. +- The queue must be safe to use from multiple goroutines simultaneously. + +## Tags +`Concurrency` diff --git a/12-concurrent-queue-iii/solution/solution.go b/12-concurrent-queue-iii/solution/solution.go new file mode 100644 index 0000000..0ef73b4 --- /dev/null +++ b/12-concurrent-queue-iii/solution/solution.go @@ -0,0 +1,58 @@ +package main + +import ( + "errors" + "sync" +) + +var ErrQueueFull = errors.New("queue is full") + +type Queue struct { + data []int + w int + r int + sync.Mutex +} + +func NewQueue(size int) *Queue { + return &Queue{ + data: make([]int, size), + } +} + +func (q *Queue) Push(val int) error { + q.Lock() + defer q.Unlock() + + if q.w == q.r || q.w%len(q.data) != q.r%len(q.data) { + q.data[q.w%len(q.data)] = val + q.w++ + return nil + } else { + return ErrQueueFull + } +} + +func (q *Queue) Pop() int { + q.Lock() + defer q.Unlock() + + if q.r != q.w { + val := q.data[q.r%len(q.data)] + q.r++ + return val + } else { + return -1 + } +} + +func (q *Queue) Peek() int { + q.Lock() + defer q.Unlock() + + if q.r != q.w { + return q.data[q.r%len(q.data)] + } else { + return -1 + } +} diff --git a/12-concurrent-queue-iii/task.go b/12-concurrent-queue-iii/task.go new file mode 100644 index 0000000..498208a --- /dev/null +++ b/12-concurrent-queue-iii/task.go @@ -0,0 +1,25 @@ +package main + +import ( + "errors" +) + +var ErrQueueFull = errors.New("queue is full") + +type Queue struct{} + +func NewQueue(size int) *Queue { + return &Queue{} +} + +func (q *Queue) Push(val int) error { + return nil +} + +func (q *Queue) Pop() int { + return -1 +} + +func (q *Queue) Peek() int { + return -1 +} diff --git a/12-concurrent-queue-iii/task_test.go b/12-concurrent-queue-iii/task_test.go new file mode 100644 index 0000000..0659dd5 --- /dev/null +++ b/12-concurrent-queue-iii/task_test.go @@ -0,0 +1,163 @@ +package main + +import ( + "context" + "reflect" + "sort" + "sync" + "testing" + "time" +) + +func TestQueue(t *testing.T) { + tests := []struct { + name string + size int + ops []string + args []int + expected []any + }{ + { + name: "basic operations", + size: 3, + ops: []string{"push", "push", "push", "pop", "pop", "pop"}, + args: []int{1, 2, 3, 0, 0, 0}, + expected: []any{nil, nil, nil, 1, 2, 3}, + }, + { + name: "empty queue pop", + size: 1, + ops: []string{"pop", "push", "pop"}, + args: []int{0, 5, 0}, + expected: []any{-1, nil, 5}, + }, + { + name: "queue full error", + size: 2, + ops: []string{"push", "push", "push", "pop", "push", "pop"}, + args: []int{5, 10, 15, 0, 20, 0}, + expected: []any{nil, nil, ErrQueueFull, 5, nil, 10}, + }, + { + name: "interleaved operations", + size: 2, + ops: []string{"push", "pop", "push", "push", "pop", "pop"}, + args: []int{1, 0, 2, 3, 0, 0}, + expected: []any{nil, 1, nil, nil, 2, 3}, + }, + { + name: "peek operations", + size: 3, + ops: []string{"push", "peek", "push", "peek", "pop", "peek", "pop", "peek"}, + args: []int{10, 0, 20, 0, 0, 0, 0, 0}, + expected: []any{nil, 10, nil, 10, 10, 20, 20, -1}, + }, + { + name: "peek empty queue", + size: 2, + ops: []string{"peek", "push", "peek", "pop", "peek"}, + args: []int{0, 42, 0, 0, 0}, + expected: []any{-1, nil, 42, 42, -1}, + }, + { + name: "mixed operations", + size: 5, + ops: []string{"push", "push", "peek", "pop", "peek", "push", "push", "push", "peek"}, + args: []int{5, 10, 0, 0, 0, 15, 20, 25, 0}, + expected: []any{nil, nil, 5, 5, 10, nil, nil, nil, 10}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewQueue(tt.size) + results := make([]any, len(tt.ops)) + + for i, op := range tt.ops { + var result any + + switch op { + case "push": + result = q.Push(tt.args[i]) + case "pop": + result = q.Pop() + case "peek": + result = q.Peek() + } + + results[i] = result + } + + if !reflect.DeepEqual(results, tt.expected) { + t.Errorf("Expected: %v, got: %v", tt.expected, results) + } + }) + } +} + +func TestConcurrentQueue(t *testing.T) { + queue := NewQueue(1000) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1000) + for i := range 1000 { + go func() { + defer wg.Done() + queue.Push(i + 1) + }() + } + + ch := make(chan int, 1000) + go func() { + defer close(ch) + for { + select { + case <-ctx.Done(): + return + default: + if val := queue.Pop(); val > -1 { + ch <- val + } + } + } + }() + + go func() { + wg.Wait() + time.Sleep(100 * time.Millisecond) + cancel() + }() + + var result []int + for val := range ch { + result = append(result, val) + } + + expected := make([]int, 1000) + for i := range 1000 { + expected[i] = i + 1 + } + + sort.Ints(result) + if !reflect.DeepEqual(result, expected) { + t.Errorf("Expected: 1..1000, got: %v", result) + } +} + +func TestQueueAllocations(t *testing.T) { + res := testing.Benchmark(BenchmarkQueue) + if res.AllocsPerOp() != 0 { + t.Errorf("Expected 0 allocation, got: %d", res.AllocsPerOp()) + } +} + +func BenchmarkQueue(b *testing.B) { + queue := NewQueue(1) + for b.Loop() { + queue.Push(1) + queue.Peek() + queue.Pop() + } +} diff --git a/README.md b/README.md index 485b5d3..0cc7fe5 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ Here is a list of the problems available in the repository. Problems are organiz * [Merge Channels](09-merge-channels/) * [Concurrent Queue](10-concurrent-queue/) * [Concurrent Queue II](11-concurrent-queue-ii/) +* [Concurrent Queue III](12-concurrent-queue-iii/) ## Contributing