519 lines
14 KiB
Markdown
519 lines
14 KiB
Markdown
|
|
# nu_plugin_orchestrator - Implementation Plan
|
||
|
|
|
||
|
|
**Created**: 2025-10-08
|
||
|
|
**Status**: Base structure complete, ready for implementation
|
||
|
|
**Pattern**: Follows nu_plugin_tera exactly
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Phase 1: Base Structure ✅ COMPLETE
|
||
|
|
|
||
|
|
### Files Created
|
||
|
|
- [x] `Cargo.toml` - Package configuration with correct dependencies
|
||
|
|
- [x] `src/main.rs` - Plugin entry point with 3 commands
|
||
|
|
- [x] `src/helpers.rs` - Helper functions (placeholders)
|
||
|
|
- [x] `src/tests.rs` - Test infrastructure
|
||
|
|
- [x] `README.md` - Comprehensive documentation
|
||
|
|
- [x] `VERIFICATION.md` - Structure verification
|
||
|
|
|
||
|
|
### Commands Implemented (Placeholders)
|
||
|
|
- [x] `orch status [--data-dir <path>]` - Orchestrator status from local files
|
||
|
|
- [x] `orch validate <workflow.k> [--strict]` - Workflow validation locally
|
||
|
|
- [x] `orch tasks [--status <status>] [--limit <n>]` - Task listing from queue
|
||
|
|
|
||
|
|
### Verification
|
||
|
|
```bash
|
||
|
|
cd provisioning/core/plugins/nushell-plugins/nu_plugin_orchestrator
|
||
|
|
cargo check # ✅ SUCCESS - 264 packages locked, compiling
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Phase 2: Core Implementation (Next Steps)
|
||
|
|
|
||
|
|
### 2.1 File Reading - `src/helpers.rs`
|
||
|
|
|
||
|
|
#### `read_local_status()` Implementation
|
||
|
|
```rust
|
||
|
|
pub fn read_local_status(data_dir: &PathBuf) -> Result<OrchStatus, String> {
|
||
|
|
let status_file = data_dir.join("status.json");
|
||
|
|
|
||
|
|
if !status_file.exists() {
|
||
|
|
return Err(format!("Status file not found: {}", status_file.display()));
|
||
|
|
}
|
||
|
|
|
||
|
|
let content = std::fs::read_to_string(&status_file)
|
||
|
|
.map_err(|e| format!("Failed to read status file: {}", e))?;
|
||
|
|
|
||
|
|
serde_json::from_str(&content)
|
||
|
|
.map_err(|e| format!("Failed to parse status JSON: {}", e))
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
**Test Data**: `provisioning/platform/orchestrator/data/status.json`
|
||
|
|
```json
|
||
|
|
{
|
||
|
|
"running": true,
|
||
|
|
"tasks_pending": 5,
|
||
|
|
"tasks_running": 2,
|
||
|
|
"last_check": "2025-10-08T12:00:00Z"
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
#### `read_task_queue()` Implementation
|
||
|
|
```rust
|
||
|
|
use walkdir::WalkDir;
|
||
|
|
|
||
|
|
pub fn read_task_queue(
|
||
|
|
data_dir: &PathBuf,
|
||
|
|
status_filter: Option<String>,
|
||
|
|
) -> Result<Vec<TaskInfo>, String> {
|
||
|
|
let tasks_dir = data_dir.join("tasks");
|
||
|
|
|
||
|
|
if !tasks_dir.exists() {
|
||
|
|
return Ok(vec![]);
|
||
|
|
}
|
||
|
|
|
||
|
|
let mut tasks = Vec::new();
|
||
|
|
|
||
|
|
for entry in WalkDir::new(&tasks_dir)
|
||
|
|
.max_depth(1)
|
||
|
|
.into_iter()
|
||
|
|
.filter_map(|e| e.ok())
|
||
|
|
.filter(|e| e.path().extension().map_or(false, |ext| ext == "json"))
|
||
|
|
{
|
||
|
|
let content = std::fs::read_to_string(entry.path())
|
||
|
|
.map_err(|e| format!("Failed to read task file: {}", e))?;
|
||
|
|
|
||
|
|
let task: TaskInfo = serde_json::from_str(&content)
|
||
|
|
.map_err(|e| format!("Failed to parse task JSON: {}", e))?;
|
||
|
|
|
||
|
|
// Filter by status if provided
|
||
|
|
if let Some(ref filter) = status_filter {
|
||
|
|
if task.status != *filter {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
tasks.push(task);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Sort by priority (higher first), then by created_at (older first)
|
||
|
|
tasks.sort_by(|a, b| {
|
||
|
|
b.priority.cmp(&a.priority)
|
||
|
|
.then_with(|| a.created_at.cmp(&b.created_at))
|
||
|
|
});
|
||
|
|
|
||
|
|
Ok(tasks)
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
**Test Data**: `provisioning/platform/orchestrator/data/tasks/task-001.json`
|
||
|
|
```json
|
||
|
|
{
|
||
|
|
"id": "task-001",
|
||
|
|
"status": "pending",
|
||
|
|
"created_at": "2025-10-08T12:00:00Z",
|
||
|
|
"priority": 5
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
### 2.2 Command Integration - `src/main.rs`
|
||
|
|
|
||
|
|
#### `OrchStatus::run()` Update
|
||
|
|
```rust
|
||
|
|
fn run(
|
||
|
|
&self,
|
||
|
|
_plugin: &OrchestratorPlugin,
|
||
|
|
_engine: &EngineInterface,
|
||
|
|
call: &EvaluatedCall,
|
||
|
|
_input: &Value,
|
||
|
|
) -> Result<Value, LabeledError> {
|
||
|
|
let data_dir = if let Some(dir_str) = call.get_flag::<String>("data-dir")? {
|
||
|
|
PathBuf::from(dir_str)
|
||
|
|
} else {
|
||
|
|
helpers::get_orchestrator_data_dir()
|
||
|
|
};
|
||
|
|
|
||
|
|
let status = helpers::read_local_status(&data_dir)
|
||
|
|
.map_err(|e| LabeledError::new(format!("Failed to read status: {}", e)))?;
|
||
|
|
|
||
|
|
Ok(Value::record(
|
||
|
|
record! {
|
||
|
|
"running" => Value::bool(status.running, call.head),
|
||
|
|
"tasks_pending" => Value::int(status.tasks_pending as i64, call.head),
|
||
|
|
"tasks_running" => Value::int(status.tasks_running as i64, call.head),
|
||
|
|
"last_check" => Value::string(status.last_check, call.head),
|
||
|
|
},
|
||
|
|
call.head,
|
||
|
|
))
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
#### `OrchTasks::run()` Update
|
||
|
|
```rust
|
||
|
|
fn run(
|
||
|
|
&self,
|
||
|
|
_plugin: &OrchestratorPlugin,
|
||
|
|
_engine: &EngineInterface,
|
||
|
|
call: &EvaluatedCall,
|
||
|
|
_input: &Value,
|
||
|
|
) -> Result<Value, LabeledError> {
|
||
|
|
let data_dir = helpers::get_orchestrator_data_dir();
|
||
|
|
let status_filter = call.get_flag::<String>("status")?;
|
||
|
|
let limit = call.get_flag::<i64>("limit")?;
|
||
|
|
|
||
|
|
let mut tasks = helpers::read_task_queue(&data_dir, status_filter)
|
||
|
|
.map_err(|e| LabeledError::new(format!("Failed to read tasks: {}", e)))?;
|
||
|
|
|
||
|
|
// Apply limit if provided
|
||
|
|
if let Some(n) = limit {
|
||
|
|
tasks.truncate(n as usize);
|
||
|
|
}
|
||
|
|
|
||
|
|
let task_values: Vec<Value> = tasks.into_iter().map(|task| {
|
||
|
|
Value::record(
|
||
|
|
record! {
|
||
|
|
"id" => Value::string(task.id, call.head),
|
||
|
|
"status" => Value::string(task.status, call.head),
|
||
|
|
"created_at" => Value::string(task.created_at, call.head),
|
||
|
|
"priority" => Value::int(task.priority as i64, call.head),
|
||
|
|
},
|
||
|
|
call.head,
|
||
|
|
)
|
||
|
|
}).collect();
|
||
|
|
|
||
|
|
Ok(Value::list(task_values, call.head))
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
### 2.3 Testing - `src/tests.rs`
|
||
|
|
|
||
|
|
```rust
|
||
|
|
#[cfg(test)]
|
||
|
|
mod tests {
|
||
|
|
use super::*;
|
||
|
|
use std::fs;
|
||
|
|
use tempfile::tempdir;
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn test_data_dir_path() {
|
||
|
|
let dir = helpers::get_orchestrator_data_dir();
|
||
|
|
assert!(dir.to_string_lossy().contains("orchestrator/data"));
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn test_read_status_file_not_found() {
|
||
|
|
let dir = tempdir().unwrap();
|
||
|
|
let result = helpers::read_local_status(&dir.path().to_path_buf());
|
||
|
|
assert!(result.is_err());
|
||
|
|
assert!(result.unwrap_err().contains("Status file not found"));
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn test_read_status_valid() {
|
||
|
|
let dir = tempdir().unwrap();
|
||
|
|
let status_file = dir.path().join("status.json");
|
||
|
|
fs::write(&status_file, r#"{
|
||
|
|
"running": true,
|
||
|
|
"tasks_pending": 5,
|
||
|
|
"tasks_running": 2,
|
||
|
|
"last_check": "2025-10-08T12:00:00Z"
|
||
|
|
}"#).unwrap();
|
||
|
|
|
||
|
|
let status = helpers::read_local_status(&dir.path().to_path_buf()).unwrap();
|
||
|
|
assert_eq!(status.running, true);
|
||
|
|
assert_eq!(status.tasks_pending, 5);
|
||
|
|
assert_eq!(status.tasks_running, 2);
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn test_read_task_queue_empty() {
|
||
|
|
let dir = tempdir().unwrap();
|
||
|
|
let tasks = helpers::read_task_queue(&dir.path().to_path_buf(), None).unwrap();
|
||
|
|
assert_eq!(tasks.len(), 0);
|
||
|
|
}
|
||
|
|
|
||
|
|
#[test]
|
||
|
|
fn test_read_task_queue_with_filter() {
|
||
|
|
let dir = tempdir().unwrap();
|
||
|
|
let tasks_dir = dir.path().join("tasks");
|
||
|
|
fs::create_dir(&tasks_dir).unwrap();
|
||
|
|
|
||
|
|
// Create test tasks
|
||
|
|
fs::write(tasks_dir.join("task-001.json"), r#"{
|
||
|
|
"id": "task-001",
|
||
|
|
"status": "pending",
|
||
|
|
"created_at": "2025-10-08T12:00:00Z",
|
||
|
|
"priority": 5
|
||
|
|
}"#).unwrap();
|
||
|
|
|
||
|
|
fs::write(tasks_dir.join("task-002.json"), r#"{
|
||
|
|
"id": "task-002",
|
||
|
|
"status": "running",
|
||
|
|
"created_at": "2025-10-08T12:01:00Z",
|
||
|
|
"priority": 3
|
||
|
|
}"#).unwrap();
|
||
|
|
|
||
|
|
// Filter by pending
|
||
|
|
let tasks = helpers::read_task_queue(
|
||
|
|
&dir.path().to_path_buf(),
|
||
|
|
Some("pending".to_string())
|
||
|
|
).unwrap();
|
||
|
|
assert_eq!(tasks.len(), 1);
|
||
|
|
assert_eq!(tasks[0].id, "task-001");
|
||
|
|
}
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Phase 3: KCL Validation (Advanced)
|
||
|
|
|
||
|
|
### 3.1 Research KCL Validation
|
||
|
|
- [ ] Investigate `kcl-lang/kcl` Rust bindings
|
||
|
|
- [ ] Determine if we can call KCL parser directly
|
||
|
|
- [ ] Alternative: Shell out to `kcl` binary
|
||
|
|
|
||
|
|
### 3.2 Implement `validate_kcl_workflow()`
|
||
|
|
```rust
|
||
|
|
pub fn validate_kcl_workflow(workflow_path: &str, strict: bool) -> Result<ValidationResult, String> {
|
||
|
|
// Option 1: Shell out to kcl binary
|
||
|
|
let output = std::process::Command::new("kcl")
|
||
|
|
.arg("vet")
|
||
|
|
.arg(workflow_path)
|
||
|
|
.output()
|
||
|
|
.map_err(|e| format!("Failed to run kcl: {}", e))?;
|
||
|
|
|
||
|
|
if output.status.success() {
|
||
|
|
Ok(ValidationResult {
|
||
|
|
valid: true,
|
||
|
|
errors: vec![],
|
||
|
|
warnings: vec![],
|
||
|
|
})
|
||
|
|
} else {
|
||
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||
|
|
Ok(ValidationResult {
|
||
|
|
valid: false,
|
||
|
|
errors: vec![stderr.to_string()],
|
||
|
|
warnings: vec![],
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
### 3.3 Update `OrchValidate::run()`
|
||
|
|
```rust
|
||
|
|
fn run(
|
||
|
|
&self,
|
||
|
|
_plugin: &OrchestratorPlugin,
|
||
|
|
_engine: &EngineInterface,
|
||
|
|
call: &EvaluatedCall,
|
||
|
|
_input: &Value,
|
||
|
|
) -> Result<Value, LabeledError> {
|
||
|
|
let workflow: String = call.req(0)?;
|
||
|
|
let strict = call.has_flag("strict")?;
|
||
|
|
|
||
|
|
let result = helpers::validate_kcl_workflow(&workflow, strict)
|
||
|
|
.map_err(|e| LabeledError::new(format!("Validation failed: {}", e)))?;
|
||
|
|
|
||
|
|
Ok(Value::record(
|
||
|
|
record! {
|
||
|
|
"valid" => Value::bool(result.valid, call.head),
|
||
|
|
"errors" => Value::list(
|
||
|
|
result.errors.into_iter()
|
||
|
|
.map(|e| Value::string(e, call.head))
|
||
|
|
.collect(),
|
||
|
|
call.head
|
||
|
|
),
|
||
|
|
"warnings" => Value::list(
|
||
|
|
result.warnings.into_iter()
|
||
|
|
.map(|w| Value::string(w, call.head))
|
||
|
|
.collect(),
|
||
|
|
call.head
|
||
|
|
),
|
||
|
|
},
|
||
|
|
call.head,
|
||
|
|
))
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Phase 4: Enhancements (Optional)
|
||
|
|
|
||
|
|
### 4.1 Caching
|
||
|
|
- [ ] Add in-memory cache for status.json (TTL: 1 second)
|
||
|
|
- [ ] Add cache invalidation on file modification
|
||
|
|
- [ ] Add `--no-cache` flag
|
||
|
|
|
||
|
|
### 4.2 Advanced Filtering
|
||
|
|
- [ ] Add date range filtering for tasks (`--from`, `--to`)
|
||
|
|
- [ ] Add priority range filtering (`--min-priority`, `--max-priority`)
|
||
|
|
- [ ] Add workflow path filtering (`--workflow`)
|
||
|
|
|
||
|
|
### 4.3 Statistics
|
||
|
|
- [ ] Add `orch stats` command for task statistics
|
||
|
|
- [ ] Group by status, priority, date
|
||
|
|
- [ ] Show average execution time
|
||
|
|
|
||
|
|
### 4.4 Monitoring
|
||
|
|
- [ ] Add `orch watch` command for real-time updates
|
||
|
|
- [ ] Use file system notifications (notify crate)
|
||
|
|
- [ ] Stream changes to stdout
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Testing Strategy
|
||
|
|
|
||
|
|
### Unit Tests (`src/tests.rs`)
|
||
|
|
- [x] Test data directory path construction
|
||
|
|
- [ ] Test file reading with mock data
|
||
|
|
- [ ] Test error handling (missing files, invalid JSON)
|
||
|
|
- [ ] Test filtering and sorting logic
|
||
|
|
- [ ] Test KCL validation (if implemented)
|
||
|
|
|
||
|
|
### Integration Tests (`tests/integration_tests.rs`)
|
||
|
|
- [ ] Test with real orchestrator data structure
|
||
|
|
- [ ] Test concurrent access to files
|
||
|
|
- [ ] Test performance with large datasets
|
||
|
|
- [ ] Test error messages are user-friendly
|
||
|
|
|
||
|
|
### Plugin Tests (using nu-plugin-test-support)
|
||
|
|
- [ ] Test command signatures are correct
|
||
|
|
- [ ] Test command outputs match expected types
|
||
|
|
- [ ] Test error cases return proper LabeledError
|
||
|
|
|
||
|
|
### Benchmarks (`benches/benchmark.rs`)
|
||
|
|
- [ ] Benchmark file reading performance
|
||
|
|
- [ ] Compare with REST API calls
|
||
|
|
- [ ] Measure memory usage
|
||
|
|
- [ ] Test with varying dataset sizes
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Performance Targets
|
||
|
|
|
||
|
|
| Operation | Target | Baseline (REST API) | Improvement |
|
||
|
|
|-----------|--------|---------------------|-------------|
|
||
|
|
| `orch status` | <5ms | ~50ms | 10x |
|
||
|
|
| `orch tasks` | <10ms | ~30ms | 3x |
|
||
|
|
| `orch validate` | <20ms | ~100ms | 5x |
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Installation & Usage
|
||
|
|
|
||
|
|
### Build
|
||
|
|
```bash
|
||
|
|
cd provisioning/core/plugins/nushell-plugins
|
||
|
|
cargo build -p nu_plugin_orchestrator --release
|
||
|
|
```
|
||
|
|
|
||
|
|
### Install
|
||
|
|
```bash
|
||
|
|
plugin add target/release/nu_plugin_orchestrator
|
||
|
|
plugin use orchestrator
|
||
|
|
```
|
||
|
|
|
||
|
|
### Verify
|
||
|
|
```nushell
|
||
|
|
# List commands
|
||
|
|
help commands | where name =~ "orch"
|
||
|
|
|
||
|
|
# Test status
|
||
|
|
orch status
|
||
|
|
|
||
|
|
# Test with custom data dir
|
||
|
|
orch status --data-dir ./test-data
|
||
|
|
|
||
|
|
# Test tasks
|
||
|
|
orch tasks
|
||
|
|
orch tasks --status pending
|
||
|
|
orch tasks --status pending --limit 10
|
||
|
|
|
||
|
|
# Test validation (once implemented)
|
||
|
|
orch validate workflows/example.k
|
||
|
|
orch validate workflows/example.k --strict
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Success Criteria
|
||
|
|
|
||
|
|
### Phase 1 (Complete) ✅
|
||
|
|
- [x] Structure follows nu_plugin_tera pattern
|
||
|
|
- [x] Cargo check passes
|
||
|
|
- [x] 3 commands implemented (placeholders)
|
||
|
|
- [x] README documentation complete
|
||
|
|
- [x] Verification document created
|
||
|
|
|
||
|
|
### Phase 2 (Core Implementation)
|
||
|
|
- [ ] `read_local_status()` reads real files
|
||
|
|
- [ ] `read_task_queue()` reads task directory
|
||
|
|
- [ ] Commands return real data (not placeholders)
|
||
|
|
- [ ] Error handling is comprehensive
|
||
|
|
- [ ] Unit tests pass (>80% coverage)
|
||
|
|
|
||
|
|
### Phase 3 (KCL Validation)
|
||
|
|
- [ ] `validate_kcl_workflow()` validates KCL syntax
|
||
|
|
- [ ] Validation errors are user-friendly
|
||
|
|
- [ ] Strict mode performs additional checks
|
||
|
|
- [ ] Integration with KCL tooling
|
||
|
|
|
||
|
|
### Phase 4 (Production Ready)
|
||
|
|
- [ ] Performance targets met
|
||
|
|
- [ ] Integration tests pass
|
||
|
|
- [ ] Benchmarks show 5-10x improvement
|
||
|
|
- [ ] Documentation updated with real examples
|
||
|
|
- [ ] Plugin registered in project
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Dependencies
|
||
|
|
|
||
|
|
### Required
|
||
|
|
- `nu-plugin` (0.107.1) - Plugin SDK
|
||
|
|
- `nu-protocol` (0.107.1) - Nushell types
|
||
|
|
- `serde` (1.0) - Serialization
|
||
|
|
- `serde_json` (1.0) - JSON parsing
|
||
|
|
- `chrono` (0.4) - Timestamps
|
||
|
|
- `walkdir` (2.5) - Directory traversal
|
||
|
|
|
||
|
|
### Optional (Phase 4)
|
||
|
|
- `notify` - File system notifications
|
||
|
|
- `kcl-lang` - KCL validation (if Rust bindings available)
|
||
|
|
- `criterion` - Benchmarking
|
||
|
|
- `cache` - In-memory caching
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Risk Mitigation
|
||
|
|
|
||
|
|
### Risk 1: Orchestrator Data Format Changes
|
||
|
|
**Mitigation**: Version check in status.json, graceful degradation
|
||
|
|
|
||
|
|
### Risk 2: File Access Permissions
|
||
|
|
**Mitigation**: Clear error messages, suggest fixing permissions
|
||
|
|
|
||
|
|
### Risk 3: Large Task Queues
|
||
|
|
**Mitigation**: Streaming, pagination, default limits
|
||
|
|
|
||
|
|
### Risk 4: KCL Integration Complexity
|
||
|
|
**Mitigation**: Shell out to `kcl` binary as fallback
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Timeline Estimate
|
||
|
|
|
||
|
|
- **Phase 1** (Base Structure): ✅ Complete (1 hour)
|
||
|
|
- **Phase 2** (Core Implementation): ~4 hours
|
||
|
|
- **Phase 3** (KCL Validation): ~2 hours
|
||
|
|
- **Phase 4** (Enhancements): ~4 hours
|
||
|
|
|
||
|
|
**Total**: ~11 hours (1 day of focused work)
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
**Next Steps**: Implement Phase 2 (Core Implementation) starting with `read_local_status()` in `src/helpers.rs`.
|