222 lines
8.2 KiB
YAML
Raw Permalink Normal View History

feat: Phase 5.3 - Multi-Agent Learning Infrastructure Implement intelligent agent learning from Knowledge Graph execution history with per-task-type expertise tracking, recency bias, and learning curves. ## Phase 5.3 Implementation ### Learning Infrastructure (✅ Complete) - LearningProfileService with per-task-type expertise metrics - TaskTypeExpertise model tracking success_rate, confidence, learning curves - Recency bias weighting: recent 7 days weighted 3x higher (exponential decay) - Confidence scoring prevents overfitting: min(1.0, executions / 20) - Learning curves computed from daily execution windows ### Agent Scoring Service (✅ Complete) - Unified AgentScore combining SwarmCoordinator + learning profiles - Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence - Rank agents by combined score for intelligent assignment - Support for recency-biased scoring (recent_success_rate) - Methods: rank_agents, select_best, rank_agents_with_recency ### KG Integration (✅ Complete) - KGPersistence::get_executions_for_task_type() - query by agent + task type - KGPersistence::get_agent_executions() - all executions for agent - Coordinator::load_learning_profile_from_kg() - core KG→Learning integration - Coordinator::load_all_learning_profiles() - batch load for multiple agents - Convert PersistedExecution → ExecutionData for learning calculations ### Agent Assignment Integration (✅ Complete) - AgentCoordinator uses learning profiles for task assignment - extract_task_type() infers task type from title/description - assign_task() scores candidates using AgentScoringService - Fallback to load-based selection if no learning data available - Learning profiles stored in coordinator.learning_profiles RwLock ### Profile Adapter Enhancements (✅ Complete) - create_learning_profile() - initialize empty profiles - add_task_type_expertise() - set task-type expertise - update_profile_with_learning() - update swarm profiles from learning ## Files Modified ### vapora-knowledge-graph/src/persistence.rs (+30 lines) - get_executions_for_task_type(agent_id, task_type, limit) - get_agent_executions(agent_id, limit) ### vapora-agents/src/coordinator.rs (+100 lines) - load_learning_profile_from_kg() - core KG integration method - load_all_learning_profiles() - batch loading for agents - assign_task() already uses learning-based scoring via AgentScoringService ### Existing Complete Implementation - vapora-knowledge-graph/src/learning.rs - calculation functions - vapora-agents/src/learning_profile.rs - data structures and expertise - vapora-agents/src/scoring.rs - unified scoring service - vapora-agents/src/profile_adapter.rs - adapter methods ## Tests Passing - learning_profile: 7 tests ✅ - scoring: 5 tests ✅ - profile_adapter: 6 tests ✅ - coordinator: learning-specific tests ✅ ## Data Flow 1. Task arrives → AgentCoordinator::assign_task() 2. Extract task_type from description 3. Query KG for task-type executions (load_learning_profile_from_kg) 4. Calculate expertise with recency bias 5. Score candidates (SwarmCoordinator + learning) 6. Assign to top-scored agent 7. Execution result → KG → Update learning profiles ## Key Design Decisions ✅ Recency bias: 7-day half-life with 3x weight for recent performance ✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting ✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence ✅ KG query limit: 100 recent executions per task-type for performance ✅ Async loading: load_learning_profile_from_kg supports concurrent loads ## Next: Phase 5.4 - Cost Optimization Ready to implement budget enforcement and cost-aware provider selection.
2026-01-11 13:03:53 +00:00
apiVersion: provisioning.vapora.io/v1
kind: Workflow
metadata:
name: scale-agents
description: Dynamically scale agent pools based on queue depth and workload
spec:
version: "0.2.0"
namespace: vapora-system
timeout: 600s # 10 minutes max
schedule: "*/5 * * * *" # Run every 5 minutes
inputs:
- name: target_agent_role
type: string
required: false
description: "Specific agent role to scale (architect, developer, reviewer, etc.)"
- name: queue_depth_threshold
type: integer
required: false
default: 50
description: "Queue depth threshold before scaling up"
- name: cpu_threshold
type: float
required: false
default: 0.75
description: "CPU utilization threshold (0-1)"
- name: memory_threshold
type: float
required: false
default: 0.80
description: "Memory utilization threshold (0-1)"
phases:
# Phase 1: Collect metrics
- name: "Collect Metrics"
description: "Gather queue depth, CPU, and memory metrics"
retryable: true
steps:
- name: "Get queue depth per agent role"
command: |
provisioning metrics query --metric "agent_queue_depth" \
--group-by "agent_role" \
--output json > /tmp/queue_depth.json
timeout: 30s
- name: "Get CPU utilization"
command: |
provisioning metrics query --metric "container_cpu_usage_seconds_total" \
--selector "pod=~vapora-agents.*" \
--output json > /tmp/cpu_usage.json
timeout: 30s
- name: "Get memory utilization"
command: |
provisioning metrics query --metric "container_memory_working_set_bytes" \
--selector "pod=~vapora-agents.*" \
--output json > /tmp/memory_usage.json
timeout: 30s
# Phase 2: Analyze scaling requirements
- name: "Analyze Scaling Needs"
description: "Determine which agents need scaling up or down"
retryable: true
steps:
- name: "Calculate scale requirements"
command: |
python3 <<'EOF'
import json
import os
with open('/tmp/queue_depth.json', 'r') as f:
queue_data = json.load(f)
with open('/tmp/cpu_usage.json', 'r') as f:
cpu_data = json.load(f)
scaling_decisions = {}
# Define queue depth thresholds per role
role_thresholds = {
"architect": {"scale_up": 10, "scale_down": 3},
"developer": {"scale_up": 100, "scale_down": 30},
"reviewer": {"scale_up": 50, "scale_down": 15},
"tester": {"scale_up": 50, "scale_down": 15},
"monitor": {"scale_up": 20, "scale_down": 5},
"devops": {"scale_up": 30, "scale_down": 10}
}
for role, metrics in queue_data.items():
thresholds = role_thresholds.get(role, {"scale_up": 50, "scale_down": 15})
current_queue = metrics.get("queue_depth", 0)
current_replicas = metrics.get("replicas", 1)
if current_queue > thresholds["scale_up"]:
# Scale up
desired_replicas = min(int(current_replicas * 1.5) + 1, 20) # Max 20
scaling_decisions[role] = {
"action": "scale_up",
"current": current_replicas,
"desired": desired_replicas,
"reason": f"Queue depth {current_queue} > {thresholds['scale_up']}"
}
elif current_queue < thresholds["scale_down"] and current_replicas > 2:
# Scale down
desired_replicas = max(int(current_replicas * 0.7), 2) # Min 2
scaling_decisions[role] = {
"action": "scale_down",
"current": current_replicas,
"desired": desired_replicas,
"reason": f"Queue depth {current_queue} < {thresholds['scale_down']}"
}
with open('/tmp/scaling_decisions.json', 'w') as f:
json.dump(scaling_decisions, f, indent=2)
print(json.dumps(scaling_decisions, indent=2))
EOF
timeout: 60s
dependencies: ["Collect Metrics"]
# Phase 3: Scale agents based on decisions
- name: "Execute Scaling"
description: "Apply scaling decisions to agent pools"
retryable: true
parallel: true
steps:
- name: "Scale developer agents"
command: |
DECISION=$(grep -E '"developer":|"desired":' /tmp/scaling_decisions.json | grep -A1 'developer')
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
provisioning taskserv scale vapora-agents --agent developer --replicas $REPLICAS
fi
timeout: 120s
dependencies: ["Calculate scale requirements"]
continueOnError: true
- name: "Scale reviewer agents"
command: |
DECISION=$(grep -E '"reviewer":|"desired":' /tmp/scaling_decisions.json | grep -A1 'reviewer')
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
provisioning taskserv scale vapora-agents --agent reviewer --replicas $REPLICAS
fi
timeout: 120s
dependencies: ["Calculate scale requirements"]
continueOnError: true
- name: "Scale tester agents"
command: |
DECISION=$(grep -E '"tester":|"desired":' /tmp/scaling_decisions.json | grep -A1 'tester')
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
provisioning taskserv scale vapora-agents --agent tester --replicas $REPLICAS
fi
timeout: 120s
dependencies: ["Calculate scale requirements"]
continueOnError: true
- name: "Scale devops agents"
command: |
DECISION=$(grep -E '"devops":|"desired":' /tmp/scaling_decisions.json | grep -A1 'devops')
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
provisioning taskserv scale vapora-agents --agent devops --replicas $REPLICAS
fi
timeout: 120s
dependencies: ["Calculate scale requirements"]
continueOnError: true
# Phase 4: Verify scaling
- name: "Verify Scaling"
description: "Confirm scaling operations succeeded"
retryable: false
steps:
- name: "Check agent replicas"
command: |
provisioning taskserv list --selector "app=vapora-agents" \
--output "json" | jq '.items[] | {agent: .metadata.labels.role, replicas: .spec.replicas}'
timeout: 60s
dependencies: ["Execute Scaling"]
- name: "Wait for pods to be ready"
command: |
kubectl wait --for=condition=Ready pod \
-l app=vapora-agents \
-n vapora-system \
--timeout=300s
timeout: 320s
dependencies: ["Execute Scaling"]
- name: "Verify queue depth improvement"
command: |
provisioning metrics query --metric "agent_queue_depth" \
--group-by "agent_role" \
--compare-to /tmp/queue_depth.json
timeout: 30s
dependencies: ["Wait for pods to be ready"]
outputs:
- name: scaling_summary
value: "cat /tmp/scaling_decisions.json"
- name: new_replica_counts
command: "provisioning taskserv list --selector app=vapora-agents -o json | jq '.items[] | {agent: .metadata.labels.role, replicas: .spec.replicas}'"
# Notifications
notifications:
onSuccess:
- "slack: #ops-automation"
- "action: record-metrics"
onFailure:
- "slack: #ops-automation"
- "slack: #alerts"
# Cleanup
cleanup:
- "rm -f /tmp/queue_depth.json"
- "rm -f /tmp/cpu_usage.json"
- "rm -f /tmp/memory_usage.json"
- "rm -f /tmp/scaling_decisions.json"