Vapora/docs/architecture/llm-provider-patterns.md

902 lines
25 KiB
Markdown
Raw Normal View History

# LLM Provider Integration Patterns
**Version**: 1.0
**Status**: Guide (Reference Architecture)
**Last Updated**: 2026-02-10
---
## Overview
Four implementation patterns for integrating LLM providers (Claude, OpenAI, Gemini, Ollama) without requiring active API subscriptions during development.
| Pattern | Dev Cost | Test Cost | When to Use |
|---------|----------|-----------|------------|
| **1. Mocks** | $0 | $0 | Unit/integration tests without real calls |
| **2. SDK Direct** | $varies | $0 (mocked) | Full SDK integration, actual API calls in staging |
| **3. Add Provider** | $0 | $0 | Extending router with new provider |
| **4. End-to-End** | $0-$$ | $varies | Complete flow from request → response |
---
## Pattern 1: Mocks for Development & Testing
**Use case**: Develop without API keys. All tests pass without real provider calls.
### Structure
```rust
// crates/vapora-llm-router/src/mocks.rs
pub struct MockLLMClient {
name: String,
responses: Vec<String>,
call_count: Arc<AtomicUsize>,
}
impl MockLLMClient {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
responses: vec![
"Mock response for architecture".into(),
"Mock response for code review".into(),
"Mock response for documentation".into(),
],
call_count: Arc::new(AtomicUsize::new(0)),
}
}
pub fn with_responses(mut self, responses: Vec<String>) -> Self {
self.responses = responses;
self
}
pub fn call_count(&self) -> usize {
self.call_count.load(std::sync::atomic::Ordering::SeqCst)
}
}
#[async_trait::async_trait]
impl LLMClient for MockLLMClient {
async fn complete(&self, prompt: &str) -> Result<String> {
let idx = self.call_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let response = self.responses.get(idx % self.responses.len())
.cloned()
.unwrap_or_else(|| format!("Mock response #{}", idx));
info!("MockLLMClient '{}' responded with: {}", self.name, response);
Ok(response)
}
async fn stream(&self, _prompt: &str) -> Result<BoxStream<String>> {
let response = "Mock streaming response".to_string();
let stream = futures::stream::once(async move { response });
Ok(Box::pin(stream))
}
fn cost_per_1k_tokens(&self) -> f64 {
0.0 // Free in mock
}
fn latency_ms(&self) -> u32 {
1 // Instant in mock
}
fn available(&self) -> bool {
true
}
}
```
### Usage in Tests
```rust
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_routing_with_mock() {
let mock_claude = MockLLMClient::new("claude");
let mock_openai = MockLLMClient::new("openai");
let mut router = LLMRouter::new();
router.register_provider("claude", Box::new(mock_claude));
router.register_provider("openai", Box::new(mock_openai));
// Route task without API calls
let result = router.route(
TaskContext {
task_type: TaskType::CodeGeneration,
domain: "backend".into(),
complexity: Complexity::High,
quality_requirement: Quality::High,
latency_required_ms: 5000,
budget_cents: None,
},
None,
).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_fallback_chain_with_mocks() {
// First provider fails, second succeeds
let mock_failed = MockLLMClient::new("failed");
let mock_success = MockLLMClient::new("success");
// Use with actual routing logic
let response = router.route_with_fallback(
vec!["failed", "success"],
"test prompt",
).await;
assert!(response.is_ok());
assert!(response.unwrap().contains("success"));
}
#[tokio::test]
async fn test_cost_tracking_with_mocks() {
let router = LLMRouter::with_mocks();
// Simulate 100 tasks
for i in 0..100 {
router.route_task(Task {
id: format!("task-{}", i),
task_type: TaskType::CodeGeneration,
..Default::default()
}).await.ok();
}
// Verify cost tracking (should be $0 for mocks)
assert_eq!(router.cost_tracker.total_cost_cents, 0);
}
}
```
### Cost Management with Mocks
```rust
pub struct MockCostTracker {
simulated_cost: AtomicU32, // Cents
}
impl MockCostTracker {
pub fn new() -> Self {
Self {
simulated_cost: AtomicU32::new(0),
}
}
/// Simulate cost without actual API call
pub async fn record_simulated_call(&self, provider: &str, tokens: u32) {
let cost_per_token = match provider {
"claude" => 0.000003, // $3 per 1M tokens
"openai" => 0.000001, // $1 per 1M tokens
"gemini" => 0.0000005, // $0.50 per 1M tokens
"ollama" => 0.0, // Free
_ => 0.0,
};
let total_cost = (tokens as f64 * cost_per_token * 100.0) as u32; // cents
self.simulated_cost.fetch_add(total_cost, Ordering::SeqCst);
}
}
```
### Benefits
✅ Zero API costs
✅ Instant responses (no network delay)
✅ Deterministic output (for testing)
✅ No authentication needed
✅ Full test coverage without subscriptions
### Limitations
❌ No real model behavior
❌ Responses are hardcoded
❌ Can't test actual provider failures
❌ Not suitable for production validation
---
## Pattern 2: SDK Direct Integration
**Use case**: Full integration with official SDKs. Cost tracking and real API calls in staging/production.
### Abstraction Layer
```rust
// crates/vapora-llm-router/src/providers.rs
use anthropic::Anthropic; // Official SDK
use openai_api::OpenAI; // Official SDK
pub trait LLMClient: Send + Sync {
async fn complete(&self, prompt: &str) -> Result<String>;
async fn stream(&self, prompt: &str) -> Result<BoxStream<String>>;
fn cost_per_1k_tokens(&self) -> f64;
fn available(&self) -> bool;
}
/// Claude SDK Implementation
pub struct ClaudeClient {
client: Anthropic,
model: String,
max_tokens: usize,
}
impl ClaudeClient {
pub fn new(api_key: &str, model: &str) -> Self {
Self {
client: Anthropic::new(api_key.into()),
model: model.to_string(),
max_tokens: 4096,
}
}
}
#[async_trait::async_trait]
impl LLMClient for ClaudeClient {
async fn complete(&self, prompt: &str) -> Result<String> {
let message = self.client
.messages()
.create(CreateMessageRequest {
model: self.model.clone(),
max_tokens: self.max_tokens,
messages: vec![
MessageParam::User(
ContentBlockParam::Text(TextBlockParam {
text: prompt.into(),
})
),
],
system: None,
tools: None,
..Default::default()
})
.await
.map_err(|e| anyhow!("Claude API error: {}", e))?;
extract_text_from_response(&message)
}
async fn stream(&self, prompt: &str) -> Result<BoxStream<String>> {
let mut stream = self.client
.messages()
.stream(CreateMessageRequest {
model: self.model.clone(),
max_tokens: self.max_tokens,
messages: vec![
MessageParam::User(ContentBlockParam::Text(
TextBlockParam { text: prompt.into() }
)),
],
..Default::default()
})
.await
.map_err(|e| anyhow!("Claude streaming error: {}", e))?;
let (tx, rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
while let Some(event) = stream.next().await {
match event {
Ok(evt) => {
if let Some(text) = extract_delta(&evt) {
let _ = tx.send(text).await;
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
});
Ok(Box::pin(ReceiverStream::new(rx).map(|s| s)))
}
fn cost_per_1k_tokens(&self) -> f64 {
// Claude Opus: $3/1M input, $15/1M output
0.015 // Average
}
fn available(&self) -> bool {
!self.client.api_key().is_empty()
}
}
/// OpenAI SDK Implementation
pub struct OpenAIClient {
client: OpenAI,
model: String,
}
impl OpenAIClient {
pub fn new(api_key: &str, model: &str) -> Self {
Self {
client: OpenAI::new(api_key.into()),
model: model.to_string(),
}
}
}
#[async_trait::async_trait]
impl LLMClient for OpenAIClient {
async fn complete(&self, prompt: &str) -> Result<String> {
let response = self.client
.chat_completions()
.create(CreateChatCompletionRequest {
model: self.model.clone(),
messages: vec![
ChatCompletionRequestMessage::User(
ChatCompletionRequestUserMessage {
content: ChatCompletionContentPart::Text(
ChatCompletionContentPartText {
text: prompt.into(),
}
),
name: None,
}
),
],
temperature: Some(0.7),
max_tokens: Some(2048),
..Default::default()
})
.await
.map_err(|e| anyhow!("OpenAI API error: {}", e))?;
Ok(response.choices[0].message.content.clone())
}
async fn stream(&self, prompt: &str) -> Result<BoxStream<String>> {
// Similar to Claude streaming
todo!("Implement OpenAI streaming")
}
fn cost_per_1k_tokens(&self) -> f64 {
// GPT-4: $10/1M input, $30/1M output
0.030 // Average
}
fn available(&self) -> bool {
!self.client.api_key().is_empty()
}
}
```
### Conditional Compilation
```rust
// Cargo.toml
[features]
default = ["mock-providers"]
real-providers = ["anthropic", "openai-api", "google-generativeai"]
development = ["mock-providers"]
production = ["real-providers"]
// src/lib.rs
#[cfg(feature = "mock-providers")]
mod mocks;
#[cfg(feature = "real-providers")]
mod claude_client;
#[cfg(feature = "real-providers")]
mod openai_client;
pub fn create_provider(name: &str) -> Box<dyn LLMClient> {
#[cfg(feature = "real-providers")]
{
match name {
"claude" => Box::new(ClaudeClient::new(
&env::var("ANTHROPIC_API_KEY").unwrap_or_default(),
"claude-opus-4",
)),
"openai" => Box::new(OpenAIClient::new(
&env::var("OPENAI_API_KEY").unwrap_or_default(),
"gpt-4",
)),
_ => Box::new(MockLLMClient::new(name)),
}
}
#[cfg(not(feature = "real-providers"))]
{
Box::new(MockLLMClient::new(name))
}
}
```
### Cost Management
```rust
pub struct SDKCostTracker {
provider_costs: DashMap<String, CostMetric>,
}
pub struct CostMetric {
total_tokens: u64,
total_cost_cents: u32,
call_count: u32,
last_call: DateTime<Utc>,
}
impl SDKCostTracker {
pub async fn track_call(
&self,
provider: &str,
input_tokens: u32,
output_tokens: u32,
cost: f64,
) {
let cost_cents = (cost * 100.0) as u32;
let total_tokens = (input_tokens + output_tokens) as u64;
self.provider_costs
.entry(provider.to_string())
.or_insert_with(|| CostMetric {
total_tokens: 0,
total_cost_cents: 0,
call_count: 0,
last_call: Utc::now(),
})
.alter(|_, mut metric| {
metric.total_tokens += total_tokens;
metric.total_cost_cents += cost_cents;
metric.call_count += 1;
metric.last_call = Utc::now();
metric
});
}
pub fn cost_summary(&self) -> CostSummary {
let mut total_cost = 0u32;
let mut providers = Vec::new();
for entry in self.provider_costs.iter() {
let metric = entry.value();
total_cost += metric.total_cost_cents;
providers.push((
entry.key().clone(),
metric.total_cost_cents,
metric.call_count,
));
}
CostSummary {
total_cost_cents: total_cost,
total_cost_dollars: total_cost as f64 / 100.0,
providers,
}
}
}
```
### Benefits
✅ Real SDK behavior
✅ Actual streaming support
✅ Token counting from real API
✅ Accurate cost calculation
✅ Production-ready
### Limitations
❌ Requires active API key subscriptions
❌ Real API calls consume quota/credits
❌ Network latency in tests
❌ More complex error handling
---
## Pattern 3: Adding a New Provider
**Use case**: Integrate a new LLM provider (e.g., Anthropic's new model, custom API).
### Step-by-Step
```rust
// Step 1: Define provider struct
pub struct CustomLLMClient {
endpoint: String,
api_key: String,
model: String,
}
impl CustomLLMClient {
pub fn new(endpoint: &str, api_key: &str, model: &str) -> Self {
Self {
endpoint: endpoint.to_string(),
api_key: api_key.to_string(),
model: model.to_string(),
}
}
}
// Step 2: Implement LLMClient trait
#[async_trait::async_trait]
impl LLMClient for CustomLLMClient {
async fn complete(&self, prompt: &str) -> Result<String> {
let client = reqwest::Client::new();
let response = client
.post(&format!("{}/v1/complete", self.endpoint))
.header("Authorization", format!("Bearer {}", self.api_key))
.json(&json!({
"model": self.model,
"prompt": prompt,
}))
.send()
.await?;
if response.status().is_success() {
let data: serde_json::Value = response.json().await?;
Ok(data["result"].as_str().unwrap_or("").to_string())
} else {
Err(anyhow!("API error: {}", response.status()))
}
}
async fn stream(&self, prompt: &str) -> Result<BoxStream<String>> {
// Implement streaming with reqwest::Client stream
todo!("Implement streaming for custom provider")
}
fn cost_per_1k_tokens(&self) -> f64 {
0.01 // Define custom pricing
}
fn available(&self) -> bool {
!self.api_key.is_empty()
}
}
// Step 3: Register in router factory
pub fn create_provider(name: &str) -> Result<Box<dyn LLMClient>> {
match name {
"claude" => Ok(Box::new(ClaudeClient::new(
&env::var("ANTHROPIC_API_KEY")?,
"claude-opus-4",
))),
"openai" => Ok(Box::new(OpenAIClient::new(
&env::var("OPENAI_API_KEY")?,
"gpt-4",
))),
"custom" => Ok(Box::new(CustomLLMClient::new(
&env::var("CUSTOM_ENDPOINT")?,
&env::var("CUSTOM_API_KEY")?,
&env::var("CUSTOM_MODEL")?,
))),
_ => Err(anyhow!("Unknown provider: {}", name)),
}
}
// Step 4: Add configuration
#[derive(Deserialize)]
pub struct ProviderConfig {
pub name: String,
pub endpoint: Option<String>,
pub api_key_env: String,
pub model: String,
pub cost_per_1k_tokens: f64,
pub timeout_ms: u32,
}
impl ProviderConfig {
pub fn to_client(&self) -> Result<Box<dyn LLMClient>> {
match self.name.as_str() {
"custom" => Ok(Box::new(CustomLLMClient::new(
self.endpoint.as_deref().unwrap_or("http://localhost:8000"),
&env::var(&self.api_key_env)?,
&self.model,
))),
_ => Err(anyhow!("Unsupported provider type")),
}
}
}
// Step 5: Update router
pub struct LLMRouter {
providers: DashMap<String, Box<dyn LLMClient>>,
config: Vec<ProviderConfig>,
}
impl LLMRouter {
pub async fn load_from_config(config: &[ProviderConfig]) -> Result<Self> {
let mut providers = DashMap::new();
for provider_config in config {
let client = provider_config.to_client()?;
providers.insert(provider_config.name.clone(), client);
}
Ok(Self {
providers,
config: config.to_vec(),
})
}
pub fn register_provider(
&self,
name: &str,
client: Box<dyn LLMClient>,
) {
self.providers.insert(name.to_string(), client);
}
}
```
### Configuration Example
```toml
# llm-providers.toml
[[providers]]
name = "claude"
api_key_env = "ANTHROPIC_API_KEY"
model = "claude-opus-4"
cost_per_1k_tokens = 0.015
timeout_ms = 30000
[[providers]]
name = "openai"
api_key_env = "OPENAI_API_KEY"
model = "gpt-4"
cost_per_1k_tokens = 0.030
timeout_ms = 30000
[[providers]]
name = "custom"
endpoint = "https://api.custom-provider.com"
api_key_env = "CUSTOM_API_KEY"
model = "custom-model-v1"
cost_per_1k_tokens = 0.005
timeout_ms = 20000
```
### Benefits
✅ Extensible design
✅ Easy to add new providers
✅ Configuration-driven
✅ No code duplication
### Limitations
❌ Requires understanding trait implementation
❌ Error handling varies per provider
❌ Testing multiple providers is complex
---
## Pattern 4: End-to-End Flow
**Use case**: Complete request → router → provider → response cycle with cost management and fallback.
### Full Implementation
```rust
// User initiates request
pub struct TaskRequest {
pub task_id: String,
pub task_type: TaskType,
pub prompt: String,
pub quality_requirement: Quality,
pub max_cost_cents: Option<u32>,
}
// Router orchestrates end-to-end
pub struct LLMRouterOrchestrator {
router: LLMRouter,
cost_tracker: SDKCostTracker,
metrics: Metrics,
}
impl LLMRouterOrchestrator {
pub async fn execute(&self, request: TaskRequest) -> Result<TaskResponse> {
info!("Starting task: {}", request.task_id);
// 1. Select provider
let provider_name = self.select_provider(&request).await?;
let provider = self.router.get_provider(&provider_name)?;
info!("Selected provider: {} for task {}", provider_name, request.task_id);
// 2. Check budget
if let Some(max_cost) = request.max_cost_cents {
let estimated_cost = provider.cost_per_1k_tokens() * 10.0; // 10k tokens estimate
if estimated_cost as u32 > max_cost {
warn!("Budget exceeded. Cost: {} > limit: {}", estimated_cost, max_cost);
return Err(anyhow!("Budget exceeded"));
}
}
// 3. Execute with timeout
let timeout = Duration::from_secs(30);
let result = tokio::time::timeout(
timeout,
provider.complete(&request.prompt),
)
.await??;
info!("Task {} completed successfully", request.task_id);
// 4. Track cost
let estimated_tokens = (request.prompt.len() / 4) as u32; // Rough estimate
self.cost_tracker.track_call(
&provider_name,
estimated_tokens,
(result.len() / 4) as u32,
provider.cost_per_1k_tokens() * 10.0,
).await;
// 5. Record metrics
self.metrics.record_task_completion(
&request.task_type,
&provider_name,
Duration::from_secs(1),
);
Ok(TaskResponse {
task_id: request.task_id,
result,
provider: provider_name,
cost_cents: Some((provider.cost_per_1k_tokens() * 10.0) as u32),
})
}
async fn select_provider(&self, request: &TaskRequest) -> Result<String> {
let context = TaskContext {
task_type: request.task_type.clone(),
quality_requirement: request.quality_requirement.clone(),
budget_cents: request.max_cost_cents,
..Default::default()
};
let provider_name = self.router.route(context, None).await?;
Ok(provider_name)
}
}
// Fallback chain handling
pub struct FallbackExecutor {
router: LLMRouter,
cost_tracker: SDKCostTracker,
metrics: Metrics,
}
impl FallbackExecutor {
pub async fn execute_with_fallback(
&self,
request: TaskRequest,
fallback_chain: Vec<String>,
) -> Result<TaskResponse> {
let mut last_error = None;
for provider_name in fallback_chain {
match self.try_provider(&request, &provider_name).await {
Ok(response) => {
info!("Success with provider: {}", provider_name);
return Ok(response);
}
Err(e) => {
warn!(
"Provider {} failed: {:?}, trying next",
provider_name, e
);
self.metrics.record_provider_failure(&provider_name);
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or_else(|| anyhow!("All providers failed")))
}
async fn try_provider(
&self,
request: &TaskRequest,
provider_name: &str,
) -> Result<TaskResponse> {
let provider = self.router.get_provider(provider_name)?;
let timeout = Duration::from_secs(30);
let result = tokio::time::timeout(
timeout,
provider.complete(&request.prompt),
)
.await??;
self.cost_tracker.track_call(
provider_name,
(request.prompt.len() / 4) as u32,
(result.len() / 4) as u32,
provider.cost_per_1k_tokens() * 10.0,
).await;
Ok(TaskResponse {
task_id: request.task_id.clone(),
result,
provider: provider_name.to_string(),
cost_cents: Some((provider.cost_per_1k_tokens() * 10.0) as u32),
})
}
}
```
### Cost Management Integration
```rust
pub struct CostManagementPolicy {
pub daily_limit_cents: u32,
pub monthly_limit_cents: u32,
pub per_task_limit_cents: u32,
pub warn_threshold_percent: f64,
}
impl CostManagementPolicy {
pub fn check_budget(
&self,
current_spend_cents: u32,
new_call_cost_cents: u32,
) -> Result<()> {
let total = current_spend_cents.saturating_add(new_call_cost_cents);
if total > self.daily_limit_cents {
return Err(anyhow!(
"Daily budget exceeded: {} + {} > {}",
current_spend_cents,
new_call_cost_cents,
self.daily_limit_cents
));
}
let percent_used = (total as f64 / self.daily_limit_cents as f64) * 100.0;
if percent_used > self.warn_threshold_percent {
warn!(
"Budget warning: {:.1}% of daily limit used",
percent_used
);
}
Ok(())
}
}
```
### Benefits
✅ Complete request lifecycle
✅ Integrated cost tracking
✅ Fallback chain support
✅ Metrics collection
✅ Budget enforcement
✅ Timeout handling
### Limitations
❌ Complex orchestration logic
❌ Hard to test all edge cases
❌ Requires multiple components
---
## Summary & Recommendations
| Pattern | Dev | Test | Prod | Recommend When |
|---------|-----|------|------|-----------------|
| **Mocks** | ⭐⭐⭐ | ⭐⭐⭐ | ❌ | Building features without costs |
| **SDK Direct** | ⭐⭐ | ⭐⭐ | ⭐⭐⭐ | Full integration, staging/prod |
| **Add Provider** | ⭐ | ⭐ | ⭐⭐ | Supporting new provider types |
| **End-to-End** | ⭐ | ⭐⭐ | ⭐⭐⭐ | Production orchestration |
### Development Workflow
```
Local Dev CI/Tests Staging Production
┌─────────────┐ ┌──────────────┐ ┌────────┐ ┌─────────────┐
│ Mocks │ │ Mocks + SDK │ │ SDK │ │ SDK + Real │
│ Zero cost │ │ (Simulated) │ │ Real │ │ Fallback │
│ No keys │ │ Tests only │ │ Budget │ │ Monitoring │
└─────────────┘ └──────────────┘ └────────┘ └─────────────┘
```
See [llm-provider-implementation-guide.md](llm-provider-implementation.md) for actual VAPORA implementation and code examples.