Documentation Index Fetch the complete documentation index at: https://docs.baytos.ai/llms.txt
Use this file to discover all available pages before exploring further.
Advanced Features
Learn advanced techniques for optimizing performance, implementing caching, and building production-ready applications with the Claro Python SDK.
Rate Limiting and Backoff
Built-in Rate Limit Handling
The SDK automatically handles rate limits with exponential backoff:
from baytos.claro import BaytClient
# Default: 3 retries with exponential backoff
client = BaytClient( api_key = "your_api_key" , max_retries = 3 )
# Backoff strategy: 2^attempt seconds
# - 1st retry: 1 second
# - 2nd retry: 2 seconds
# - 3rd retry: 4 seconds
The SDK respects the Retry-After header from the API:
# If API returns Retry-After: 60
# SDK will wait 60 seconds before retrying
# Otherwise uses exponential backoff
Custom Retry Configuration
Adjust retry behavior based on your needs:
# Aggressive retries for batch jobs
client = BaytClient(
api_key = "..." ,
max_retries = 5 # Up to 5 retries
)
# Fast-fail for interactive apps
client = BaytClient(
api_key = "..." ,
max_retries = 1 # Single retry only
)
# No retries (handle manually)
client = BaytClient(
api_key = "..." ,
max_retries = 0
)
Connection Pooling
How It Works
The SDK uses requests.Session for automatic connection pooling:
from baytos.claro import BaytClient
client = BaytClient( api_key = "..." )
# First request - establishes connection
prompt1 = client.get_prompt( "@workspace/p1:v1" )
# Subsequent requests - reuse connection (faster)
prompt2 = client.get_prompt( "@workspace/p2:v1" )
prompt3 = client.get_prompt( "@workspace/p3:v1" )
Benefits
Faster requests - No TCP handshake overhead
Lower latency - Connection already established
Reduced server load - Fewer concurrent connections
Automatic keep-alive - Connection reused efficiently
Best Practice: Reuse Client
# ✅ Good: Single client instance
client = BaytClient( api_key = "..." )
for package in packages:
prompt = client.get_prompt(package)
# ❌ Bad: New client each time
for package in packages:
client = BaytClient( api_key = "..." ) # Creates new session!
prompt = client.get_prompt(package)
Caching Strategies
Simple In-Memory Cache
Cache prompts to reduce API calls:
from functools import lru_cache
from baytos.claro import BaytClient
client = BaytClient( api_key = "..." )
@lru_cache ( maxsize = 100 )
def get_cached_prompt ( package_name : str ):
"""Cache up to 100 prompts in memory"""
return client.get_prompt(package_name)
# First call - fetches from API
prompt = get_cached_prompt( "@workspace/support:v1" )
# Subsequent calls - returns cached version
prompt = get_cached_prompt( "@workspace/support:v1" ) # Instant!
Cached prompts become stale when updated. Use version pinning or implement cache invalidation.
Time-Based Cache
Implement TTL (time-to-live) caching:
import time
from typing import Dict, Optional, Tuple
from baytos.claro import BaytClient, Prompt
class PromptCache :
def __init__ ( self , client : BaytClient, ttl_seconds : int = 300 ):
self .client = client
self .ttl = ttl_seconds
self .cache: Dict[ str , Tuple[Prompt, float ]] = {}
def get ( self , package_name : str ) -> Prompt:
"""Get prompt with TTL-based caching"""
now = time.time()
# Check cache
if package_name in self .cache:
prompt, timestamp = self .cache[package_name]
# Return if not expired
if now - timestamp < self .ttl:
return prompt
# Fetch from API
prompt = self .client.get_prompt(package_name)
# Store in cache
self .cache[package_name] = (prompt, now)
return prompt
def invalidate ( self , package_name : str = None ):
"""Invalidate cache for one or all prompts"""
if package_name:
self .cache.pop(package_name, None )
else :
self .cache.clear()
# Usage
client = BaytClient( api_key = "..." )
cache = PromptCache(client, ttl_seconds = 300 ) # 5 minutes
prompt = cache.get( "@workspace/support:v1" ) # Fetches from API
prompt = cache.get( "@workspace/support:v1" ) # Returns cached (< 5 min)
# Invalidate when needed
cache.invalidate( "@workspace/support:v1" )
Redis-Based Caching
For distributed systems, use Redis:
import json
import redis
from baytos.claro import BaytClient, Prompt
class RedisPromptCache :
def __init__ ( self , client : BaytClient, redis_client , ttl = 300 ):
self .client = client
self .redis = redis_client
self .ttl = ttl
def get ( self , package_name : str ) -> Prompt:
"""Get prompt with Redis caching"""
cache_key = f "prompt: { package_name } "
# Check Redis cache
cached = self .redis.get(cache_key)
if cached:
data = json.loads(cached)
return Prompt(data)
# Fetch from API
prompt = self .client.get_prompt(package_name)
# Store in Redis
self .redis.setex(
cache_key,
self .ttl,
json.dumps(prompt.to_dict())
)
return prompt
# Usage
client = BaytClient( api_key = "..." )
redis_client = redis.Redis( host = 'localhost' , port = 6379 , db = 0 )
cache = RedisPromptCache(client, redis_client, ttl = 300 )
prompt = cache.get( "@workspace/support:v1" )
Concurrent Requests
Thread-Based Concurrency
Fetch multiple prompts in parallel:
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient
client = BaytClient( api_key = "..." )
package_names = [
"@workspace/support:v1" ,
"@workspace/sales:v1" ,
"@workspace/marketing:v1"
]
def fetch_prompt ( package_name ):
"""Fetch a single prompt"""
return client.get_prompt(package_name)
# Fetch concurrently
with ThreadPoolExecutor( max_workers = 5 ) as executor:
# Submit all tasks
futures = {
executor.submit(fetch_prompt, name): name
for name in package_names
}
# Collect results
prompts = {}
for future in as_completed(futures):
name = futures[future]
try :
prompt = future.result()
prompts[name] = prompt
print ( f "Fetched: { prompt.title } " )
except Exception as e:
print ( f "Failed to fetch { name } : { e } " )
print ( f " \n Fetched { len (prompts) } prompts" )
Async/Await Pattern
For async applications, wrap SDK calls:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from baytos.claro import BaytClient
client = BaytClient( api_key = "..." )
executor = ThreadPoolExecutor( max_workers = 10 )
async def get_prompt_async ( package_name : str ):
"""Async wrapper for get_prompt"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
executor,
client.get_prompt,
package_name
)
async def fetch_multiple_prompts ( package_names ):
"""Fetch multiple prompts concurrently"""
tasks = [get_prompt_async(name) for name in package_names]
return await asyncio.gather( * tasks)
# Usage
async def main ():
packages = [
"@workspace/p1:v1" ,
"@workspace/p2:v1" ,
"@workspace/p3:v1"
]
prompts = await fetch_multiple_prompts(packages)
for prompt in prompts:
print ( f "Fetched: { prompt.title } " )
asyncio.run(main())
Batch Operations
Optimize bulk operations:
from baytos.claro import BaytClient
def fetch_prompts_efficiently ( package_names , batch_size = 10 ):
"""Fetch prompts in batches with progress tracking"""
client = BaytClient( api_key = "..." )
prompts = []
for i in range ( 0 , len (package_names), batch_size):
batch = package_names[i:i + batch_size]
print ( f "Fetching batch { i // batch_size + 1 } ..." )
for name in batch:
try :
prompt = client.get_prompt(name)
prompts.append(prompt)
except Exception as e:
print ( f "Failed to fetch { name } : { e } " )
return prompts
# Usage
packages = [ "@workspace/p1:v1" , "@workspace/p2:v1" , ... ]
prompts = fetch_prompts_efficiently(packages)
Minimize Data Transfer
Only fetch what you need:
# ✅ Good: Fetch specific prompt
prompt = client.get_prompt( "@workspace/support:v1" )
content = prompt.generator
# ❌ Wasteful: List all, then filter
result = client.list_prompts( limit = 100 )
prompt = [p for p in result[ 'prompts' ] if p.title == "Support" ][ 0 ]
Lazy Loading
Load context files only when needed:
from baytos.claro import BaytClient
client = BaytClient( api_key = "..." )
prompt = client.get_prompt( "@workspace/research:v1" )
# ✅ Good: Check before downloading
if prompt.has_context():
files = prompt.get_file_contexts()
# Only download if needed
if any (f.mime_type == 'application/pdf' for f in files):
for file in files:
if file .mime_type == 'application/pdf' :
content = client.download_context_file( file .id)
# Process PDF...
Production Best Practices
Environment-Based Configuration
Configure differently per environment:
import os
from baytos.claro import BaytClient
def create_client ():
"""Create client with environment-specific config"""
env = os.getenv( "ENVIRONMENT" , "production" )
if env == "development" :
return BaytClient(
api_key = os.getenv( "BAYT_API_KEY" ),
base_url = "http://localhost:8000" ,
max_retries = 0 # Fast-fail in dev
)
elif env == "staging" :
return BaytClient(
api_key = os.getenv( "BAYT_API_KEY" ),
base_url = "https://staging-api.baytos.ai" ,
max_retries = 2
)
else : # production
return BaytClient(
api_key = os.getenv( "BAYT_API_KEY" ),
max_retries = 3
)
client = create_client()
Health Checks
Implement health checks for monitoring:
from baytos.claro import BaytClient, BaytAPIError
def health_check ():
"""Check if API is accessible"""
try :
client = BaytClient( api_key = "..." )
# Try to list prompts (lightweight operation)
result = client.list_prompts( limit = 1 )
return {
'status' : 'healthy' ,
'api_accessible' : True
}
except BaytAPIError as e:
return {
'status' : 'unhealthy' ,
'api_accessible' : False ,
'error' : str (e)
}
# Usage in health endpoint
# GET /health
status = health_check()
Monitoring and Metrics
Track API usage and performance:
import time
from baytos.claro import BaytClient, BaytAPIError
class MonitoredClient :
def __init__ ( self , api_key ):
self .client = BaytClient( api_key = api_key)
self .stats = {
'requests' : 0 ,
'errors' : 0 ,
'total_time' : 0
}
def get_prompt ( self , package_name ):
"""Get prompt with monitoring"""
start = time.time()
try :
self .stats[ 'requests' ] += 1
prompt = self .client.get_prompt(package_name)
return prompt
except BaytAPIError:
self .stats[ 'errors' ] += 1
raise
finally :
elapsed = time.time() - start
self .stats[ 'total_time' ] += elapsed
def get_stats ( self ):
"""Get performance statistics"""
avg_time = (
self .stats[ 'total_time' ] / self .stats[ 'requests' ]
if self .stats[ 'requests' ] > 0
else 0
)
return {
'total_requests' : self .stats[ 'requests' ],
'total_errors' : self .stats[ 'errors' ],
'error_rate' : (
self .stats[ 'errors' ] / self .stats[ 'requests' ]
if self .stats[ 'requests' ] > 0
else 0
),
'avg_response_time' : avg_time
}
# Usage
client = MonitoredClient( api_key = "..." )
prompt = client.get_prompt( "@workspace/test:v1" )
stats = client.get_stats()
print ( f "Average response time: { stats[ 'avg_response_time' ] :.2f} s" )
Circuit Breaker Pattern
Prevent cascading failures:
import time
from baytos.claro import BaytClient, BaytAPIError
class CircuitBreaker :
def __init__ ( self , failure_threshold = 5 , timeout = 60 ):
self .failure_threshold = failure_threshold
self .timeout = timeout
self .failures = 0
self .last_failure_time = None
self .state = 'closed' # closed, open, half-open
def call ( self , func , * args , ** kwargs ):
"""Execute function with circuit breaker"""
# Check if circuit is open
if self .state == 'open' :
if time.time() - self .last_failure_time > self .timeout:
self .state = 'half-open'
else :
raise Exception ( "Circuit breaker is open" )
try :
result = func( * args, ** kwargs)
# Success - reset on half-open
if self .state == 'half-open' :
self .failures = 0
self .state = 'closed'
return result
except BaytAPIError as e:
self .failures += 1
self .last_failure_time = time.time()
# Open circuit if threshold reached
if self .failures >= self .failure_threshold:
self .state = 'open'
raise
# Usage
client = BaytClient( api_key = "..." )
breaker = CircuitBreaker( failure_threshold = 5 , timeout = 60 )
def fetch_with_breaker ( package_name ):
return breaker.call(client.get_prompt, package_name)
try :
prompt = fetch_with_breaker( "@workspace/test:v1" )
except Exception as e:
print ( f "Circuit breaker prevented call: { e } " )
Best Practices Summary
Reuse client instances to benefit from connection pooling: # ✅ Good: Singleton pattern
_client = None
def get_client ():
global _client
if _client is None :
_client = BaytClient( api_key = "..." )
return _client
Cache prompts to reduce API calls: # Use version pinning with caching
@lru_cache ( maxsize = 100 )
def get_cached_prompt ( package_name : str ):
return client.get_prompt(package_name)
# Always use specific versions when caching
prompt = get_cached_prompt( "@workspace/support:v1" ) # ✅
prompt = get_cached_prompt( "@workspace/support:latest" ) # ❌
Handle Rate Limits Gracefully
Respect rate limits and implement backoff: # Configure appropriate retries
client = BaytClient(
api_key = "..." ,
max_retries = 3 # Let SDK handle retries
)
# Add application-level rate limiting
import time
time.sleep( 0.1 ) # 100ms between requests
Next Steps
Error Handling Implement robust error handling
Client Configuration Configure timeouts and retries
API Reference Complete API documentation
Quickstart Review basic usage patterns