Prerequisites
Copy
pip install baytos-claro
export BAYT_API_KEY="your_api_key_here"
Example 1: Response Caching
Cache prompts locally to reduce API calls:Copy
import os
import time
from functools import lru_cache
from baytos.claro import BaytClient
class CachedClient:
"""Client with built-in caching"""
def __init__(self, api_key, cache_ttl=300):
self.client = BaytClient(api_key=api_key)
self.cache_ttl = cache_ttl # 5 minutes default
@lru_cache(maxsize=100)
def get_prompt_cached(self, package_name, cache_key):
"""Get prompt with caching based on time window"""
return self.client.get_prompt(package_name)
def get_prompt(self, package_name):
"""Get prompt with automatic cache invalidation"""
# Use time window as cache key to auto-invalidate
cache_key = int(time.time() / self.cache_ttl)
return self.get_prompt_cached(package_name, cache_key)
# Usage
cached_client = CachedClient(
api_key=os.getenv("BAYT_API_KEY"),
cache_ttl=300 # 5 minutes
)
# First call - hits API
prompt = cached_client.get_prompt("@workspace/my-prompt:v1")
print("First call (from API)")
# Second call - from cache
prompt = cached_client.get_prompt("@workspace/my-prompt:v1")
print("Second call (from cache)")
Example 2: Redis Caching
Use Redis for distributed caching:Copy
import os
import json
import redis
from baytos.claro import BaytClient
class RedisPromptCache:
"""Prompt cache using Redis"""
def __init__(self, api_key, redis_url='redis://localhost:6379', ttl=300):
self.client = BaytClient(api_key=api_key)
self.redis = redis.from_url(redis_url)
self.ttl = ttl
def get_prompt(self, package_name):
"""Get prompt with Redis caching"""
cache_key = f"prompt:{package_name}"
# Check cache
cached = self.redis.get(cache_key)
if cached:
print(f"Cache hit: {package_name}")
return json.loads(cached)
# Fetch from API
print(f"Cache miss: {package_name}")
prompt = self.client.get_prompt(package_name)
# Store in cache
self.redis.setex(
cache_key,
self.ttl,
json.dumps(prompt.to_dict())
)
return prompt
# Usage
cache = RedisPromptCache(
api_key=os.getenv("BAYT_API_KEY"),
ttl=300
)
prompt = cache.get_prompt("@workspace/my-prompt:v1")
print(f"Loaded: {prompt.title}")
Example 3: Concurrent Requests
Fetch multiple prompts concurrently:Copy
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient
def fetch_prompt(client, package_name):
"""Fetch a single prompt"""
try:
prompt = client.get_prompt(package_name)
return {'success': True, 'prompt': prompt}
except Exception as e:
return {'success': False, 'package': package_name, 'error': str(e)}
def fetch_prompts_concurrent(package_names, max_workers=5):
"""Fetch multiple prompts concurrently"""
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
futures = {
executor.submit(fetch_prompt, client, pkg): pkg
for pkg in package_names
}
# Collect results as they complete
for future in as_completed(futures):
result = future.result()
results.append(result)
if result['success']:
print(f"✓ Loaded: {result['prompt'].title}")
else:
print(f"✗ Failed: {result['package']}")
return results
# Usage
packages = [
"@workspace/prompt1:v1",
"@workspace/prompt2:v1",
"@workspace/prompt3:v1"
]
results = fetch_prompts_concurrent(packages, max_workers=3)
successful = [r for r in results if r['success']]
print(f"\nLoaded {len(successful)}/{len(packages)} prompts")
Be mindful of rate limits when making concurrent requests. Start with a conservative
max_workers value (3-5) and monitor rate limit headers.Example 4: Async with asyncio
Asynchronous pattern for high-performance applications:Copy
import os
import asyncio
import aiohttp
from baytos.claro import BaytClient
async def fetch_prompt_async(session, package_name, api_key):
"""Async fetch a prompt"""
url = f"https://api.baytos.ai/v1/prompts/{package_name}"
headers = {"Authorization": f"Bearer {api_key}"}
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return {'success': True, 'data': data}
else:
error = await response.text()
return {'success': False, 'error': error}
async def fetch_all_prompts(package_names):
"""Fetch all prompts asynchronously"""
api_key = os.getenv("BAYT_API_KEY")
async with aiohttp.ClientSession() as session:
tasks = [
fetch_prompt_async(session, pkg, api_key)
for pkg in package_names
]
results = await asyncio.gather(*tasks)
return results
# Usage
packages = [
"@workspace/prompt1:v1",
"@workspace/prompt2:v1",
"@workspace/prompt3:v1"
]
results = asyncio.run(fetch_all_prompts(packages))
for i, result in enumerate(results, 1):
if result['success']:
print(f"✓ Prompt {i} loaded")
else:
print(f"✗ Prompt {i} failed: {result['error']}")
Example 5: Batch Processing with Queue
Process prompts from a queue:Copy
import os
import queue
import threading
from baytos.claro import BaytClient, BaytAPIError
def worker(task_queue, result_queue, client):
"""Worker thread to process prompts"""
while True:
try:
package_name = task_queue.get(timeout=1)
if package_name is None: # Poison pill
break
# Fetch prompt
try:
prompt = client.get_prompt(package_name)
result_queue.put({
'success': True,
'package': package_name,
'prompt': prompt
})
except BaytAPIError as e:
result_queue.put({
'success': False,
'package': package_name,
'error': str(e)
})
except queue.Empty:
continue
finally:
task_queue.task_done()
def process_prompts_batch(package_names, num_workers=3):
"""Process prompts using worker queue"""
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
task_queue = queue.Queue()
result_queue = queue.Queue()
# Start workers
workers = []
for _ in range(num_workers):
t = threading.Thread(
target=worker,
args=(task_queue, result_queue, client)
)
t.start()
workers.append(t)
# Add tasks
for package_name in package_names:
task_queue.put(package_name)
# Wait for completion
task_queue.join()
# Stop workers
for _ in range(num_workers):
task_queue.put(None)
for t in workers:
t.join()
# Collect results
results = []
while not result_queue.empty():
results.append(result_queue.get())
return results
# Usage
packages = ["@workspace/prompt1:v1", "@workspace/prompt2:v1"]
results = process_prompts_batch(packages, num_workers=2)
for result in results:
if result['success']:
print(f"✓ {result['package']}: {result['prompt'].title}")
else:
print(f"✗ {result['package']}: {result['error']}")
Example 6: Request Throttling
Throttle requests to avoid rate limits:Copy
import os
import time
from baytos.claro import BaytClient
class ThrottledClient:
"""Client with request throttling"""
def __init__(self, api_key, requests_per_second=10):
self.client = BaytClient(api_key=api_key)
self.min_interval = 1.0 / requests_per_second
self.last_request = 0
def get_prompt(self, package_name):
"""Get prompt with throttling"""
# Wait if needed
elapsed = time.time() - self.last_request
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
# Make request
self.last_request = time.time()
return self.client.get_prompt(package_name)
# Usage
throttled_client = ThrottledClient(
api_key=os.getenv("BAYT_API_KEY"),
requests_per_second=10 # Max 10 requests per second
)
# These will be automatically throttled
for i in range(20):
prompt = throttled_client.get_prompt("@workspace/my-prompt:v1")
print(f"Request {i+1} completed")
Example 7: Lazy Loading
Lazy load prompt content only when needed:Copy
import os
from baytos.claro import BaytClient
class LazyPrompt:
"""Lazy-loaded prompt wrapper"""
def __init__(self, package_name, client):
self.package_name = package_name
self.client = client
self._prompt = None
@property
def prompt(self):
"""Load prompt on first access"""
if self._prompt is None:
print(f"Loading {self.package_name}...")
self._prompt = self.client.get_prompt(self.package_name)
return self._prompt
@property
def title(self):
return self.prompt.title
@property
def generator(self):
return self.prompt.generator
# Usage
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
# Create lazy prompts (no API calls yet)
prompt1 = LazyPrompt("@workspace/prompt1:v1", client)
prompt2 = LazyPrompt("@workspace/prompt2:v1", client)
# Only loads when accessed
print(prompt1.title) # API call happens here
print(prompt1.generator) # Uses cached data
Example 8: Export and Analysis
Export prompts for analysis:Copy
import os
import json
import csv
from pathlib import Path
from baytos.claro import BaytClient
def export_prompts_to_json(output_file="prompts.json"):
"""Export all prompts to JSON"""
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
all_prompts = []
cursor = None
while True:
result = client.list_prompts(limit=50, cursor=cursor)
for prompt in result['prompts']:
all_prompts.append({
'package': prompt.package_name,
'title': prompt.title,
'description': prompt.description,
'category': prompt.category,
'version': prompt.version,
'content_length': len(prompt.generator),
'has_system': prompt.has_system_prompt(),
'has_critique': prompt.has_critique_prompt(),
'has_context': prompt.has_context(),
'num_files': len(prompt.get_file_contexts()),
'num_urls': len(prompt.get_url_contexts())
})
if not result['hasMore']:
break
cursor = result['cursor']
# Save to JSON
with open(output_file, 'w') as f:
json.dump(all_prompts, f, indent=2)
print(f"Exported {len(all_prompts)} prompts to {output_file}")
return all_prompts
def export_prompts_to_csv(output_file="prompts.csv"):
"""Export prompts to CSV"""
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'Package', 'Title', 'Category', 'Version',
'Content Length', 'Has Context', 'Files', 'URLs'
])
# Data
cursor = None
count = 0
while True:
result = client.list_prompts(limit=50, cursor=cursor)
for prompt in result['prompts']:
writer.writerow([
prompt.package_name,
prompt.title,
prompt.category,
prompt.version,
len(prompt.generator),
prompt.has_context(),
len(prompt.get_file_contexts()),
len(prompt.get_url_contexts())
])
count += 1
if not result['hasMore']:
break
cursor = result['cursor']
print(f"Exported {count} prompts to {output_file}")
# Usage
export_prompts_to_json()
export_prompts_to_csv()
Example 9: Version Comparison
Compare different versions of a prompt:Copy
import os
from difflib import unified_diff
from baytos.claro import BaytClient
def compare_prompt_versions(package_base, version1, version2):
"""Compare two versions of a prompt"""
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
# Fetch both versions
prompt1 = client.get_prompt(f"{package_base}:{version1}")
prompt2 = client.get_prompt(f"{package_base}:{version2}")
print(f"Comparing {version1} vs {version2}")
print("=" * 60)
# Compare content
diff = unified_diff(
prompt1.generator.splitlines(),
prompt2.generator.splitlines(),
fromfile=f'{package_base}:{version1}',
tofile=f'{package_base}:{version2}',
lineterm=''
)
for line in diff:
print(line)
# Usage
compare_prompt_versions("@workspace/my-prompt", "v1", "v2")
Example 10: Monitoring and Metrics
Track SDK usage metrics:Copy
import os
import time
from collections import defaultdict
from baytos.claro import BaytClient, BaytAPIError
class MetricsClient:
"""Client with built-in metrics"""
def __init__(self, api_key):
self.client = BaytClient(api_key=api_key)
self.metrics = {
'requests': 0,
'errors': 0,
'cache_hits': 0,
'total_time': 0,
'by_prompt': defaultdict(int)
}
def get_prompt(self, package_name):
"""Get prompt with metrics tracking"""
start = time.time()
self.metrics['requests'] += 1
self.metrics['by_prompt'][package_name] += 1
try:
prompt = self.client.get_prompt(package_name)
return prompt
except BaytAPIError as e:
self.metrics['errors'] += 1
raise
finally:
elapsed = time.time() - start
self.metrics['total_time'] += elapsed
def print_metrics(self):
"""Print collected metrics"""
print("\nMetrics Summary")
print("=" * 60)
print(f"Total requests: {self.metrics['requests']}")
print(f"Errors: {self.metrics['errors']}")
print(f"Average time: {self.metrics['total_time'] / max(self.metrics['requests'], 1):.2f}s")
print("\nRequests by prompt:")
for package, count in self.metrics['by_prompt'].items():
print(f" {package}: {count}")
# Usage
metrics_client = MetricsClient(api_key=os.getenv("BAYT_API_KEY"))
# Make some requests
for _ in range(5):
metrics_client.get_prompt("@workspace/my-prompt:v1")
metrics_client.print_metrics()
Example 11: Circuit Breaker Pattern
Implement circuit breaker for resilience:Copy
import os
import time
from enum import Enum
from baytos.claro import BaytClient, BaytAPIError
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for API calls"""
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except BaytAPIError as e:
self.on_failure()
raise
def on_success(self):
"""Reset on success"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
"""Handle failure"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker opened after {self.failure_count} failures")
# Usage
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
breaker = CircuitBreaker(failure_threshold=3, timeout=60)
try:
prompt = breaker.call(client.get_prompt, "@workspace/my-prompt:v1")
print(f"Loaded: {prompt.title}")
except Exception as e:
print(f"Circuit breaker prevented call: {e}")
Example 12: Production Configuration
Complete production-ready configuration:Copy
import os
import logging
from baytos.claro import BaytClient
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('claro.log'),
logging.StreamHandler()
]
)
class ProductionClient:
"""Production-ready Claro client"""
def __init__(self):
# Load configuration from environment
self.api_key = os.getenv("BAYT_API_KEY")
if not self.api_key:
raise ValueError("BAYT_API_KEY not set")
# Initialize client with retries
self.client = BaytClient(
api_key=self.api_key,
max_retries=3
)
# Setup logging
self.logger = logging.getLogger(__name__)
def get_prompt(self, package_name, use_cache=True):
"""Get prompt with production features"""
self.logger.info(f"Fetching prompt: {package_name}")
try:
prompt = self.client.get_prompt(package_name)
self.logger.info(f"Successfully loaded: {prompt.title}")
return prompt
except Exception as e:
self.logger.error(f"Failed to load prompt: {e}", exc_info=True)
raise
# Usage
client = ProductionClient()
prompt = client.get_prompt("@workspace/my-prompt:v1")