Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.baytos.ai/llms.txt

Use this file to discover all available pages before exploring further.

This guide demonstrates advanced patterns for using the Claro SDK in production environments, including caching, concurrent requests, and performance optimizations.

Prerequisites

pip install baytos-claro
export BAYT_API_KEY="your_api_key_here"

Example 1: Response Caching

Cache prompts locally to reduce API calls:
import os
import time
from functools import lru_cache
from baytos.claro import BaytClient

class CachedClient:
    """Client with built-in caching"""

    def __init__(self, api_key, cache_ttl=300):
        self.client = BaytClient(api_key=api_key)
        self.cache_ttl = cache_ttl  # 5 minutes default

    @lru_cache(maxsize=100)
    def get_prompt_cached(self, package_name, cache_key):
        """Get prompt with caching based on time window"""
        return self.client.get_prompt(package_name)

    def get_prompt(self, package_name):
        """Get prompt with automatic cache invalidation"""
        # Use time window as cache key to auto-invalidate
        cache_key = int(time.time() / self.cache_ttl)
        return self.get_prompt_cached(package_name, cache_key)

# Usage
cached_client = CachedClient(
    api_key=os.getenv("BAYT_API_KEY"),
    cache_ttl=300  # 5 minutes
)

# First call - hits API
prompt = cached_client.get_prompt("@workspace/my-prompt:v1")
print("First call (from API)")

# Second call - from cache
prompt = cached_client.get_prompt("@workspace/my-prompt:v1")
print("Second call (from cache)")

Example 2: Redis Caching

Use Redis for distributed caching:
import os
import json
import redis
from baytos.claro import BaytClient

class RedisPromptCache:
    """Prompt cache using Redis"""

    def __init__(self, api_key, redis_url='redis://localhost:6379', ttl=300):
        self.client = BaytClient(api_key=api_key)
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl

    def get_prompt(self, package_name):
        """Get prompt with Redis caching"""
        cache_key = f"prompt:{package_name}"

        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            print(f"Cache hit: {package_name}")
            return json.loads(cached)

        # Fetch from API
        print(f"Cache miss: {package_name}")
        prompt = self.client.get_prompt(package_name)

        # Store in cache
        self.redis.setex(
            cache_key,
            self.ttl,
            json.dumps(prompt.to_dict())
        )

        return prompt

# Usage
cache = RedisPromptCache(
    api_key=os.getenv("BAYT_API_KEY"),
    ttl=300
)

prompt = cache.get_prompt("@workspace/my-prompt:v1")
print(f"Loaded: {prompt.title}")

Example 3: Concurrent Requests

Fetch multiple prompts concurrently:
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient

def fetch_prompt(client, package_name):
    """Fetch a single prompt"""
    try:
        prompt = client.get_prompt(package_name)
        return {'success': True, 'prompt': prompt}
    except Exception as e:
        return {'success': False, 'package': package_name, 'error': str(e)}

def fetch_prompts_concurrent(package_names, max_workers=5):
    """Fetch multiple prompts concurrently"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        futures = {
            executor.submit(fetch_prompt, client, pkg): pkg
            for pkg in package_names
        }

        # Collect results as they complete
        for future in as_completed(futures):
            result = future.result()
            results.append(result)

            if result['success']:
                print(f"✓ Loaded: {result['prompt'].title}")
            else:
                print(f"✗ Failed: {result['package']}")

    return results

# Usage
packages = [
    "@workspace/prompt1:v1",
    "@workspace/prompt2:v1",
    "@workspace/prompt3:v1"
]

results = fetch_prompts_concurrent(packages, max_workers=3)

successful = [r for r in results if r['success']]
print(f"\nLoaded {len(successful)}/{len(packages)} prompts")
Be mindful of rate limits when making concurrent requests. Start with a conservative max_workers value (3-5) and monitor rate limit headers.

Example 4: Async with asyncio

Asynchronous pattern for high-performance applications:
import os
import asyncio
import aiohttp
from baytos.claro import BaytClient

async def fetch_prompt_async(session, package_name, api_key):
    """Async fetch a prompt"""
    url = f"https://api.baytos.ai/v1/prompts/{package_name}"
    headers = {"Authorization": f"Bearer {api_key}"}

    async with session.get(url, headers=headers) as response:
        if response.status == 200:
            data = await response.json()
            return {'success': True, 'data': data}
        else:
            error = await response.text()
            return {'success': False, 'error': error}

async def fetch_all_prompts(package_names):
    """Fetch all prompts asynchronously"""
    api_key = os.getenv("BAYT_API_KEY")

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_prompt_async(session, pkg, api_key)
            for pkg in package_names
        ]

        results = await asyncio.gather(*tasks)
        return results

# Usage
packages = [
    "@workspace/prompt1:v1",
    "@workspace/prompt2:v1",
    "@workspace/prompt3:v1"
]

results = asyncio.run(fetch_all_prompts(packages))

for i, result in enumerate(results, 1):
    if result['success']:
        print(f"✓ Prompt {i} loaded")
    else:
        print(f"✗ Prompt {i} failed: {result['error']}")

Example 5: Batch Processing with Queue

Process prompts from a queue:
import os
import queue
import threading
from baytos.claro import BaytClient, BaytAPIError

def worker(task_queue, result_queue, client):
    """Worker thread to process prompts"""
    while True:
        try:
            package_name = task_queue.get(timeout=1)

            if package_name is None:  # Poison pill
                break

            # Fetch prompt
            try:
                prompt = client.get_prompt(package_name)
                result_queue.put({
                    'success': True,
                    'package': package_name,
                    'prompt': prompt
                })
            except BaytAPIError as e:
                result_queue.put({
                    'success': False,
                    'package': package_name,
                    'error': str(e)
                })

        except queue.Empty:
            continue
        finally:
            task_queue.task_done()

def process_prompts_batch(package_names, num_workers=3):
    """Process prompts using worker queue"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    task_queue = queue.Queue()
    result_queue = queue.Queue()

    # Start workers
    workers = []
    for _ in range(num_workers):
        t = threading.Thread(
            target=worker,
            args=(task_queue, result_queue, client)
        )
        t.start()
        workers.append(t)

    # Add tasks
    for package_name in package_names:
        task_queue.put(package_name)

    # Wait for completion
    task_queue.join()

    # Stop workers
    for _ in range(num_workers):
        task_queue.put(None)

    for t in workers:
        t.join()

    # Collect results
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    return results

# Usage
packages = ["@workspace/prompt1:v1", "@workspace/prompt2:v1"]
results = process_prompts_batch(packages, num_workers=2)

for result in results:
    if result['success']:
        print(f"✓ {result['package']}: {result['prompt'].title}")
    else:
        print(f"✗ {result['package']}: {result['error']}")

Example 6: Request Throttling

Throttle requests to avoid rate limits:
import os
import time
from baytos.claro import BaytClient

class ThrottledClient:
    """Client with request throttling"""

    def __init__(self, api_key, requests_per_second=10):
        self.client = BaytClient(api_key=api_key)
        self.min_interval = 1.0 / requests_per_second
        self.last_request = 0

    def get_prompt(self, package_name):
        """Get prompt with throttling"""
        # Wait if needed
        elapsed = time.time() - self.last_request
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)

        # Make request
        self.last_request = time.time()
        return self.client.get_prompt(package_name)

# Usage
throttled_client = ThrottledClient(
    api_key=os.getenv("BAYT_API_KEY"),
    requests_per_second=10  # Max 10 requests per second
)

# These will be automatically throttled
for i in range(20):
    prompt = throttled_client.get_prompt("@workspace/my-prompt:v1")
    print(f"Request {i+1} completed")

Example 7: Lazy Loading

Lazy load prompt content only when needed:
import os
from baytos.claro import BaytClient

class LazyPrompt:
    """Lazy-loaded prompt wrapper"""

    def __init__(self, package_name, client):
        self.package_name = package_name
        self.client = client
        self._prompt = None

    @property
    def prompt(self):
        """Load prompt on first access"""
        if self._prompt is None:
            print(f"Loading {self.package_name}...")
            self._prompt = self.client.get_prompt(self.package_name)
        return self._prompt

    @property
    def title(self):
        return self.prompt.title

    @property
    def generator(self):
        return self.prompt.generator

# Usage
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

# Create lazy prompts (no API calls yet)
prompt1 = LazyPrompt("@workspace/prompt1:v1", client)
prompt2 = LazyPrompt("@workspace/prompt2:v1", client)

# Only loads when accessed
print(prompt1.title)  # API call happens here
print(prompt1.generator)  # Uses cached data

Example 8: Export and Analysis

Export prompts for analysis:
import os
import json
import csv
from pathlib import Path
from baytos.claro import BaytClient

def export_prompts_to_json(output_file="prompts.json"):
    """Export all prompts to JSON"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    all_prompts = []
    cursor = None

    while True:
        result = client.list_prompts(limit=50, cursor=cursor)

        for prompt in result['prompts']:
            all_prompts.append({
                'package': prompt.package_name,
                'title': prompt.title,
                'description': prompt.description,
                'category': prompt.category,
                'version': prompt.version,
                'content_length': len(prompt.generator),
                'has_system': prompt.has_system_prompt(),
                'has_critique': prompt.has_critique_prompt(),
                'has_context': prompt.has_context(),
                'num_files': len(prompt.get_file_contexts()),
                'num_urls': len(prompt.get_url_contexts())
            })

        if not result['hasMore']:
            break

        cursor = result['cursor']

    # Save to JSON
    with open(output_file, 'w') as f:
        json.dump(all_prompts, f, indent=2)

    print(f"Exported {len(all_prompts)} prompts to {output_file}")
    return all_prompts

def export_prompts_to_csv(output_file="prompts.csv"):
    """Export prompts to CSV"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    with open(output_file, 'w', newline='') as f:
        writer = csv.writer(f)

        # Header
        writer.writerow([
            'Package', 'Title', 'Category', 'Version',
            'Content Length', 'Has Context', 'Files', 'URLs'
        ])

        # Data
        cursor = None
        count = 0

        while True:
            result = client.list_prompts(limit=50, cursor=cursor)

            for prompt in result['prompts']:
                writer.writerow([
                    prompt.package_name,
                    prompt.title,
                    prompt.category,
                    prompt.version,
                    len(prompt.generator),
                    prompt.has_context(),
                    len(prompt.get_file_contexts()),
                    len(prompt.get_url_contexts())
                ])
                count += 1

            if not result['hasMore']:
                break

            cursor = result['cursor']

    print(f"Exported {count} prompts to {output_file}")

# Usage
export_prompts_to_json()
export_prompts_to_csv()

Example 9: Version Comparison

Compare different versions of a prompt:
import os
from difflib import unified_diff
from baytos.claro import BaytClient

def compare_prompt_versions(package_base, version1, version2):
    """Compare two versions of a prompt"""
    client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

    # Fetch both versions
    prompt1 = client.get_prompt(f"{package_base}:{version1}")
    prompt2 = client.get_prompt(f"{package_base}:{version2}")

    print(f"Comparing {version1} vs {version2}")
    print("=" * 60)

    # Compare content
    diff = unified_diff(
        prompt1.generator.splitlines(),
        prompt2.generator.splitlines(),
        fromfile=f'{package_base}:{version1}',
        tofile=f'{package_base}:{version2}',
        lineterm=''
    )

    for line in diff:
        print(line)

# Usage
compare_prompt_versions("@workspace/my-prompt", "v1", "v2")

Example 10: Monitoring and Metrics

Track SDK usage metrics:
import os
import time
from collections import defaultdict
from baytos.claro import BaytClient, BaytAPIError

class MetricsClient:
    """Client with built-in metrics"""

    def __init__(self, api_key):
        self.client = BaytClient(api_key=api_key)
        self.metrics = {
            'requests': 0,
            'errors': 0,
            'cache_hits': 0,
            'total_time': 0,
            'by_prompt': defaultdict(int)
        }

    def get_prompt(self, package_name):
        """Get prompt with metrics tracking"""
        start = time.time()
        self.metrics['requests'] += 1
        self.metrics['by_prompt'][package_name] += 1

        try:
            prompt = self.client.get_prompt(package_name)
            return prompt

        except BaytAPIError as e:
            self.metrics['errors'] += 1
            raise

        finally:
            elapsed = time.time() - start
            self.metrics['total_time'] += elapsed

    def print_metrics(self):
        """Print collected metrics"""
        print("\nMetrics Summary")
        print("=" * 60)
        print(f"Total requests: {self.metrics['requests']}")
        print(f"Errors: {self.metrics['errors']}")
        print(f"Average time: {self.metrics['total_time'] / max(self.metrics['requests'], 1):.2f}s")

        print("\nRequests by prompt:")
        for package, count in self.metrics['by_prompt'].items():
            print(f"  {package}: {count}")

# Usage
metrics_client = MetricsClient(api_key=os.getenv("BAYT_API_KEY"))

# Make some requests
for _ in range(5):
    metrics_client.get_prompt("@workspace/my-prompt:v1")

metrics_client.print_metrics()

Example 11: Circuit Breaker Pattern

Implement circuit breaker for resilience:
import os
import time
from enum import Enum
from baytos.claro import BaytClient, BaytAPIError

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for API calls"""

    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker"""
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result

        except BaytAPIError as e:
            self.on_failure()
            raise

    def on_success(self):
        """Reset on success"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def on_failure(self):
        """Handle failure"""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker opened after {self.failure_count} failures")

# Usage
client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))
breaker = CircuitBreaker(failure_threshold=3, timeout=60)

try:
    prompt = breaker.call(client.get_prompt, "@workspace/my-prompt:v1")
    print(f"Loaded: {prompt.title}")
except Exception as e:
    print(f"Circuit breaker prevented call: {e}")

Example 12: Production Configuration

Complete production-ready configuration:
import os
import logging
from baytos.claro import BaytClient

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('claro.log'),
        logging.StreamHandler()
    ]
)

class ProductionClient:
    """Production-ready Claro client"""

    def __init__(self):
        # Load configuration from environment
        self.api_key = os.getenv("BAYT_API_KEY")
        if not self.api_key:
            raise ValueError("BAYT_API_KEY not set")

        # Initialize client with retries
        self.client = BaytClient(
            api_key=self.api_key,
            max_retries=3
        )

        # Setup logging
        self.logger = logging.getLogger(__name__)

    def get_prompt(self, package_name, use_cache=True):
        """Get prompt with production features"""
        self.logger.info(f"Fetching prompt: {package_name}")

        try:
            prompt = self.client.get_prompt(package_name)
            self.logger.info(f"Successfully loaded: {prompt.title}")
            return prompt

        except Exception as e:
            self.logger.error(f"Failed to load prompt: {e}", exc_info=True)
            raise

# Usage
client = ProductionClient()
prompt = client.get_prompt("@workspace/my-prompt:v1")

Next Steps

Error Handling

Robust error handling patterns

Rate Limits

Understanding rate limits

Performance Guide

Optimize your implementation

Python SDK

Complete SDK documentation