diff --git a/08-distributed-task-executor/README.md b/08-distributed-task-executor/README.md deleted file mode 100644 index 95565aa..0000000 --- a/08-distributed-task-executor/README.md +++ /dev/null @@ -1,12 +0,0 @@ -# Distributed Task Executor -You are tasked with implementing a failover mechanism for executing tasks across multiple distributed nodes. Each node has a priority level, and you need to ensure tasks are executed reliably even when some nodes fail. -Your task is to implement the `ExecuteWithFailover` function that: - -* Tries nodes in order of priority (lowest number first) -* If a node fails, immediately tries the next node without waiting -* If a node doesn't respond within 500ms, tries the next node but keeps the original request running -* Returns the first successful result, or all errors if all nodes fail -* Properly handles context cancellation throughout the process - -Tags -`Concurrency` `Context` `Failover` diff --git a/08-distributed-task-executor/solution/solution.go b/08-distributed-task-executor/solution/solution.go deleted file mode 100644 index 0c51784..0000000 --- a/08-distributed-task-executor/solution/solution.go +++ /dev/null @@ -1,87 +0,0 @@ -package main - -import ( - "context" - "errors" - "sort" - "time" -) - -var ( - ErrTaskFailed = errors.New("task execution failed") - ErrNoNodesAvailable = errors.New("no nodes available") -) - -// Represents a remote node that can execute tasks -type Node struct { - Address string - Priority int // Lower is higher priority -} - -// Executes a task on a single remote node -var executeOnNode func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - // Already implemented - return nil, nil -} - -// ExecuteWithFailover attempts to execute a task on available nodes with the following requirements: -// 1. First try nodes in order of priority (lowest number first) -// 2. If a node fails, immediately try the next node without waiting -// 3. If a node doesn't respond within 500ms, try the next node but keep the original request running -// 4. Return the first successful result, or all errors if all nodes fail -// 5. Properly handle context cancellation throughout the process -func ExecuteWithFailover(ctx context.Context, nodes []Node, taskID string, payload []byte) ([]byte, error) { - if len(nodes) == 0 { - return nil, ErrNoNodesAvailable - } - sort.Slice(nodes, func(i, j int) bool { return nodes[i].Priority < nodes[j].Priority }) - - nodeCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) - defer cancel() - - ch := make(chan []byte, 1) - errCh := make(chan error, len(nodes)) - - var cur int - go startNext(ctx, nodes[cur], taskID, payload, ch, errCh) - - var errJoin error - for { - select { - case resp := <-ch: - return resp, nil - case err := <-errCh: - errJoin = errors.Join(errJoin, err) - cur++ - if cur < len(nodes) { - nodeCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) - defer cancel() - go startNext(ctx, nodes[cur], taskID, payload, ch, errCh) - continue - } - return nil, errJoin - case <-nodeCtx.Done(): - cur++ - if cur < len(nodes) { - nodeCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) - defer cancel() - go startNext(ctx, nodes[cur], taskID, payload, ch, errCh) - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - -func startNext(ctx context.Context, node Node, taskID string, payload []byte, ch chan<- []byte, errCh chan<- error) { - resp, err := executeOnNode(ctx, node, taskID, payload) - if err != nil { - errCh <- err - return - } - select { - case ch <- resp: - default: - return - } -} diff --git a/08-distributed-task-executor/task.go b/08-distributed-task-executor/task.go deleted file mode 100644 index 0ca7ccb..0000000 --- a/08-distributed-task-executor/task.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - "context" - "errors" -) - -var ( - ErrTaskFailed = errors.New("task execution failed") - ErrNoNodesAvailable = errors.New("no nodes available") -) - -// Represents a remote node that can execute tasks -type Node struct { - Address string - Priority int // Lower is higher priority -} - -// Executes a task on a single remote node -var executeOnNode func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - // Already implemented - return nil, nil -} - -// ExecuteWithFailover attempts to execute a task on available nodes with the following requirements: -// 1. First try nodes in order of priority (lowest number first) -// 2. If a node fails, immediately try the next node without waiting -// 3. If a node doesn't respond within 500ms, try the next node but keep the original request running -// 4. Return the first successful result, or all errors if all nodes fail -// 5. Properly handle context cancellation throughout the process -func ExecuteWithFailover(ctx context.Context, nodes []Node, taskID string, payload []byte) ([]byte, error) { - return nil, nil -} diff --git a/08-distributed-task-executor/task_test.go b/08-distributed-task-executor/task_test.go deleted file mode 100644 index f414016..0000000 --- a/08-distributed-task-executor/task_test.go +++ /dev/null @@ -1,280 +0,0 @@ -package main - -import ( - "context" - "errors" - "sync" - "testing" - "time" -) - -type mockNode struct { - delay time.Duration - shouldFail bool - response []byte -} - -func TestFirstNodeSucceeds(t *testing.T) { - mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - mockNodes := map[string]mockNode{ - "node1": {delay: 10 * time.Millisecond, shouldFail: false, response: []byte("success-1")}, - "node2": {delay: 20 * time.Millisecond, shouldFail: false, response: []byte("success-2")}, - } - - mockNode, exists := mockNodes[node.Address] - if !exists { - return nil, errors.New("unknown node") - } - - select { - case <-time.After(mockNode.delay): - if mockNode.shouldFail { - return nil, ErrTaskFailed - } - return mockNode.response, nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - executeOnNode = mockExecuteOnNode - defer func() { - executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - return nil, nil - } - }() - - nodes := []Node{ - {Address: "node1", Priority: 1}, - {Address: "node2", Priority: 2}, - } - - result, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) - if err != nil { - t.Fatalf("Expected success, got error: %v", err) - } - - if string(result) != "success-1" { - t.Errorf("Expected result 'success-1', got '%s'", string(result)) - } -} - -func TestFirstNodeFailsSecondNodeSucceeds(t *testing.T) { - mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - mockNodes := map[string]mockNode{ - "node1": {delay: 10 * time.Millisecond, shouldFail: true, response: nil}, - "node2": {delay: 20 * time.Millisecond, shouldFail: false, response: []byte("success-2")}, - } - - mockNode, exists := mockNodes[node.Address] - if !exists { - return nil, errors.New("unknown node") - } - - select { - case <-time.After(mockNode.delay): - if mockNode.shouldFail { - return nil, ErrTaskFailed - } - return mockNode.response, nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - executeOnNode = mockExecuteOnNode - defer func() { - executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - return nil, nil - } - }() - - nodes := []Node{ - {Address: "node1", Priority: 1}, - {Address: "node2", Priority: 2}, - } - - result, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) - if err != nil { - t.Fatalf("Expected success, got error: %v", err) - } - - if string(result) != "success-2" { - t.Errorf("Expected result 'success-2', got '%s'", string(result)) - } -} - -func TestRespectsPriorityOrder(t *testing.T) { - var executionOrder []string - var mu sync.Mutex - - mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - mu.Lock() - executionOrder = append(executionOrder, node.Address) - mu.Unlock() - - delay := time.Duration(10*node.Priority) * time.Millisecond - time.Sleep(delay) - return []byte("success-" + node.Address), nil - } - - executeOnNode = mockExecuteOnNode - defer func() { - executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - return nil, nil - } - }() - - nodes := []Node{ - {Address: "node3", Priority: 3}, - {Address: "node1", Priority: 1}, - {Address: "node2", Priority: 2}, - } - - result, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) - if err != nil { - t.Fatalf("Expected success, got error: %v", err) - } - - if string(result) != "success-node1" { - t.Errorf("Expected result from highest priority node 'success-node1', got '%s'", string(result)) - } - - if len(executionOrder) < 1 || executionOrder[0] != "node1" { - t.Errorf("Expected node1 (highest priority) to be executed first, got execution order: %v", executionOrder) - } -} - -func TestAllNodesFail(t *testing.T) { - mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - return nil, ErrTaskFailed - } - - executeOnNode = mockExecuteOnNode - defer func() { - executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - return nil, nil - } - }() - - nodes := []Node{ - {Address: "node1", Priority: 1}, - {Address: "node2", Priority: 2}, - } - - _, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) - if err == nil { - t.Fatal("Expected error when all nodes fail, got nil") - } -} - -func TestHandles500msTimeoutCorrectly(t *testing.T) { - var nodeStartTimes sync.Map - var mu sync.Mutex - var executionOrder []string - - mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - nodeStartTimes.Store(node.Address, time.Now()) - - mu.Lock() - executionOrder = append(executionOrder, node.Address) - mu.Unlock() - - if node.Address == "node1" { - select { - case <-time.After(1 * time.Second): - return []byte("success-but-slow"), nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - if node.Address == "node2" { - time.Sleep(100 * time.Millisecond) - return []byte("success-fast"), nil - } - - if node.Address == "node3" { - time.Sleep(50 * time.Millisecond) - return nil, ErrTaskFailed - } - - return nil, errors.New("unknown node") - } - - executeOnNode = mockExecuteOnNode - defer func() { - executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - return nil, nil - } - }() - - nodes := []Node{ - {Address: "node1", Priority: 1}, - {Address: "node2", Priority: 2}, - {Address: "node3", Priority: 3}, - } - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - start := time.Now() - result, err := ExecuteWithFailover(ctx, nodes, "task1", []byte("test-payload")) - elapsed := time.Since(start) - - if err != nil { - t.Fatalf("Expected success, got error: %v", err) - } - - if string(result) != "success-fast" { - t.Errorf("Expected fastest successful result 'success-fast', got '%s'", string(result)) - } - - node1Start, _ := nodeStartTimes.Load("node1") - node2Start, _ := nodeStartTimes.Load("node2") - if node2Start.(time.Time).Sub(node1Start.(time.Time)) > 550*time.Millisecond { - t.Errorf("Expected node2 to start within 500ms after node1") - } - - if elapsed > 650*time.Millisecond { - t.Errorf("Expected to get result in under 650ms, took %v", elapsed) - } -} - -func TestRespectsContextCancellation(t *testing.T) { - mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - select { - case <-time.After(500 * time.Millisecond): - return []byte("success"), nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - executeOnNode = mockExecuteOnNode - defer func() { - executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { - return nil, nil - } - }() - - nodes := []Node{ - {Address: "node1", Priority: 1}, - {Address: "node2", Priority: 2}, - } - - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() - - _, err := ExecuteWithFailover(ctx, nodes, "task1", []byte("test-payload")) - if err == nil || !errors.Is(err, context.DeadlineExceeded) { - t.Fatalf("Expected context deadline exceeded error, got: %v", err) - } -} - -func TestHandlesEmptyNodesList(t *testing.T) { - _, err := ExecuteWithFailover(context.Background(), []Node{}, "task1", []byte("test-payload")) - if err != ErrNoNodesAvailable { - t.Fatalf("Expected ErrNoNodesAvailable, got: %v", err) - } -} diff --git a/08-request-with-failover/README.md b/08-request-with-failover/README.md new file mode 100644 index 0000000..ec58125 --- /dev/null +++ b/08-request-with-failover/README.md @@ -0,0 +1,12 @@ +# Request With Failover +Attempts to retrieve data from multiple sources with timeout-based failover logic. + +Behavior: +- Launches a request to each address with a 500ms timeout. +- If a request errors, the next address is tried immediately. +- If a request hangs, the next one is attempted in parallel after 500ms, but the previous request is still allowed to complete. +- Returns the first successful result from any address. +- If all attempts fail, returns `ErrRequestsFailed` + +## Tags +`Concurrency` `Context` `Failover` diff --git a/08-request-with-failover/solution/solution.go b/08-request-with-failover/solution/solution.go new file mode 100644 index 0000000..56f4886 --- /dev/null +++ b/08-request-with-failover/solution/solution.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "errors" + "time" +) + +type Getter interface { + Get(ctx context.Context, address string) ([]byte, error) +} + +var ( + ErrRequestsFailed = errors.New("requests failed") + getter Getter +) + +// RequestWithFailover attempts to request a data from available addresses: +// 1. If error, immediately try the address without waiting +// 2. If an address doesn't respond within 500ms, try the next but keep the original request running +// 3. Return the first successful response, or all ErrTaskFailed if all nodes fail +// 4. Properly handle context cancellation throughout the process +func RequestWithFailover(ctx context.Context, addresses []string) ([]byte, error) { + ch := make(chan []byte, 1) + errCh := make(chan error, len(addresses)) + + var errCnt int + for _, address := range addresses { + nodeCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + + go func() { + resp, err := getter.Get(ctx, address) + if err != nil { + errCh <- err + return + } + select { + case ch <- resp: + default: + return + } + }() + + select { + case res := <-ch: + return res, nil + case <-errCh: + errCnt++ + case <-nodeCtx.Done(): + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + } + } + + if errCnt == len(addresses) { + return nil, ErrRequestsFailed + } + + for { + select { + case res := <-ch: + return res, nil + case <-errCh: + errCnt++ + if errCnt == len(addresses) { + return nil, ErrRequestsFailed + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} diff --git a/08-request-with-failover/task.go b/08-request-with-failover/task.go new file mode 100644 index 0000000..3bcb882 --- /dev/null +++ b/08-request-with-failover/task.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + "errors" +) + +type Getter interface { + Get(ctx context.Context, address string) ([]byte, error) +} + +var ( + ErrTaskFailed = errors.New("task execution failed") + getter Getter +) + +// RequestWithFailover attempts to request a data from available addresses: +// 1. If error, immediately try the address without waiting +// 2. If an address doesn't respond within 500ms, try the next but keep the original request running +// 3. Return the first successful response, or all ErrTaskFailed if all nodes fail +// 4. Properly handle context cancellation throughout the process +func RequestWithFailover(ctx context.Context, addresses []string) ([]byte, error) { + return nil, nil +} diff --git a/08-request-with-failover/task_test.go b/08-request-with-failover/task_test.go new file mode 100644 index 0000000..3f01b2b --- /dev/null +++ b/08-request-with-failover/task_test.go @@ -0,0 +1,115 @@ +package main + +import ( + "context" + "errors" + "testing" + "time" +) + +type MockGetter struct { + responses map[string]mockResponse +} + +type mockResponse struct { + data []byte + err error + delay time.Duration +} + +func (m *MockGetter) Get(ctx context.Context, address string) ([]byte, error) { + resp, exists := m.responses[address] + if !exists { + return nil, errors.New("unknown address") + } + + if resp.delay > 0 { + select { + case <-time.After(resp.delay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + return resp.data, resp.err +} + +func TestExecuteWithFailover(t *testing.T) { + originalGetter := getter + defer func() { getter = originalGetter }() + + mockGetter := &MockGetter{ + responses: map[string]mockResponse{ + "success": {[]byte("success"), nil, 0}, + "error": {nil, errors.New("failed"), 0}, + "slow-success": {[]byte("slow success"), nil, 600 * time.Millisecond}, + "timeout": {[]byte("timeout"), nil, 2 * time.Second}, + }, + } + getter = mockGetter + + t.Run("first node succeeds", func(t *testing.T) { + result, err := RequestWithFailover(context.Background(), []string{"success", "error"}) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + if string(result) != "success" { + t.Errorf("Expected 'success', got: %s", string(result)) + } + }) + + t.Run("failover to second node", func(t *testing.T) { + result, err := RequestWithFailover(context.Background(), []string{"error", "success"}) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + if string(result) != "success" { + t.Errorf("Expected 'success', got: %s", string(result)) + } + }) + + t.Run("timeout to second node", func(t *testing.T) { + result, err := RequestWithFailover(context.Background(), []string{"timeout", "success"}) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + if string(result) != "success" { + t.Errorf("Expected 'success', got: %s", string(result)) + } + }) + + t.Run("all nodes fail", func(t *testing.T) { + _, err := RequestWithFailover(context.Background(), []string{"error", "error"}) + + if !errors.Is(err, ErrTaskFailed) { + t.Fatalf("Expected ErrTaskFailed, got: %v", err) + } + }) + + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + + _, err := RequestWithFailover(ctx, []string{"timeout", "timeout"}) + + if !errors.Is(err, context.Canceled) { + t.Fatalf("Expected context.Canceled, got: %v", err) + } + }) + + t.Run("slow node eventually succeeds", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + result, err := RequestWithFailover(ctx, []string{"slow-success"}) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + if string(result) != "slow success" { + t.Errorf("Expected 'slow success', got: %s", string(result)) + } + }) +} diff --git a/README.md b/README.md index cea50b0..58c14ae 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Here is a list of the problems available in the repository. Problems are organiz * [Costly Connection With Unsafe Storage](05-costly-connections-with-unsafe-storage/) * [Rate Limiter](06-rate-limiter/) * [TTL Cache](07-ttl-cache/) -* [Distributed Task Executor](08-distributed-task-executor/) +* [Request With Failover](08-request-with-failover/) ## Contributing