Architecture
The WorkManager is an event-driven worker system built on Dapr that enables you to deploy and manage workers written in different programming languages. Workers subscribe to pub/sub topics, process incoming CloudEvents, and optionally publish responses.
System Overview
┌─────────────────────────────────────────────────────────────────────────┐
│ WorkManager Service │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ WorkManager │ │ EngineRegistry│ │ WorkerRegistry │ │
│ │ Component │ │ │ │ │ │
│ │ │ │ text/x-python│ │ ┌─────┐ ┌─────┐ │ │
│ │ - Create │ │ text/x-csharp│ │ │ W1 │ │ W2 │ ... │ │
│ │ - Start │◄──►│ ... │◄──►│ └─────┘ └─────┘ │ │
│ │ - Stop │ │ │ │ │ │
│ │ - Delete │ └──────────────┘ └─────────────────────────┘ │
│ │ - Recover │ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ ┌─────────────────────────────────────────────┐ │
│ │ Dapr Client │ │ Dapr Subscriptions │ │
│ │ │◄──►│ │ │
│ │ - State │ │ topic: "time-worker-topic" │ │
│ │ - Pub/Sub │ │ └──► Worker #1 ──► PythonEngine │ │
│ │ - Pub/Sub │ │ └──► Worker #2 ──► CSharpSourceEngine │ │
│ └──────────────┘ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ Dapr Runtime │────►│ State Store │
│ │ │ (statestore) │
│ │ └─────────────────┘
│ │
│ │ ┌─────────────────┐
│ │────►│ Pub/Sub │
│ │ │ (pubsub) │
└─────────────────┘ └─────────────────┘
Core Components
WorkManager (Components/WorkManager.cs)
The central orchestrator that manages the entire worker lifecycle. It:
- Creates, starts, stops, and deletes workers
- Manages Dapr subscriptions to pub/sub topics
- Persists worker configurations to the state store for recovery
- Handles message processing and group-based locking for coordinated execution
- Publishes worker output back to pub/sub topics
Worker (Runtime/Worker.cs)
Represents a single worker instance with:
- Id: Unique identifier (GUID)
- CodeSource: Where the code comes from (URL or inline content)
- MimeType: The content type identifying the language/engine
- Topic: The pub/sub topic to subscribe to
- Group: Optional coordination group for mutually exclusive execution
- Status: Either
StoppedorRunning - History: Track of all code changes over time
EngineRegistry (Runtime/EngineRegistry.cs)
Maps MIME types to engine implementations. Each engine factory creates new engine instances per worker.
Engine Implementations (Engines/)
All engines implement the IEngine interface:
public interface IEngine
{
void LoadCode(byte[] code);
CloudEvent? Process(CloudEvent input);
}
Available Engines:
| Engine | MIME Type | Description |
|---|---|---|
PythonEngine |
text/x-python |
Executes Python code via python3 subprocess |
CSharpSourceEngine |
text/x-csharp |
Compiles and executes C# source code at runtime |
CSharpDllEngine |
text/x-dll |
Loads pre-compiled .NET assemblies |
WorkerRegistry (Runtime/WorkerRegistry.cs)
In-memory storage for active worker instances. Workers are recovered from persistent state on restart.
Data Flow
Worker Creation Flow
- Client calls
CreateWorkerAsync()via gRPC or direct injection - WorkManager validates the MIME type has a registered engine
- Code is fetched (from URL or extracted from content)
- Engine instance is created and code is loaded
- Worker configuration is persisted to Dapr state store
- Dapr subscription is created for the worker's topic
- Worker is added to the in-memory registry
- Worker status is set to
Running
Message Processing Flow
- Dapr delivers a CloudEvent to the subscription handler
- For workers in a group: Try to acquire a distributed lock
- Worker engine processes the input CloudEvent
- If engine returns a response CloudEvent:
- If no group: Publish directly to output topic
- If group: Publish after lock acquisition
- Lock is released (if group-based)
Worker Recovery Flow
On service restart:
RecoverWorkersAsync()is called (typically via gRPC)- All worker configurations are loaded from state store
- For each worker:
- Engine is created and code is re-loaded
- Dapr subscription is re-established
- Worker status is set to
Running
Persistence
Worker configurations are stored in the Dapr state store with these keys:
workmanager/workers/index- List of all worker IDsworkmanager/workers/{workerId}- Individual worker configuration
Group locks use:
workmanager/groups/{groupId}/lock- Lock data (owner instance ID and acquisition time)
Group Coordination
Workers can be assigned to a group for mutually exclusive execution. When multiple workers share a group:
- Only one worker processes a message at a time
- A distributed lock prevents concurrent processing
- Locks have a 30-second max age and can be "stolen" if stale
Code Source Types
Workers support two code source types:
CodeSourceUrl: Code fetched from an HTTP(S) URL at worker creation time
{ "url": "https://example.com/worker.py" }
CodeSourceContent: Base64-encoded inline code
{ "content": "aW1wb3J0IGpzb24..." }
Extension Points
Adding a New Engine
- Create a class implementing
IEngine - Register it in
Program.cs:
engineRegistry.Register(
new ContentType("text/x-my-language"),
"MyLanguage",
() => new MyLanguageEngine()
);
Custom Worker Logic
Workers must implement a Process(CloudEvent input) function:
Python:
def Process(event):
return {
"type": "com.example.output",
"source": "my-worker",
"data": { "result": event["data"] }
}
C#:
public static CloudEvent Process(CloudEvent input)
{
return new CloudEvent
{
Type = "com.example.output",
Source = new Uri("urn:my-worker"),
Data = input.Data
};
}