Worker Lifecycle
Workers go through several state transitions during their lifetime. This guide covers creation, state management, and cleanup.
Worker States
┌──────────────┐
│ (None) │ Worker doesn't exist
└──────┬───────┘
│ CreateWorkerAsync()
▼
┌──────────────┐
│ Stopped │ Created but not yet subscribed
└──────┬───────┘
│ StartWorkerAsync()
▼
┌──────────────┐
│ Running │ Subscribed and processing
└──────┬───────┘
│ StopWorkerAsync()
▼
┌──────────────┐
│ Stopped │ Subscription paused
└──────┬───────┘
│ StartWorkerAsync()
▼
┌──────────────┐
│ Running │ Resumed processing
└──────┬───────┘
│ DeleteWorkerAsync()
▼
┌──────────────┐
│ Deleted │ Removed from registry and state
└──────────────┘
Creation
CreateWorkerAsync
var workerId = await workManager.CreateWorkerAsync(new CreateWorkerRequest(
CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(pythonCode)),
MimeType: "text/x-python",
Topic: "my-topic",
Group: null // or "my-group" for coordination
));
What happens:
- Engine lookup: Validates that an engine exists for the MIME type
- Code fetch: Extracts code from URL or uses inline content
- Code loading: Calls
engine.LoadCode(code) - Persistence: Saves worker config to Dapr state store
- Subscription: Creates Dapr pub/sub subscription
- Registry: Adds worker to in-memory registry
- Status: Sets worker to
Running
CodeSource Types
Inline Content:
CodeSource: new CodeSourceContent(Encoding.UTF8.GetBytes(code))
From URL:
CodeSource: new CodeSourceUrl(new Uri("https://example.com/worker.py"))
State Transitions
StartWorkerAsync
Resumes a stopped worker:
await workManager.StartWorkerAsync(workerId);
What happens:
1. Re-establishes Dapr topic subscription
2. Sets status to Running
Notes: - Idempotent: If already running, does nothing - Worker continues processing from where it left off
StopWorkerAsync
Pauses a running worker:
await workManager.StopWorkerAsync(workerId);
What happens:
1. Removes Dapr topic subscription
2. Sets status to Stopped
Notes:
- Idempotent: If already stopped, does nothing
- Worker can be restarted with StartWorkerAsync
Code Updates
LoadCodeFromContent
Updates worker code while running:
await workManager.LoadCodeFromContent(workerId, Encoding.UTF8.GetBytes(newCode));
What happens:
1. Calls worker.engine.LoadCode(newCode)
2. Adds history entry with new code source
3. Updates persisted configuration
LoadCodeFromUrl
Updates worker code from a URL:
await workManager.LoadCodeFromUrl(workerId, new Uri("https://example.com/new-worker.py"));
What happens:
1. Fetches code from URL
2. Calls worker.engine.LoadCode(fetchedCode)
3. Adds history entry with new URL
4. Updates persisted configuration
Deletion
DeleteWorkerAsync
Permanently removes a worker:
await workManager.DeleteWorkerAsync(workerId);
What happens: 1. Removes worker from in-memory registry 2. Deletes worker config from state store 3. Removes Dapr topic subscription 4. Does not stop the worker first (subscription is removed regardless)
Recovery
RecoverWorkersAsync
Restores workers after service restart:
await workManager.RecoverWorkersAsync();
What happens:
1. Loads all worker configs from state store
2. For each worker:
- Creates new engine instance via factory
- Loads original code into engine
- Re-establishes Dapr subscription
- Sets status to Running
Prerequisites: - All engines must be registered before calling - Dapr state store must be accessible
Error handling:
- If an engine is missing for a worker's MIME type, throws EngineNotFoundException
- Individual worker load failures are logged but don't stop recovery of other workers
History Tracking
Each worker maintains a history of code changes:
var history = workManager.GetWorkerHistory(workerId);
foreach (var entry in history)
{
Console.WriteLine($"Code loaded at {entry.CreatedAt}");
Console.WriteLine($" Type: {entry.CodeSource.GetType().Name}");
}
What's tracked:
- Initial creation with original code source
- Each LoadCodeFromContent call
- Each LoadCodeFromUrl call
What's not tracked: - Runtime execution errors (these go to logs) - Message processing counts
Group Coordination
Workers can belong to a group for mutually exclusive execution:
// Worker 1
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
CodeSource: code1,
MimeType: "text/x-python",
Topic: "shared-topic",
Group: "group-a" // Same group as Worker 2
));
// Worker 2
await workManager.CreateWorkerAsync(new CreateWorkerRequest(
CodeSource: code2,
MimeType: "text/x-python",
Topic: "shared-topic",
Group: "group-a" // Same group as Worker 1
));
Behavior: - Only one worker in a group processes a message at a time - Distributed lock acquired before processing - Lock released after processing (or on error) - Lock max age is 30 seconds (stale locks can be stolen)
Lock lifecycle:
Message received
│
▼
TryAcquireGroupLock()
│
├── Lock acquired ──► Process message ──► ReleaseLock()
│
└── Lock held by other ──► Skip message (no retry)
Cleanup on Disposal
When WorkManager.DisposeAsync() is called:
- All Dapr subscriptions are disposed
- All topic-specific subscriptions are disposed
- In-memory registry is cleared
Note: State store data is NOT deleted. Workers can be recovered on restart.
Best Practices
-
Use groups for related workers: If multiple workers should handle the same topic without duplicate processing, use a group.
-
Handle recovery gracefully: Call
RecoverWorkersAsync()early in service startup:// In Program.cs or startup await workManager.RecoverWorkersAsync(); -
Track history for debugging: History entries help audit code changes:
var history = workManager.GetWorkerHistory(id); -
Stop before delete is optional:
DeleteWorkerAsynccleans up subscriptions regardless of state, so explicitStopWorkerAsyncis not required before deletion. -
Code updates don't restart subscriptions:
LoadCodeFromContentandLoadCodeFromUrlupdate code in-place without disrupting processing.