Skip to main content
This guide demonstrates advanced patterns for using the Claro SDK in production environments, including caching, concurrent requests, and performance optimizations.

Prerequisites

pip install baytos-claro
export BAYT_API_KEY="your_api_key_here"

Example 1: Response Caching

Cache prompts locally to reduce API calls:
import os
import time
from functools import lru_cache
from baytos.claro import BaytClient

class CachedClient:
    """Client with built-in caching"""

    def __init__(self, api_key, cache_ttl=300):
        self.client = BaytClient(api_key=api_key)
        self.cache_ttl = cache_ttl  # 5 minutes default

    @lru_cache(maxsize=100)
    def get_prompt_cached(self, package_name, cache_key):
        """Get prompt with caching based on time window"""
        return self.client.get_prompt(package_name)

    def get_prompt(self, package_name):
        """Get prompt with automatic cache invalidation"""
        # Use time window as cache key to auto-invalidate
        cache_key = int(time.time() / self.cache_ttl)
        return self.get_prompt_cached(package_name, cache_key)

# Usage
cached_client = CachedClient(
    api_key=os.getenv("BAYT_API_KEY"),
    cache_ttl=300  # 5 minutes
)

# First call - hits API
prompt = cached_client.get_prompt("@workspace/my-prompt:v1")
print("First call (from API)")

# Second call - from cache
prompt = cached_client.get_prompt("@workspace/my-prompt:v1")
print("Second call (from cache)")

Example 2: Redis Caching

Use Redis for distributed caching:
import os
import json
import redis
from baytos.claro import BaytClient

class RedisPromptCache:
    """Prompt cache using Redis"""

    def __init__(self, api_key, redis_url='redis://localhost:6379', ttl=300):
        self.client = BaytClient(api_key=api_key)
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl

    def get_prompt(self, package_name):
        """Get prompt with Redis caching"""
        cache_key = f"prompt:{package_name}"

        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            print(f"Cache hit: {package_name}")
            return json.loads(cached)

        # Fetch from API
        print(f"Cache miss: {package_name}")
        prompt = self.client.get_prompt(package_name)

        # Store in cache
        self.redis.setex(
            cache_key,
            self.ttl,
            json.dumps(prompt.to_dict())
        )

        return prompt

# Usage
cache = RedisPromptCache(
    api_key=os.getenv("BAYT_API_KEY"),
    ttl=300
)

prompt = cache.get_prompt("@workspace/my-prompt:v1")
print(f"Loaded: {prompt.title}")

Example 3: Concurrent Requests

Fetch multiple prompts concurrently:
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient

def fetch_prompt(client, package_name):
    """Fetch a single prompt"""
    try:
        prompt = client.get_prompt(package_name)
        return {'success': True, 'prompt': prompt}
    except Exception as e:
        return {'success': False, 'package': package_name, 'error': str(e)}

def fetch_prompts_concurrent(package_names, max_workers=5):
    """Fetch multiple prompts concurrently"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        futures = {
            executor.submit(fetch_prompt, client, pkg): pkg
            for pkg in package_names
        }

        # Collect results as they complete
        for future in as_completed(futures):
            result = future.result()
            results.append(result)

            if result['success']:
                print(f"✓ Loaded: {result['prompt'].title}")
            else:
                print(f"✗ Failed: {result['package']}")

    return results

# Usage
packages = [
    "@workspace/prompt1:v1",
    "@workspace/prompt2:v1",
    "@workspace/prompt3:v1"
]

results = fetch_prompts_concurrent(packages, max_workers=3)

successful = [r for r in results if r['success']]
print(f"\nLoaded {len(successful)}/{len(packages)} prompts")
Be mindful of rate limits when making concurrent requests. Start with a conservative max_workers value (3-5) and monitor rate limit headers.

Example 4: Async with asyncio

Asynchronous pattern for high-performance applications:
import os
import asyncio
import aiohttp
from baytos.claro import BaytClient

async def fetch_prompt_async(session, package_name, api_key):
    """Async fetch a prompt"""
    url = f"https://api.baytos.ai/v1/prompts/{package_name}"
    headers = {"Authorization": f"Bearer {api_key}"}

    async with session.get(url, headers=headers) as response:
        if response.status == 200:
            data = await response.json()
            return {'success': True, 'data': data}
        else:
            error = await response.text()
            return {'success': False, 'error': error}

async def fetch_all_prompts(package_names):
    """Fetch all prompts asynchronously"""
    api_key = os.getenv("BAYT_API_KEY")

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_prompt_async(session, pkg, api_key)
            for pkg in package_names
        ]

        results = await asyncio.gather(*tasks)
        return results

# Usage
packages = [
    "@workspace/prompt1:v1",
    "@workspace/prompt2:v1",
    "@workspace/prompt3:v1"
]

results = asyncio.run(fetch_all_prompts(packages))

for i, result in enumerate(results, 1):
    if result['success']:
        print(f"✓ Prompt {i} loaded")
    else:
        print(f"✗ Prompt {i} failed: {result['error']}")

Example 5: Batch Processing with Queue

Process prompts from a queue:
import os
import queue
import threading
from baytos.claro import BaytClient, BaytAPIError

def worker(task_queue, result_queue, client):
    """Worker thread to process prompts"""
    while True:
        try:
            package_name = task_queue.get(timeout=1)

            if package_name is None:  # Poison pill
                break

            # Fetch prompt
            try:
                prompt = client.get_prompt(package_name)
                result_queue.put({
                    'success': True,
                    'package': package_name,
                    'prompt': prompt
                })
            except BaytAPIError as e:
                result_queue.put({
                    'success': False,
                    'package': package_name,
                    'error': str(e)
                })

        except queue.Empty:
            continue
        finally:
            task_queue.task_done()

def process_prompts_batch(package_names, num_workers=3):
    """Process prompts using worker queue"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    task_queue = queue.Queue()
    result_queue = queue.Queue()

    # Start workers
    workers = []
    for _ in range(num_workers):
        t = threading.Thread(
            target=worker,
            args=(task_queue, result_queue, client)
        )
        t.start()
        workers.append(t)

    # Add tasks
    for package_name in package_names:
        task_queue.put(package_name)

    # Wait for completion
    task_queue.join()

    # Stop workers
    for _ in range(num_workers):
        task_queue.put(None)

    for t in workers:
        t.join()

    # Collect results
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    return results

# Usage
packages = ["@workspace/prompt1:v1", "@workspace/prompt2:v1"]
results = process_prompts_batch(packages, num_workers=2)

for result in results:
    if result['success']:
        print(f"✓ {result['package']}: {result['prompt'].title}")
    else:
        print(f"✗ {result['package']}: {result['error']}")

Example 6: Request Throttling

Throttle requests to avoid rate limits:
import os
import time
from baytos.claro import BaytClient

class ThrottledClient:
    """Client with request throttling"""

    def __init__(self, api_key, requests_per_second=10):
        self.client = BaytClient(api_key=api_key)
        self.min_interval = 1.0 / requests_per_second
        self.last_request = 0

    def get_prompt(self, package_name):
        """Get prompt with throttling"""
        # Wait if needed
        elapsed = time.time() - self.last_request
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)

        # Make request
        self.last_request = time.time()
        return self.client.get_prompt(package_name)

# Usage
throttled_client = ThrottledClient(
    api_key=os.getenv("BAYT_API_KEY"),
    requests_per_second=10  # Max 10 requests per second
)

# These will be automatically throttled
for i in range(20):
    prompt = throttled_client.get_prompt("@workspace/my-prompt:v1")
    print(f"Request {i+1} completed")

Example 7: Lazy Loading

Lazy load prompt content only when needed:
import os
from baytos.claro import BaytClient

class LazyPrompt:
    """Lazy-loaded prompt wrapper"""

    def __init__(self, package_name, client):
        self.package_name = package_name
        self.client = client
        self._prompt = None

    @property
    def prompt(self):
        """Load prompt on first access"""
        if self._prompt is None:
            print(f"Loading {self.package_name}...")
            self._prompt = self.client.get_prompt(self.package_name)
        return self._prompt

    @property
    def title(self):
        return self.prompt.title

    @property
    def generator(self):
        return self.prompt.generator

# Usage
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

# Create lazy prompts (no API calls yet)
prompt1 = LazyPrompt("@workspace/prompt1:v1", client)
prompt2 = LazyPrompt("@workspace/prompt2:v1", client)

# Only loads when accessed
print(prompt1.title)  # API call happens here
print(prompt1.generator)  # Uses cached data

Example 8: Export and Analysis

Export prompts for analysis:
import os
import json
import csv
from pathlib import Path
from baytos.claro import BaytClient

def export_prompts_to_json(output_file="prompts.json"):
    """Export all prompts to JSON"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    all_prompts = []
    cursor = None

    while True:
        result = client.list_prompts(limit=50, cursor=cursor)

        for prompt in result['prompts']:
            all_prompts.append({
                'package': prompt.package_name,
                'title': prompt.title,
                'description': prompt.description,
                'category': prompt.category,
                'version': prompt.version,
                'content_length': len(prompt.generator),
                'has_system': prompt.has_system_prompt(),
                'has_critique': prompt.has_critique_prompt(),
                'has_context': prompt.has_context(),
                'num_files': len(prompt.get_file_contexts()),
                'num_urls': len(prompt.get_url_contexts())
            })

        if not result['hasMore']:
            break

        cursor = result['cursor']

    # Save to JSON
    with open(output_file, 'w') as f:
        json.dump(all_prompts, f, indent=2)

    print(f"Exported {len(all_prompts)} prompts to {output_file}")
    return all_prompts

def export_prompts_to_csv(output_file="prompts.csv"):
    """Export prompts to CSV"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    with open(output_file, 'w', newline='') as f:
        writer = csv.writer(f)

        # Header
        writer.writerow([
            'Package', 'Title', 'Category', 'Version',
            'Content Length', 'Has Context', 'Files', 'URLs'
        ])

        # Data
        cursor = None
        count = 0

        while True:
            result = client.list_prompts(limit=50, cursor=cursor)

            for prompt in result['prompts']:
                writer.writerow([
                    prompt.package_name,
                    prompt.title,
                    prompt.category,
                    prompt.version,
                    len(prompt.generator),
                    prompt.has_context(),
                    len(prompt.get_file_contexts()),
                    len(prompt.get_url_contexts())
                ])
                count += 1

            if not result['hasMore']:
                break

            cursor = result['cursor']

    print(f"Exported {count} prompts to {output_file}")

# Usage
export_prompts_to_json()
export_prompts_to_csv()

Example 9: Version Comparison

Compare different versions of a prompt:
import os
from difflib import unified_diff
from baytos.claro import BaytClient

def compare_prompt_versions(package_base, version1, version2):
    """Compare two versions of a prompt"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    # Fetch both versions
    prompt1 = client.get_prompt(f"{package_base}:{version1}")
    prompt2 = client.get_prompt(f"{package_base}:{version2}")

    print(f"Comparing {version1} vs {version2}")
    print("=" * 60)

    # Compare content
    diff = unified_diff(
        prompt1.generator.splitlines(),
        prompt2.generator.splitlines(),
        fromfile=f'{package_base}:{version1}',
        tofile=f'{package_base}:{version2}',
        lineterm=''
    )

    for line in diff:
        print(line)

# Usage
compare_prompt_versions("@workspace/my-prompt", "v1", "v2")

Example 10: Monitoring and Metrics

Track SDK usage metrics:
import os
import time
from collections import defaultdict
from baytos.claro import BaytClient, BaytAPIError

class MetricsClient:
    """Client with built-in metrics"""

    def __init__(self, api_key):
        self.client = BaytClient(api_key=api_key)
        self.metrics = {
            'requests': 0,
            'errors': 0,
            'cache_hits': 0,
            'total_time': 0,
            'by_prompt': defaultdict(int)
        }

    def get_prompt(self, package_name):
        """Get prompt with metrics tracking"""
        start = time.time()
        self.metrics['requests'] += 1
        self.metrics['by_prompt'][package_name] += 1

        try:
            prompt = self.client.get_prompt(package_name)
            return prompt

        except BaytAPIError as e:
            self.metrics['errors'] += 1
            raise

        finally:
            elapsed = time.time() - start
            self.metrics['total_time'] += elapsed

    def print_metrics(self):
        """Print collected metrics"""
        print("\nMetrics Summary")
        print("=" * 60)
        print(f"Total requests: {self.metrics['requests']}")
        print(f"Errors: {self.metrics['errors']}")
        print(f"Average time: {self.metrics['total_time'] / max(self.metrics['requests'], 1):.2f}s")

        print("\nRequests by prompt:")
        for package, count in self.metrics['by_prompt'].items():
            print(f"  {package}: {count}")

# Usage
metrics_client = MetricsClient(api_key=os.getenv("BAYT_API_KEY"))

# Make some requests
for _ in range(5):
    metrics_client.get_prompt("@workspace/my-prompt:v1")

metrics_client.print_metrics()

Example 11: Circuit Breaker Pattern

Implement circuit breaker for resilience:
import os
import time
from enum import Enum
from baytos.claro import BaytClient, BaytAPIError

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for API calls"""

    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker"""
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result

        except BaytAPIError as e:
            self.on_failure()
            raise

    def on_success(self):
        """Reset on success"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def on_failure(self):
        """Handle failure"""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker opened after {self.failure_count} failures")

# Usage
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
breaker = CircuitBreaker(failure_threshold=3, timeout=60)

try:
    prompt = breaker.call(client.get_prompt, "@workspace/my-prompt:v1")
    print(f"Loaded: {prompt.title}")
except Exception as e:
    print(f"Circuit breaker prevented call: {e}")

Example 12: Production Configuration

Complete production-ready configuration:
import os
import logging
from baytos.claro import BaytClient

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('claro.log'),
        logging.StreamHandler()
    ]
)

class ProductionClient:
    """Production-ready Claro client"""

    def __init__(self):
        # Load configuration from environment
        self.api_key = os.getenv("BAYT_API_KEY")
        if not self.api_key:
            raise ValueError("BAYT_API_KEY not set")

        # Initialize client with retries
        self.client = BaytClient(
            api_key=self.api_key,
            max_retries=3
        )

        # Setup logging
        self.logger = logging.getLogger(__name__)

    def get_prompt(self, package_name, use_cache=True):
        """Get prompt with production features"""
        self.logger.info(f"Fetching prompt: {package_name}")

        try:
            prompt = self.client.get_prompt(package_name)
            self.logger.info(f"Successfully loaded: {prompt.title}")
            return prompt

        except Exception as e:
            self.logger.error(f"Failed to load prompt: {e}", exc_info=True)
            raise

# Usage
client = ProductionClient()
prompt = client.get_prompt("@workspace/my-prompt:v1")

Next Steps