use crate::core::config::AppConfig; use crate::core::message::{Destination, Envelope, Message}; use crate::core::plugin::{Plugin, PluginContext}; use crate::core::plugin_loader::ErrorPolicy; use anyhow::{anyhow, Result}; use std::collections::{HashMap, HashSet}; use std::sync::{mpsc, Arc}; /// 插件运行时状态包装 struct PluginState { plugin: Box, /// 是否为动态加载的插件 is_dynamic: bool, /// 错误处理策略 error_policy: ErrorPolicy, /// 连续错误计数 error_count: u32, /// 最大允许错误数 max_errors: u32, /// 是否启用 enabled: bool, } impl PluginState { fn new_static(plugin: Box) -> Self { Self { plugin, is_dynamic: false, error_policy: ErrorPolicy::DisableAndLog, error_count: 0, max_errors: u32::MAX, // 静态插件不自动禁用 enabled: true, } } fn new_dynamic( plugin: Box, error_policy: ErrorPolicy, max_errors: u32, ) -> Self { Self { plugin, is_dynamic: true, error_policy, error_count: 0, max_errors, enabled: true, } } fn id(&self) -> &str { self.plugin.id() } /// 记录一次错误,返回是否超过阈值 fn record_error(&mut self) -> bool { self.error_count += 1; self.error_count >= self.max_errors } /// 重置错误计数(成功处理消息后调用) fn reset_errors(&mut self) { self.error_count = 0; } } /// 中央调度器:插件注册、生命周期管理、消息路由 pub struct ServiceManager { plugins: Vec, config: Arc, tx: mpsc::Sender, rx: mpsc::Receiver, running: bool, } impl ServiceManager { pub fn new(config: AppConfig) -> Self { let (tx, rx) = mpsc::channel(); Self { plugins: Vec::new(), config: Arc::new(config), tx, rx, running: false, } } /// 注册静态插件(编译时链接的插件) pub fn register(&mut self, plugin: Box) { println!("[ServiceManager] 注册插件: {}", plugin.id()); self.plugins.push(PluginState::new_static(plugin)); } /// 注册动态插件(运行时加载的 .so 插件) pub fn register_dynamic( &mut self, plugin: Box, error_policy: ErrorPolicy, max_errors: u32, ) { println!( "[ServiceManager] 注册动态插件: {} (策略: {:?}, 最大错误: {})", plugin.id(), error_policy, max_errors ); self.plugins .push(PluginState::new_dynamic(plugin, error_policy, max_errors)); } /// 按注册顺序 init() + start() 所有插件 /// 动态插件 init/start 失败时按策略处理,不中断其他插件 pub fn start_all(&mut self) -> Result<()> { self.validate_and_sort_plugins()?; // init for state in &mut self.plugins { let ctx = PluginContext { tx: self.tx.clone(), config: Arc::clone(&self.config), }; println!("[ServiceManager] 初始化插件: {}", state.id()); if let Err(e) = state.plugin.init(ctx) { if state.is_dynamic { eprintln!( "[ServiceManager] 动态插件 '{}' 初始化失败,禁用: {}", state.id(), e ); state.enabled = false; continue; } else { return Err(e); } } } // start for state in &mut self.plugins { if !state.enabled { continue; } println!("[ServiceManager] 启动插件: {}", state.id()); if let Err(e) = state.plugin.start() { if state.is_dynamic { eprintln!( "[ServiceManager] 动态插件 '{}' 启动失败,禁用: {}", state.id(), e ); state.enabled = false; continue; } else { return Err(e); } } } Ok(()) } /// 主消息循环(阻塞) pub fn run(&mut self) -> Result<()> { println!("[ServiceManager] 进入主消息循环"); self.running = true; while self.running { let envelope = match self.rx.recv() { Ok(env) => env, Err(_) => { println!("[ServiceManager] 所有发送端已关闭,退出"); break; } }; match envelope.to { Destination::Plugin(id) => { self.deliver_to_plugin(&id, envelope.message); } Destination::Broadcast => { self.broadcast_message(envelope.message); } Destination::Manager => { self.handle_manager_message(envelope.message)?; } } } self.stop_all() } /// 逆序 stop() 所有插件 pub fn stop_all(&mut self) -> Result<()> { println!("[ServiceManager] 停止所有插件"); for state in self.plugins.iter_mut().rev() { if !state.enabled { continue; } println!("[ServiceManager] 停止插件: {}", state.id()); if let Err(e) = state.plugin.stop() { eprintln!( "[ServiceManager] 停止插件 '{}' 失败: {}", state.id(), e ); } } Ok(()) } /// 启用/禁用指定插件 pub fn set_plugin_enabled(&mut self, plugin_id: &str, enabled: bool) -> Result<()> { let state = self .plugins .iter_mut() .find(|s| s.id() == plugin_id) .ok_or_else(|| anyhow!("plugin '{plugin_id}' not found"))?; if enabled && !state.enabled { // 重新启用:reset 错误计数 state.error_count = 0; state.enabled = true; println!("[ServiceManager] 插件 '{plugin_id}' 已启用"); } else if !enabled && state.enabled { state.enabled = false; println!("[ServiceManager] 插件 '{plugin_id}' 已禁用"); } Ok(()) } /// 查询插件状态信息(供 HTTP API 使用) pub fn plugin_states(&self) -> Vec { self.plugins .iter() .map(|s| PluginStateInfo { id: s.id().to_string(), info: s.plugin.info(), is_dynamic: s.is_dynamic, error_policy: s.error_policy.clone(), error_count: s.error_count, max_errors: s.max_errors, enabled: s.enabled, }) .collect() } /// 热替换动态插件(stop 旧的 → 替换 → init → start 新的) pub fn replace_dynamic_plugin( &mut self, plugin_id: &str, new_plugin: Box, error_policy: ErrorPolicy, max_errors: u32, ) -> Result<()> { let idx = self .plugins .iter() .position(|s| s.id() == plugin_id) .ok_or_else(|| anyhow!("plugin '{plugin_id}' not found for replacement"))?; // Stop old plugin if self.plugins[idx].enabled { let _ = self.plugins[idx].plugin.stop(); } // Replace let mut new_state = PluginState::new_dynamic(new_plugin, error_policy, max_errors); // Init new plugin let ctx = PluginContext { tx: self.tx.clone(), config: Arc::clone(&self.config), }; new_state.plugin.init(ctx)?; new_state.plugin.start()?; self.plugins[idx] = new_state; println!("[ServiceManager] 插件 '{plugin_id}' 热替换成功"); Ok(()) } /// 处理发给管理层自身的消息 fn handle_manager_message(&mut self, msg: Message) -> Result<()> { match msg { Message::Shutdown => { println!("[ServiceManager] 收到 Shutdown 指令"); self.broadcast_message(Message::Shutdown); } Message::WifiResult(payload) => { self.broadcast_message(Message::WifiResult(payload)); } Message::PlayerStatus(status) => { self.broadcast_message(Message::PlayerStatus(status)); } Message::StateChanged { old_state, new_state, } => { self.broadcast_message(Message::StateChanged { old_state, new_state, }); } Message::WifiProvisioned { ssid, ip } => { self.broadcast_message(Message::WifiProvisioned { ssid, ip }); } Message::ConfigReloadRequest => { println!("[ServiceManager] 收到配置重载请求"); match AppConfig::from_file(&self.config.source_path) { Ok(new_config) => { let new_config = Arc::new(new_config); self.config = Arc::clone(&new_config); println!("[ServiceManager] 配置重载成功,广播 ConfigReloaded"); self.broadcast_message(Message::ConfigReloaded(new_config)); } Err(e) => { eprintln!("[ServiceManager] 配置重载失败: {}", e); } } } Message::PluginReady(id) => { println!("[ServiceManager] 插件 '{}' 就绪", id); self.broadcast_message(Message::PluginReady(id)); } _ => {} } Ok(()) } fn validate_and_sort_plugins(&mut self) -> Result<()> { let mut plugin_ids = Vec::with_capacity(self.plugins.len()); let mut plugin_set = HashSet::with_capacity(self.plugins.len()); let mut dependency_map = HashMap::with_capacity(self.plugins.len()); for state in &self.plugins { let id = state.id().to_string(); if !plugin_set.insert(id.clone()) { return Err(anyhow!("duplicate plugin id registered: '{id}'")); } plugin_ids.push(id.clone()); dependency_map.insert(id, state.plugin.dependencies()); } for (plugin_id, dependencies) in &dependency_map { for dependency in dependencies { if dependency == plugin_id { return Err(anyhow!("plugin '{plugin_id}' cannot depend on itself")); } if !plugin_set.contains(dependency.as_str()) { return Err(anyhow!( "plugin '{plugin_id}' depends on missing plugin '{dependency}'" )); } } } let mut resolved = HashSet::with_capacity(plugin_ids.len()); let mut sorted_ids = Vec::with_capacity(plugin_ids.len()); while sorted_ids.len() < plugin_ids.len() { let mut progressed = false; for plugin_id in &plugin_ids { if resolved.contains(plugin_id) { continue; } let dependencies = dependency_map .get(plugin_id) .expect("plugin dependency map must contain all registered ids"); if dependencies .iter() .all(|dependency| resolved.contains(dependency)) { resolved.insert(plugin_id.clone()); sorted_ids.push(plugin_id.clone()); progressed = true; } } if !progressed { let unresolved = plugin_ids .iter() .filter(|plugin_id| !resolved.contains(plugin_id.as_str())) .cloned() .collect::>() .join(", "); return Err(anyhow!( "plugin dependency cycle detected among: {unresolved}" )); } } let mut remaining_plugins = std::mem::take(&mut self.plugins); let mut ordered_plugins = Vec::with_capacity(remaining_plugins.len()); for plugin_id in &sorted_ids { let index = remaining_plugins .iter() .position(|state| state.id() == plugin_id) .ok_or_else(|| anyhow!("plugin '{plugin_id}' disappeared during sorting"))?; ordered_plugins.push(remaining_plugins.remove(index)); } self.plugins = ordered_plugins; Ok(()) } /// 投递消息给指定插件,带错误计数和策略处理 fn deliver_to_plugin(&mut self, id: &str, msg: Message) { let state = match self.plugins.iter_mut().find(|s| s.id() == id) { Some(s) => s, None => { eprintln!("[ServiceManager] 目标插件 '{}' 不存在", id); return; } }; if !state.enabled { return; } match state.plugin.handle_message(msg) { Ok(()) => { state.reset_errors(); } Err(e) => { eprintln!( "[ServiceManager] 插件 '{}' 处理消息失败 ({}/{}): {}", id, state.error_count + 1, state.max_errors, e ); if state.record_error() && state.is_dynamic { self.handle_error_threshold(id); } } } } fn broadcast_message(&mut self, msg: Message) { let should_shutdown = matches!(&msg, Message::Shutdown); for state in &mut self.plugins { if !state.enabled { continue; } match state.plugin.handle_message(msg.clone()) { Ok(()) => { state.reset_errors(); } Err(e) => { eprintln!( "[ServiceManager] 插件 '{}' 处理广播消息失败: {}", state.id(), e ); // 广播消息的错误不触发阈值处理(避免广播期间修改列表) } } } if should_shutdown { println!("[ServiceManager] 收到 Shutdown 广播"); self.running = false; } } /// 插件错误达到阈值时的处理 fn handle_error_threshold(&mut self, plugin_id: &str) { let state = match self.plugins.iter_mut().find(|s| s.id() == plugin_id) { Some(s) => s, None => return, }; match state.error_policy { ErrorPolicy::DisableAndLog => { eprintln!( "[ServiceManager] 插件 '{}' 错误次数达到阈值,已禁用", plugin_id ); state.enabled = false; } ErrorPolicy::AutoRollback => { eprintln!( "[ServiceManager] 插件 '{}' 错误次数达到阈值,需要回退 (由外部 VersionManager 处理)", plugin_id ); // 先禁用,等待外部 (main.rs / HTTP API) 调用 VersionManager 执行回退 state.enabled = false; } } } /// 获取发送通道的克隆(供外部使用) pub fn sender(&self) -> mpsc::Sender { self.tx.clone() } } /// 插件状态信息(用于 API 查询) #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PluginStateInfo { pub id: String, pub info: crate::core::plugin::PluginInfo, pub is_dynamic: bool, pub error_policy: ErrorPolicy, pub error_count: u32, pub max_errors: u32, pub enabled: bool, }