Integrating Multiple LLM Providers: Lessons from Production
A practical guide to building production-ready AI applications with multiple LLM providers, including Anthropic Claude, Google Gemini, and OpenAI.
Introduction
Over the past year, I've built several Gen AI applications at Quantiphi, including a medical affairs insights platform and an audit report generation system. One critical lesson: never depend on a single LLM provider.
In this post, I'll share practical patterns for integrating multiple LLM providers, handling failures gracefully, and optimizing for cost and performance.
Why Multiple Providers?
- Reliability: One provider's downtime doesn't break your app
- Cost optimization: Route requests to the most cost-effective provider
- Model specialization: Use the best model for each task
- Rate limits: Distribute load across providers
- Geographic availability: Some providers work better in certain regions
Architecture Pattern
Here's the abstraction layer I use:
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, AsyncIterator
class LLMProvider(Enum):
ANTHROPIC = "anthropic"
GEMINI = "gemini"
OPENAI = "openai"
class LLMResponse:
def __init__(self, content: str, model: str, provider: LLMProvider,
tokens_used: int, cost: float):
self.content = content
self.model = model
self.provider = provider
self.tokens_used = tokens_used
self.cost = cost
class BaseLLMClient(ABC):
@abstractmethod
async def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 1000
) -> LLMResponse:
pass
@abstractmethod
async def stream(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 1000
) -> AsyncIterator[str]:
passImplementing Providers
Anthropic Claude
import anthropic
from anthropic import AsyncAnthropic
class AnthropicClient(BaseLLMClient):
def __init__(self, api_key: str):
self.client = AsyncAnthropic(api_key=api_key)
self.model = "claude-3-5-sonnet-20241022"
async def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 1000
) -> LLMResponse:
try:
message = await self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
system=system_prompt or "",
messages=[{"role": "user", "content": prompt}]
)
content = message.content[0].text
tokens = message.usage.input_tokens + message.usage.output_tokens
# Claude pricing: ~$3/1M input, ~$15/1M output tokens
cost = (message.usage.input_tokens * 0.000003 +
message.usage.output_tokens * 0.000015)
return LLMResponse(
content=content,
model=self.model,
provider=LLMProvider.ANTHROPIC,
tokens_used=tokens,
cost=cost
)
except anthropic.APIError as e:
raise LLMProviderError(f"Anthropic API error: {str(e)}")
async def stream(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 1000
) -> AsyncIterator[str]:
async with self.client.messages.stream(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
system=system_prompt or "",
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
yield textGoogle Gemini
import google.generativeai as genai
class GeminiClient(BaseLLMClient):
def __init__(self, api_key: str):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
async def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 1000
) -> LLMResponse:
try:
full_prompt = f"{system_prompt}\n\n{prompt}" if system_prompt else prompt
response = await self.model.generate_content_async(
full_prompt,
generation_config=genai.GenerationConfig(
temperature=temperature,
max_output_tokens=max_tokens
)
)
# Gemini pricing: ~$0.50/1M input, ~$1.50/1M output tokens
tokens = response.usage_metadata.total_token_count
cost = tokens * 0.000001 # Simplified calculation
return LLMResponse(
content=response.text,
model="gemini-pro",
provider=LLMProvider.GEMINI,
tokens_used=tokens,
cost=cost
)
except Exception as e:
raise LLMProviderError(f"Gemini API error: {str(e)}")The Router: Smart Provider Selection
The router decides which provider to use based on:
- Provider availability
- Cost constraints
- Task type (some models are better at certain tasks)
- Rate limits
from typing import List
import asyncio
from circuitbreaker import circuit
class LLMRouter:
def __init__(self, clients: List[BaseLLMClient]):
self.clients = {
LLMProvider.ANTHROPIC: clients[0],
LLMProvider.GEMINI: clients[1],
LLMProvider.OPENAI: clients[2]
}
self.provider_status = {
provider: {"available": True, "failures": 0}
for provider in LLMProvider
}
def _select_provider(
self,
task_type: str = "general",
prefer_cost: bool = False
) -> LLMProvider:
"""Select best provider based on task and constraints"""
# Task-specific routing
if task_type == "code_generation":
# Claude is excellent at code
if self.provider_status[LLMProvider.ANTHROPIC]["available"]:
return LLMProvider.ANTHROPIC
if task_type == "long_context":
# Gemini has 1M token context window
if self.provider_status[LLMProvider.GEMINI]["available"]:
return LLMProvider.GEMINI
# Cost-based routing
if prefer_cost:
# Gemini is typically cheapest
if self.provider_status[LLMProvider.GEMINI]["available"]:
return LLMProvider.GEMINI
# Default: Use most reliable provider
for provider in [LLMProvider.ANTHROPIC, LLMProvider.GEMINI, LLMProvider.OPENAI]:
if self.provider_status[provider]["available"]:
return provider
raise Exception("No LLM providers available")
@circuit(failure_threshold=3, recovery_timeout=60)
async def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
task_type: str = "general",
prefer_cost: bool = False,
fallback: bool = True
) -> LLMResponse:
"""Generate with automatic fallback"""
primary_provider = self._select_provider(task_type, prefer_cost)
providers_to_try = [primary_provider]
# Add fallback providers
if fallback:
providers_to_try.extend([
p for p in LLMProvider if p != primary_provider
and self.provider_status[p]["available"]
])
last_error = None
for provider in providers_to_try:
try:
client = self.clients[provider]
response = await client.generate(
prompt=prompt,
system_prompt=system_prompt
)
# Reset failure count on success
self.provider_status[provider]["failures"] = 0
return response
except Exception as e:
last_error = e
self._handle_provider_failure(provider, e)
continue
raise Exception(f"All providers failed. Last error: {last_error}")
def _handle_provider_failure(self, provider: LLMProvider, error: Exception):
"""Handle provider failure and update status"""
self.provider_status[provider]["failures"] += 1
# Mark as unavailable after 3 failures
if self.provider_status[provider]["failures"] >= 3:
self.provider_status[provider]["available"] = False
# Schedule recovery check after 2 minutes
asyncio.create_task(self._schedule_recovery(provider, 120))
async def _schedule_recovery(self, provider: LLMProvider, delay: int):
"""Re-enable provider after cooldown period"""
await asyncio.sleep(delay)
self.provider_status[provider]["available"] = True
self.provider_status[provider]["failures"] = 0Streaming Responses
For real-time user interfaces (like ChatGPT-style interfaces):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
try:
async for chunk in router.clients[LLMProvider.ANTHROPIC].stream(
prompt=request.message,
system_prompt=request.system_prompt
):
# Server-Sent Events format
yield f"data: {json.dumps({'content': chunk})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)Cost Optimization Strategies
1. Caching
Cache responses for identical prompts:
from redis.asyncio import Redis
import hashlib
class CachedLLMRouter(LLMRouter):
def __init__(self, clients: List[BaseLLMClient], redis: Redis):
super().__init__(clients)
self.redis = redis
async def generate(self, prompt: str, **kwargs) -> LLMResponse:
# Generate cache key
cache_key = hashlib.sha256(
f"{prompt}{kwargs.get('system_prompt', '')}".encode()
).hexdigest()
# Check cache
cached = await self.redis.get(f"llm_cache:{cache_key}")
if cached:
return LLMResponse(**json.loads(cached))
# Generate
response = await super().generate(prompt, **kwargs)
# Cache for 24 hours
await self.redis.setex(
f"llm_cache:{cache_key}",
86400,
json.dumps(response.__dict__)
)
return response2. Prompt Optimization
Reduce token usage:
def optimize_prompt(long_prompt: str, max_tokens: int = 500) -> str:
"""Compress prompts while maintaining meaning"""
# Remove extra whitespace
optimized = " ".join(long_prompt.split())
# Use abbreviations for common terms
replacements = {
"for example": "e.g.",
"that is": "i.e.",
"and so on": "etc."
}
for old, new in replacements.items():
optimized = optimized.replace(old, new)
return optimizedMonitoring & Analytics
Track usage and costs:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class LLMMetrics:
timestamp: datetime
provider: LLMProvider
model: str
tokens_used: int
cost: float
latency_ms: float
success: bool
async def log_metrics(response: LLMResponse, latency: float):
metrics = LLMMetrics(
timestamp=datetime.utcnow(),
provider=response.provider,
model=response.model,
tokens_used=response.tokens_used,
cost=response.cost,
latency_ms=latency,
success=True
)
# Store in BigQuery for analysis
await bigquery_client.insert_row("llm_metrics", metrics)Production Checklist
- Rate limiting: Implement per-provider rate limits
- Timeouts: Set appropriate timeouts (30-60s)
- Retry logic: Exponential backoff for transient failures
- Circuit breakers: Prevent cascading failures
- Cost alerts: Alert when spending exceeds thresholds
- Prompt injection protection: Sanitize user inputs
- Content filtering: Implement safety checks
- Logging: Comprehensive request/response logging
- Monitoring: Track latency, errors, and costs
Lessons Learned
- Start with one provider: Add more as you scale
- Cache aggressively: 30-40% of our requests hit cache
- Monitor costs: LLM costs can spiral quickly
- Use streaming: Better UX and perceived performance
- Provider-specific tuning: Each model has quirks
- Graceful degradation: Always have fallbacks
Conclusion
Integrating multiple LLM providers adds complexity but provides crucial resilience and flexibility for production applications. Start simple, add providers as needed, and invest in good abstractions from day one.
The pattern I've shared here powers multiple production applications serving thousands of users daily with 99.9% uptime.
Resources
Building AI applications? Let's connect on LinkedIn.