Python Client
The Python client library provides an async gRPC interface to the WorkManager service.
Installation
The client is located at src/python/virtufin/. Install it in your project:
pip install grpcio grpcio-reflection
Or reference it directly from your project structure.
Quick Start
import asyncio
from virtufin import WorkManagerClient
async def main():
async with WorkManagerClient("localhost", 5002) as client:
# Create a worker
worker_id = await client.create_worker(
code_source={"content": "def Process(event):\n return {\"type\": \"com.example.output\", \"source\": \"test\", \"data\": event[\"data\"]}"},
mime_type="text/x-python",
topic="my-topic"
)
print(f"Created worker: {worker_id}")
# List workers
workers = await client.list_workers()
for w in workers:
print(f" {w['id']}: {w['status']}")
# Delete worker
await client.delete_worker(worker_id)
asyncio.run(main())
WorkManagerClient
The main client class for interacting with the WorkManager service.
Constructor
def __init__(self, host: str = "localhost", port: int = 5102)
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
gRPC server hostname |
port |
int |
5102 |
gRPC server port |
Context Manager
async with WorkManagerClient() as client:
# client is connected
pass
# Automatically closed
connect()
Manually establish connection.
await client.connect()
close()
Close the connection.
await client.close()
Worker Operations
create_worker
Creates a new worker.
worker_id: str = await client.create_worker(
code_source: Dict[str, Any],
mime_type: str,
topic: str,
group: str = ""
)
| Parameter | Type | Description |
|---|---|---|
code_source |
Dict |
Either {"url": "..."} or {"content": "base64_or_raw_string"} |
mime_type |
str |
MIME type like "text/x-python" |
topic |
str |
Pub/sub topic to subscribe to |
group |
str |
Optional group for coordinated execution |
Returns: Worker ID string.
Example:
# Create with inline code
worker_id = await client.create_worker(
code_source={"content": "def Process(e): return {\"type\": \"output\", \"source\": \"w\", \"data\": e[\"data\"]}"},
mime_type="text/x-python",
topic="events"
)
# Create from URL
worker_id = await client.create_worker(
code_source={"url": "https://example.com/worker.py"},
mime_type="text/x-python",
topic="events"
)
load_code_from_content
Update worker code with inline content.
await client.load_code_from_content(id: str, content: bytes)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID |
content |
bytes |
New code content |
load_code_from_url
Update worker code from a URL.
await client.load_code_from_url(id: str, url: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID |
url |
str |
URL to fetch new code from |
delete_worker
Delete a worker.
await client.delete_worker(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID to delete |
start_worker
Start a stopped worker.
await client.start_worker(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID to start |
stop_worker
Stop a running worker.
await client.stop_worker(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID to stop |
list_workers
List all workers.
workers: List[Dict[str, Any]] = await client.list_workers()
Returns: List of worker info dictionaries with keys:
- id - Worker ID
- code_source - {"url": ...} or {"content": ...}
- mime_type - MIME type string
- language - Language name
- topic - Subscribed topic
- group - Group name (or empty string)
- created_at - ISO timestamp
- status - "STOPPED" or "RUNNING"
get_worker_history
Get code change history for a worker.
history: List[Dict[str, Any]] = await client.get_worker_history(id: str)
| Parameter | Type | Description |
|---|---|---|
id |
str |
Worker ID |
Returns: List of history entries with:
- code_source - Code source at that point
- created_at - ISO timestamp
recover_workers
Recover workers from persistent state.
count: int = await client.recover_workers()
Returns: Number of workers recovered.
create_worker_from_file
Convenience method to create a worker from a source file.
worker_id: Optional[str] = await client.create_worker_from_file(
file_path: Path,
mime_type: str = "text/x-python",
topic: str = "time-worker-topic",
group: str = None
)
| Parameter | Type | Default | Description |
|---|---|---|---|
file_path |
Path |
- | Path to worker source file |
mime_type |
str |
"text/x-python" |
MIME type |
topic |
str |
"time-worker-topic" |
Topic name |
group |
str |
None |
Optional group |
Complete Example
#!/usr/bin/env python3
"""Example: Complete worker lifecycle with the Python client."""
import asyncio
import base64
from pathlib import Path
from virtufin import WorkManagerClient
# Worker code as a string
WORKER_CODE = '''
from datetime import datetime
def Process(event):
return {
"type": "com.example.time-response",
"source": "python-worker",
"data": {
"current_time": datetime.now().isoformat(),
"received_event_id": event.get("id", "unknown")
}
}
'''
async def main():
async with WorkManagerClient("localhost", 5002) as client:
print("Creating worker...")
worker_id = await client.create_worker(
code_source={"content": WORKER_CODE},
mime_type="text/x-python",
topic="time-worker-topic"
)
print(f"Created: {worker_id}")
print("\nListing workers...")
workers = await client.list_workers()
for w in workers:
print(f" {w['id']} - {w['status']} - topic={w['topic']}")
print("\nUpdating worker code...")
new_code = '''
from datetime import datetime
def Process(event):
return {
"type": "com.example.time-response-v2",
"source": "python-worker-v2",
"data": {
"current_time": datetime.now().isoformat(),
"message": "Updated worker!"
}
}
'''
await client.load_code_from_content(worker_id, new_code.encode())
print("Code updated")
print("\nGetting history...")
history = await client.get_worker_history(worker_id)
for i, entry in enumerate(history):
print(f" Version {i}: {entry['created_at']}")
print("\nStopping worker...")
await client.stop_worker(worker_id)
workers = await client.list_workers()
for w in workers:
if w['id'] == worker_id:
print(f" Status after stop: {w['status']}")
print("\nStarting worker...")
await client.start_worker(worker_id)
print("\nDeleting worker...")
await client.delete_worker(worker_id)
workers = await client.list_workers()
found = any(w['id'] == worker_id for w in workers)
print(f" Worker still listed: {found}")
if __name__ == "__main__":
asyncio.run(main())
Error Handling
All methods raise grpc.aio.AioRpcError on connection or RPC errors:
try:
await client.create_worker(...)
except grpc.aio.AioRpcError as e:
print(f"gRPC error: {e.code()} - {e.details()}")
Common gRPC status codes:
- StatusCode.NOT_FOUND - Worker doesn't exist
- StatusCode.INVALID_ARGUMENT - Invalid parameters
- StatusCode.UNAVAILABLE - Server not reachable
Direct gRPC Usage
If you need lower-level access, you can use grpcio-reflection directly. See examples/run_worker_grpc.py for a complete implementation using dynamic method invocation without generated stubs.