# nu_plugin_orchestrator - Implementation Plan **Created**: 2025-10-08 **Status**: Base structure complete, ready for implementation **Pattern**: Follows nu_plugin_tera exactly --- ## Phase 1: Base Structure ✅ COMPLETE ### Files Created - [x] `Cargo.toml` - Package configuration with correct dependencies - [x] `src/main.rs` - Plugin entry point with 3 commands - [x] `src/helpers.rs` - Helper functions (placeholders) - [x] `src/tests.rs` - Test infrastructure - [x] `README.md` - Comprehensive documentation - [x] `VERIFICATION.md` - Structure verification ### Commands Implemented (Placeholders) - [x] `orch status [--data-dir ]` - Orchestrator status from local files - [x] `orch validate [--strict]` - Workflow validation locally - [x] `orch tasks [--status ] [--limit ]` - Task listing from queue ### Verification ```bash cd provisioning/core/plugins/nushell-plugins/nu_plugin_orchestrator cargo check # ✅ SUCCESS - 264 packages locked, compiling ``` --- ## Phase 2: Core Implementation (Next Steps) ### 2.1 File Reading - `src/helpers.rs` #### `read_local_status()` Implementation ```rust pub fn read_local_status(data_dir: &PathBuf) -> Result { let status_file = data_dir.join("status.json"); if !status_file.exists() { return Err(format!("Status file not found: {}", status_file.display())); } let content = std::fs::read_to_string(&status_file) .map_err(|e| format!("Failed to read status file: {}", e))?; serde_json::from_str(&content) .map_err(|e| format!("Failed to parse status JSON: {}", e)) } ``` **Test Data**: `provisioning/platform/orchestrator/data/status.json` ```json { "running": true, "tasks_pending": 5, "tasks_running": 2, "last_check": "2025-10-08T12:00:00Z" } ``` #### `read_task_queue()` Implementation ```rust use walkdir::WalkDir; pub fn read_task_queue( data_dir: &PathBuf, status_filter: Option, ) -> Result, String> { let tasks_dir = data_dir.join("tasks"); if !tasks_dir.exists() { return Ok(vec![]); } let mut tasks = Vec::new(); for entry in WalkDir::new(&tasks_dir) .max_depth(1) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.path().extension().map_or(false, |ext| ext == "json")) { let content = std::fs::read_to_string(entry.path()) .map_err(|e| format!("Failed to read task file: {}", e))?; let task: TaskInfo = serde_json::from_str(&content) .map_err(|e| format!("Failed to parse task JSON: {}", e))?; // Filter by status if provided if let Some(ref filter) = status_filter { if task.status != *filter { continue; } } tasks.push(task); } // Sort by priority (higher first), then by created_at (older first) tasks.sort_by(|a, b| { b.priority.cmp(&a.priority) .then_with(|| a.created_at.cmp(&b.created_at)) }); Ok(tasks) } ``` **Test Data**: `provisioning/platform/orchestrator/data/tasks/task-001.json` ```json { "id": "task-001", "status": "pending", "created_at": "2025-10-08T12:00:00Z", "priority": 5 } ``` ### 2.2 Command Integration - `src/main.rs` #### `OrchStatus::run()` Update ```rust fn run( &self, _plugin: &OrchestratorPlugin, _engine: &EngineInterface, call: &EvaluatedCall, _input: &Value, ) -> Result { let data_dir = if let Some(dir_str) = call.get_flag::("data-dir")? { PathBuf::from(dir_str) } else { helpers::get_orchestrator_data_dir() }; let status = helpers::read_local_status(&data_dir) .map_err(|e| LabeledError::new(format!("Failed to read status: {}", e)))?; Ok(Value::record( record! { "running" => Value::bool(status.running, call.head), "tasks_pending" => Value::int(status.tasks_pending as i64, call.head), "tasks_running" => Value::int(status.tasks_running as i64, call.head), "last_check" => Value::string(status.last_check, call.head), }, call.head, )) } ``` #### `OrchTasks::run()` Update ```rust fn run( &self, _plugin: &OrchestratorPlugin, _engine: &EngineInterface, call: &EvaluatedCall, _input: &Value, ) -> Result { let data_dir = helpers::get_orchestrator_data_dir(); let status_filter = call.get_flag::("status")?; let limit = call.get_flag::("limit")?; let mut tasks = helpers::read_task_queue(&data_dir, status_filter) .map_err(|e| LabeledError::new(format!("Failed to read tasks: {}", e)))?; // Apply limit if provided if let Some(n) = limit { tasks.truncate(n as usize); } let task_values: Vec = tasks.into_iter().map(|task| { Value::record( record! { "id" => Value::string(task.id, call.head), "status" => Value::string(task.status, call.head), "created_at" => Value::string(task.created_at, call.head), "priority" => Value::int(task.priority as i64, call.head), }, call.head, ) }).collect(); Ok(Value::list(task_values, call.head)) } ``` ### 2.3 Testing - `src/tests.rs` ```rust #[cfg(test)] mod tests { use super::*; use std::fs; use tempfile::tempdir; #[test] fn test_data_dir_path() { let dir = helpers::get_orchestrator_data_dir(); assert!(dir.to_string_lossy().contains("orchestrator/data")); } #[test] fn test_read_status_file_not_found() { let dir = tempdir().unwrap(); let result = helpers::read_local_status(&dir.path().to_path_buf()); assert!(result.is_err()); assert!(result.unwrap_err().contains("Status file not found")); } #[test] fn test_read_status_valid() { let dir = tempdir().unwrap(); let status_file = dir.path().join("status.json"); fs::write(&status_file, r#"{ "running": true, "tasks_pending": 5, "tasks_running": 2, "last_check": "2025-10-08T12:00:00Z" }"#).unwrap(); let status = helpers::read_local_status(&dir.path().to_path_buf()).unwrap(); assert_eq!(status.running, true); assert_eq!(status.tasks_pending, 5); assert_eq!(status.tasks_running, 2); } #[test] fn test_read_task_queue_empty() { let dir = tempdir().unwrap(); let tasks = helpers::read_task_queue(&dir.path().to_path_buf(), None).unwrap(); assert_eq!(tasks.len(), 0); } #[test] fn test_read_task_queue_with_filter() { let dir = tempdir().unwrap(); let tasks_dir = dir.path().join("tasks"); fs::create_dir(&tasks_dir).unwrap(); // Create test tasks fs::write(tasks_dir.join("task-001.json"), r#"{ "id": "task-001", "status": "pending", "created_at": "2025-10-08T12:00:00Z", "priority": 5 }"#).unwrap(); fs::write(tasks_dir.join("task-002.json"), r#"{ "id": "task-002", "status": "running", "created_at": "2025-10-08T12:01:00Z", "priority": 3 }"#).unwrap(); // Filter by pending let tasks = helpers::read_task_queue( &dir.path().to_path_buf(), Some("pending".to_string()) ).unwrap(); assert_eq!(tasks.len(), 1); assert_eq!(tasks[0].id, "task-001"); } } ``` --- ## Phase 3: KCL Validation (Advanced) ### 3.1 Research KCL Validation - [ ] Investigate `kcl-lang/kcl` Rust bindings - [ ] Determine if we can call KCL parser directly - [ ] Alternative: Shell out to `kcl` binary ### 3.2 Implement `validate_kcl_workflow()` ```rust pub fn validate_kcl_workflow(workflow_path: &str, strict: bool) -> Result { // Option 1: Shell out to kcl binary let output = std::process::Command::new("kcl") .arg("vet") .arg(workflow_path) .output() .map_err(|e| format!("Failed to run kcl: {}", e))?; if output.status.success() { Ok(ValidationResult { valid: true, errors: vec![], warnings: vec![], }) } else { let stderr = String::from_utf8_lossy(&output.stderr); Ok(ValidationResult { valid: false, errors: vec![stderr.to_string()], warnings: vec![], }) } } ``` ### 3.3 Update `OrchValidate::run()` ```rust fn run( &self, _plugin: &OrchestratorPlugin, _engine: &EngineInterface, call: &EvaluatedCall, _input: &Value, ) -> Result { let workflow: String = call.req(0)?; let strict = call.has_flag("strict")?; let result = helpers::validate_kcl_workflow(&workflow, strict) .map_err(|e| LabeledError::new(format!("Validation failed: {}", e)))?; Ok(Value::record( record! { "valid" => Value::bool(result.valid, call.head), "errors" => Value::list( result.errors.into_iter() .map(|e| Value::string(e, call.head)) .collect(), call.head ), "warnings" => Value::list( result.warnings.into_iter() .map(|w| Value::string(w, call.head)) .collect(), call.head ), }, call.head, )) } ``` --- ## Phase 4: Enhancements (Optional) ### 4.1 Caching - [ ] Add in-memory cache for status.json (TTL: 1 second) - [ ] Add cache invalidation on file modification - [ ] Add `--no-cache` flag ### 4.2 Advanced Filtering - [ ] Add date range filtering for tasks (`--from`, `--to`) - [ ] Add priority range filtering (`--min-priority`, `--max-priority`) - [ ] Add workflow path filtering (`--workflow`) ### 4.3 Statistics - [ ] Add `orch stats` command for task statistics - [ ] Group by status, priority, date - [ ] Show average execution time ### 4.4 Monitoring - [ ] Add `orch watch` command for real-time updates - [ ] Use file system notifications (notify crate) - [ ] Stream changes to stdout --- ## Testing Strategy ### Unit Tests (`src/tests.rs`) - [x] Test data directory path construction - [ ] Test file reading with mock data - [ ] Test error handling (missing files, invalid JSON) - [ ] Test filtering and sorting logic - [ ] Test KCL validation (if implemented) ### Integration Tests (`tests/integration_tests.rs`) - [ ] Test with real orchestrator data structure - [ ] Test concurrent access to files - [ ] Test performance with large datasets - [ ] Test error messages are user-friendly ### Plugin Tests (using nu-plugin-test-support) - [ ] Test command signatures are correct - [ ] Test command outputs match expected types - [ ] Test error cases return proper LabeledError ### Benchmarks (`benches/benchmark.rs`) - [ ] Benchmark file reading performance - [ ] Compare with REST API calls - [ ] Measure memory usage - [ ] Test with varying dataset sizes --- ## Performance Targets | Operation | Target | Baseline (REST API) | Improvement | |-----------|--------|---------------------|-------------| | `orch status` | <5ms | ~50ms | 10x | | `orch tasks` | <10ms | ~30ms | 3x | | `orch validate` | <20ms | ~100ms | 5x | --- ## Installation & Usage ### Build ```bash cd provisioning/core/plugins/nushell-plugins cargo build -p nu_plugin_orchestrator --release ``` ### Install ```bash plugin add target/release/nu_plugin_orchestrator plugin use orchestrator ``` ### Verify ```nushell # List commands help commands | where name =~ "orch" # Test status orch status # Test with custom data dir orch status --data-dir ./test-data # Test tasks orch tasks orch tasks --status pending orch tasks --status pending --limit 10 # Test validation (once implemented) orch validate workflows/example.k orch validate workflows/example.k --strict ``` --- ## Success Criteria ### Phase 1 (Complete) ✅ - [x] Structure follows nu_plugin_tera pattern - [x] Cargo check passes - [x] 3 commands implemented (placeholders) - [x] README documentation complete - [x] Verification document created ### Phase 2 (Core Implementation) - [ ] `read_local_status()` reads real files - [ ] `read_task_queue()` reads task directory - [ ] Commands return real data (not placeholders) - [ ] Error handling is comprehensive - [ ] Unit tests pass (>80% coverage) ### Phase 3 (KCL Validation) - [ ] `validate_kcl_workflow()` validates KCL syntax - [ ] Validation errors are user-friendly - [ ] Strict mode performs additional checks - [ ] Integration with KCL tooling ### Phase 4 (Production Ready) - [ ] Performance targets met - [ ] Integration tests pass - [ ] Benchmarks show 5-10x improvement - [ ] Documentation updated with real examples - [ ] Plugin registered in project --- ## Dependencies ### Required - `nu-plugin` (0.107.1) - Plugin SDK - `nu-protocol` (0.107.1) - Nushell types - `serde` (1.0) - Serialization - `serde_json` (1.0) - JSON parsing - `chrono` (0.4) - Timestamps - `walkdir` (2.5) - Directory traversal ### Optional (Phase 4) - `notify` - File system notifications - `kcl-lang` - KCL validation (if Rust bindings available) - `criterion` - Benchmarking - `cache` - In-memory caching --- ## Risk Mitigation ### Risk 1: Orchestrator Data Format Changes **Mitigation**: Version check in status.json, graceful degradation ### Risk 2: File Access Permissions **Mitigation**: Clear error messages, suggest fixing permissions ### Risk 3: Large Task Queues **Mitigation**: Streaming, pagination, default limits ### Risk 4: KCL Integration Complexity **Mitigation**: Shell out to `kcl` binary as fallback --- ## Timeline Estimate - **Phase 1** (Base Structure): ✅ Complete (1 hour) - **Phase 2** (Core Implementation): ~4 hours - **Phase 3** (KCL Validation): ~2 hours - **Phase 4** (Enhancements): ~4 hours **Total**: ~11 hours (1 day of focused work) --- **Next Steps**: Implement Phase 2 (Core Implementation) starting with `read_local_status()` in `src/helpers.rs`.