Implement intelligent agent learning from Knowledge Graph execution history with per-task-type expertise tracking, recency bias, and learning curves. ## Phase 5.3 Implementation ### Learning Infrastructure (✅ Complete) - LearningProfileService with per-task-type expertise metrics - TaskTypeExpertise model tracking success_rate, confidence, learning curves - Recency bias weighting: recent 7 days weighted 3x higher (exponential decay) - Confidence scoring prevents overfitting: min(1.0, executions / 20) - Learning curves computed from daily execution windows ### Agent Scoring Service (✅ Complete) - Unified AgentScore combining SwarmCoordinator + learning profiles - Scoring formula: 0.3*base + 0.5*expertise + 0.2*confidence - Rank agents by combined score for intelligent assignment - Support for recency-biased scoring (recent_success_rate) - Methods: rank_agents, select_best, rank_agents_with_recency ### KG Integration (✅ Complete) - KGPersistence::get_executions_for_task_type() - query by agent + task type - KGPersistence::get_agent_executions() - all executions for agent - Coordinator::load_learning_profile_from_kg() - core KG→Learning integration - Coordinator::load_all_learning_profiles() - batch load for multiple agents - Convert PersistedExecution → ExecutionData for learning calculations ### Agent Assignment Integration (✅ Complete) - AgentCoordinator uses learning profiles for task assignment - extract_task_type() infers task type from title/description - assign_task() scores candidates using AgentScoringService - Fallback to load-based selection if no learning data available - Learning profiles stored in coordinator.learning_profiles RwLock ### Profile Adapter Enhancements (✅ Complete) - create_learning_profile() - initialize empty profiles - add_task_type_expertise() - set task-type expertise - update_profile_with_learning() - update swarm profiles from learning ## Files Modified ### vapora-knowledge-graph/src/persistence.rs (+30 lines) - get_executions_for_task_type(agent_id, task_type, limit) - get_agent_executions(agent_id, limit) ### vapora-agents/src/coordinator.rs (+100 lines) - load_learning_profile_from_kg() - core KG integration method - load_all_learning_profiles() - batch loading for agents - assign_task() already uses learning-based scoring via AgentScoringService ### Existing Complete Implementation - vapora-knowledge-graph/src/learning.rs - calculation functions - vapora-agents/src/learning_profile.rs - data structures and expertise - vapora-agents/src/scoring.rs - unified scoring service - vapora-agents/src/profile_adapter.rs - adapter methods ## Tests Passing - learning_profile: 7 tests ✅ - scoring: 5 tests ✅ - profile_adapter: 6 tests ✅ - coordinator: learning-specific tests ✅ ## Data Flow 1. Task arrives → AgentCoordinator::assign_task() 2. Extract task_type from description 3. Query KG for task-type executions (load_learning_profile_from_kg) 4. Calculate expertise with recency bias 5. Score candidates (SwarmCoordinator + learning) 6. Assign to top-scored agent 7. Execution result → KG → Update learning profiles ## Key Design Decisions ✅ Recency bias: 7-day half-life with 3x weight for recent performance ✅ Confidence scoring: min(1.0, total_executions / 20) prevents overfitting ✅ Hierarchical scoring: 30% base load, 50% expertise, 20% confidence ✅ KG query limit: 100 recent executions per task-type for performance ✅ Async loading: load_learning_profile_from_kg supports concurrent loads ## Next: Phase 5.4 - Cost Optimization Ready to implement budget enforcement and cost-aware provider selection.
222 lines
8.2 KiB
YAML
222 lines
8.2 KiB
YAML
apiVersion: provisioning.vapora.io/v1
|
|
kind: Workflow
|
|
metadata:
|
|
name: scale-agents
|
|
description: Dynamically scale agent pools based on queue depth and workload
|
|
spec:
|
|
version: "0.2.0"
|
|
namespace: vapora-system
|
|
timeout: 600s # 10 minutes max
|
|
schedule: "*/5 * * * *" # Run every 5 minutes
|
|
|
|
inputs:
|
|
- name: target_agent_role
|
|
type: string
|
|
required: false
|
|
description: "Specific agent role to scale (architect, developer, reviewer, etc.)"
|
|
- name: queue_depth_threshold
|
|
type: integer
|
|
required: false
|
|
default: 50
|
|
description: "Queue depth threshold before scaling up"
|
|
- name: cpu_threshold
|
|
type: float
|
|
required: false
|
|
default: 0.75
|
|
description: "CPU utilization threshold (0-1)"
|
|
- name: memory_threshold
|
|
type: float
|
|
required: false
|
|
default: 0.80
|
|
description: "Memory utilization threshold (0-1)"
|
|
|
|
phases:
|
|
|
|
# Phase 1: Collect metrics
|
|
- name: "Collect Metrics"
|
|
description: "Gather queue depth, CPU, and memory metrics"
|
|
retryable: true
|
|
steps:
|
|
- name: "Get queue depth per agent role"
|
|
command: |
|
|
provisioning metrics query --metric "agent_queue_depth" \
|
|
--group-by "agent_role" \
|
|
--output json > /tmp/queue_depth.json
|
|
timeout: 30s
|
|
|
|
- name: "Get CPU utilization"
|
|
command: |
|
|
provisioning metrics query --metric "container_cpu_usage_seconds_total" \
|
|
--selector "pod=~vapora-agents.*" \
|
|
--output json > /tmp/cpu_usage.json
|
|
timeout: 30s
|
|
|
|
- name: "Get memory utilization"
|
|
command: |
|
|
provisioning metrics query --metric "container_memory_working_set_bytes" \
|
|
--selector "pod=~vapora-agents.*" \
|
|
--output json > /tmp/memory_usage.json
|
|
timeout: 30s
|
|
|
|
# Phase 2: Analyze scaling requirements
|
|
- name: "Analyze Scaling Needs"
|
|
description: "Determine which agents need scaling up or down"
|
|
retryable: true
|
|
steps:
|
|
- name: "Calculate scale requirements"
|
|
command: |
|
|
python3 <<'EOF'
|
|
import json
|
|
import os
|
|
|
|
with open('/tmp/queue_depth.json', 'r') as f:
|
|
queue_data = json.load(f)
|
|
|
|
with open('/tmp/cpu_usage.json', 'r') as f:
|
|
cpu_data = json.load(f)
|
|
|
|
scaling_decisions = {}
|
|
|
|
# Define queue depth thresholds per role
|
|
role_thresholds = {
|
|
"architect": {"scale_up": 10, "scale_down": 3},
|
|
"developer": {"scale_up": 100, "scale_down": 30},
|
|
"reviewer": {"scale_up": 50, "scale_down": 15},
|
|
"tester": {"scale_up": 50, "scale_down": 15},
|
|
"monitor": {"scale_up": 20, "scale_down": 5},
|
|
"devops": {"scale_up": 30, "scale_down": 10}
|
|
}
|
|
|
|
for role, metrics in queue_data.items():
|
|
thresholds = role_thresholds.get(role, {"scale_up": 50, "scale_down": 15})
|
|
current_queue = metrics.get("queue_depth", 0)
|
|
current_replicas = metrics.get("replicas", 1)
|
|
|
|
if current_queue > thresholds["scale_up"]:
|
|
# Scale up
|
|
desired_replicas = min(int(current_replicas * 1.5) + 1, 20) # Max 20
|
|
scaling_decisions[role] = {
|
|
"action": "scale_up",
|
|
"current": current_replicas,
|
|
"desired": desired_replicas,
|
|
"reason": f"Queue depth {current_queue} > {thresholds['scale_up']}"
|
|
}
|
|
elif current_queue < thresholds["scale_down"] and current_replicas > 2:
|
|
# Scale down
|
|
desired_replicas = max(int(current_replicas * 0.7), 2) # Min 2
|
|
scaling_decisions[role] = {
|
|
"action": "scale_down",
|
|
"current": current_replicas,
|
|
"desired": desired_replicas,
|
|
"reason": f"Queue depth {current_queue} < {thresholds['scale_down']}"
|
|
}
|
|
|
|
with open('/tmp/scaling_decisions.json', 'w') as f:
|
|
json.dump(scaling_decisions, f, indent=2)
|
|
|
|
print(json.dumps(scaling_decisions, indent=2))
|
|
EOF
|
|
timeout: 60s
|
|
dependencies: ["Collect Metrics"]
|
|
|
|
# Phase 3: Scale agents based on decisions
|
|
- name: "Execute Scaling"
|
|
description: "Apply scaling decisions to agent pools"
|
|
retryable: true
|
|
parallel: true
|
|
steps:
|
|
- name: "Scale developer agents"
|
|
command: |
|
|
DECISION=$(grep -E '"developer":|"desired":' /tmp/scaling_decisions.json | grep -A1 'developer')
|
|
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
|
|
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
|
|
provisioning taskserv scale vapora-agents --agent developer --replicas $REPLICAS
|
|
fi
|
|
timeout: 120s
|
|
dependencies: ["Calculate scale requirements"]
|
|
continueOnError: true
|
|
|
|
- name: "Scale reviewer agents"
|
|
command: |
|
|
DECISION=$(grep -E '"reviewer":|"desired":' /tmp/scaling_decisions.json | grep -A1 'reviewer')
|
|
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
|
|
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
|
|
provisioning taskserv scale vapora-agents --agent reviewer --replicas $REPLICAS
|
|
fi
|
|
timeout: 120s
|
|
dependencies: ["Calculate scale requirements"]
|
|
continueOnError: true
|
|
|
|
- name: "Scale tester agents"
|
|
command: |
|
|
DECISION=$(grep -E '"tester":|"desired":' /tmp/scaling_decisions.json | grep -A1 'tester')
|
|
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
|
|
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
|
|
provisioning taskserv scale vapora-agents --agent tester --replicas $REPLICAS
|
|
fi
|
|
timeout: 120s
|
|
dependencies: ["Calculate scale requirements"]
|
|
continueOnError: true
|
|
|
|
- name: "Scale devops agents"
|
|
command: |
|
|
DECISION=$(grep -E '"devops":|"desired":' /tmp/scaling_decisions.json | grep -A1 'devops')
|
|
if echo "$DECISION" | grep -q 'scale_up\|scale_down'; then
|
|
REPLICAS=$(echo "$DECISION" | grep '"desired"' | grep -oE '[0-9]+')
|
|
provisioning taskserv scale vapora-agents --agent devops --replicas $REPLICAS
|
|
fi
|
|
timeout: 120s
|
|
dependencies: ["Calculate scale requirements"]
|
|
continueOnError: true
|
|
|
|
# Phase 4: Verify scaling
|
|
- name: "Verify Scaling"
|
|
description: "Confirm scaling operations succeeded"
|
|
retryable: false
|
|
steps:
|
|
- name: "Check agent replicas"
|
|
command: |
|
|
provisioning taskserv list --selector "app=vapora-agents" \
|
|
--output "json" | jq '.items[] | {agent: .metadata.labels.role, replicas: .spec.replicas}'
|
|
timeout: 60s
|
|
dependencies: ["Execute Scaling"]
|
|
|
|
- name: "Wait for pods to be ready"
|
|
command: |
|
|
kubectl wait --for=condition=Ready pod \
|
|
-l app=vapora-agents \
|
|
-n vapora-system \
|
|
--timeout=300s
|
|
timeout: 320s
|
|
dependencies: ["Execute Scaling"]
|
|
|
|
- name: "Verify queue depth improvement"
|
|
command: |
|
|
provisioning metrics query --metric "agent_queue_depth" \
|
|
--group-by "agent_role" \
|
|
--compare-to /tmp/queue_depth.json
|
|
timeout: 30s
|
|
dependencies: ["Wait for pods to be ready"]
|
|
|
|
outputs:
|
|
- name: scaling_summary
|
|
value: "cat /tmp/scaling_decisions.json"
|
|
- name: new_replica_counts
|
|
command: "provisioning taskserv list --selector app=vapora-agents -o json | jq '.items[] | {agent: .metadata.labels.role, replicas: .spec.replicas}'"
|
|
|
|
# Notifications
|
|
notifications:
|
|
onSuccess:
|
|
- "slack: #ops-automation"
|
|
- "action: record-metrics"
|
|
onFailure:
|
|
- "slack: #ops-automation"
|
|
- "slack: #alerts"
|
|
|
|
# Cleanup
|
|
cleanup:
|
|
- "rm -f /tmp/queue_depth.json"
|
|
- "rm -f /tmp/cpu_usage.json"
|
|
- "rm -f /tmp/memory_usage.json"
|
|
- "rm -f /tmp/scaling_decisions.json"
|