Skip to content

Python Client

The Python client library provides an async gRPC interface to the WorkManager service.

Installation

The client is located at src/python/virtufin/. Install it in your project:

pip install grpcio grpcio-reflection

Or reference it directly from your project structure.

Quick Start

import asyncio
from virtufin import WorkManagerClient

async def main():
    async with WorkManagerClient("localhost", 5002) as client:
        # Create a worker
        worker_id = await client.create_worker(
            code_source={"content": "def Process(event):\n    return {\"type\": \"com.example.output\", \"source\": \"test\", \"data\": event[\"data\"]}"},
            mime_type="text/x-python",
            topic="my-topic"
        )
        print(f"Created worker: {worker_id}")

        # List workers
        workers = await client.list_workers()
        for w in workers:
            print(f"  {w['id']}: {w['status']}")

        # Delete worker
        await client.delete_worker(worker_id)

asyncio.run(main())

WorkManagerClient

The main client class for interacting with the WorkManager service.

Constructor

def __init__(self, host: str = "localhost", port: int = 5102)
Parameter Type Default Description
host str "localhost" gRPC server hostname
port int 5102 gRPC server port

Context Manager

async with WorkManagerClient() as client:
    # client is connected
    pass
# Automatically closed

connect()

Manually establish connection.

await client.connect()

close()

Close the connection.

await client.close()

Worker Operations

create_worker

Creates a new worker.

worker_id: str = await client.create_worker(
    code_source: Dict[str, Any],
    mime_type: str,
    topic: str,
    group: str = ""
)
Parameter Type Description
code_source Dict Either {"url": "..."} or {"content": "base64_or_raw_string"}
mime_type str MIME type like "text/x-python"
topic str Pub/sub topic to subscribe to
group str Optional group for coordinated execution

Returns: Worker ID string.

Example:

# Create with inline code
worker_id = await client.create_worker(
    code_source={"content": "def Process(e): return {\"type\": \"output\", \"source\": \"w\", \"data\": e[\"data\"]}"},
    mime_type="text/x-python",
    topic="events"
)

# Create from URL
worker_id = await client.create_worker(
    code_source={"url": "https://example.com/worker.py"},
    mime_type="text/x-python",
    topic="events"
)

load_code_from_content

Update worker code with inline content.

await client.load_code_from_content(id: str, content: bytes)
Parameter Type Description
id str Worker ID
content bytes New code content

load_code_from_url

Update worker code from a URL.

await client.load_code_from_url(id: str, url: str)
Parameter Type Description
id str Worker ID
url str URL to fetch new code from

delete_worker

Delete a worker.

await client.delete_worker(id: str)
Parameter Type Description
id str Worker ID to delete

start_worker

Start a stopped worker.

await client.start_worker(id: str)
Parameter Type Description
id str Worker ID to start

stop_worker

Stop a running worker.

await client.stop_worker(id: str)
Parameter Type Description
id str Worker ID to stop

list_workers

List all workers.

workers: List[Dict[str, Any]] = await client.list_workers()

Returns: List of worker info dictionaries with keys: - id - Worker ID - code_source - {"url": ...} or {"content": ...} - mime_type - MIME type string - language - Language name - topic - Subscribed topic - group - Group name (or empty string) - created_at - ISO timestamp - status - "STOPPED" or "RUNNING"

get_worker_history

Get code change history for a worker.

history: List[Dict[str, Any]] = await client.get_worker_history(id: str)
Parameter Type Description
id str Worker ID

Returns: List of history entries with: - code_source - Code source at that point - created_at - ISO timestamp

recover_workers

Recover workers from persistent state.

count: int = await client.recover_workers()

Returns: Number of workers recovered.

create_worker_from_file

Convenience method to create a worker from a source file.

worker_id: Optional[str] = await client.create_worker_from_file(
    file_path: Path,
    mime_type: str = "text/x-python",
    topic: str = "time-worker-topic",
    group: str = None
)
Parameter Type Default Description
file_path Path - Path to worker source file
mime_type str "text/x-python" MIME type
topic str "time-worker-topic" Topic name
group str None Optional group

Complete Example

#!/usr/bin/env python3
"""Example: Complete worker lifecycle with the Python client."""

import asyncio
import base64
from pathlib import Path
from virtufin import WorkManagerClient

# Worker code as a string
WORKER_CODE = '''
from datetime import datetime

def Process(event):
    return {
        "type": "com.example.time-response",
        "source": "python-worker",
        "data": {
            "current_time": datetime.now().isoformat(),
            "received_event_id": event.get("id", "unknown")
        }
    }
'''

async def main():
    async with WorkManagerClient("localhost", 5002) as client:
        print("Creating worker...")
        worker_id = await client.create_worker(
            code_source={"content": WORKER_CODE},
            mime_type="text/x-python",
            topic="time-worker-topic"
        )
        print(f"Created: {worker_id}")

        print("\nListing workers...")
        workers = await client.list_workers()
        for w in workers:
            print(f"  {w['id']} - {w['status']} - topic={w['topic']}")

        print("\nUpdating worker code...")
        new_code = '''
from datetime import datetime

def Process(event):
    return {
        "type": "com.example.time-response-v2",
        "source": "python-worker-v2",
        "data": {
            "current_time": datetime.now().isoformat(),
            "message": "Updated worker!"
        }
    }
'''
        await client.load_code_from_content(worker_id, new_code.encode())
        print("Code updated")

        print("\nGetting history...")
        history = await client.get_worker_history(worker_id)
        for i, entry in enumerate(history):
            print(f"  Version {i}: {entry['created_at']}")

        print("\nStopping worker...")
        await client.stop_worker(worker_id)

        workers = await client.list_workers()
        for w in workers:
            if w['id'] == worker_id:
                print(f"  Status after stop: {w['status']}")

        print("\nStarting worker...")
        await client.start_worker(worker_id)

        print("\nDeleting worker...")
        await client.delete_worker(worker_id)

        workers = await client.list_workers()
        found = any(w['id'] == worker_id for w in workers)
        print(f"  Worker still listed: {found}")

if __name__ == "__main__":
    asyncio.run(main())

Error Handling

All methods raise grpc.aio.AioRpcError on connection or RPC errors:

try:
    await client.create_worker(...)
except grpc.aio.AioRpcError as e:
    print(f"gRPC error: {e.code()} - {e.details()}")

Common gRPC status codes: - StatusCode.NOT_FOUND - Worker doesn't exist - StatusCode.INVALID_ARGUMENT - Invalid parameters - StatusCode.UNAVAILABLE - Server not reachable

Direct gRPC Usage

If you need lower-level access, you can use grpcio-reflection directly. See examples/run_worker_grpc.py for a complete implementation using dynamic method invocation without generated stubs.