Skip to content

C# Client Library

The Virtufin WorkManager Client library (Virtufin.WorkManager.Client) provides a high-level C# interface for interacting with the WorkManager service.

Installation

Reference the Virtufin.WorkManager.Client project or NuGet package in your C# application.

<ProjectReference Include="path/to/Virtufin.WorkManager.Client/Virtufin.WorkManager.Client.csproj" />

Overview

The client library provides the WorkManagerClient class as the main entry point for all worker operations.

Quick Start

using Virtufin.WorkManager.Client;

using var client = new WorkManagerClient("localhost", 5002);

// Create a worker
var workerId = await client.CreateWorkerAsync(
    codeSource: new CodeSource { Content = Encoding.UTF8.GetBytes("def Process(e): return e") },
    mimeType: "text/x-python",
    topic: "my-topic"
);

// List workers
var workers = await client.ListWorkersAsync();
foreach (var w in workers)
    Console.WriteLine($"{w.Id}: {w.Status}");

// Delete worker
await client.DeleteWorkerAsync(workerId);

WorkManagerClient

The main client class for interacting with the WorkManager service.

Constructor

public WorkManagerClient(string host = "localhost", int port = 5002)

Parameters: - host - The gRPC host address (default: "localhost") - port - The gRPC port (default: 5002)

Properties

// Direct access to the underlying gRPC client
public WorkManager.WorkManagerClient GrpcClient { get; }

Methods

Worker Lifecycle

// Create a new worker
Task<string> CreateWorkerAsync(
    CodeSource codeSource,
    string mimeType,
    string topic,
    string group = "")

// Update worker code from content
Task LoadCodeFromContentAsync(string id, byte[] content)

// Update worker code from URL
Task LoadCodeFromUrlAsync(string id, string url)

// Delete a worker
Task DeleteWorkerAsync(string id)

Example:

// Create with inline code
var workerId = await client.CreateWorkerAsync(
    new CodeSource { Content = Encoding.UTF8.GetBytes("def Process(e): return e") },
    mimeType: "text/x-python",
    topic: "events"
);

// Create from URL
var workerId = await client.CreateWorkerAsync(
    new CodeSource { Url = "https://example.com/worker.py" },
    mimeType: "text/x-python",
    topic: "events"
);

// Update code
await client.LoadCodeFromContentAsync(workerId, newCodeBytes);

// Delete
await client.DeleteWorkerAsync(workerId);

Worker Control

// Start a stopped worker
Task StartWorkerAsync(string id)

// Stop a running worker
Task StopWorkerAsync(string id)

Example:

await client.StartWorkerAsync(workerId);
await client.StopWorkerAsync(workerId);

Worker Queries

// List all workers
Task<List<WorkerInfo>> ListWorkersAsync()

// Get worker code history
Task<List<HistoryEntry>> GetWorkerHistoryAsync(string id)

// Recover workers from persistent state
Task<int> RecoverWorkersAsync()

Example:

var workers = await client.ListWorkersAsync();
foreach (var w in workers)
    Console.WriteLine($"{w.Id} - {w.Status} - {w.Topic}");

var history = await client.GetWorkerHistoryAsync(workerId);
foreach (var entry in history)
    Console.WriteLine($"At {entry.CreatedAt}");

var recovered = await client.RecoverWorkersAsync();
Console.WriteLine($"Recovered {recovered} workers");

Connection Management

void Close()
Task CloseAsync()
void Dispose()
ValueTask DisposeAsync()

Example:

using (var client = new WorkManagerClient())
{
    var workers = await client.ListWorkersAsync();
}
// Automatically closed


Data Models

CodeSource

message CodeSource {
    oneof Source {
        string Url = 1;
        bytes Content = 2;
    }
}

WorkerInfo

message WorkerInfo {
    string Id = 1;
    CodeSource CodeSource = 2;
    string MimeType = 3;
    string Language = 4;
    string Topic = 5;
    string Group = 6;
    string CreatedAt = 7;
    WorkerStatus Status = 8;
}

WorkerStatus

enum WorkerStatus {
    WORKER_STATUS_UNSPECIFIED = 0;
    WORKER_STATUS_STOPPED = 1;
    WORKER_STATUS_RUNNING = 2;
}

HistoryEntry

message HistoryEntry {
    CodeSource CodeSource = 1;
    string CreatedAt = 2;
}

Error Handling

Worker Not Found

try {
    await client.DeleteWorkerAsync("nonexistent");
}
catch (Grpc.Core.RpcException ex) when (ex.StatusCode == StatusCode.NotFound)
{
    Console.WriteLine("Worker not found");
}

Invalid Arguments

try {
    await client.CreateWorkerAsync(
        new CodeSource(),
        mimeType: "",
        topic: "topic");
}
catch (Grpc.Core.RpcException ex) when (ex.StatusCode == StatusCode.InvalidArgument)
{
    Console.WriteLine("Invalid worker configuration");
}

Connection Errors

try {
    using var client = new WorkManagerClient("unreachable-host", 5002);
    await client.ListWorkersAsync();
}
catch (Grpc.Core.RpcException ex)
{
    Console.WriteLine($"Connection failed: {ex.Status}");
}

Complete Example

using Grpc.Core;
using Virtufin.WorkManager.Client;
using Virtufin.WorkManager.Protos;

class Program
{
    static async Task Main(string[] args)
    {
        Console.WriteLine("WorkManager Client Demo");
        Console.WriteLine("=======================\n");

        using var client = new WorkManagerClient("localhost", 5002);

        // Create a worker
        Console.WriteLine("Creating worker...");
        var workerId = await client.CreateWorkerAsync(
            new CodeSource { Content = Encoding.UTF8.GetBytes(@"
def Process(event):
    return {
        ""type"": ""com.example.output"",
        ""source"": ""csharp-demo"",
        ""data"": event.Get(""data"", {})
    }
") },
            mimeType: "text/x-python",
            topic: "demo-topic"
        );
        Console.WriteLine($"Created: {workerId}");

        // List workers
        Console.WriteLine("\nListing workers...");
        var workers = await client.ListWorkersAsync();
        foreach (var w in workers)
            Console.WriteLine($"  {w.Id} - {w.Status} - topic={w.Topic}");

        // Start worker
        Console.WriteLine("\nStarting worker...");
        await client.StartWorkerAsync(workerId);

        var updated = await client.ListWorkersAsync();
        foreach (var w in updated)
            if (w.Id == workerId)
                Console.WriteLine($"  Status after start: {w.Status}");

        // Stop worker
        Console.WriteLine("\nStopping worker...");
        await client.StopWorkerAsync(workerId);

        // Update code
        Console.WriteLine("\nUpdating worker code...");
        var newCode = Encoding.UTF8.GetBytes(@"
def Process(event):
    return {
        ""type"": ""com.example.output-v2"",
        ""source"": ""csharp-demo-v2"",
        ""data"": {""updated"": True}
    }
");
        await client.LoadCodeFromContentAsync(workerId, newCode);

        // Get history
        Console.WriteLine("\nWorker history:");
        var history = await client.GetWorkerHistoryAsync(workerId);
        Console.WriteLine($"  {history.Count} version(s)");

        // Delete
        Console.WriteLine("\nDeleting worker...");
        await client.DeleteWorkerAsync(workerId);
        Console.WriteLine("Deleted");
    }
}

Protobuf Dependencies

The client library requires the generated protobuf classes:

<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Grpc.Net.Client" />
<PackageReference Include="Grpc.Tools" />

The proto file (workmanager.proto) is compiled into: - Virtufin.WorkManager.Protos.WorkManager - Virtufin.WorkManager.Protos.WorkManagerClient - Request/Response message classes