Skip to main content

Advanced Features

Learn advanced techniques for optimizing performance, implementing caching, and building production-ready applications with the Claro Python SDK.

Rate Limiting and Backoff

Built-in Rate Limit Handling

The SDK automatically handles rate limits with exponential backoff:
from baytos.claro import BaytClient

# Default: 3 retries with exponential backoff
client = BaytClient(api_key="your_api_key", max_retries=3)

# Backoff strategy: 2^attempt seconds
# - 1st retry: 1 second
# - 2nd retry: 2 seconds
# - 3rd retry: 4 seconds

Respecting Retry-After Headers

The SDK respects the Retry-After header from the API:
# If API returns Retry-After: 60
# SDK will wait 60 seconds before retrying
# Otherwise uses exponential backoff

Custom Retry Configuration

Adjust retry behavior based on your needs:
# Aggressive retries for batch jobs
client = BaytClient(
    api_key="...",
    max_retries=5  # Up to 5 retries
)

# Fast-fail for interactive apps
client = BaytClient(
    api_key="...",
    max_retries=1  # Single retry only
)

# No retries (handle manually)
client = BaytClient(
    api_key="...",
    max_retries=0
)

Connection Pooling

How It Works

The SDK uses requests.Session for automatic connection pooling:
from baytos.claro import BaytClient

client = BaytClient(api_key="...")

# First request - establishes connection
prompt1 = client.get_prompt("@workspace/p1:v1")

# Subsequent requests - reuse connection (faster)
prompt2 = client.get_prompt("@workspace/p2:v1")
prompt3 = client.get_prompt("@workspace/p3:v1")

Benefits

  • Faster requests - No TCP handshake overhead
  • Lower latency - Connection already established
  • Reduced server load - Fewer concurrent connections
  • Automatic keep-alive - Connection reused efficiently

Best Practice: Reuse Client

# ✅ Good: Single client instance
client = BaytClient(api_key="...")

for package in packages:
    prompt = client.get_prompt(package)

# ❌ Bad: New client each time
for package in packages:
    client = BaytClient(api_key="...")  # Creates new session!
    prompt = client.get_prompt(package)

Caching Strategies

Simple In-Memory Cache

Cache prompts to reduce API calls:
from functools import lru_cache
from baytos.claro import BaytClient

client = BaytClient(api_key="...")

@lru_cache(maxsize=100)
def get_cached_prompt(package_name: str):
    """Cache up to 100 prompts in memory"""
    return client.get_prompt(package_name)

# First call - fetches from API
prompt = get_cached_prompt("@workspace/support:v1")

# Subsequent calls - returns cached version
prompt = get_cached_prompt("@workspace/support:v1")  # Instant!
Cached prompts become stale when updated. Use version pinning or implement cache invalidation.

Time-Based Cache

Implement TTL (time-to-live) caching:
import time
from typing import Dict, Optional, Tuple
from baytos.claro import BaytClient, Prompt

class PromptCache:
    def __init__(self, client: BaytClient, ttl_seconds: int = 300):
        self.client = client
        self.ttl = ttl_seconds
        self.cache: Dict[str, Tuple[Prompt, float]] = {}

    def get(self, package_name: str) -> Prompt:
        """Get prompt with TTL-based caching"""
        now = time.time()

        # Check cache
        if package_name in self.cache:
            prompt, timestamp = self.cache[package_name]

            # Return if not expired
            if now - timestamp < self.ttl:
                return prompt

        # Fetch from API
        prompt = self.client.get_prompt(package_name)

        # Store in cache
        self.cache[package_name] = (prompt, now)

        return prompt

    def invalidate(self, package_name: str = None):
        """Invalidate cache for one or all prompts"""
        if package_name:
            self.cache.pop(package_name, None)
        else:
            self.cache.clear()

# Usage
client = BaytClient(api_key="...")
cache = PromptCache(client, ttl_seconds=300)  # 5 minutes

prompt = cache.get("@workspace/support:v1")  # Fetches from API
prompt = cache.get("@workspace/support:v1")  # Returns cached (< 5 min)

# Invalidate when needed
cache.invalidate("@workspace/support:v1")

Redis-Based Caching

For distributed systems, use Redis:
import json
import redis
from baytos.claro import BaytClient, Prompt

class RedisPromptCache:
    def __init__(self, client: BaytClient, redis_client, ttl=300):
        self.client = client
        self.redis = redis_client
        self.ttl = ttl

    def get(self, package_name: str) -> Prompt:
        """Get prompt with Redis caching"""
        cache_key = f"prompt:{package_name}"

        # Check Redis cache
        cached = self.redis.get(cache_key)
        if cached:
            data = json.loads(cached)
            return Prompt(data)

        # Fetch from API
        prompt = self.client.get_prompt(package_name)

        # Store in Redis
        self.redis.setex(
            cache_key,
            self.ttl,
            json.dumps(prompt.to_dict())
        )

        return prompt

# Usage
client = BaytClient(api_key="...")
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache = RedisPromptCache(client, redis_client, ttl=300)

prompt = cache.get("@workspace/support:v1")

Concurrent Requests

Thread-Based Concurrency

Fetch multiple prompts in parallel:
from concurrent.futures import ThreadPoolExecutor, as_completed
from baytos.claro import BaytClient

client = BaytClient(api_key="...")

package_names = [
    "@workspace/support:v1",
    "@workspace/sales:v1",
    "@workspace/marketing:v1"
]

def fetch_prompt(package_name):
    """Fetch a single prompt"""
    return client.get_prompt(package_name)

# Fetch concurrently
with ThreadPoolExecutor(max_workers=5) as executor:
    # Submit all tasks
    futures = {
        executor.submit(fetch_prompt, name): name
        for name in package_names
    }

    # Collect results
    prompts = {}
    for future in as_completed(futures):
        name = futures[future]
        try:
            prompt = future.result()
            prompts[name] = prompt
            print(f"Fetched: {prompt.title}")
        except Exception as e:
            print(f"Failed to fetch {name}: {e}")

print(f"\nFetched {len(prompts)} prompts")

Async/Await Pattern

For async applications, wrap SDK calls:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from baytos.claro import BaytClient

client = BaytClient(api_key="...")
executor = ThreadPoolExecutor(max_workers=10)

async def get_prompt_async(package_name: str):
    """Async wrapper for get_prompt"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        executor,
        client.get_prompt,
        package_name
    )

async def fetch_multiple_prompts(package_names):
    """Fetch multiple prompts concurrently"""
    tasks = [get_prompt_async(name) for name in package_names]
    return await asyncio.gather(*tasks)

# Usage
async def main():
    packages = [
        "@workspace/p1:v1",
        "@workspace/p2:v1",
        "@workspace/p3:v1"
    ]

    prompts = await fetch_multiple_prompts(packages)

    for prompt in prompts:
        print(f"Fetched: {prompt.title}")

asyncio.run(main())

Performance Optimization

Batch Operations

Optimize bulk operations:
from baytos.claro import BaytClient

def fetch_prompts_efficiently(package_names, batch_size=10):
    """Fetch prompts in batches with progress tracking"""
    client = BaytClient(api_key="...")
    prompts = []

    for i in range(0, len(package_names), batch_size):
        batch = package_names[i:i + batch_size]

        print(f"Fetching batch {i//batch_size + 1}...")

        for name in batch:
            try:
                prompt = client.get_prompt(name)
                prompts.append(prompt)
            except Exception as e:
                print(f"Failed to fetch {name}: {e}")

    return prompts

# Usage
packages = ["@workspace/p1:v1", "@workspace/p2:v1", ...]
prompts = fetch_prompts_efficiently(packages)

Minimize Data Transfer

Only fetch what you need:
# ✅ Good: Fetch specific prompt
prompt = client.get_prompt("@workspace/support:v1")
content = prompt.generator

# ❌ Wasteful: List all, then filter
result = client.list_prompts(limit=100)
prompt = [p for p in result['prompts'] if p.title == "Support"][0]

Lazy Loading

Load context files only when needed:
from baytos.claro import BaytClient

client = BaytClient(api_key="...")
prompt = client.get_prompt("@workspace/research:v1")

# ✅ Good: Check before downloading
if prompt.has_context():
    files = prompt.get_file_contexts()

    # Only download if needed
    if any(f.mime_type == 'application/pdf' for f in files):
        for file in files:
            if file.mime_type == 'application/pdf':
                content = client.download_context_file(file.id)
                # Process PDF...

Production Best Practices

Environment-Based Configuration

Configure differently per environment:
import os
from baytos.claro import BaytClient

def create_client():
    """Create client with environment-specific config"""
    env = os.getenv("ENVIRONMENT", "production")

    if env == "development":
        return BaytClient(
            api_key=os.getenv("BAYT_API_KEY"),
            base_url="http://localhost:8000",
            max_retries=0  # Fast-fail in dev
        )

    elif env == "staging":
        return BaytClient(
            api_key=os.getenv("BAYT_API_KEY"),
            base_url="https://staging-api.baytos.ai",
            max_retries=2
        )

    else:  # production
        return BaytClient(
            api_key=os.getenv("BAYT_API_KEY"),
            max_retries=3
        )

client = create_client()

Health Checks

Implement health checks for monitoring:
from baytos.claro import BaytClient, BaytAPIError

def health_check():
    """Check if API is accessible"""
    try:
        client = BaytClient(api_key="...")

        # Try to list prompts (lightweight operation)
        result = client.list_prompts(limit=1)

        return {
            'status': 'healthy',
            'api_accessible': True
        }

    except BaytAPIError as e:
        return {
            'status': 'unhealthy',
            'api_accessible': False,
            'error': str(e)
        }

# Usage in health endpoint
# GET /health
status = health_check()

Monitoring and Metrics

Track API usage and performance:
import time
from baytos.claro import BaytClient, BaytAPIError

class MonitoredClient:
    def __init__(self, api_key):
        self.client = BaytClient(api_key=api_key)
        self.stats = {
            'requests': 0,
            'errors': 0,
            'total_time': 0
        }

    def get_prompt(self, package_name):
        """Get prompt with monitoring"""
        start = time.time()

        try:
            self.stats['requests'] += 1
            prompt = self.client.get_prompt(package_name)
            return prompt

        except BaytAPIError:
            self.stats['errors'] += 1
            raise

        finally:
            elapsed = time.time() - start
            self.stats['total_time'] += elapsed

    def get_stats(self):
        """Get performance statistics"""
        avg_time = (
            self.stats['total_time'] / self.stats['requests']
            if self.stats['requests'] > 0
            else 0
        )

        return {
            'total_requests': self.stats['requests'],
            'total_errors': self.stats['errors'],
            'error_rate': (
                self.stats['errors'] / self.stats['requests']
                if self.stats['requests'] > 0
                else 0
            ),
            'avg_response_time': avg_time
        }

# Usage
client = MonitoredClient(api_key="...")

prompt = client.get_prompt("@workspace/test:v1")
stats = client.get_stats()
print(f"Average response time: {stats['avg_response_time']:.2f}s")

Circuit Breaker Pattern

Prevent cascading failures:
import time
from baytos.claro import BaytClient, BaytAPIError

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = 'closed'  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker"""

        # Check if circuit is open
        if self.state == 'open':
            if time.time() - self.last_failure_time > self.timeout:
                self.state = 'half-open'
            else:
                raise Exception("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)

            # Success - reset on half-open
            if self.state == 'half-open':
                self.failures = 0
                self.state = 'closed'

            return result

        except BaytAPIError as e:
            self.failures += 1
            self.last_failure_time = time.time()

            # Open circuit if threshold reached
            if self.failures >= self.failure_threshold:
                self.state = 'open'

            raise

# Usage
client = BaytClient(api_key="...")
breaker = CircuitBreaker(failure_threshold=5, timeout=60)

def fetch_with_breaker(package_name):
    return breaker.call(client.get_prompt, package_name)

try:
    prompt = fetch_with_breaker("@workspace/test:v1")
except Exception as e:
    print(f"Circuit breaker prevented call: {e}")

Best Practices Summary

Reuse client instances to benefit from connection pooling:
# ✅ Good: Singleton pattern
_client = None

def get_client():
    global _client
    if _client is None:
        _client = BaytClient(api_key="...")
    return _client
Cache prompts to reduce API calls:
# Use version pinning with caching
@lru_cache(maxsize=100)
def get_cached_prompt(package_name: str):
    return client.get_prompt(package_name)

# Always use specific versions when caching
prompt = get_cached_prompt("@workspace/support:v1")  # ✅
prompt = get_cached_prompt("@workspace/support:latest")  # ❌
Respect rate limits and implement backoff:
# Configure appropriate retries
client = BaytClient(
    api_key="...",
    max_retries=3  # Let SDK handle retries
)

# Add application-level rate limiting
import time
time.sleep(0.1)  # 100ms between requests
Track metrics in production:
# Log response times
# Track error rates
# Monitor cache hit rates
# Set up alerts for anomalies

Next Steps