Advanced Features
Learn advanced techniques for optimizing performance, implementing caching, and building production-ready applications with the Claro Python SDK.Rate Limiting and Backoff
Built-in Rate Limit Handling
The SDK automatically handles rate limits with exponential backoff:Copy
from baytos.claro import BaytClient
# Default: 3 retries with exponential backoff
client = BaytClient(api_key="your_api_key", max_retries=3)
# Backoff strategy: 2^attempt seconds
# - 1st retry: 1 second
# - 2nd retry: 2 seconds
# - 3rd retry: 4 seconds
Respecting Retry-After Headers
The SDK respects theRetry-After header from the API:
Copy
# If API returns Retry-After: 60
# SDK will wait 60 seconds before retrying
# Otherwise uses exponential backoff
Custom Retry Configuration
Adjust retry behavior based on your needs:Copy
# Aggressive retries for batch jobs
client = BaytClient(
api_key="...",
max_retries=5 # Up to 5 retries
)
# Fast-fail for interactive apps
client = BaytClient(
api_key="...",
max_retries=1 # Single retry only
)
# No retries (handle manually)
client = BaytClient(
api_key="...",
max_retries=0
)
Connection Pooling
How It Works
The SDK usesrequests.Session for automatic connection pooling:
Copy
from baytos.claro import BaytClient
client = BaytClient(api_key="...")
# First request - establishes connection
prompt1 = client.get_prompt("@workspace/p1:v1")
# Subsequent requests - reuse connection (faster)
prompt2 = client.get_prompt("@workspace/p2:v1")
prompt3 = client.get_prompt("@workspace/p3:v1")
Benefits
- Faster requests - No TCP handshake overhead
- Lower latency - Connection already established
- Reduced server load - Fewer concurrent connections
- Automatic keep-alive - Connection reused efficiently
Best Practice: Reuse Client
Copy
# ✅ Good: Single client instance
client = BaytClient(api_key="...")
for package in packages:
prompt = client.get_prompt(package)
# ❌ Bad: New client each time
for package in packages:
client = BaytClient(api_key="...") # Creates new session!
prompt = client.get_prompt(package)
Caching Strategies
Simple In-Memory Cache
Cache prompts to reduce API calls:Copy
from functools import lru_cache
from baytos.claro import BaytClient
client = BaytClient(api_key="...")
@lru_cache(maxsize=100)
def get_cached_prompt(package_name: str):
"""Cache up to 100 prompts in memory"""
return client.get_prompt(package_name)
# First call - fetches from API
prompt = get_cached_prompt("@workspace/support:v1")
# Subsequent calls - returns cached version
prompt = get_cached_prompt("@workspace/support:v1") # Instant!
Cached prompts become stale when updated. Use version pinning or implement cache invalidation.
Time-Based Cache
Implement TTL (time-to-live) caching:Copy
import time
from typing import Dict, Optional, Tuple
from baytos.claro import BaytClient, Prompt
class PromptCache:
def __init__(self, client: BaytClient, ttl_seconds: int = 300):
self.client = client
self.ttl = ttl_seconds
self.cache: Dict[str, Tuple[Prompt, float]] = {}
def get(self, package_name: str) -> Prompt:
"""Get prompt with TTL-based caching"""
now = time.time()
# Check cache
if package_name in self.cache:
prompt, timestamp = self.cache[package_name]
# Return if not expired
if now - timestamp < self.ttl:
return prompt
# Fetch from API
prompt = self.client.get_prompt(package_name)
# Store in cache
self.cache[package_name] = (prompt, now)
return prompt
def invalidate(self, package_name: str = None):
"""Invalidate cache for one or all prompts"""
if package_name:
self.cache.pop(package_name, None)
else:
self.cache.clear()
# Usage
client = BaytClient(api_key="...")
cache = PromptCache(client, ttl_seconds=300) # 5 minutes
prompt = cache.get("@workspace/support:v1") # Fetches from API
prompt = cache.get("@workspace/support:v1") # Returns cached (< 5 min)
# Invalidate when needed
cache.invalidate("@workspace/support:v1")
Redis-Based Caching
For distributed systems, use Redis:Copy
import json
import redis
from baytos.claro import BaytClient, Prompt
class RedisPromptCache:
def __init__(self, client: BaytClient, redis_client, ttl=300):
self.client = client
self.redis = redis_client
self.ttl = ttl
def get(self, package_name: str) -> Prompt:
"""Get prompt with Redis caching"""
cache_key = f"prompt:{package_name}"
# Check Redis cache
cached = self.redis.get(cache_key)
if cached:
data = json.loads(cached)
return Prompt(data)
# Fetch from API
prompt = self.client.get_prompt(package_name)
# Store in Redis
self.redis.setex(
cache_key,
self.ttl,
json.dumps(prompt.to_dict())
)
return prompt
# Usage
client = BaytClient(api_key="...")
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache = RedisPromptCache(client, redis_client, ttl=300)
prompt = cache.get("@workspace/support:v1")
Concurrent Requests
Thread-Based Concurrency
Fetch multiple prompts in parallel:Copy
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient
client = BaytClient(api_key="...")
package_names = [
"@workspace/support:v1",
"@workspace/sales:v1",
"@workspace/marketing:v1"
]
def fetch_prompt(package_name):
"""Fetch a single prompt"""
return client.get_prompt(package_name)
# Fetch concurrently
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit all tasks
futures = {
executor.submit(fetch_prompt, name): name
for name in package_names
}
# Collect results
prompts = {}
for future in as_completed(futures):
name = futures[future]
try:
prompt = future.result()
prompts[name] = prompt
print(f"Fetched: {prompt.title}")
except Exception as e:
print(f"Failed to fetch {name}: {e}")
print(f"\nFetched {len(prompts)} prompts")
Async/Await Pattern
For async applications, wrap SDK calls:Copy
import asyncio
from concurrent.futures import ThreadPoolExecutor
from baytos.claro import BaytClient
client = BaytClient(api_key="...")
executor = ThreadPoolExecutor(max_workers=10)
async def get_prompt_async(package_name: str):
"""Async wrapper for get_prompt"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
executor,
client.get_prompt,
package_name
)
async def fetch_multiple_prompts(package_names):
"""Fetch multiple prompts concurrently"""
tasks = [get_prompt_async(name) for name in package_names]
return await asyncio.gather(*tasks)
# Usage
async def main():
packages = [
"@workspace/p1:v1",
"@workspace/p2:v1",
"@workspace/p3:v1"
]
prompts = await fetch_multiple_prompts(packages)
for prompt in prompts:
print(f"Fetched: {prompt.title}")
asyncio.run(main())
Performance Optimization
Batch Operations
Optimize bulk operations:Copy
from baytos.claro import BaytClient
def fetch_prompts_efficiently(package_names, batch_size=10):
"""Fetch prompts in batches with progress tracking"""
client = BaytClient(api_key="...")
prompts = []
for i in range(0, len(package_names), batch_size):
batch = package_names[i:i + batch_size]
print(f"Fetching batch {i//batch_size + 1}...")
for name in batch:
try:
prompt = client.get_prompt(name)
prompts.append(prompt)
except Exception as e:
print(f"Failed to fetch {name}: {e}")
return prompts
# Usage
packages = ["@workspace/p1:v1", "@workspace/p2:v1", ...]
prompts = fetch_prompts_efficiently(packages)
Minimize Data Transfer
Only fetch what you need:Copy
# ✅ Good: Fetch specific prompt
prompt = client.get_prompt("@workspace/support:v1")
content = prompt.generator
# ❌ Wasteful: List all, then filter
result = client.list_prompts(limit=100)
prompt = [p for p in result['prompts'] if p.title == "Support"][0]
Lazy Loading
Load context files only when needed:Copy
from baytos.claro import BaytClient
client = BaytClient(api_key="...")
prompt = client.get_prompt("@workspace/research:v1")
# ✅ Good: Check before downloading
if prompt.has_context():
files = prompt.get_file_contexts()
# Only download if needed
if any(f.mime_type == 'application/pdf' for f in files):
for file in files:
if file.mime_type == 'application/pdf':
content = client.download_context_file(file.id)
# Process PDF...
Production Best Practices
Environment-Based Configuration
Configure differently per environment:Copy
import os
from baytos.claro import BaytClient
def create_client():
"""Create client with environment-specific config"""
env = os.getenv("ENVIRONMENT", "production")
if env == "development":
return BaytClient(
api_key=os.getenv("BAYT_API_KEY"),
base_url="http://localhost:8000",
max_retries=0 # Fast-fail in dev
)
elif env == "staging":
return BaytClient(
api_key=os.getenv("BAYT_API_KEY"),
base_url="https://staging-api.baytos.ai",
max_retries=2
)
else: # production
return BaytClient(
api_key=os.getenv("BAYT_API_KEY"),
max_retries=3
)
client = create_client()
Health Checks
Implement health checks for monitoring:Copy
from baytos.claro import BaytClient, BaytAPIError
def health_check():
"""Check if API is accessible"""
try:
client = BaytClient(api_key="...")
# Try to list prompts (lightweight operation)
result = client.list_prompts(limit=1)
return {
'status': 'healthy',
'api_accessible': True
}
except BaytAPIError as e:
return {
'status': 'unhealthy',
'api_accessible': False,
'error': str(e)
}
# Usage in health endpoint
# GET /health
status = health_check()
Monitoring and Metrics
Track API usage and performance:Copy
import time
from baytos.claro import BaytClient, BaytAPIError
class MonitoredClient:
def __init__(self, api_key):
self.client = BaytClient(api_key=api_key)
self.stats = {
'requests': 0,
'errors': 0,
'total_time': 0
}
def get_prompt(self, package_name):
"""Get prompt with monitoring"""
start = time.time()
try:
self.stats['requests'] += 1
prompt = self.client.get_prompt(package_name)
return prompt
except BaytAPIError:
self.stats['errors'] += 1
raise
finally:
elapsed = time.time() - start
self.stats['total_time'] += elapsed
def get_stats(self):
"""Get performance statistics"""
avg_time = (
self.stats['total_time'] / self.stats['requests']
if self.stats['requests'] > 0
else 0
)
return {
'total_requests': self.stats['requests'],
'total_errors': self.stats['errors'],
'error_rate': (
self.stats['errors'] / self.stats['requests']
if self.stats['requests'] > 0
else 0
),
'avg_response_time': avg_time
}
# Usage
client = MonitoredClient(api_key="...")
prompt = client.get_prompt("@workspace/test:v1")
stats = client.get_stats()
print(f"Average response time: {stats['avg_response_time']:.2f}s")
Circuit Breaker Pattern
Prevent cascading failures:Copy
import time
from baytos.claro import BaytClient, BaytAPIError
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker"""
# Check if circuit is open
if self.state == 'open':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'half-open'
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
# Success - reset on half-open
if self.state == 'half-open':
self.failures = 0
self.state = 'closed'
return result
except BaytAPIError as e:
self.failures += 1
self.last_failure_time = time.time()
# Open circuit if threshold reached
if self.failures >= self.failure_threshold:
self.state = 'open'
raise
# Usage
client = BaytClient(api_key="...")
breaker = CircuitBreaker(failure_threshold=5, timeout=60)
def fetch_with_breaker(package_name):
return breaker.call(client.get_prompt, package_name)
try:
prompt = fetch_with_breaker("@workspace/test:v1")
except Exception as e:
print(f"Circuit breaker prevented call: {e}")
Best Practices Summary
Use Connection Pooling
Use Connection Pooling
Reuse client instances to benefit from connection pooling:
Copy
# ✅ Good: Singleton pattern
_client = None
def get_client():
global _client
if _client is None:
_client = BaytClient(api_key="...")
return _client
Implement Caching
Implement Caching
Cache prompts to reduce API calls:
Copy
# Use version pinning with caching
@lru_cache(maxsize=100)
def get_cached_prompt(package_name: str):
return client.get_prompt(package_name)
# Always use specific versions when caching
prompt = get_cached_prompt("@workspace/support:v1") # ✅
prompt = get_cached_prompt("@workspace/support:latest") # ❌
Handle Rate Limits Gracefully
Handle Rate Limits Gracefully
Respect rate limits and implement backoff:
Copy
# Configure appropriate retries
client = BaytClient(
api_key="...",
max_retries=3 # Let SDK handle retries
)
# Add application-level rate limiting
import time
time.sleep(0.1) # 100ms between requests
Monitor Performance
Monitor Performance
Track metrics in production:
Copy
# Log response times
# Track error rates
# Monitor cache hit rates
# Set up alerts for anomalies