Skip to content

Engine System

The WorkManager uses a pluggable engine architecture to support multiple programming languages. Each engine handles code execution for a specific MIME type.

IEngine Interface

All engines implement the IEngine interface:

public interface IEngine
{
    void LoadCode(byte[] code);
    CloudEvent? Process(CloudEvent input);
}
Method Description
LoadCode Loads the worker code into the engine. Called once at worker creation or when code is updated.
Process Processes an incoming CloudEvent and returns a response CloudEvent, or null to publish nothing.

Architecture

┌─────────────────────────────────────────────────────┐
│                  EngineRegistry                     │
│                                                      │
│  "text/x-python" ──► () => new PythonEngine()       │
│  "text/x-csharp" ──► () => new CSharpSourceEngine()  │
│  "text/x-dll"     ──► () => new CSharpDllEngine()   │
└─────────────────────────────────────────────────────┘
                          │
                          ▼
              ┌─────────────────────────┐
              │   Engine Factory         │
              │   (per worker)           │
              └─────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────┐
│                   IEngine Instance                  │
│                                                      │
│  ┌─────────────┐    ┌─────────────────────────────┐ │
│  │ LoadCode   │───►│  Compiled/Loaded Code        │ │
│  └─────────────┘    └─────────────────────────────┘ │
│                                                      │
│  ┌─────────────┐    ┌─────────────────────────────┐ │
│  │ Process    │───►│  Process(CloudEvent)        │ │
│  └─────────────┘    └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘

Engine Resolution

When a worker is created:

  1. WorkManager looks up the engine factory by MIME type in EngineRegistry
  2. A new engine instance is created via the factory function
  3. Code is loaded into the engine via LoadCode()
  4. The engine instance is associated with the worker

This design ensures: - Each worker has its own engine instance (no shared state) - Engines can be stateless or maintain internal state - New engines can be added without modifying existing code

Built-in Engines

PythonEngine

MIME Type: text/x-python

Executes Python code using the python3 subprocess. The code is written to a temporary .py file and executed with stdin/stdout for input/output.

Code Requirements:

def Process(event):
    # event is a dict with CloudEvent fields
    # Return a dict representing the output CloudEvent, or None
    return {
        "type": "com.example.output",
        "source": "my-worker",
        "data": { "result": "processed" }
    }

CloudEvent Mapping:

Input CloudEvents are serialized to JSON and passed via stdin:

{
    "id": "event-123",
    "type": "com.example.input",
    "source": "urn:source",
    "specversion": "1.0",
    "datacontenttype": "application/json",
    "data": { "key": "value" }
}

The Process function should return a dict that gets deserialized back to a CloudEvent.

Error Handling: - Python stdout is captured and parsed as JSON - If stdout contains __error__ key, an exception is thrown - Any stderr output results in an exception - Execution timeout is 30 seconds by default

CSharpSourceEngine

MIME Type: text/x-csharp

Compiles and executes C# source code at runtime using Roslyn. The code is compiled into an in-memory assembly.

Code Requirements:

using CloudNative.CloudEvents;

// Must have a Process method that returns CloudEvent or null
public static CloudEvent Process(CloudEvent input)
{
    return new CloudEvent
    {
        Type = "com.example.output",
        Source = new Uri("urn:my-worker"),
        Data = input.Data
    };
}

Requirements: - Method must be named Process - Must accept a single CloudEvent parameter - Must return CloudEvent or null - Can be static or instance method - Assembly is compiled with references to: - System.* - netstandard - CloudNative.CloudEvents

Error Handling: - Compilation errors throw InvalidOperationException with full error messages - Runtime exceptions propagate to the caller

CSharpDllEngine

MIME Type: text/x-dll

Loads pre-compiled .NET assemblies from byte content.

Code Requirements: Same as CSharpSourceEngine - the assembly must contain a Process method with the same signature.

Error Handling: - If assembly cannot be loaded, exception is thrown - If Process method is not found, exception is thrown - Runtime exceptions propagate to the caller

Engine Registration

Engines are registered in Program.cs:

var engines = new (ContentType, string, Func<IEngine>)[]
{
    (new ContentType("text/x-csharp"), "C#", () => 
        new CSharpSourceEngine(logger)),
    (new ContentType("text/x-python"), "Python", () => 
        new PythonEngine(TimeSpan.FromSeconds(30), logger)),
};

foreach (var e in engines)
{
    engineRegistry.Register(e.Item1, e.Item2, e.Item3);
}

Adding a Custom Engine

  1. Create a new project referencing Virtufin.WorkManager.Engines

  2. Implement IEngine:

    using CloudNative.CloudEvents;
    using Virtufin.WorkManager.Engines;
    
    public sealed class MyEngine : IEngine
    {
        public void LoadCode(byte[] code)
        {
            // Load and possibly compile the code
        }
    
        public CloudEvent? Process(CloudEvent input)
        {
            // Execute the code with the input
            // Return a response CloudEvent or null
        }
    }
    

  3. Register in Program.cs:

    engineRegistry.Register(
        new ContentType("text/x-my-lang"),
        "MyLanguage",
        () => new MyEngine()
    );
    

Engine Lifecycle

Engines are created fresh for each worker via the factory function. This means:

  • Engines should be stateless (or handle their own state initialization)
  • Engine instances are not reused across workers
  • When a worker is recovered, a new engine is created and LoadCode is called again

Thread Safety

Engine instances are specific to a single worker and accessed sequentially (one message at a time). However:

  • PythonEngine uses IDisposable - dispose is called when engine is replaced
  • CSharpSourceEngine and CSharpDllEngine do not require disposal