Performance Optimization
Optimize your Claro integration to minimize latency, reduce costs, and improve user experience. This guide covers caching, connection pooling, batch operations, and monitoring.
Caching Strategies
Why Cache Prompts?
Prompts don’t change frequently, making them ideal for caching:
Reduced latency - Serve from cache instead of API
Lower costs - Fewer API calls
Better reliability - Work offline with cached prompts
Improved UX - Faster response times
Cache prompts aggressively, but ensure you have a strategy to invalidate the cache when prompts are updated.
In-Memory Caching
Simple caching for single-server applications:
import time
from functools import lru_cache
from baytos.claro import BaytClient
class CachedClaroClient :
"""Client with built-in caching"""
def __init__ ( self , api_key : str , cache_ttl : int = 300 ):
self .client = BaytClient( api_key = api_key)
self .cache_ttl = cache_ttl # 5 minutes default
@lru_cache ( maxsize = 100 )
def _get_prompt_cached ( self , package_name : str , cache_key : int ):
"""Internal cached method with time-based key"""
return self .client.get_prompt(package_name)
def get_prompt ( self , package_name : str ):
"""Get prompt with automatic cache invalidation"""
# Use time window as cache key for auto-invalidation
cache_key = int (time.time() / self .cache_ttl)
return self ._get_prompt_cached(package_name, cache_key)
# Usage
client = CachedClaroClient(
api_key = "your_api_key" ,
cache_ttl = 300 # Cache for 5 minutes
)
# First call - hits API
prompt = client.get_prompt( "@workspace/support:v1" )
# Subsequent calls within 5 minutes - from cache
prompt = client.get_prompt( "@workspace/support:v1" )
Redis Caching
For distributed systems, use Redis:
import json
import redis
from baytos.claro import BaytClient
class RedisCachedClient :
"""Claro client with Redis caching"""
def __init__ ( self , api_key : str , redis_url : str = "redis://localhost:6379" , ttl : int = 300 ):
self .client = BaytClient( api_key = api_key)
self .redis = redis.from_url(redis_url)
self .ttl = ttl
def get_prompt ( self , package_name : str ):
"""Get prompt with Redis caching"""
cache_key = f "claro:prompt: { package_name } "
# Check cache
cached = self .redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from API
prompt = self .client.get_prompt(package_name)
# Cache the result
self .redis.setex(
cache_key,
self .ttl,
json.dumps(prompt.to_dict())
)
return prompt
def invalidate ( self , package_name : str ):
"""Manually invalidate cache for a prompt"""
cache_key = f "claro:prompt: { package_name } "
self .redis.delete(cache_key)
def invalidate_all ( self ):
"""Clear all cached prompts"""
for key in self .redis.scan_iter( "claro:prompt:*" ):
self .redis.delete(key)
# Usage
client = RedisCachedClient(
api_key = "your_api_key" ,
redis_url = "redis://localhost:6379" ,
ttl = 600 # Cache for 10 minutes
)
prompt = client.get_prompt( "@workspace/support:v1" )
# When you publish a new version, invalidate cache
client.invalidate( "@workspace/support:v1" )
Cache Invalidation Strategies
Time-Based (TTL)
Version-Based
Webhook-Based
Manual
Automatically expire after a time period Pros:
Simple to implement
No manual invalidation needed
Works well for stable prompts
Cons:
May serve stale data
Can’t force updates immediately
# Cache for 10 minutes
cache.setex(key, 600 , value)
Include version in cache key Pros:
New versions automatically bypass cache
No stale data issues
Explicit cache control
Cons:
Requires version pinning
Cache not used for :latest
def get_prompt ( self , package_name : str ):
# Version is part of cache key
cache_key = f "claro:prompt: { package_name } "
# @workspace/support:v1 and :v2 have different keys
Invalidate when Claro notifies of changes Pros:
Immediate updates
No stale data
Minimal cache misses
Cons:
Requires webhook setup
More complex implementation
from flask import Flask, request
app = Flask( __name__ )
@app.route ( '/webhook/claro' , methods = [ 'POST' ])
def handle_webhook ():
"""Invalidate cache when prompt is updated"""
event = request.json
if event[ 'type' ] == 'prompt.published' :
package_name = event[ 'data' ][ 'package_name' ]
cache.invalidate(package_name)
return { 'status' : 'ok' }
Invalidate explicitly when needed Pros:
Full control
Can trigger on specific events
Cons:
Requires manual intervention
Risk of forgetting to invalidate
# After publishing new version
client.invalidate( "@workspace/support:v1" )
# Or clear all
client.invalidate_all()
Connection Pooling
HTTP Connection Reuse
The Claro SDK uses connection pooling by default, but you can optimize it:
from baytos.claro import BaytClient
# Configure connection pooling
client = BaytClient(
api_key = "your_api_key" ,
max_retries = 3 ,
timeout = 30.0 ,
# Connection pool settings (if SDK supports)
pool_connections = 10 , # Number of connection pools to cache
pool_maxsize = 100 # Maximum number of connections to save
)
Singleton Pattern
Reuse a single client instance across your application:
# config.py
from baytos.claro import BaytClient
import os
_client_instance = None
def get_claro_client () -> BaytClient:
"""Get singleton Claro client"""
global _client_instance
if _client_instance is None :
_client_instance = BaytClient(
api_key = os.getenv( "BAYT_API_KEY" ),
max_retries = 3
)
return _client_instance
# Usage across your app
from config import get_claro_client
client = get_claro_client()
prompt = client.get_prompt( "@workspace/support:v1" )
FastAPI Dependency Injection
For web applications, use dependency injection:
from fastapi import FastAPI, Depends
from baytos.claro import BaytClient
import os
app = FastAPI()
# Create client at startup
claro_client = BaytClient( api_key = os.getenv( "BAYT_API_KEY" ))
def get_client () -> BaytClient:
"""Dependency that returns the shared client"""
return claro_client
@app.get ( "/ask" )
async def ask_question (
query : str ,
client : BaytClient = Depends(get_client)
):
"""Endpoint that reuses the same client"""
prompt = client.get_prompt( "@workspace/support:v1" )
# Use prompt with LLM
return { "response" : "..." }
Batch Operations
Fetching Multiple Prompts
Load multiple prompts in parallel:
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient
def fetch_prompts_batch ( package_names : list[ str ], max_workers : int = 5 ):
"""Fetch multiple prompts concurrently"""
client = BaytClient( api_key = "your_api_key" )
results = {}
def fetch_one ( package_name ):
try :
prompt = client.get_prompt(package_name)
return (package_name, prompt, None )
except Exception as e:
return (package_name, None , str (e))
with ThreadPoolExecutor( max_workers = max_workers) as executor:
futures = {
executor.submit(fetch_one, pkg): pkg
for pkg in package_names
}
for future in as_completed(futures):
package_name, prompt, error = future.result()
if error:
print ( f "Failed to load { package_name } : { error } " )
else :
results[package_name] = prompt
return results
# Usage
prompts = fetch_prompts_batch([
"@workspace/support:v1" ,
"@workspace/sales:v1" ,
"@workspace/technical:v1"
], max_workers = 3 )
Be mindful of rate limits when making concurrent requests. Start with a conservative max_workers value (3-5) and monitor for rate limit errors.
When listing prompts, use efficient pagination:
from baytos.claro import BaytClient
def fetch_all_prompts ( client : BaytClient, limit : int = 50 ):
"""Efficiently fetch all prompts with pagination"""
all_prompts = []
cursor = None
while True :
# Fetch page
result = client.list_prompts( limit = limit, cursor = cursor)
all_prompts.extend(result[ 'prompts' ])
# Check if more pages exist
if not result.get( 'hasMore' ):
break
cursor = result.get( 'cursor' )
return all_prompts
# Usage
client = BaytClient( api_key = "your_api_key" )
all_prompts = fetch_all_prompts(client, limit = 100 ) # Use max limit for fewer requests
Monitoring and Metrics
Request Timing
Track API performance:
import time
import logging
from baytos.claro import BaytClient
logger = logging.getLogger( __name__ )
class MonitoredClient :
"""Client with performance monitoring"""
def __init__ ( self , api_key : str ):
self .client = BaytClient( api_key = api_key)
def get_prompt ( self , package_name : str ):
"""Get prompt with timing"""
start = time.time()
try :
prompt = self .client.get_prompt(package_name)
elapsed = (time.time() - start) * 1000 # Convert to ms
logger.info( f "Fetched { package_name } in { elapsed :.2f} ms" )
# Alert on slow requests
if elapsed > 1000 : # More than 1 second
logger.warning( f "Slow API call: { package_name } took { elapsed :.2f} ms" )
return prompt
except Exception as e:
elapsed = (time.time() - start) * 1000
logger.error( f "Failed to fetch { package_name } after { elapsed :.2f} ms: { e } " )
raise
# Usage
client = MonitoredClient( api_key = "your_api_key" )
prompt = client.get_prompt( "@workspace/support:v1" )
Track key performance indicators:
from dataclasses import dataclass
from collections import defaultdict
import time
@dataclass
class PerformanceMetrics :
"""Track API performance metrics"""
total_requests: int = 0
total_errors: int = 0
total_time: float = 0.0
cache_hits: int = 0
cache_misses: int = 0
class MetricsClient :
"""Client with comprehensive metrics"""
def __init__ ( self , api_key : str ):
self .client = BaytClient( api_key = api_key)
self .metrics = PerformanceMetrics()
self .by_prompt = defaultdict( lambda : PerformanceMetrics())
def get_prompt ( self , package_name : str , use_cache : bool = True ):
"""Get prompt with metrics tracking"""
start = time.time()
self .metrics.total_requests += 1
self .by_prompt[package_name].total_requests += 1
try :
# Check cache first (if you have caching)
if use_cache:
cached = self ._get_from_cache(package_name)
if cached:
self .metrics.cache_hits += 1
return cached
self .metrics.cache_misses += 1
# Fetch from API
prompt = self .client.get_prompt(package_name)
elapsed = time.time() - start
self .metrics.total_time += elapsed
self .by_prompt[package_name].total_time += elapsed
return prompt
except Exception as e:
self .metrics.total_errors += 1
self .by_prompt[package_name].total_errors += 1
raise
def get_stats ( self ):
"""Get performance statistics"""
avg_time = (
self .metrics.total_time / self .metrics.total_requests
if self .metrics.total_requests > 0
else 0
)
cache_rate = (
self .metrics.cache_hits / self .metrics.total_requests
if self .metrics.total_requests > 0
else 0
)
return {
'total_requests' : self .metrics.total_requests,
'total_errors' : self .metrics.total_errors,
'error_rate' : self .metrics.total_errors / max ( self .metrics.total_requests, 1 ),
'avg_time_ms' : avg_time * 1000 ,
'cache_hit_rate' : cache_rate,
'cache_hits' : self .metrics.cache_hits,
'cache_misses' : self .metrics.cache_misses
}
def print_stats ( self ):
"""Print performance summary"""
stats = self .get_stats()
print ( " \n Performance Metrics:" )
print ( f " Total Requests: { stats[ 'total_requests' ] } " )
print ( f " Total Errors: { stats[ 'total_errors' ] } " )
print ( f " Error Rate: { stats[ 'error_rate' ] :.2%} " )
print ( f " Avg Response Time: { stats[ 'avg_time_ms' ] :.2f} ms" )
print ( f " Cache Hit Rate: { stats[ 'cache_hit_rate' ] :.2%} " )
# Usage
client = MetricsClient( api_key = "your_api_key" )
# Make requests
for _ in range ( 100 ):
prompt = client.get_prompt( "@workspace/support:v1" )
# Print stats
client.print_stats()
Prometheus
DataDog
CloudWatch
Export metrics to Prometheus from prometheus_client import Counter, Histogram, start_http_server
import time
# Define metrics
api_requests = Counter(
'claro_api_requests_total' ,
'Total Claro API requests' ,
[ 'package_name' , 'status' ]
)
api_duration = Histogram(
'claro_api_duration_seconds' ,
'Claro API request duration' ,
[ 'package_name' ]
)
class PrometheusClient :
def __init__ ( self , api_key : str ):
self .client = BaytClient( api_key = api_key)
def get_prompt ( self , package_name : str ):
start = time.time()
try :
prompt = self .client.get_prompt(package_name)
# Record success
api_requests.labels(
package_name = package_name,
status = 'success'
).inc()
return prompt
except Exception as e:
# Record failure
api_requests.labels(
package_name = package_name,
status = 'error'
).inc()
raise
finally :
# Record duration
duration = time.time() - start
api_duration.labels( package_name = package_name).observe(duration)
# Start metrics server
start_http_server( 8000 ) # Metrics at http://localhost:8000/metrics
Send metrics to DataDog from datadog import initialize, statsd
import time
# Initialize DataDog
initialize(
api_key = 'your_datadog_api_key' ,
app_key = 'your_datadog_app_key'
)
class DataDogClient :
def __init__ ( self , api_key : str ):
self .client = BaytClient( api_key = api_key)
def get_prompt ( self , package_name : str ):
start = time.time()
try :
prompt = self .client.get_prompt(package_name)
# Record success
statsd.increment(
'claro.api.requests' ,
tags = [ f 'package: { package_name } ' , 'status:success' ]
)
return prompt
except Exception as e:
# Record error
statsd.increment(
'claro.api.requests' ,
tags = [ f 'package: { package_name } ' , 'status:error' ]
)
raise
finally :
# Record timing
duration = (time.time() - start) * 1000 # ms
statsd.histogram(
'claro.api.duration' ,
duration,
tags = [ f 'package: { package_name } ' ]
)
Send metrics to AWS CloudWatch import boto3
import time
from datetime import datetime
cloudwatch = boto3.client( 'cloudwatch' )
class CloudWatchClient :
def __init__ ( self , api_key : str ):
self .client = BaytClient( api_key = api_key)
def get_prompt ( self , package_name : str ):
start = time.time()
try :
prompt = self .client.get_prompt(package_name)
# Record success metric
self ._put_metric(
'ClaroAPIRequests' ,
1 ,
'Count' ,
dimensions = [
{ 'Name' : 'Package' , 'Value' : package_name},
{ 'Name' : 'Status' , 'Value' : 'Success' }
]
)
return prompt
except Exception as e:
# Record error metric
self ._put_metric(
'ClaroAPIRequests' ,
1 ,
'Count' ,
dimensions = [
{ 'Name' : 'Package' , 'Value' : package_name},
{ 'Name' : 'Status' , 'Value' : 'Error' }
]
)
raise
finally :
# Record duration
duration = (time.time() - start) * 1000
self ._put_metric(
'ClaroAPIDuration' ,
duration,
'Milliseconds' ,
dimensions = [{ 'Name' : 'Package' , 'Value' : package_name}]
)
def _put_metric ( self , name , value , unit , dimensions ):
cloudwatch.put_metric_data(
Namespace = 'Claro' ,
MetricData = [{
'MetricName' : name,
'Value' : value,
'Unit' : unit,
'Timestamp' : datetime.utcnow(),
'Dimensions' : dimensions
}]
)
Troubleshooting Slow Requests
Problem: First request is slowCause: Initial connection setup, DNS resolutionSolution: # Warm up the client at application startup
def warm_up_client ( client : BaytClient):
"""Pre-connect and cache first request"""
try :
# Make a lightweight request to warm up connection
client.list_prompts( limit = 1 )
except :
pass # Ignore errors during warmup
# At application startup
client = BaytClient( api_key = "your_api_key" )
warm_up_client(client)
Problem: Slow response times from APICause: Geographic distance from API serversSolution:
Use caching aggressively (TTL of 5-10 minutes)
Fetch prompts at application startup
Consider edge caching with CloudFlare or CDN
Problem: Prompts with large context files are slowCause: Transferring large files over networkSolution: # Only download files when needed
prompt = client.get_prompt( "@workspace/support:v1" )
if prompt.has_context():
# Check file sizes first
contexts = prompt.get_file_contexts()
for context in contexts:
if context.size < 1_000_000 : # Only download < 1MB
content = client.download_file(context.url)
Too Many Sequential Requests
Problem: Loading many prompts takes too longCause: Sequential API calls add upSolution:
Use concurrent fetching (see Batch Operations section above)
Problem: Requests throttled due to rate limitsCause: Exceeding API rate limitsSolution: import time
from baytos.claro import BaytClient, BaytRateLimitError
def get_prompt_with_backoff ( client , package_name , max_retries = 3 ):
"""Retry with exponential backoff on rate limits"""
for attempt in range (max_retries):
try :
return client.get_prompt(package_name)
except BaytRateLimitError as e:
if attempt == max_retries - 1 :
raise
wait_time = 2 ** attempt # Exponential backoff
print ( f "Rate limited, waiting { wait_time } s..." )
time.sleep(wait_time)
Best Practices
Prompts are relatively static - cache them:
Use 5-10 minute TTL for frequently accessed prompts
Use version-based cache keys to avoid stale data
Implement cache warming for critical prompts
Reduce API calls by batching:
Fetch multiple prompts concurrently
Use pagination with maximum page size
Pre-load prompts at startup for critical paths
Optimize for Common Cases
Optimize hot paths:
Cache most frequently used prompts
Pre-fetch prompts for common workflows
Use CDN for static prompt content
Handle Failures Gracefully
Plan for API unavailability:
Implement fallback prompts
Cache prompts locally as backup
Return graceful errors to users
Retry with exponential backoff
Before deploying to production:
Next Steps