From 991b530bad016e58785920bf9ba49b4438b09ca2 Mon Sep 17 00:00:00 2001 From: blindlobstar Date: Sat, 5 Apr 2025 19:03:38 +0200 Subject: [PATCH] distributed task executor problem --- 08-distributed-task-executor/README.md | 12 + .../solution/solution.go | 87 ++++++ 08-distributed-task-executor/task.go | 33 +++ 08-distributed-task-executor/task_test.go | 280 ++++++++++++++++++ README.md | 1 + 5 files changed, 413 insertions(+) create mode 100644 08-distributed-task-executor/README.md create mode 100644 08-distributed-task-executor/solution/solution.go create mode 100644 08-distributed-task-executor/task.go create mode 100644 08-distributed-task-executor/task_test.go diff --git a/08-distributed-task-executor/README.md b/08-distributed-task-executor/README.md new file mode 100644 index 0000000..95565aa --- /dev/null +++ b/08-distributed-task-executor/README.md @@ -0,0 +1,12 @@ +# Distributed Task Executor +You are tasked with implementing a failover mechanism for executing tasks across multiple distributed nodes. Each node has a priority level, and you need to ensure tasks are executed reliably even when some nodes fail. +Your task is to implement the `ExecuteWithFailover` function that: + +* Tries nodes in order of priority (lowest number first) +* If a node fails, immediately tries the next node without waiting +* If a node doesn't respond within 500ms, tries the next node but keeps the original request running +* Returns the first successful result, or all errors if all nodes fail +* Properly handles context cancellation throughout the process + +Tags +`Concurrency` `Context` `Failover` diff --git a/08-distributed-task-executor/solution/solution.go b/08-distributed-task-executor/solution/solution.go new file mode 100644 index 0000000..0c51784 --- /dev/null +++ b/08-distributed-task-executor/solution/solution.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "errors" + "sort" + "time" +) + +var ( + ErrTaskFailed = errors.New("task execution failed") + ErrNoNodesAvailable = errors.New("no nodes available") +) + +// Represents a remote node that can execute tasks +type Node struct { + Address string + Priority int // Lower is higher priority +} + +// Executes a task on a single remote node +var executeOnNode func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + // Already implemented + return nil, nil +} + +// ExecuteWithFailover attempts to execute a task on available nodes with the following requirements: +// 1. First try nodes in order of priority (lowest number first) +// 2. If a node fails, immediately try the next node without waiting +// 3. If a node doesn't respond within 500ms, try the next node but keep the original request running +// 4. Return the first successful result, or all errors if all nodes fail +// 5. Properly handle context cancellation throughout the process +func ExecuteWithFailover(ctx context.Context, nodes []Node, taskID string, payload []byte) ([]byte, error) { + if len(nodes) == 0 { + return nil, ErrNoNodesAvailable + } + sort.Slice(nodes, func(i, j int) bool { return nodes[i].Priority < nodes[j].Priority }) + + nodeCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + + ch := make(chan []byte, 1) + errCh := make(chan error, len(nodes)) + + var cur int + go startNext(ctx, nodes[cur], taskID, payload, ch, errCh) + + var errJoin error + for { + select { + case resp := <-ch: + return resp, nil + case err := <-errCh: + errJoin = errors.Join(errJoin, err) + cur++ + if cur < len(nodes) { + nodeCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + go startNext(ctx, nodes[cur], taskID, payload, ch, errCh) + continue + } + return nil, errJoin + case <-nodeCtx.Done(): + cur++ + if cur < len(nodes) { + nodeCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + go startNext(ctx, nodes[cur], taskID, payload, ch, errCh) + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +func startNext(ctx context.Context, node Node, taskID string, payload []byte, ch chan<- []byte, errCh chan<- error) { + resp, err := executeOnNode(ctx, node, taskID, payload) + if err != nil { + errCh <- err + return + } + select { + case ch <- resp: + default: + return + } +} diff --git a/08-distributed-task-executor/task.go b/08-distributed-task-executor/task.go new file mode 100644 index 0000000..0ca7ccb --- /dev/null +++ b/08-distributed-task-executor/task.go @@ -0,0 +1,33 @@ +package main + +import ( + "context" + "errors" +) + +var ( + ErrTaskFailed = errors.New("task execution failed") + ErrNoNodesAvailable = errors.New("no nodes available") +) + +// Represents a remote node that can execute tasks +type Node struct { + Address string + Priority int // Lower is higher priority +} + +// Executes a task on a single remote node +var executeOnNode func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + // Already implemented + return nil, nil +} + +// ExecuteWithFailover attempts to execute a task on available nodes with the following requirements: +// 1. First try nodes in order of priority (lowest number first) +// 2. If a node fails, immediately try the next node without waiting +// 3. If a node doesn't respond within 500ms, try the next node but keep the original request running +// 4. Return the first successful result, or all errors if all nodes fail +// 5. Properly handle context cancellation throughout the process +func ExecuteWithFailover(ctx context.Context, nodes []Node, taskID string, payload []byte) ([]byte, error) { + return nil, nil +} diff --git a/08-distributed-task-executor/task_test.go b/08-distributed-task-executor/task_test.go new file mode 100644 index 0000000..f414016 --- /dev/null +++ b/08-distributed-task-executor/task_test.go @@ -0,0 +1,280 @@ +package main + +import ( + "context" + "errors" + "sync" + "testing" + "time" +) + +type mockNode struct { + delay time.Duration + shouldFail bool + response []byte +} + +func TestFirstNodeSucceeds(t *testing.T) { + mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + mockNodes := map[string]mockNode{ + "node1": {delay: 10 * time.Millisecond, shouldFail: false, response: []byte("success-1")}, + "node2": {delay: 20 * time.Millisecond, shouldFail: false, response: []byte("success-2")}, + } + + mockNode, exists := mockNodes[node.Address] + if !exists { + return nil, errors.New("unknown node") + } + + select { + case <-time.After(mockNode.delay): + if mockNode.shouldFail { + return nil, ErrTaskFailed + } + return mockNode.response, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + executeOnNode = mockExecuteOnNode + defer func() { + executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + return nil, nil + } + }() + + nodes := []Node{ + {Address: "node1", Priority: 1}, + {Address: "node2", Priority: 2}, + } + + result, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + + if string(result) != "success-1" { + t.Errorf("Expected result 'success-1', got '%s'", string(result)) + } +} + +func TestFirstNodeFailsSecondNodeSucceeds(t *testing.T) { + mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + mockNodes := map[string]mockNode{ + "node1": {delay: 10 * time.Millisecond, shouldFail: true, response: nil}, + "node2": {delay: 20 * time.Millisecond, shouldFail: false, response: []byte("success-2")}, + } + + mockNode, exists := mockNodes[node.Address] + if !exists { + return nil, errors.New("unknown node") + } + + select { + case <-time.After(mockNode.delay): + if mockNode.shouldFail { + return nil, ErrTaskFailed + } + return mockNode.response, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + executeOnNode = mockExecuteOnNode + defer func() { + executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + return nil, nil + } + }() + + nodes := []Node{ + {Address: "node1", Priority: 1}, + {Address: "node2", Priority: 2}, + } + + result, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + + if string(result) != "success-2" { + t.Errorf("Expected result 'success-2', got '%s'", string(result)) + } +} + +func TestRespectsPriorityOrder(t *testing.T) { + var executionOrder []string + var mu sync.Mutex + + mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + mu.Lock() + executionOrder = append(executionOrder, node.Address) + mu.Unlock() + + delay := time.Duration(10*node.Priority) * time.Millisecond + time.Sleep(delay) + return []byte("success-" + node.Address), nil + } + + executeOnNode = mockExecuteOnNode + defer func() { + executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + return nil, nil + } + }() + + nodes := []Node{ + {Address: "node3", Priority: 3}, + {Address: "node1", Priority: 1}, + {Address: "node2", Priority: 2}, + } + + result, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + + if string(result) != "success-node1" { + t.Errorf("Expected result from highest priority node 'success-node1', got '%s'", string(result)) + } + + if len(executionOrder) < 1 || executionOrder[0] != "node1" { + t.Errorf("Expected node1 (highest priority) to be executed first, got execution order: %v", executionOrder) + } +} + +func TestAllNodesFail(t *testing.T) { + mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + return nil, ErrTaskFailed + } + + executeOnNode = mockExecuteOnNode + defer func() { + executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + return nil, nil + } + }() + + nodes := []Node{ + {Address: "node1", Priority: 1}, + {Address: "node2", Priority: 2}, + } + + _, err := ExecuteWithFailover(context.Background(), nodes, "task1", []byte("test-payload")) + if err == nil { + t.Fatal("Expected error when all nodes fail, got nil") + } +} + +func TestHandles500msTimeoutCorrectly(t *testing.T) { + var nodeStartTimes sync.Map + var mu sync.Mutex + var executionOrder []string + + mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + nodeStartTimes.Store(node.Address, time.Now()) + + mu.Lock() + executionOrder = append(executionOrder, node.Address) + mu.Unlock() + + if node.Address == "node1" { + select { + case <-time.After(1 * time.Second): + return []byte("success-but-slow"), nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + if node.Address == "node2" { + time.Sleep(100 * time.Millisecond) + return []byte("success-fast"), nil + } + + if node.Address == "node3" { + time.Sleep(50 * time.Millisecond) + return nil, ErrTaskFailed + } + + return nil, errors.New("unknown node") + } + + executeOnNode = mockExecuteOnNode + defer func() { + executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + return nil, nil + } + }() + + nodes := []Node{ + {Address: "node1", Priority: 1}, + {Address: "node2", Priority: 2}, + {Address: "node3", Priority: 3}, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + result, err := ExecuteWithFailover(ctx, nodes, "task1", []byte("test-payload")) + elapsed := time.Since(start) + + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + + if string(result) != "success-fast" { + t.Errorf("Expected fastest successful result 'success-fast', got '%s'", string(result)) + } + + node1Start, _ := nodeStartTimes.Load("node1") + node2Start, _ := nodeStartTimes.Load("node2") + if node2Start.(time.Time).Sub(node1Start.(time.Time)) > 550*time.Millisecond { + t.Errorf("Expected node2 to start within 500ms after node1") + } + + if elapsed > 650*time.Millisecond { + t.Errorf("Expected to get result in under 650ms, took %v", elapsed) + } +} + +func TestRespectsContextCancellation(t *testing.T) { + mockExecuteOnNode := func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + select { + case <-time.After(500 * time.Millisecond): + return []byte("success"), nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + executeOnNode = mockExecuteOnNode + defer func() { + executeOnNode = func(ctx context.Context, node Node, taskID string, payload []byte) ([]byte, error) { + return nil, nil + } + }() + + nodes := []Node{ + {Address: "node1", Priority: 1}, + {Address: "node2", Priority: 2}, + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + _, err := ExecuteWithFailover(ctx, nodes, "task1", []byte("test-payload")) + if err == nil || !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("Expected context deadline exceeded error, got: %v", err) + } +} + +func TestHandlesEmptyNodesList(t *testing.T) { + _, err := ExecuteWithFailover(context.Background(), []Node{}, "task1", []byte("test-payload")) + if err != ErrNoNodesAvailable { + t.Fatalf("Expected ErrNoNodesAvailable, got: %v", err) + } +} diff --git a/README.md b/README.md index a5285fb..cea50b0 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Here is a list of the problems available in the repository. Problems are organiz * [Costly Connection With Unsafe Storage](05-costly-connections-with-unsafe-storage/) * [Rate Limiter](06-rate-limiter/) * [TTL Cache](07-ttl-cache/) +* [Distributed Task Executor](08-distributed-task-executor/) ## Contributing