TypeScript Client Library
The Virtufin WorkManager TypeScript client library (@virtufin/workmanager) provides a TypeScript interface for interacting with the WorkManager service using gRPC-web and Connect-RPC.
Installation
npm install @virtufin/workmanager
Or reference the source directly:
{
"dependencies": {
"@virtufin/workmanager": "file:path/to/src/typescript"
}
}
Overview
The client library provides the WorkManagerClient class as the main entry point for all worker operations.
Quick Start
import { WorkManagerClient } from "@virtufin/workmanager";
const client = new WorkManagerClient({ url: "http://localhost:5002" });
// Create a worker
const { id: workerId } = await client.createWorker({
codeSource: { content: new TextEncoder().encode("def Process(e): return e") },
mimeType: "text/x-python",
topic: "my-topic"
});
// List workers
const { workers } = await client.listWorkers();
console.log(`Workers: ${workers.length}`);
// Delete worker
await client.deleteWorker({ id: workerId });
WorkManagerClient
Constructor
const client = new WorkManagerClient({ url: string, timeout?: number });
Parameters:
- url - The gRPC-web base URL (e.g., "http://localhost:5002")
- timeout - Request timeout in milliseconds (default: 30000)
Methods
Worker Lifecycle
// Create a new worker
createWorker(request: CreateWorkerRequest): Promise<CreateWorkerResponse>
// Update worker code from content
loadCodeFromContent(request: LoadCodeFromContentRequest): Promise<LoadCodeFromContentResponse>
// Update worker code from URL
loadCodeFromUrl(request: LoadCodeFromUrlRequest): Promise<LoadCodeFromUrlResponse>
// Delete a worker
deleteWorker(request: DeleteWorkerRequest): Promise<DeleteWorkerResponse>
Example:
// Create with inline code
const { id: workerId } = await client.createWorker({
codeSource: {
content: new TextEncoder().encode("def Process(e): return {\"type\": \"output\", \"source\": \"ts\", \"data\": e[\"data\"]}")
},
mimeType: "text/x-python",
topic: "events"
});
// Create from URL
const { id: workerId } = await client.createWorker({
codeSource: { url: "https://example.com/worker.py" },
mimeType: "text/x-python",
topic: "events"
});
// Update code
await client.loadCodeFromContent({
id: workerId,
content: new TextEncoder().encode("def Process(e): return {\"updated\": true}")
});
// Delete
await client.deleteWorker({ id: workerId });
Worker Control
// Start a stopped worker
startWorker(request: StartWorkerRequest): Promise<StartWorkerResponse>
// Stop a running worker
stopWorker(request: StopWorkerRequest): Promise<StopWorkerResponse>
Example:
await client.startWorker({ id: workerId });
await client.stopWorker({ id: workerId });
Worker Queries
// List all workers
listWorkers(request: ListWorkersRequest = {}): Promise<ListWorkersResponse>
// Get worker code history
getWorkerHistory(request: GetWorkerHistoryRequest): Promise<GetWorkerHistoryResponse>
// Recover workers from persistent state
recoverWorkers(request: RecoverWorkersRequest = {}): Promise<RecoverWorkersResponse>
Example:
const { workers } = await client.listWorkers();
for (const w of workers) {
console.log(`${w.id} - ${w.status} - topic=${w.topic}`);
}
const { entries } = await client.getWorkerHistory({ id: workerId });
console.log(`${entries.length} version(s) in history`);
const { recoveredCount } = await client.recoverWorkers();
console.log(`Recovered ${recoveredCount} workers`);
Data Models
CodeSource
interface CodeSource {
url?: string;
content?: Uint8Array;
}
CreateWorkerRequest
interface CreateWorkerRequest {
codeSource: CodeSource;
mimeType: string;
topic: string;
group?: string;
}
WorkerInfo
interface WorkerInfo {
id: string;
codeSource: CodeSource;
mimeType: string;
language: string;
topic: string;
group: string;
createdAt: string;
status: WorkerStatus;
}
WorkerStatus
enum WorkerStatus {
WORKER_STATUS_UNSPECIFIED = 0,
WORKER_STATUS_STOPPED = 1,
WORKER_STATUS_RUNNING = 2,
}
HistoryEntry
interface HistoryEntry {
codeSource: CodeSource;
createdAt: string;
}
Error Handling
Worker Not Found
try {
await client.deleteWorker({ id: "nonexistent" });
} catch (error) {
if (error instanceof ConnectError) {
console.log(`Not found: ${error.message}`);
}
}
Invalid Arguments
try {
await client.createWorker({
codeSource: {},
mimeType: "",
topic: "topic"
});
} catch (error) {
if (error instanceof ConnectError) {
console.log(`Invalid: ${error.message}`);
}
}
Connection Errors
try {
const client = new WorkManagerClient({ url: "http://invalid-host:5002" });
await client.listWorkers({});
} catch (error) {
console.log(`Connection failed: ${error.message}`);
}
Complete Example
import { WorkManagerClient } from "@virtufin/workmanager";
async function main() {
console.log("WorkManager Client Demo");
console.log("=======================\n");
const client = new WorkManagerClient({ url: "http://localhost:5002" });
// Create a worker
console.log("Creating worker...");
const { id: workerId } = await client.createWorker({
codeSource: {
content: new TextEncoder().encode(`def Process(event):
return {
"type": "com.example.output",
"source": "typescript-demo",
"data": event.get("data", {})
}`)
},
mimeType: "text/x-python",
topic: "demo-topic"
});
console.log(`Created: ${workerId}`);
// List workers
console.log("\nListing workers...");
const { workers } = await client.listWorkers({});
for (const w of workers) {
console.log(` ${w.id} - ${w.status} - topic=${w.topic}`);
}
// Start worker
console.log("\nStarting worker...");
await client.startWorker({ id: workerId });
const { workers: updated } = await client.listWorkers({});
for (const w of updated) {
if (w.id === workerId) {
console.log(` Status after start: ${w.status}`);
}
}
// Stop worker
console.log("\nStopping worker...");
await client.stopWorker({ id: workerId });
// Update code
console.log("\nUpdating worker code...");
await client.loadCodeFromContent({
id: workerId,
content: new TextEncoder().encode(`def Process(event):
return {
"type": "com.example.output-v2",
"source": "typescript-demo-v2",
"data": { "updated": true }
}`)
});
// Get history
console.log("\nWorker history:");
const { entries } = await client.getWorkerHistory({ id: workerId });
console.log(` ${entries.length} version(s)`);
// Delete
console.log("\nDeleting worker...");
await client.deleteWorker({ id: workerId });
console.log("Deleted");
}
main();
Protobuf Dependencies
The client library requires the peer dependency:
{
"peerDependencies": {
"@bufbuild/protobuf": "^1.10.0"
}
}
Ensure your project installs the peer dependency:
npm install @bufbuild/protobuf
The proto files are pre-compiled into the src/typescript/src/generated directory.
Generated Protos
The client includes generated protobuf classes from workmanager.proto:
| Type | Description |
|---|---|
CreateWorkerRequest |
Request to create a worker |
CreateWorkerResponse |
Response with new worker ID |
LoadCodeFromContentRequest |
Request to update code from content |
LoadCodeFromUrlRequest |
Request to update code from URL |
DeleteWorkerRequest |
Request to delete a worker |
ListWorkersRequest |
Request to list workers |
ListWorkersResponse |
Response with worker list |
WorkerInfo |
Worker details |
WorkerStatus |
Worker state enum |
StartWorkerRequest |
Request to start a worker |
StopWorkerRequest |
Request to stop a worker |
GetWorkerHistoryRequest |
Request for worker history |
HistoryEntry |
Single history entry |