|
8 | 8 | //! - Platform-specific actions (Slack, Discord, etc.) |
9 | 9 |
|
10 | 10 | use std::collections::HashMap; |
| 11 | +use std::path::PathBuf; |
11 | 12 | use std::sync::Arc; |
12 | 13 |
|
13 | 14 | use chrono::Utc; |
14 | | -use tokio::sync::mpsc; |
| 15 | +use tokio::sync::{mpsc, RwLock}; |
15 | 16 | use tracing::{debug, error, info, warn}; |
16 | 17 | use uuid::Uuid; |
17 | 18 |
|
18 | 19 | use aof_core::{ |
19 | | - AgentFlow, AgentFlowState, AofError, AofResult, FlowError, FlowExecutionStatus, FlowNode, |
20 | | - NodeExecutionStatus, NodeResult, NodeType, |
| 20 | + AgentConfig, AgentFlow, AgentFlowState, AofError, AofResult, FlowError, FlowExecutionStatus, |
| 21 | + FlowNode, NodeExecutionStatus, NodeResult, NodeType, |
21 | 22 | }; |
22 | 23 |
|
23 | 24 | use super::Runtime; |
@@ -63,23 +64,36 @@ pub enum AgentFlowEvent { |
63 | 64 | /// AgentFlow executor |
64 | 65 | pub struct AgentFlowExecutor { |
65 | 66 | flow: AgentFlow, |
66 | | - #[allow(dead_code)] |
67 | | - runtime: Arc<Runtime>, |
| 67 | + runtime: Arc<RwLock<Runtime>>, |
68 | 68 | event_tx: Option<mpsc::Sender<AgentFlowEvent>>, |
| 69 | + /// Directory to search for agent YAML files |
| 70 | + agents_dir: Option<PathBuf>, |
69 | 71 | } |
70 | 72 |
|
71 | 73 | impl AgentFlowExecutor { |
72 | 74 | /// Create a new AgentFlow executor |
73 | | - pub fn new(flow: AgentFlow, runtime: Arc<Runtime>) -> Self { |
| 75 | + pub fn new(flow: AgentFlow, runtime: Arc<RwLock<Runtime>>) -> Self { |
74 | 76 | Self { |
75 | 77 | flow, |
76 | 78 | runtime, |
77 | 79 | event_tx: None, |
| 80 | + agents_dir: None, |
78 | 81 | } |
79 | 82 | } |
80 | 83 |
|
| 84 | + /// Create with a non-locked runtime (convenience constructor) |
| 85 | + pub fn with_runtime(flow: AgentFlow, runtime: Runtime) -> Self { |
| 86 | + Self::new(flow, Arc::new(RwLock::new(runtime))) |
| 87 | + } |
| 88 | + |
| 89 | + /// Set the agents directory for loading agent configs |
| 90 | + pub fn with_agents_dir(mut self, dir: impl Into<PathBuf>) -> Self { |
| 91 | + self.agents_dir = Some(dir.into()); |
| 92 | + self |
| 93 | + } |
| 94 | + |
81 | 95 | /// Load AgentFlow from file |
82 | | - pub async fn from_file(path: &str, runtime: Arc<Runtime>) -> AofResult<Self> { |
| 96 | + pub async fn from_file(path: &str, runtime: Arc<RwLock<Runtime>>) -> AofResult<Self> { |
83 | 97 | let content = std::fs::read_to_string(path).map_err(|e| { |
84 | 98 | AofError::Config(format!("Failed to read AgentFlow config {}: {}", path, e)) |
85 | 99 | })?; |
@@ -388,31 +402,146 @@ impl AgentFlowExecutor { |
388 | 402 | Ok(agent_result) |
389 | 403 | } |
390 | 404 |
|
391 | | - /// Run an agent (placeholder - needs integration with AgentExecutor) |
| 405 | + /// Run an agent using the runtime |
| 406 | + /// |
| 407 | + /// This method: |
| 408 | + /// 1. Checks if the agent is already loaded in the runtime |
| 409 | + /// 2. If not, tries to load it from the agents directory |
| 410 | + /// 3. Applies flow context (kubeconfig, env vars) to the execution |
| 411 | + /// 4. Executes the agent and returns the result |
392 | 412 | async fn run_agent( |
393 | 413 | &self, |
394 | 414 | agent_name: &str, |
395 | 415 | input: &str, |
396 | | - _state: &AgentFlowState, |
| 416 | + state: &AgentFlowState, |
397 | 417 | ) -> AofResult<serde_json::Value> { |
398 | | - // This is a simplified implementation |
399 | | - // In production, this would: |
400 | | - // 1. Load agent config from registry or file |
401 | | - // 2. Create AgentExecutor |
402 | | - // 3. Run the agent with input |
403 | | - // 4. Return the response |
404 | | - |
405 | | - // For now, try to use the runtime to check if agent exists |
406 | | - info!("Agent {} called with input: {}", agent_name, input); |
407 | | - |
408 | | - // Return a placeholder response |
409 | | - // The real implementation would integrate with aof_runtime::AgentExecutor |
410 | | - Ok(serde_json::json!({ |
411 | | - "agent": agent_name, |
412 | | - "input": input, |
413 | | - "output": format!("Agent {} processed: {}", agent_name, input), |
414 | | - "requires_approval": false |
415 | | - })) |
| 418 | + info!("Executing agent '{}' with input: {}", agent_name, input); |
| 419 | + |
| 420 | + // First, try to execute with the agent already loaded |
| 421 | + { |
| 422 | + let runtime = self.runtime.read().await; |
| 423 | + if runtime.has_agent(agent_name) { |
| 424 | + // Apply flow context if specified |
| 425 | + self.apply_flow_context().await?; |
| 426 | + |
| 427 | + let result = runtime.execute(agent_name, input).await?; |
| 428 | + return Ok(serde_json::json!({ |
| 429 | + "agent": agent_name, |
| 430 | + "input": input, |
| 431 | + "output": result, |
| 432 | + "requires_approval": false |
| 433 | + })); |
| 434 | + } |
| 435 | + } |
| 436 | + |
| 437 | + // Agent not loaded - try to load from agents directory |
| 438 | + if let Some(ref agents_dir) = self.agents_dir { |
| 439 | + // Try common naming patterns |
| 440 | + let possible_paths = vec![ |
| 441 | + agents_dir.join(format!("{}.yaml", agent_name)), |
| 442 | + agents_dir.join(format!("{}.yml", agent_name)), |
| 443 | + agents_dir.join(format!("{}-agent.yaml", agent_name)), |
| 444 | + agents_dir.join(format!("{}-agent.yml", agent_name)), |
| 445 | + ]; |
| 446 | + |
| 447 | + for path in possible_paths { |
| 448 | + if path.exists() { |
| 449 | + info!("Loading agent '{}' from {}", agent_name, path.display()); |
| 450 | + |
| 451 | + let mut runtime = self.runtime.write().await; |
| 452 | + runtime.load_agent_from_file(path.to_string_lossy().as_ref()).await?; |
| 453 | + |
| 454 | + // Apply flow context |
| 455 | + drop(runtime); // Release write lock |
| 456 | + self.apply_flow_context().await?; |
| 457 | + |
| 458 | + let runtime = self.runtime.read().await; |
| 459 | + let result = runtime.execute(agent_name, input).await?; |
| 460 | + |
| 461 | + return Ok(serde_json::json!({ |
| 462 | + "agent": agent_name, |
| 463 | + "input": input, |
| 464 | + "output": result, |
| 465 | + "requires_approval": false |
| 466 | + })); |
| 467 | + } |
| 468 | + } |
| 469 | + } |
| 470 | + |
| 471 | + // Agent config might also be embedded in the flow |
| 472 | + // Check if there's a node config with agent config |
| 473 | + if let Some(node) = self.flow.spec.nodes.iter().find(|n| { |
| 474 | + n.config.agent.as_ref() == Some(&agent_name.to_string()) |
| 475 | + }) { |
| 476 | + if let Some(ref config_yaml) = node.config.agent_config { |
| 477 | + info!("Loading agent '{}' from inline config", agent_name); |
| 478 | + |
| 479 | + let agent_config: AgentConfig = serde_yaml::from_str(config_yaml).map_err(|e| { |
| 480 | + AofError::Config(format!("Failed to parse inline agent config: {}", e)) |
| 481 | + })?; |
| 482 | + |
| 483 | + let mut runtime = self.runtime.write().await; |
| 484 | + runtime.load_agent_from_config(agent_config).await?; |
| 485 | + |
| 486 | + drop(runtime); |
| 487 | + self.apply_flow_context().await?; |
| 488 | + |
| 489 | + let runtime = self.runtime.read().await; |
| 490 | + let result = runtime.execute(agent_name, input).await?; |
| 491 | + |
| 492 | + return Ok(serde_json::json!({ |
| 493 | + "agent": agent_name, |
| 494 | + "input": input, |
| 495 | + "output": result, |
| 496 | + "requires_approval": false |
| 497 | + })); |
| 498 | + } |
| 499 | + } |
| 500 | + |
| 501 | + // Could not find or load the agent |
| 502 | + Err(AofError::Config(format!( |
| 503 | + "Agent '{}' not found. Ensure it's loaded or available in the agents directory.", |
| 504 | + agent_name |
| 505 | + ))) |
| 506 | + } |
| 507 | + |
| 508 | + /// Apply flow context to the environment |
| 509 | + async fn apply_flow_context(&self) -> AofResult<()> { |
| 510 | + if let Some(ref context) = self.flow.spec.context { |
| 511 | + // Set KUBECONFIG if specified |
| 512 | + if let Some(ref kubeconfig) = context.kubeconfig { |
| 513 | + std::env::set_var("KUBECONFIG", kubeconfig); |
| 514 | + info!("Set KUBECONFIG to {}", kubeconfig); |
| 515 | + } |
| 516 | + |
| 517 | + // Set namespace as env var if specified |
| 518 | + if let Some(ref namespace) = context.namespace { |
| 519 | + std::env::set_var("K8S_NAMESPACE", namespace); |
| 520 | + info!("Set K8S_NAMESPACE to {}", namespace); |
| 521 | + } |
| 522 | + |
| 523 | + // Set cluster name if specified |
| 524 | + if let Some(ref cluster) = context.cluster { |
| 525 | + std::env::set_var("K8S_CLUSTER", cluster); |
| 526 | + info!("Set K8S_CLUSTER to {}", cluster); |
| 527 | + } |
| 528 | + |
| 529 | + // Set working directory if specified |
| 530 | + if let Some(ref working_dir) = context.working_dir { |
| 531 | + std::env::set_current_dir(working_dir).map_err(|e| { |
| 532 | + AofError::Config(format!("Failed to change working directory: {}", e)) |
| 533 | + })?; |
| 534 | + info!("Changed working directory to {}", working_dir); |
| 535 | + } |
| 536 | + |
| 537 | + // Set additional environment variables |
| 538 | + for (key, value) in &context.env { |
| 539 | + std::env::set_var(key, value); |
| 540 | + debug!("Set env var {}={}", key, value); |
| 541 | + } |
| 542 | + } |
| 543 | + |
| 544 | + Ok(()) |
416 | 545 | } |
417 | 546 |
|
418 | 547 | /// Execute a Conditional node |
@@ -874,7 +1003,7 @@ mod tests { |
874 | 1003 |
|
875 | 1004 | #[test] |
876 | 1005 | fn test_evaluate_condition() { |
877 | | - let runtime = Arc::new(Runtime::new()); |
| 1006 | + let runtime = Arc::new(RwLock::new(Runtime::new())); |
878 | 1007 | let flow: AgentFlow = serde_yaml::from_str( |
879 | 1008 | r#" |
880 | 1009 | apiVersion: aof.dev/v1 |
|
0 commit comments