Skip to content

Architecture

The WorkManager is an event-driven worker system built on Dapr that enables you to deploy and manage workers written in different programming languages. Workers subscribe to pub/sub topics, process incoming CloudEvents, and optionally publish responses.

System Overview

┌─────────────────────────────────────────────────────────────────────────┐
│                           WorkManager Service                           │
│                                                                         │
│  ┌─────────────┐    ┌──────────────┐    ┌─────────────────────────┐  │
│  │  WorkManager │    │ EngineRegistry│    │     WorkerRegistry      │  │
│  │  Component   │    │              │    │                         │  │
│  │              │    │  text/x-python│    │  ┌─────┐ ┌─────┐        │  │
│  │  - Create    │    │  text/x-csharp│    │  │ W1  │ │ W2  │  ...  │  │
│  │  - Start     │◄──►│  ...         │◄──►│  └─────┘ └─────┘        │  │
│  │  - Stop      │    │              │    │                         │  │
│  │  - Delete    │    └──────────────┘    └─────────────────────────┘  │
│  │  - Recover   │                                                        │
│  └──────┬───────┘                                                        │
│         │                                                                  │
│  ┌──────▼───────┐    ┌─────────────────────────────────────────────┐   │
│  │ Dapr Client  │    │              Dapr Subscriptions              │   │
│  │              │◄──►│                                             │   │
│  │  - State     │    │  topic: "time-worker-topic"                 │   │
│  │  - Pub/Sub   │    │    └──► Worker #1 ──► PythonEngine           │   │
│  │  - Pub/Sub   │    │    └──► Worker #2 ──► CSharpSourceEngine    │   │
│  └──────────────┘    └─────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘
         │
         ▼
┌─────────────────┐     ┌─────────────────┐
│   Dapr Runtime  │────►│  State Store    │
│                 │     │  (statestore)   │
│                 │     └─────────────────┘
│                 │
│                 │     ┌─────────────────┐
│                 │────►│   Pub/Sub       │
│                 │     │   (pubsub)      │
└─────────────────┘     └─────────────────┘

Core Components

WorkManager (Components/WorkManager.cs)

The central orchestrator that manages the entire worker lifecycle. It:

  • Creates, starts, stops, and deletes workers
  • Manages Dapr subscriptions to pub/sub topics
  • Persists worker configurations to the state store for recovery
  • Handles message processing and group-based locking for coordinated execution
  • Publishes worker output back to pub/sub topics

Worker (Runtime/Worker.cs)

Represents a single worker instance with:

  • Id: Unique identifier (GUID)
  • CodeSource: Where the code comes from (URL or inline content)
  • MimeType: The content type identifying the language/engine
  • Topic: The pub/sub topic to subscribe to
  • Group: Optional coordination group for mutually exclusive execution
  • Status: Either Stopped or Running
  • History: Track of all code changes over time

EngineRegistry (Runtime/EngineRegistry.cs)

Maps MIME types to engine implementations. Each engine factory creates new engine instances per worker.

Engine Implementations (Engines/)

All engines implement the IEngine interface:

public interface IEngine
{
    void LoadCode(byte[] code);
    CloudEvent? Process(CloudEvent input);
}

Available Engines:

Engine MIME Type Description
PythonEngine text/x-python Executes Python code via python3 subprocess
CSharpSourceEngine text/x-csharp Compiles and executes C# source code at runtime
CSharpDllEngine text/x-dll Loads pre-compiled .NET assemblies

WorkerRegistry (Runtime/WorkerRegistry.cs)

In-memory storage for active worker instances. Workers are recovered from persistent state on restart.

Data Flow

Worker Creation Flow

  1. Client calls CreateWorkerAsync() via gRPC or direct injection
  2. WorkManager validates the MIME type has a registered engine
  3. Code is fetched (from URL or extracted from content)
  4. Engine instance is created and code is loaded
  5. Worker configuration is persisted to Dapr state store
  6. Dapr subscription is created for the worker's topic
  7. Worker is added to the in-memory registry
  8. Worker status is set to Running

Message Processing Flow

  1. Dapr delivers a CloudEvent to the subscription handler
  2. For workers in a group: Try to acquire a distributed lock
  3. Worker engine processes the input CloudEvent
  4. If engine returns a response CloudEvent:
  5. If no group: Publish directly to output topic
  6. If group: Publish after lock acquisition
  7. Lock is released (if group-based)

Worker Recovery Flow

On service restart:

  1. RecoverWorkersAsync() is called (typically via gRPC)
  2. All worker configurations are loaded from state store
  3. For each worker:
  4. Engine is created and code is re-loaded
  5. Dapr subscription is re-established
  6. Worker status is set to Running

Persistence

Worker configurations are stored in the Dapr state store with these keys:

  • workmanager/workers/index - List of all worker IDs
  • workmanager/workers/{workerId} - Individual worker configuration

Group locks use:

  • workmanager/groups/{groupId}/lock - Lock data (owner instance ID and acquisition time)

Group Coordination

Workers can be assigned to a group for mutually exclusive execution. When multiple workers share a group:

  • Only one worker processes a message at a time
  • A distributed lock prevents concurrent processing
  • Locks have a 30-second max age and can be "stolen" if stale

Code Source Types

Workers support two code source types:

CodeSourceUrl: Code fetched from an HTTP(S) URL at worker creation time

{ "url": "https://example.com/worker.py" }

CodeSourceContent: Base64-encoded inline code

{ "content": "aW1wb3J0IGpzb24..." }

Extension Points

Adding a New Engine

  1. Create a class implementing IEngine
  2. Register it in Program.cs:
engineRegistry.Register(
    new ContentType("text/x-my-language"),
    "MyLanguage",
    () => new MyLanguageEngine()
);

Custom Worker Logic

Workers must implement a Process(CloudEvent input) function:

Python:

def Process(event):
    return {
        "type": "com.example.output",
        "source": "my-worker",
        "data": { "result": event["data"] }
    }

C#:

public static CloudEvent Process(CloudEvent input)
{
    return new CloudEvent
    {
        Type = "com.example.output",
        Source = new Uri("urn:my-worker"),
        Data = input.Data
    };
}