Skip to content

TypeScript Client Library

The Virtufin WorkManager TypeScript client library (@virtufin/workmanager) provides a TypeScript interface for interacting with the WorkManager service using gRPC-web and Connect-RPC.

Installation

npm install @virtufin/workmanager

Or reference the source directly:

{
  "dependencies": {
    "@virtufin/workmanager": "file:path/to/src/typescript"
  }
}

Overview

The client library provides the WorkManagerClient class as the main entry point for all worker operations.

Quick Start

import { WorkManagerClient } from "@virtufin/workmanager";

const client = new WorkManagerClient({ url: "http://localhost:5002" });

// Create a worker
const { id: workerId } = await client.createWorker({
    codeSource: { content: new TextEncoder().encode("def Process(e): return e") },
    mimeType: "text/x-python",
    topic: "my-topic"
});

// List workers
const { workers } = await client.listWorkers();
console.log(`Workers: ${workers.length}`);

// Delete worker
await client.deleteWorker({ id: workerId });

WorkManagerClient

Constructor

const client = new WorkManagerClient({ url: string, timeout?: number });

Parameters: - url - The gRPC-web base URL (e.g., "http://localhost:5002") - timeout - Request timeout in milliseconds (default: 30000)

Methods

Worker Lifecycle

// Create a new worker
createWorker(request: CreateWorkerRequest): Promise<CreateWorkerResponse>

// Update worker code from content
loadCodeFromContent(request: LoadCodeFromContentRequest): Promise<LoadCodeFromContentResponse>

// Update worker code from URL
loadCodeFromUrl(request: LoadCodeFromUrlRequest): Promise<LoadCodeFromUrlResponse>

// Delete a worker
deleteWorker(request: DeleteWorkerRequest): Promise<DeleteWorkerResponse>

Example:

// Create with inline code
const { id: workerId } = await client.createWorker({
    codeSource: {
        content: new TextEncoder().encode("def Process(e): return {\"type\": \"output\", \"source\": \"ts\", \"data\": e[\"data\"]}")
    },
    mimeType: "text/x-python",
    topic: "events"
});

// Create from URL
const { id: workerId } = await client.createWorker({
    codeSource: { url: "https://example.com/worker.py" },
    mimeType: "text/x-python",
    topic: "events"
});

// Update code
await client.loadCodeFromContent({
    id: workerId,
    content: new TextEncoder().encode("def Process(e): return {\"updated\": true}")
});

// Delete
await client.deleteWorker({ id: workerId });

Worker Control

// Start a stopped worker
startWorker(request: StartWorkerRequest): Promise<StartWorkerResponse>

// Stop a running worker
stopWorker(request: StopWorkerRequest): Promise<StopWorkerResponse>

Example:

await client.startWorker({ id: workerId });
await client.stopWorker({ id: workerId });

Worker Queries

// List all workers
listWorkers(request: ListWorkersRequest = {}): Promise<ListWorkersResponse>

// Get worker code history
getWorkerHistory(request: GetWorkerHistoryRequest): Promise<GetWorkerHistoryResponse>

// Recover workers from persistent state
recoverWorkers(request: RecoverWorkersRequest = {}): Promise<RecoverWorkersResponse>

Example:

const { workers } = await client.listWorkers();
for (const w of workers) {
    console.log(`${w.id} - ${w.status} - topic=${w.topic}`);
}

const { entries } = await client.getWorkerHistory({ id: workerId });
console.log(`${entries.length} version(s) in history`);

const { recoveredCount } = await client.recoverWorkers();
console.log(`Recovered ${recoveredCount} workers`);


Data Models

CodeSource

interface CodeSource {
    url?: string;
    content?: Uint8Array;
}

CreateWorkerRequest

interface CreateWorkerRequest {
    codeSource: CodeSource;
    mimeType: string;
    topic: string;
    group?: string;
}

WorkerInfo

interface WorkerInfo {
    id: string;
    codeSource: CodeSource;
    mimeType: string;
    language: string;
    topic: string;
    group: string;
    createdAt: string;
    status: WorkerStatus;
}

WorkerStatus

enum WorkerStatus {
    WORKER_STATUS_UNSPECIFIED = 0,
    WORKER_STATUS_STOPPED = 1,
    WORKER_STATUS_RUNNING = 2,
}

HistoryEntry

interface HistoryEntry {
    codeSource: CodeSource;
    createdAt: string;
}

Error Handling

Worker Not Found

try {
    await client.deleteWorker({ id: "nonexistent" });
} catch (error) {
    if (error instanceof ConnectError) {
        console.log(`Not found: ${error.message}`);
    }
}

Invalid Arguments

try {
    await client.createWorker({
        codeSource: {},
        mimeType: "",
        topic: "topic"
    });
} catch (error) {
    if (error instanceof ConnectError) {
        console.log(`Invalid: ${error.message}`);
    }
}

Connection Errors

try {
    const client = new WorkManagerClient({ url: "http://invalid-host:5002" });
    await client.listWorkers({});
} catch (error) {
    console.log(`Connection failed: ${error.message}`);
}

Complete Example

import { WorkManagerClient } from "@virtufin/workmanager";

async function main() {
    console.log("WorkManager Client Demo");
    console.log("=======================\n");

    const client = new WorkManagerClient({ url: "http://localhost:5002" });

    // Create a worker
    console.log("Creating worker...");
    const { id: workerId } = await client.createWorker({
        codeSource: {
            content: new TextEncoder().encode(`def Process(event):
    return {
        "type": "com.example.output",
        "source": "typescript-demo",
        "data": event.get("data", {})
    }`)
        },
        mimeType: "text/x-python",
        topic: "demo-topic"
    });
    console.log(`Created: ${workerId}`);

    // List workers
    console.log("\nListing workers...");
    const { workers } = await client.listWorkers({});
    for (const w of workers) {
        console.log(`  ${w.id} - ${w.status} - topic=${w.topic}`);
    }

    // Start worker
    console.log("\nStarting worker...");
    await client.startWorker({ id: workerId });

    const { workers: updated } = await client.listWorkers({});
    for (const w of updated) {
        if (w.id === workerId) {
            console.log(`  Status after start: ${w.status}`);
        }
    }

    // Stop worker
    console.log("\nStopping worker...");
    await client.stopWorker({ id: workerId });

    // Update code
    console.log("\nUpdating worker code...");
    await client.loadCodeFromContent({
        id: workerId,
        content: new TextEncoder().encode(`def Process(event):
    return {
        "type": "com.example.output-v2",
        "source": "typescript-demo-v2",
        "data": { "updated": true }
    }`)
    });

    // Get history
    console.log("\nWorker history:");
    const { entries } = await client.getWorkerHistory({ id: workerId });
    console.log(`  ${entries.length} version(s)`);

    // Delete
    console.log("\nDeleting worker...");
    await client.deleteWorker({ id: workerId });
    console.log("Deleted");
}

main();

Protobuf Dependencies

The client library requires the peer dependency:

{
  "peerDependencies": {
    "@bufbuild/protobuf": "^1.10.0"
  }
}

Ensure your project installs the peer dependency:

npm install @bufbuild/protobuf

The proto files are pre-compiled into the src/typescript/src/generated directory.


Generated Protos

The client includes generated protobuf classes from workmanager.proto:

Type Description
CreateWorkerRequest Request to create a worker
CreateWorkerResponse Response with new worker ID
LoadCodeFromContentRequest Request to update code from content
LoadCodeFromUrlRequest Request to update code from URL
DeleteWorkerRequest Request to delete a worker
ListWorkersRequest Request to list workers
ListWorkersResponse Response with worker list
WorkerInfo Worker details
WorkerStatus Worker state enum
StartWorkerRequest Request to start a worker
StopWorkerRequest Request to stop a worker
GetWorkerHistoryRequest Request for worker history
HistoryEntry Single history entry