C# Client Library
The Virtufin WorkManager Client library (Virtufin.WorkManager.Client) provides a high-level C# interface for interacting with the WorkManager service.
Installation
Reference the Virtufin.WorkManager.Client project or NuGet package in your C# application.
<ProjectReference Include="path/to/Virtufin.WorkManager.Client/Virtufin.WorkManager.Client.csproj" />
Overview
The client library provides the WorkManagerClient class as the main entry point for all worker operations.
Quick Start
using Virtufin.WorkManager.Client;
using var client = new WorkManagerClient("localhost", 5002);
// Create a worker
var workerId = await client.CreateWorkerAsync(
codeSource: new CodeSource { Content = Encoding.UTF8.GetBytes("def Process(e): return e") },
mimeType: "text/x-python",
topic: "my-topic"
);
// List workers
var workers = await client.ListWorkersAsync();
foreach (var w in workers)
Console.WriteLine($"{w.Id}: {w.Status}");
// Delete worker
await client.DeleteWorkerAsync(workerId);
WorkManagerClient
The main client class for interacting with the WorkManager service.
Constructor
public WorkManagerClient(string host = "localhost", int port = 5002)
Parameters:
- host - The gRPC host address (default: "localhost")
- port - The gRPC port (default: 5002)
Properties
// Direct access to the underlying gRPC client
public WorkManager.WorkManagerClient GrpcClient { get; }
Methods
Worker Lifecycle
// Create a new worker
Task<string> CreateWorkerAsync(
CodeSource codeSource,
string mimeType,
string topic,
string group = "")
// Update worker code from content
Task LoadCodeFromContentAsync(string id, byte[] content)
// Update worker code from URL
Task LoadCodeFromUrlAsync(string id, string url)
// Delete a worker
Task DeleteWorkerAsync(string id)
Example:
// Create with inline code
var workerId = await client.CreateWorkerAsync(
new CodeSource { Content = Encoding.UTF8.GetBytes("def Process(e): return e") },
mimeType: "text/x-python",
topic: "events"
);
// Create from URL
var workerId = await client.CreateWorkerAsync(
new CodeSource { Url = "https://example.com/worker.py" },
mimeType: "text/x-python",
topic: "events"
);
// Update code
await client.LoadCodeFromContentAsync(workerId, newCodeBytes);
// Delete
await client.DeleteWorkerAsync(workerId);
Worker Control
// Start a stopped worker
Task StartWorkerAsync(string id)
// Stop a running worker
Task StopWorkerAsync(string id)
Example:
await client.StartWorkerAsync(workerId);
await client.StopWorkerAsync(workerId);
Worker Queries
// List all workers
Task<List<WorkerInfo>> ListWorkersAsync()
// Get worker code history
Task<List<HistoryEntry>> GetWorkerHistoryAsync(string id)
// Recover workers from persistent state
Task<int> RecoverWorkersAsync()
Example:
var workers = await client.ListWorkersAsync();
foreach (var w in workers)
Console.WriteLine($"{w.Id} - {w.Status} - {w.Topic}");
var history = await client.GetWorkerHistoryAsync(workerId);
foreach (var entry in history)
Console.WriteLine($"At {entry.CreatedAt}");
var recovered = await client.RecoverWorkersAsync();
Console.WriteLine($"Recovered {recovered} workers");
Connection Management
void Close()
Task CloseAsync()
void Dispose()
ValueTask DisposeAsync()
Example:
using (var client = new WorkManagerClient())
{
var workers = await client.ListWorkersAsync();
}
// Automatically closed
Data Models
CodeSource
message CodeSource {
oneof Source {
string Url = 1;
bytes Content = 2;
}
}
WorkerInfo
message WorkerInfo {
string Id = 1;
CodeSource CodeSource = 2;
string MimeType = 3;
string Language = 4;
string Topic = 5;
string Group = 6;
string CreatedAt = 7;
WorkerStatus Status = 8;
}
WorkerStatus
enum WorkerStatus {
WORKER_STATUS_UNSPECIFIED = 0;
WORKER_STATUS_STOPPED = 1;
WORKER_STATUS_RUNNING = 2;
}
HistoryEntry
message HistoryEntry {
CodeSource CodeSource = 1;
string CreatedAt = 2;
}
Error Handling
Worker Not Found
try {
await client.DeleteWorkerAsync("nonexistent");
}
catch (Grpc.Core.RpcException ex) when (ex.StatusCode == StatusCode.NotFound)
{
Console.WriteLine("Worker not found");
}
Invalid Arguments
try {
await client.CreateWorkerAsync(
new CodeSource(),
mimeType: "",
topic: "topic");
}
catch (Grpc.Core.RpcException ex) when (ex.StatusCode == StatusCode.InvalidArgument)
{
Console.WriteLine("Invalid worker configuration");
}
Connection Errors
try {
using var client = new WorkManagerClient("unreachable-host", 5002);
await client.ListWorkersAsync();
}
catch (Grpc.Core.RpcException ex)
{
Console.WriteLine($"Connection failed: {ex.Status}");
}
Complete Example
using Grpc.Core;
using Virtufin.WorkManager.Client;
using Virtufin.WorkManager.Protos;
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("WorkManager Client Demo");
Console.WriteLine("=======================\n");
using var client = new WorkManagerClient("localhost", 5002);
// Create a worker
Console.WriteLine("Creating worker...");
var workerId = await client.CreateWorkerAsync(
new CodeSource { Content = Encoding.UTF8.GetBytes(@"
def Process(event):
return {
""type"": ""com.example.output"",
""source"": ""csharp-demo"",
""data"": event.Get(""data"", {})
}
") },
mimeType: "text/x-python",
topic: "demo-topic"
);
Console.WriteLine($"Created: {workerId}");
// List workers
Console.WriteLine("\nListing workers...");
var workers = await client.ListWorkersAsync();
foreach (var w in workers)
Console.WriteLine($" {w.Id} - {w.Status} - topic={w.Topic}");
// Start worker
Console.WriteLine("\nStarting worker...");
await client.StartWorkerAsync(workerId);
var updated = await client.ListWorkersAsync();
foreach (var w in updated)
if (w.Id == workerId)
Console.WriteLine($" Status after start: {w.Status}");
// Stop worker
Console.WriteLine("\nStopping worker...");
await client.StopWorkerAsync(workerId);
// Update code
Console.WriteLine("\nUpdating worker code...");
var newCode = Encoding.UTF8.GetBytes(@"
def Process(event):
return {
""type"": ""com.example.output-v2"",
""source"": ""csharp-demo-v2"",
""data"": {""updated"": True}
}
");
await client.LoadCodeFromContentAsync(workerId, newCode);
// Get history
Console.WriteLine("\nWorker history:");
var history = await client.GetWorkerHistoryAsync(workerId);
Console.WriteLine($" {history.Count} version(s)");
// Delete
Console.WriteLine("\nDeleting worker...");
await client.DeleteWorkerAsync(workerId);
Console.WriteLine("Deleted");
}
}
Protobuf Dependencies
The client library requires the generated protobuf classes:
<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Grpc.Net.Client" />
<PackageReference Include="Grpc.Tools" />
The proto file (workmanager.proto) is compiled into:
- Virtufin.WorkManager.Protos.WorkManager
- Virtufin.WorkManager.Protos.WorkManagerClient
- Request/Response message classes