diff --git a/11-concurrent-queue-ii/README.md b/11-concurrent-queue-ii/README.md new file mode 100644 index 0000000..bf3485d --- /dev/null +++ b/11-concurrent-queue-ii/README.md @@ -0,0 +1,11 @@ +# Concurrent Queue +Implement a thread-safe queue with the following properties: + +- It has a fixed maximum size set at initialization. +- `Push(val int) error` adds an item to the queue. If the queue is full, it should return `ErrQueueFull`. +- `Pop() int` removes and returns first item from the queue. If the queue is empty, return -1. +- `Peek() int` returns first item from the queue. If the queue is empty, return -1. +- The queue must be safe to use from multiple goroutines simultaneously. + +## Tags +`Concurrency` diff --git a/11-concurrent-queue-ii/solution/solution.go b/11-concurrent-queue-ii/solution/solution.go new file mode 100644 index 0000000..9333691 --- /dev/null +++ b/11-concurrent-queue-ii/solution/solution.go @@ -0,0 +1,56 @@ +package main + +import ( + "errors" + "sync" +) + +var ErrQueueFull = errors.New("queue is full") + +type Queue struct { + data []int + size int + sync.Mutex +} + +func NewQueue(size int) *Queue { + return &Queue{ + data: []int{}, + size: size, + } +} + +func (q *Queue) Push(val int) error { + q.Lock() + defer q.Unlock() + if len(q.data) >= q.size { + return ErrQueueFull + } else { + q.data = append(q.data, val) + return nil + } +} + +func (q *Queue) Pop() int { + q.Lock() + defer q.Unlock() + + if len(q.data) > 0 { + val := q.data[0] + q.data = q.data[1:] + return val + } else { + return -1 + } +} + +func (q *Queue) Peek() int { + q.Lock() + defer q.Unlock() + + if len(q.data) > 0 { + return q.data[0] + } else { + return -1 + } +} diff --git a/11-concurrent-queue-ii/task.go b/11-concurrent-queue-ii/task.go new file mode 100644 index 0000000..498208a --- /dev/null +++ b/11-concurrent-queue-ii/task.go @@ -0,0 +1,25 @@ +package main + +import ( + "errors" +) + +var ErrQueueFull = errors.New("queue is full") + +type Queue struct{} + +func NewQueue(size int) *Queue { + return &Queue{} +} + +func (q *Queue) Push(val int) error { + return nil +} + +func (q *Queue) Pop() int { + return -1 +} + +func (q *Queue) Peek() int { + return -1 +} diff --git a/11-concurrent-queue-ii/task_test.go b/11-concurrent-queue-ii/task_test.go new file mode 100644 index 0000000..75e110f --- /dev/null +++ b/11-concurrent-queue-ii/task_test.go @@ -0,0 +1,147 @@ +package main + +import ( + "context" + "reflect" + "sort" + "sync" + "testing" + "time" +) + +func TestQueue(t *testing.T) { + tests := []struct { + name string + size int + ops []string + args []int + expected []any + }{ + { + name: "basic operations", + size: 3, + ops: []string{"push", "push", "push", "pop", "pop", "pop"}, + args: []int{1, 2, 3, 0, 0, 0}, + expected: []any{nil, nil, nil, 1, 2, 3}, + }, + { + name: "empty queue pop", + size: 1, + ops: []string{"pop", "push", "pop"}, + args: []int{0, 5, 0}, + expected: []any{-1, nil, 5}, + }, + { + name: "queue full error", + size: 2, + ops: []string{"push", "push", "push", "pop", "push", "pop"}, + args: []int{5, 10, 15, 0, 20, 0}, + expected: []any{nil, nil, ErrQueueFull, 5, nil, 10}, + }, + { + name: "interleaved operations", + size: 2, + ops: []string{"push", "pop", "push", "push", "pop", "pop"}, + args: []int{1, 0, 2, 3, 0, 0}, + expected: []any{nil, 1, nil, nil, 2, 3}, + }, + { + name: "peek operations", + size: 3, + ops: []string{"push", "peek", "push", "peek", "pop", "peek", "pop", "peek"}, + args: []int{10, 0, 20, 0, 0, 0, 0, 0}, + expected: []any{nil, 10, nil, 10, 10, 20, 20, -1}, + }, + { + name: "peek empty queue", + size: 2, + ops: []string{"peek", "push", "peek", "pop", "peek"}, + args: []int{0, 42, 0, 0, 0}, + expected: []any{-1, nil, 42, 42, -1}, + }, + { + name: "mixed operations", + size: 5, + ops: []string{"push", "push", "peek", "pop", "peek", "push", "push", "push", "peek"}, + args: []int{5, 10, 0, 0, 0, 15, 20, 25, 0}, + expected: []any{nil, nil, 5, 5, 10, nil, nil, nil, 10}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewQueue(tt.size) + results := make([]any, len(tt.ops)) + + for i, op := range tt.ops { + var result any + + switch op { + case "push": + result = q.Push(tt.args[i]) + case "peek": + result = q.Peek() + case "pop": + result = q.Pop() + } + + results[i] = result + } + + if !reflect.DeepEqual(results, tt.expected) { + t.Errorf("Expected: %v, got: %v", tt.expected, results) + } + }) + } +} + +func TestConcurrentQueue(t *testing.T) { + queue := NewQueue(1000) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1000) + for i := range 1000 { + go func() { + defer wg.Done() + queue.Push(i + 1) + }() + } + + ch := make(chan int, 1000) + go func() { + defer close(ch) + for { + select { + case <-ctx.Done(): + return + default: + if val := queue.Pop(); val > -1 { + ch <- val + } + } + } + }() + + go func() { + wg.Wait() + time.Sleep(100 * time.Millisecond) + cancel() + }() + + var result []int + for val := range ch { + result = append(result, val) + } + + expected := make([]int, 1000) + for i := range 1000 { + expected[i] = i + 1 + } + + sort.Ints(result) + if !reflect.DeepEqual(result, expected) { + t.Errorf("Expected: 1..1000, got: %v", result) + } +} diff --git a/README.md b/README.md index 10cb3af..485b5d3 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Here is a list of the problems available in the repository. Problems are organiz * [Request With Failover](08-request-with-failover/) * [Merge Channels](09-merge-channels/) * [Concurrent Queue](10-concurrent-queue/) +* [Concurrent Queue II](11-concurrent-queue-ii/) ## Contributing