Skip to main content

Performance Optimization

Optimize your Claro integration to minimize latency, reduce costs, and improve user experience. This guide covers caching, connection pooling, batch operations, and monitoring.

Caching Strategies

Why Cache Prompts?

Prompts don’t change frequently, making them ideal for caching:
  • Reduced latency - Serve from cache instead of API
  • Lower costs - Fewer API calls
  • Better reliability - Work offline with cached prompts
  • Improved UX - Faster response times
Cache prompts aggressively, but ensure you have a strategy to invalidate the cache when prompts are updated.

In-Memory Caching

Simple caching for single-server applications:
import time
from functools import lru_cache
from baytos.claro import BaytClient

class CachedClaroClient:
    """Client with built-in caching"""

    def __init__(self, api_key: str, cache_ttl: int = 300):
        self.client = BaytClient(api_key=api_key)
        self.cache_ttl = cache_ttl  # 5 minutes default

    @lru_cache(maxsize=100)
    def _get_prompt_cached(self, package_name: str, cache_key: int):
        """Internal cached method with time-based key"""
        return self.client.get_prompt(package_name)

    def get_prompt(self, package_name: str):
        """Get prompt with automatic cache invalidation"""
        # Use time window as cache key for auto-invalidation
        cache_key = int(time.time() / self.cache_ttl)
        return self._get_prompt_cached(package_name, cache_key)

# Usage
client = CachedClaroClient(
    api_key="your_api_key",
    cache_ttl=300  # Cache for 5 minutes
)

# First call - hits API
prompt = client.get_prompt("@workspace/support:v1")

# Subsequent calls within 5 minutes - from cache
prompt = client.get_prompt("@workspace/support:v1")

Redis Caching

For distributed systems, use Redis:
import json
import redis
from baytos.claro import BaytClient

class RedisCachedClient:
    """Claro client with Redis caching"""

    def __init__(self, api_key: str, redis_url: str = "redis://localhost:6379", ttl: int = 300):
        self.client = BaytClient(api_key=api_key)
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl

    def get_prompt(self, package_name: str):
        """Get prompt with Redis caching"""
        cache_key = f"claro:prompt:{package_name}"

        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # Fetch from API
        prompt = self.client.get_prompt(package_name)

        # Cache the result
        self.redis.setex(
            cache_key,
            self.ttl,
            json.dumps(prompt.to_dict())
        )

        return prompt

    def invalidate(self, package_name: str):
        """Manually invalidate cache for a prompt"""
        cache_key = f"claro:prompt:{package_name}"
        self.redis.delete(cache_key)

    def invalidate_all(self):
        """Clear all cached prompts"""
        for key in self.redis.scan_iter("claro:prompt:*"):
            self.redis.delete(key)

# Usage
client = RedisCachedClient(
    api_key="your_api_key",
    redis_url="redis://localhost:6379",
    ttl=600  # Cache for 10 minutes
)

prompt = client.get_prompt("@workspace/support:v1")

# When you publish a new version, invalidate cache
client.invalidate("@workspace/support:v1")

Cache Invalidation Strategies

Automatically expire after a time periodPros:
  • Simple to implement
  • No manual invalidation needed
  • Works well for stable prompts
Cons:
  • May serve stale data
  • Can’t force updates immediately
# Cache for 10 minutes
cache.setex(key, 600, value)

Connection Pooling

HTTP Connection Reuse

The Claro SDK uses connection pooling by default, but you can optimize it:
from baytos.claro import BaytClient

# Configure connection pooling
client = BaytClient(
    api_key="your_api_key",
    max_retries=3,
    timeout=30.0,
    # Connection pool settings (if SDK supports)
    pool_connections=10,  # Number of connection pools to cache
    pool_maxsize=100      # Maximum number of connections to save
)

Singleton Pattern

Reuse a single client instance across your application:
# config.py
from baytos.claro import BaytClient
import os

_client_instance = None

def get_claro_client() -> BaytClient:
    """Get singleton Claro client"""
    global _client_instance

    if _client_instance is None:
        _client_instance = BaytClient(
            api_key=os.getenv("BAYT_API_KEY"),
            max_retries=3
        )

    return _client_instance

# Usage across your app
from config import get_claro_client

client = get_claro_client()
prompt = client.get_prompt("@workspace/support:v1")

FastAPI Dependency Injection

For web applications, use dependency injection:
from fastapi import FastAPI, Depends
from baytos.claro import BaytClient
import os

app = FastAPI()

# Create client at startup
claro_client = BaytClient(api_key=os.getenv("BAYT_API_KEY"))

def get_client() -> BaytClient:
    """Dependency that returns the shared client"""
    return claro_client

@app.get("/ask")
async def ask_question(
    query: str,
    client: BaytClient = Depends(get_client)
):
    """Endpoint that reuses the same client"""
    prompt = client.get_prompt("@workspace/support:v1")
    # Use prompt with LLM
    return {"response": "..."}

Batch Operations

Fetching Multiple Prompts

Load multiple prompts in parallel:
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient

def fetch_prompts_batch(package_names: list[str], max_workers: int = 5):
    """Fetch multiple prompts concurrently"""
    client = BaytClient(api_key="your_api_key")
    results = {}

    def fetch_one(package_name):
        try:
            prompt = client.get_prompt(package_name)
            return (package_name, prompt, None)
        except Exception as e:
            return (package_name, None, str(e))

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(fetch_one, pkg): pkg
            for pkg in package_names
        }

        for future in as_completed(futures):
            package_name, prompt, error = future.result()

            if error:
                print(f"Failed to load {package_name}: {error}")
            else:
                results[package_name] = prompt

    return results

# Usage
prompts = fetch_prompts_batch([
    "@workspace/support:v1",
    "@workspace/sales:v1",
    "@workspace/technical:v1"
], max_workers=3)
Be mindful of rate limits when making concurrent requests. Start with a conservative max_workers value (3-5) and monitor for rate limit errors.

Pagination Best Practices

When listing prompts, use efficient pagination:
from baytos.claro import BaytClient

def fetch_all_prompts(client: BaytClient, limit: int = 50):
    """Efficiently fetch all prompts with pagination"""
    all_prompts = []
    cursor = None

    while True:
        # Fetch page
        result = client.list_prompts(limit=limit, cursor=cursor)

        all_prompts.extend(result['prompts'])

        # Check if more pages exist
        if not result.get('hasMore'):
            break

        cursor = result.get('cursor')

    return all_prompts

# Usage
client = BaytClient(api_key="your_api_key")
all_prompts = fetch_all_prompts(client, limit=100)  # Use max limit for fewer requests

Monitoring and Metrics

Request Timing

Track API performance:
import time
import logging
from baytos.claro import BaytClient

logger = logging.getLogger(__name__)

class MonitoredClient:
    """Client with performance monitoring"""

    def __init__(self, api_key: str):
        self.client = BaytClient(api_key=api_key)

    def get_prompt(self, package_name: str):
        """Get prompt with timing"""
        start = time.time()

        try:
            prompt = self.client.get_prompt(package_name)
            elapsed = (time.time() - start) * 1000  # Convert to ms

            logger.info(f"Fetched {package_name} in {elapsed:.2f}ms")

            # Alert on slow requests
            if elapsed > 1000:  # More than 1 second
                logger.warning(f"Slow API call: {package_name} took {elapsed:.2f}ms")

            return prompt

        except Exception as e:
            elapsed = (time.time() - start) * 1000
            logger.error(f"Failed to fetch {package_name} after {elapsed:.2f}ms: {e}")
            raise

# Usage
client = MonitoredClient(api_key="your_api_key")
prompt = client.get_prompt("@workspace/support:v1")

Performance Metrics

Track key performance indicators:
from dataclasses import dataclass
from collections import defaultdict
import time

@dataclass
class PerformanceMetrics:
    """Track API performance metrics"""
    total_requests: int = 0
    total_errors: int = 0
    total_time: float = 0.0
    cache_hits: int = 0
    cache_misses: int = 0

class MetricsClient:
    """Client with comprehensive metrics"""

    def __init__(self, api_key: str):
        self.client = BaytClient(api_key=api_key)
        self.metrics = PerformanceMetrics()
        self.by_prompt = defaultdict(lambda: PerformanceMetrics())

    def get_prompt(self, package_name: str, use_cache: bool = True):
        """Get prompt with metrics tracking"""
        start = time.time()
        self.metrics.total_requests += 1
        self.by_prompt[package_name].total_requests += 1

        try:
            # Check cache first (if you have caching)
            if use_cache:
                cached = self._get_from_cache(package_name)
                if cached:
                    self.metrics.cache_hits += 1
                    return cached

            self.metrics.cache_misses += 1

            # Fetch from API
            prompt = self.client.get_prompt(package_name)

            elapsed = time.time() - start
            self.metrics.total_time += elapsed
            self.by_prompt[package_name].total_time += elapsed

            return prompt

        except Exception as e:
            self.metrics.total_errors += 1
            self.by_prompt[package_name].total_errors += 1
            raise

    def get_stats(self):
        """Get performance statistics"""
        avg_time = (
            self.metrics.total_time / self.metrics.total_requests
            if self.metrics.total_requests > 0
            else 0
        )

        cache_rate = (
            self.metrics.cache_hits / self.metrics.total_requests
            if self.metrics.total_requests > 0
            else 0
        )

        return {
            'total_requests': self.metrics.total_requests,
            'total_errors': self.metrics.total_errors,
            'error_rate': self.metrics.total_errors / max(self.metrics.total_requests, 1),
            'avg_time_ms': avg_time * 1000,
            'cache_hit_rate': cache_rate,
            'cache_hits': self.metrics.cache_hits,
            'cache_misses': self.metrics.cache_misses
        }

    def print_stats(self):
        """Print performance summary"""
        stats = self.get_stats()

        print("\nPerformance Metrics:")
        print(f"  Total Requests: {stats['total_requests']}")
        print(f"  Total Errors: {stats['total_errors']}")
        print(f"  Error Rate: {stats['error_rate']:.2%}")
        print(f"  Avg Response Time: {stats['avg_time_ms']:.2f}ms")
        print(f"  Cache Hit Rate: {stats['cache_hit_rate']:.2%}")

# Usage
client = MetricsClient(api_key="your_api_key")

# Make requests
for _ in range(100):
    prompt = client.get_prompt("@workspace/support:v1")

# Print stats
client.print_stats()

Integration with Monitoring Tools

Export metrics to Prometheus
from prometheus_client import Counter, Histogram, start_http_server
import time

# Define metrics
api_requests = Counter(
    'claro_api_requests_total',
    'Total Claro API requests',
    ['package_name', 'status']
)

api_duration = Histogram(
    'claro_api_duration_seconds',
    'Claro API request duration',
    ['package_name']
)

class PrometheusClient:
    def __init__(self, api_key: str):
        self.client = BaytClient(api_key=api_key)

    def get_prompt(self, package_name: str):
        start = time.time()

        try:
            prompt = self.client.get_prompt(package_name)

            # Record success
            api_requests.labels(
                package_name=package_name,
                status='success'
            ).inc()

            return prompt

        except Exception as e:
            # Record failure
            api_requests.labels(
                package_name=package_name,
                status='error'
            ).inc()
            raise

        finally:
            # Record duration
            duration = time.time() - start
            api_duration.labels(package_name=package_name).observe(duration)

# Start metrics server
start_http_server(8000)  # Metrics at http://localhost:8000/metrics

Troubleshooting Slow Requests

Common Performance Issues

Problem: First request is slowCause: Initial connection setup, DNS resolutionSolution:
# Warm up the client at application startup
def warm_up_client(client: BaytClient):
    """Pre-connect and cache first request"""
    try:
        # Make a lightweight request to warm up connection
        client.list_prompts(limit=1)
    except:
        pass  # Ignore errors during warmup

# At application startup
client = BaytClient(api_key="your_api_key")
warm_up_client(client)
Problem: Slow response times from APICause: Geographic distance from API serversSolution:
  • Use caching aggressively (TTL of 5-10 minutes)
  • Fetch prompts at application startup
  • Consider edge caching with CloudFlare or CDN
Problem: Prompts with large context files are slowCause: Transferring large files over networkSolution:
# Only download files when needed
prompt = client.get_prompt("@workspace/support:v1")

if prompt.has_context():
    # Check file sizes first
    contexts = prompt.get_file_contexts()

    for context in contexts:
        if context.size < 1_000_000:  # Only download < 1MB
            content = client.download_file(context.url)
Problem: Loading many prompts takes too longCause: Sequential API calls add upSolution: Use concurrent fetching (see Batch Operations section above)
Problem: Requests throttled due to rate limitsCause: Exceeding API rate limitsSolution:
import time
from baytos.claro import BaytClient, BaytRateLimitError

def get_prompt_with_backoff(client, package_name, max_retries=3):
    """Retry with exponential backoff on rate limits"""
    for attempt in range(max_retries):
        try:
            return client.get_prompt(package_name)

        except BaytRateLimitError as e:
            if attempt == max_retries - 1:
                raise

            wait_time = 2 ** attempt  # Exponential backoff
            print(f"Rate limited, waiting {wait_time}s...")
            time.sleep(wait_time)

Best Practices

Prompts are relatively static - cache them:
  • Use 5-10 minute TTL for frequently accessed prompts
  • Use version-based cache keys to avoid stale data
  • Implement cache warming for critical prompts
Reduce API calls by batching:
  • Fetch multiple prompts concurrently
  • Use pagination with maximum page size
  • Pre-load prompts at startup for critical paths
Track metrics to identify issues:
  • Log slow requests (>1 second)
  • Monitor error rates
  • Track cache hit rates
  • Set up alerts for anomalies
Optimize hot paths:
  • Cache most frequently used prompts
  • Pre-fetch prompts for common workflows
  • Use CDN for static prompt content
Plan for API unavailability:
  • Implement fallback prompts
  • Cache prompts locally as backup
  • Return graceful errors to users
  • Retry with exponential backoff

Performance Checklist

Before deploying to production:
  • Caching implemented with appropriate TTL
  • Connection pooling configured
  • Metrics and monitoring in place
  • Slow request alerts configured
  • Rate limit handling implemented
  • Fallback strategy for API failures
  • Performance tested under load
  • Critical prompts pre-loaded at startup

Next Steps