Engine System
The WorkManager uses a pluggable engine architecture to support multiple programming languages. Each engine handles code execution for a specific MIME type.
IEngine Interface
All engines implement the IEngine interface:
public interface IEngine
{
void LoadCode(byte[] code);
CloudEvent? Process(CloudEvent input);
}
| Method | Description |
|---|---|
LoadCode |
Loads the worker code into the engine. Called once at worker creation or when code is updated. |
Process |
Processes an incoming CloudEvent and returns a response CloudEvent, or null to publish nothing. |
Architecture
┌─────────────────────────────────────────────────────┐
│ EngineRegistry │
│ │
│ "text/x-python" ──► () => new PythonEngine() │
│ "text/x-csharp" ──► () => new CSharpSourceEngine() │
│ "text/x-dll" ──► () => new CSharpDllEngine() │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────┐
│ Engine Factory │
│ (per worker) │
└─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ IEngine Instance │
│ │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ LoadCode │───►│ Compiled/Loaded Code │ │
│ └─────────────┘ └─────────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Process │───►│ Process(CloudEvent) │ │
│ └─────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
Engine Resolution
When a worker is created:
- WorkManager looks up the engine factory by MIME type in
EngineRegistry - A new engine instance is created via the factory function
- Code is loaded into the engine via
LoadCode() - The engine instance is associated with the worker
This design ensures: - Each worker has its own engine instance (no shared state) - Engines can be stateless or maintain internal state - New engines can be added without modifying existing code
Built-in Engines
PythonEngine
MIME Type: text/x-python
Executes Python code using the python3 subprocess. The code is written to a temporary .py file and executed with stdin/stdout for input/output.
Code Requirements:
def Process(event):
# event is a dict with CloudEvent fields
# Return a dict representing the output CloudEvent, or None
return {
"type": "com.example.output",
"source": "my-worker",
"data": { "result": "processed" }
}
CloudEvent Mapping:
Input CloudEvents are serialized to JSON and passed via stdin:
{
"id": "event-123",
"type": "com.example.input",
"source": "urn:source",
"specversion": "1.0",
"datacontenttype": "application/json",
"data": { "key": "value" }
}
The Process function should return a dict that gets deserialized back to a CloudEvent.
Error Handling:
- Python stdout is captured and parsed as JSON
- If stdout contains __error__ key, an exception is thrown
- Any stderr output results in an exception
- Execution timeout is 30 seconds by default
CSharpSourceEngine
MIME Type: text/x-csharp
Compiles and executes C# source code at runtime using Roslyn. The code is compiled into an in-memory assembly.
Code Requirements:
using CloudNative.CloudEvents;
// Must have a Process method that returns CloudEvent or null
public static CloudEvent Process(CloudEvent input)
{
return new CloudEvent
{
Type = "com.example.output",
Source = new Uri("urn:my-worker"),
Data = input.Data
};
}
Requirements:
- Method must be named Process
- Must accept a single CloudEvent parameter
- Must return CloudEvent or null
- Can be static or instance method
- Assembly is compiled with references to:
- System.*
- netstandard
- CloudNative.CloudEvents
Error Handling:
- Compilation errors throw InvalidOperationException with full error messages
- Runtime exceptions propagate to the caller
CSharpDllEngine
MIME Type: text/x-dll
Loads pre-compiled .NET assemblies from byte content.
Code Requirements:
Same as CSharpSourceEngine - the assembly must contain a Process method with the same signature.
Error Handling:
- If assembly cannot be loaded, exception is thrown
- If Process method is not found, exception is thrown
- Runtime exceptions propagate to the caller
Engine Registration
Engines are registered in Program.cs:
var engines = new (ContentType, string, Func<IEngine>)[]
{
(new ContentType("text/x-csharp"), "C#", () =>
new CSharpSourceEngine(logger)),
(new ContentType("text/x-python"), "Python", () =>
new PythonEngine(TimeSpan.FromSeconds(30), logger)),
};
foreach (var e in engines)
{
engineRegistry.Register(e.Item1, e.Item2, e.Item3);
}
Adding a Custom Engine
-
Create a new project referencing
Virtufin.WorkManager.Engines -
Implement
IEngine:using CloudNative.CloudEvents; using Virtufin.WorkManager.Engines; public sealed class MyEngine : IEngine { public void LoadCode(byte[] code) { // Load and possibly compile the code } public CloudEvent? Process(CloudEvent input) { // Execute the code with the input // Return a response CloudEvent or null } } -
Register in
Program.cs:engineRegistry.Register( new ContentType("text/x-my-lang"), "MyLanguage", () => new MyEngine() );
Engine Lifecycle
Engines are created fresh for each worker via the factory function. This means:
- Engines should be stateless (or handle their own state initialization)
- Engine instances are not reused across workers
- When a worker is recovered, a new engine is created and
LoadCodeis called again
Thread Safety
Engine instances are specific to a single worker and accessed sequentially (one message at a time). However:
PythonEngineusesIDisposable- dispose is called when engine is replacedCSharpSourceEngineandCSharpDllEnginedo not require disposal