autobahn-client
v0.1.0
Published
A tiny TS library
Readme
Autobahn Client
A modern, production-ready Python client for Autobahn WebSocket communication with built-in RPC support, automatic reconnection, and comprehensive error handling.
Features
- 🚀 WebSocket Communication: Reliable WebSocket client with automatic reconnection
- 🔧 RPC System: Powerful decorator-based RPC with type safety and validation
- ⚡ Async/Await: Fully async/await compatible
- 🛡️ Error Handling: Comprehensive error handling with detailed logging
- 🔄 Auto-Reconnection: Automatic reconnection with configurable intervals
- 📝 Type Safety: Full type annotations and validation
- 🏗️ Protobuf: Protocol Buffers for efficient message serialization
Installation
pip install autobahn_clientQuick Start
import asyncio
from autobahn_client import Autobahn
from autobahn_client.util import Address
async def main():
# Create client
client = Autobahn(Address("localhost", 8080))
# Connect
await client.begin()
# Subscribe to a topic
async def message_handler(payload: bytes):
print(f"Received: {payload}")
await client.subscribe("my-topic", message_handler)
# Publish a message
await client.publish("my-topic", b"Hello, World!")
# Keep running
await asyncio.sleep(1)
asyncio.run(main())Configuration
Basic Configuration
from autobahn_client import Autobahn
from autobahn_client.util import Address
# Basic configuration
client = Autobahn(
address=Address("localhost", 8080),
reconnect=True, # Enable auto-reconnection
reconnect_interval_seconds=1.0 # Reconnect every 1 second
)Connection Management
# Start the client
await client.begin()
# Check connection status
await client.ping()
# Manual publish (will auto-reconnect if needed)
await client.publish("topic", b"data")
# Subscribe with error handling
async def safe_handler(payload: bytes):
try:
# Process message
print(f"Got: {payload}")
except Exception as e:
print(f"Handler error: {e}")
await client.subscribe("events", safe_handler)
# Unsubscribe
await client.unsubscribe("events")Pub/Sub System Architecture
The following diagram shows how the publish/subscribe system works:
graph TD
A["Publisher Client"] --> B["WebSocket Connection"]
C["Subscriber Client 1"] --> D["WebSocket Connection"]
E["Subscriber Client 2"] --> F["WebSocket Connection"]
B --> G["Autobahn Server"]
D --> G
F --> G
G --> H["Topic: 'chat'"]
G --> I["Topic: 'events'"]
G --> J["Topic: 'notifications'"]
A --> K["1. Publish Message<br/>topic='chat'<br/>payload='Hello World!'"]
K --> B
B --> G
G --> L["2. Forward to Subscribers"]
L --> M["3. Deliver to Client 1<br/>topic='chat'<br/>payload='Hello World!'"]
L --> N["3. Deliver to Client 2<br/>topic='chat'<br/>payload='Hello World!'"]
M --> D
N --> F
D --> O["4. Callback Executed<br/>message_handler(payload)"]
F --> P["4. Callback Executed<br/>message_handler(payload)"]
Q["Subscribe Process"] --> R["client.subscribe('chat', handler)"]
R --> S["SUBSCRIBE message sent"]
S --> G
G --> T["Client added to topic subscribers"]
style A fill:#e3f2fd
style C fill:#e8f5e8
style E fill:#e8f5e8
style G fill:#fff3e0
style H fill:#f3e5f5
style I fill:#f3e5f5
style J fill:#f3e5f5
style Q fill:#fce4ecHow it works:
- Subscription: Clients subscribe to topics they're interested in
- Publication: Any client can publish messages to any topic
- Distribution: The server forwards messages to all subscribers of that topic
- Delivery: Each subscriber's callback function is executed with the message
RPC System
The Autobahn client includes a powerful RPC system with decorators for both client calls and server functions. RPC functions are automatically discovered and registered when you call begin() - no manual registration required!
RPC Server (Service Provider)
Create RPC services using the @rpc_function decorator:
from autobahn_client import Autobahn
from autobahn_client.proto.message_pb2 import PublishMessage
from google.protobuf.message import Message
import asyncio
# Define your RPC functions - they will be auto-registered!
@Autobahn.rpc_function()
async def get_user(request: PublishMessage) -> PublishMessage:
"""Get user information."""
# Process the request
user_data = f"User: {request.topic}"
# Return response
return PublishMessage(
topic="user-response",
payload=user_data.encode()
)
@Autobahn.rpc_function()
async def process_data(request: PublishMessage) -> None:
"""Process data without returning anything."""
print(f"Processing: {request.payload.decode()}")
# No return value
async def main():
client = Autobahn(Address("localhost", 8080))
# Just call begin() - RPC functions are automatically registered!
await client.begin()
print("RPC server running with auto-registered functions...")
# Keep the server running
while True:
await asyncio.sleep(1)
asyncio.run(main())RPC Client (Service Consumer)
Create RPC client calls using the @rpc_callable decorator:
from autobahn_client import Autobahn
from autobahn_client.proto.message_pb2 import PublishMessage
import asyncio
# Define RPC client methods
@Autobahn.rpc_callable(timeout_ms=5000)
async def call_get_user(request: PublishMessage) -> PublishMessage:
"""Client method to call get_user RPC."""
pass # Implementation handled by decorator
@Autobahn.rpc_callable(timeout_ms=3000)
async def call_process_data(request: PublishMessage) -> None:
"""Client method to call process_data RPC."""
pass # Implementation handled by decorator
async def main():
client = Autobahn(Address("localhost", 8080))
await client.begin()
# Make RPC calls
try:
# Call with return value
request = PublishMessage(topic="john_doe", payload=b"get user info")
response = await call_get_user(client, request)
print(f"User info: {response.payload.decode()}")
# Call without return value
data_request = PublishMessage(topic="data", payload=b"some data")
await call_process_data(client, data_request)
print("Data processed successfully")
except TimeoutError as e:
print(f"RPC timeout: {e}")
except Exception as e:
print(f"RPC error: {e}")
asyncio.run(main())RPC Type Safety
All RPC functions must have proper type annotations:
from typing import Optional
from google.protobuf.message import Message
# ✅ Correct: Proper type annotations
@Autobahn.rpc_function()
async def valid_rpc(request: PublishMessage) -> Optional[PublishMessage]:
if request.payload:
return PublishMessage(topic="response", payload=b"data")
return None
# ❌ Invalid: Missing type annotations
@Autobahn.rpc_function()
async def invalid_rpc(request): # No type annotation
return "error"
# ❌ Invalid: Wrong parameter count
@Autobahn.rpc_function()
async def invalid_rpc2(req1: Message, req2: Message) -> Message: # Too many params
passHow RPC Auto-Registration Works
- Decoration Time: When you use
@Autobahn.rpc_function(), the function is added to a global registry - Connection Time: When you call
client.begin(), all registered RPC functions are automatically discovered and registered with that client instance - Ready to Serve: Your RPC services are immediately available to handle incoming requests
# Step 1: Decorate your functions (happens at import time)
@Autobahn.rpc_function()
async def my_service(request: MyRequest) -> MyResponse:
return MyResponse(data="processed")
# Step 2: Create client and call begin() - auto-registration happens here!
async def main():
client = Autobahn(Address("localhost", 8080))
await client.begin() # 🎉 my_service is now automatically registered!
# Your RPC service is ready to handle calls
await asyncio.sleep(float('inf'))RPC System Architecture
The following diagram shows the complete RPC request/response flow:
graph TD
A["RPC Client"] --> B["@rpc_callable decorated function"]
C["RPC Server"] --> D["@rpc_function decorated function"]
B --> E["1. Generate Call ID<br/>uuid4()"]
E --> F["2. Serialize Request<br/>protobuf → bytes"]
F --> G["3. Subscribe to Response Topic<br/>RPC/FUNCTIONAL_SERVICE/OUTPUT/{call_id}"]
G --> H["4. Publish Request<br/>Topic: RPC/FUNCTIONAL_SERVICE/{func_signature}<br/>Payload: RPCRequestMessage"]
H --> I["WebSocket Transport"]
I --> J["5. Server Receives Request"]
J --> K["6. Deserialize Request<br/>bytes → protobuf"]
K --> L["7. Execute Business Logic<br/>await rpc_function(request)"]
L --> M["8. Serialize Response<br/>protobuf → bytes"]
M --> N["9. Publish Response<br/>Topic: RPC/FUNCTIONAL_SERVICE/OUTPUT/{call_id}<br/>Payload: RPCResponseMessage"]
N --> O["WebSocket Transport"]
O --> P["10. Client Receives Response"]
P --> Q["11. Deserialize Response<br/>bytes → protobuf"]
Q --> R["12. Return Result<br/>or raise Exception"]
S["Auto-Registration Process"] --> T["@rpc_function adds to global registry"]
T --> U["client.begin() called"]
U --> V["Auto-discover registered functions"]
V --> W["Subscribe to RPC topics"]
W --> X["Ready to handle RPC calls"]
Y["Error Handling"] --> Z["Timeout: asyncio.wait_for()"]
Y --> AA["Serialization Errors"]
Y --> BB["Business Logic Exceptions"]
Y --> CC["Connection Failures"]
style A fill:#e3f2fd
style C fill:#e8f5e8
style I fill:#fff3e0
style O fill:#fff3e0
style S fill:#f1f8e9
style Y fill:#ffebeeRPC Flow Explained:
- Client Side: Decorated function generates unique call ID and subscribes to response topic
- Request: Serialized request published to function-specific topic
- Server Side: Auto-registered function receives and processes the request
- Response: Result serialized and published back to the response topic
- Completion: Client receives response, deserializes it, and returns the result
Key Features:
- 🔄 Automatic Registration: Functions decorated with
@rpc_functionare auto-discovered - 🆔 Unique Call IDs: Each RPC call gets a UUID to match requests with responses
- ⏱️ Timeout Handling: Configurable timeouts with proper cleanup
- 🛡️ Error Handling: Comprehensive error handling at every step
- 🔒 Type Safety: Full type validation for requests and responses
Advanced Features
Custom Message Types
from google.protobuf.message import Message
# Define your custom protobuf message types
# (You would typically generate these from .proto files)
@Autobahn.rpc_function()
async def handle_custom_message(request: YourCustomMessage) -> YourResponseMessage:
# Process custom message
response = YourResponseMessage()
response.field = request.input_field
return responseError Handling
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
async def robust_handler(payload: bytes):
try:
# Your message processing logic
data = json.loads(payload.decode())
# Process data...
except json.JSONDecodeError:
logging.error("Invalid JSON received")
except Exception as e:
logging.error(f"Handler error: {e}")
await client.subscribe("topic", robust_handler)Connection Monitoring
async def monitor_connection():
client = Autobahn(Address("localhost", 8080), reconnect=True)
await client.begin()
while True:
try:
await client.ping()
print("Connection healthy")
except ConnectionError:
print("Connection lost - will auto-reconnect")
await asyncio.sleep(5)Error Handling & Logging
The client provides comprehensive error handling and logging:
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# The client will automatically log:
# - Connection events
# - RPC calls and responses
# - Errors and exceptions
# - Reconnection attempts
# Handle specific errors
try:
await client.publish("topic", b"data")
except ConnectionError:
print("Not connected")
except Exception as e:
print(f"Unexpected error: {e}")API Reference
Autobahn Class
Constructor
Autobahn(
address: Address, # WebSocket server address
reconnect: bool = True, # Enable auto-reconnection
reconnect_interval_seconds: float = 1.0 # Reconnection interval
)Methods
async begin()- Start the client, connect to server, and auto-register all RPC functionsasync publish(topic: str, payload: bytes)- Publish message to topicasync subscribe(topic: str, callback: Callable)- Subscribe to topicasync unsubscribe(topic: str)- Unsubscribe from topicasync ping()- Send ping to server
Decorators
@Autobahn.rpc_function()- Mark function as RPC service (auto-registered onbegin())@Autobahn.rpc_callable(timeout_ms: float = 3000)- Create RPC client call
RPC Auto-Registration
When you call begin(), the client automatically:
- Discovers all functions decorated with
@rpc_function - Registers them as RPC services
- Sets up WebSocket subscriptions to handle incoming RPC calls
- Logs the registration process
No manual registration required!
Address Class
Address(host: str, port: int)Simple dataclass for WebSocket server address.
Protocol Buffers
This package uses Protocol Buffers for message serialization. Key message types:
PublishMessage- Standard publish/subscribe messagesTopicMessage- Topic subscription/unsubscriptionRPCRequestMessage- RPC request messagesRPCResponseMessage- RPC response messages
Compiling Proto Files
If you modify the .proto files:
# Install protobuf compiler
# macOS: brew install protobuf
# Ubuntu: apt-get install protobuf-compiler
# Compile proto files
protoc --python_out=. proto/message.protoBest Practices
1. Always Use Type Annotations
# ✅ Good
@Autobahn.rpc_function()
async def process_user(request: UserRequest) -> UserResponse:
pass
# ❌ Bad
@Autobahn.rpc_function()
async def process_user(request):
pass2. Handle Errors Gracefully
async def safe_rpc_call():
try:
result = await my_rpc_call(client, request)
return result
except TimeoutError:
logging.warning("RPC call timed out")
return None
except Exception as e:
logging.error(f"RPC call failed: {e}")
raise3. Use Connection Monitoring
# Enable auto-reconnection
client = Autobahn(address, reconnect=True)
# Monitor connection health
async def health_check():
while True:
try:
await client.ping()
except Exception:
pass # Auto-reconnection will handle it
await asyncio.sleep(30)4. Proper Resource Cleanup
async def main():
client = Autobahn(Address("localhost", 8080))
try:
await client.begin()
# Your application logic
except KeyboardInterrupt:
print("Shutting down...")
finally:
# Cleanup is handled automatically
passExamples
Check out the examples/ directory for complete working examples:
basic_client.py- Basic pub/sub clientrpc_server.py- RPC service providerrpc_client.py- RPC service consumerchat_application.py- Complete chat application
Contributing
- Fork the repository
- Create a feature branch
- Add tests for new features
- Ensure all tests pass
- Submit a pull request
License
MIT License - see LICENSE file for details.
Changelog
v0.1.1 (Latest)
- 🎉 Complete RPC system overhaul
- ✅ Fixed type extraction bug
- ✅ Fixed response handling logic
- ✅ Fixed race conditions in service registration
- ✅ Added instance-specific service registry
- ✅ Comprehensive error handling and logging
- ✅ Added input validation for RPC decorators
- ✅ Improved Optional type handling
- ✅ Added resource cleanup mechanisms
v0.1.0
- Basic WebSocket client
- Simple pub/sub functionality
- Initial RPC implementation
