Skip to content

Worker Lifecycle

Workers go through several state transitions during their lifetime. This guide covers creation, state management, and cleanup.

Worker States

┌──────────────┐
│   (None)     │  Worker doesn't exist
└──────┬───────┘
       │ CreateWorkerAsync()
       ▼
┌──────────────┐
│   Stopped    │  Created but not yet subscribed
└──────┬───────┘
       │ StartWorkerAsync()
       ▼
┌──────────────┐
│   Running    │  Subscribed and processing
└──────┬───────┘
       │ StopWorkerAsync()
       ▼
┌──────────────┐
│   Stopped    │  Subscription paused
└──────┬───────┘
       │ StartWorkerAsync()
       ▼
┌──────────────┐
│   Running    │  Resumed processing
└──────┬───────┘
       │ DeleteWorkerAsync()
       ▼
┌──────────────┐
│   Deleted    │  Removed from registry and state
└──────────────┘

Creation

CreateWorkerAsync

var workerId = await workManager.CreateWorkerAsync(new CreateWorkerRequest(
    CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(pythonCode)),
    MimeType: "text/x-python",
    Topic: "my-topic",
    Group: null  // or "my-group" for coordination
));

What happens:

  1. Engine lookup: Validates that an engine exists for the MIME type
  2. Code fetch: Extracts code from URL or uses inline content
  3. Code loading: Calls engine.LoadCode(code)
  4. Persistence: Saves worker config to Dapr state store
  5. Subscription: Creates Dapr pub/sub subscription
  6. Registry: Adds worker to in-memory registry
  7. Status: Sets worker to Running

CodeSource Types

Inline Content:

CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(code))

From URL:

CodeSource: new CodeSourceUrl(new Uri("https://example.com/worker.py"))

State Transitions

StartWorkerAsync

Resumes a stopped worker:

await workManager.StartWorkerAsync(workerId);

What happens: 1. Re-establishes Dapr topic subscription 2. Sets status to Running

Notes: - Idempotent: If already running, does nothing - Worker continues processing from where it left off

StopWorkerAsync

Pauses a running worker:

await workManager.StopWorkerAsync(workerId);

What happens: 1. Removes Dapr topic subscription 2. Sets status to Stopped

Notes: - Idempotent: If already stopped, does nothing - Worker can be restarted with StartWorkerAsync

Code Updates

LoadCodeFromContent

Updates worker code while running:

await workManager.LoadCodeFromContent(workerId, Encoding.UTF8.GetBytes(newCode));

What happens: 1. Calls worker.engine.LoadCode(newCode) 2. Adds history entry with new code source 3. Updates persisted configuration

LoadCodeFromUrl

Updates worker code from a URL:

await workManager.LoadCodeFromUrl(workerId, new Uri("https://example.com/new-worker.py"));

What happens: 1. Fetches code from URL 2. Calls worker.engine.LoadCode(fetchedCode) 3. Adds history entry with new URL 4. Updates persisted configuration

Deletion

DeleteWorkerAsync

Permanently removes a worker:

await workManager.DeleteWorkerAsync(workerId);

What happens: 1. Removes worker from in-memory registry 2. Deletes worker config from state store 3. Removes Dapr topic subscription 4. Does not stop the worker first (subscription is removed regardless)

Recovery

RecoverWorkersAsync

Restores workers after service restart:

await workManager.RecoverWorkersAsync();

What happens: 1. Loads all worker configs from state store 2. For each worker: - Creates new engine instance via factory - Loads original code into engine - Re-establishes Dapr subscription - Sets status to Running

Prerequisites: - All engines must be registered before calling - Dapr state store must be accessible

Error handling: - If an engine is missing for a worker's MIME type, throws EngineNotFoundException - Individual worker load failures are logged but don't stop recovery of other workers

History Tracking

Each worker maintains a history of code changes:

var history = workManager.GetWorkerHistory(workerId);
foreach (var entry in history)
{
    Console.WriteLine($"Code loaded at {entry.CreatedAt}");
    Console.WriteLine($"  Type: {entry.CodeSource.GetType().Name}");
}

What's tracked: - Initial creation with original code source - Each LoadCodeFromContent call - Each LoadCodeFromUrl call

What's not tracked: - Runtime execution errors (these go to logs) - Message processing counts

Group Coordination

Workers can belong to a group for mutually exclusive execution:

// Worker 1
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
    CodeSource: code1,
    MimeType: "text/x-python",
    Topic: "shared-topic",
    Group: "group-a"  // Same group as Worker 2
));

// Worker 2
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
    CodeSource: code2,
    MimeType: "text/x-python",
    Topic: "shared-topic",
    Group: "group-a"  // Same group as Worker 1
));

Behavior: - Only one worker in a group processes a message at a time - Distributed lock acquired before processing - Lock released after processing (or on error) - Lock max age is 30 seconds (stale locks can be stolen)

Lock lifecycle:

Message received
     │
     ▼
TryAcquireGroupLock()
     │
     ├── Lock acquired ──► Process message ──► ReleaseLock()
     │
     └── Lock held by other ──► Skip message (no retry)

Cleanup on Disposal

When WorkManager.DisposeAsync() is called:

  1. All Dapr subscriptions are disposed
  2. All topic-specific subscriptions are disposed
  3. In-memory registry is cleared

Note: State store data is NOT deleted. Workers can be recovered on restart.

Best Practices

  1. Use groups for related workers: If multiple workers should handle the same topic without duplicate processing, use a group.

  2. Handle recovery gracefully: Call RecoverWorkersAsync() early in service startup:

    // In Program.cs or startup
    await workManager.RecoverWorkersAsync();
    

  3. Track history for debugging: History entries help audit code changes:

    var history = workManager.GetWorkerHistory(id);
    

  4. Stop before delete is optional: DeleteWorkerAsync cleans up subscriptions regardless of state, so explicit StopWorkerAsync is not required before deletion.

  5. Code updates don't restart subscriptions: LoadCodeFromContent and LoadCodeFromUrl update code in-place without disrupting processing.