Files
ShowenV2/src/core/service_manager.rs
showen d30c111c71 feat: M1.1 完成 + M1.2 启动 — 全量更新
M1.1 收尾:
- 24项 P0/P1/P2 bug 修复 (Rust 107 tests + Flutter 15 tests)
- Flutter App v0.3: cupertino_icons 修复, 单元测试, 调试面板, APK 52.6MB
- 示例插件完善: manifest.json + 请求/响应示范 + 7个测试
- API 文档重写 (以 routes.rs 为唯一权威)
- MILESTONES.md 更新至 100%

M1.2 启动:
- P0: 插件管理 API 闭环 (handle_manager_message Custom 分支 + broadcast_plugin_states)
- ServiceManager 集成测试 8/8 (tests/m1_2_service_manager.rs)
- M1.2 测试计划 (docs/M1.2_TEST_PLAN.md, 18个E2E场景)
- 动态插件系统: auto_rollback + version_manager GC + 路径穿越防护

总计: Rust 115/115 测试, Flutter 15/15 测试, 零 warning

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 18:12:42 +08:00

1153 lines
40 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use crate::core::config::AppConfig;
use crate::core::message::{Destination, Envelope, Message};
use crate::core::plugin::{CapabilityTestResult, Plugin, PluginContext};
use crate::core::plugin_loader::{ErrorPolicy, PluginLoader, PluginRegistryEntry};
use crate::core::plugin_repo::PluginRepository;
use crate::core::version_manager::VersionManager;
use anyhow::{anyhow, Result};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc};
const DEFAULT_PLUGIN_REPO_URL: &str = "https://plugins.example.com";
#[derive(Deserialize)]
struct PluginSwitchCommand {
id: String,
version: String,
}
#[derive(Deserialize)]
struct PluginInstallCommand {
id: String,
#[serde(default)]
version: Option<String>,
}
/// 插件运行时状态包装
struct PluginState {
plugin: Box<dyn Plugin>,
/// 是否为动态加载的插件
is_dynamic: bool,
/// 错误处理策略
error_policy: ErrorPolicy,
/// 连续错误计数
error_count: u32,
/// 最大允许错误数
max_errors: u32,
/// 是否启用
enabled: bool,
/// 挂载时的自测结果
test_results: Vec<CapabilityTestResult>,
/// 声明的功能列表
capabilities: Vec<String>,
/// manifest 中声明的必须通过的功能
required_capabilities: Vec<String>,
/// 是否自动测试
auto_test: bool,
/// 是否需要在后续生命周期中执行回退
needs_rollback: bool,
}
impl PluginState {
fn new_static(plugin: Box<dyn Plugin>) -> Self {
Self {
plugin,
is_dynamic: false,
error_policy: ErrorPolicy::DisableAndLog,
error_count: 0,
max_errors: u32::MAX, // 静态插件不自动禁用
enabled: true,
test_results: vec![],
capabilities: vec![],
required_capabilities: vec![],
auto_test: false, // 静态插件默认不自测
needs_rollback: false,
}
}
fn new_dynamic(plugin: Box<dyn Plugin>, error_policy: ErrorPolicy, max_errors: u32) -> Self {
Self {
plugin,
is_dynamic: true,
error_policy,
error_count: 0,
max_errors,
enabled: true,
test_results: vec![],
capabilities: vec![],
required_capabilities: vec![],
auto_test: true,
needs_rollback: false,
}
}
fn id(&self) -> &str {
self.plugin.id()
}
/// 记录一次错误,返回是否超过阈值
fn record_error(&mut self) -> bool {
self.error_count += 1;
self.error_count >= self.max_errors
}
/// 重置错误计数(成功处理消息后调用)
fn reset_errors(&mut self) {
self.error_count = 0;
}
}
/// 中央调度器:插件注册、生命周期管理、消息路由
pub struct ServiceManager {
plugins: Vec<PluginState>,
config: Arc<AppConfig>,
tx: mpsc::Sender<Envelope>,
rx: mpsc::Receiver<Envelope>,
running: bool,
version_manager: Option<VersionManager>,
}
impl ServiceManager {
fn plugin_context(&self) -> PluginContext {
PluginContext {
tx: self.tx.clone(),
config: Arc::clone(&self.config),
}
}
fn init_and_start_plugin_with_context(
state: &mut PluginState,
ctx: PluginContext,
) -> Result<()> {
if let Err(init_error) = state.plugin.init(ctx) {
let cleanup_error = state.plugin.stop().err();
return match cleanup_error {
Some(stop_error) => Err(anyhow!(
"plugin '{}' init failed: {}; cleanup stop failed: {}",
state.id(),
init_error,
stop_error
)),
None => Err(init_error),
};
}
if let Err(start_error) = state.plugin.start() {
let cleanup_error = state.plugin.stop().err();
return match cleanup_error {
Some(stop_error) => Err(anyhow!(
"plugin '{}' start failed: {}; cleanup stop failed: {}",
state.id(),
start_error,
stop_error
)),
None => Err(start_error),
};
}
Ok(())
}
pub fn new(config: AppConfig) -> Self {
let (tx, rx) = mpsc::channel();
Self {
plugins: Vec::new(),
config: Arc::new(config),
tx,
rx,
running: false,
version_manager: None,
}
}
pub fn set_version_manager(&mut self, version_manager: VersionManager) {
self.version_manager = Some(version_manager);
}
/// 注册静态插件(编译时链接的插件)
pub fn register(&mut self, plugin: Box<dyn Plugin>) {
println!("[ServiceManager] 注册插件: {}", plugin.id());
self.plugins.push(PluginState::new_static(plugin));
}
/// 注册动态插件(运行时加载的 .so 插件)
pub fn register_dynamic(
&mut self,
plugin: Box<dyn Plugin>,
error_policy: ErrorPolicy,
max_errors: u32,
) {
self.register_dynamic_with_manifest(plugin, error_policy, max_errors, vec![], vec![], true);
}
/// 注册动态插件(带 manifest 自测信息)
pub fn register_dynamic_with_manifest(
&mut self,
plugin: Box<dyn Plugin>,
error_policy: ErrorPolicy,
max_errors: u32,
required_capabilities: Vec<String>,
capabilities: Vec<String>,
auto_test: bool,
) {
println!(
"[ServiceManager] 注册动态插件: {} (策略: {:?}, 最大错误: {})",
plugin.id(),
error_policy,
max_errors
);
let mut state = PluginState::new_dynamic(plugin, error_policy, max_errors);
state.required_capabilities = required_capabilities;
state.capabilities = capabilities;
state.auto_test = auto_test;
self.plugins.push(state);
}
/// 按注册顺序 init() → self_test() → start() 所有插件
/// 动态插件 init/start/test 失败时按策略处理,不中断其他插件
pub fn start_all(&mut self) -> Result<()> {
self.validate_and_sort_plugins()?;
// Phase 1: init
for state in &mut self.plugins {
let ctx = PluginContext {
tx: self.tx.clone(),
config: Arc::clone(&self.config),
};
println!("[ServiceManager] 初始化插件: {}", state.id());
if let Err(e) = state.plugin.init(ctx) {
if state.is_dynamic {
eprintln!(
"[ServiceManager] 动态插件 '{}' 初始化失败,禁用: {}",
state.id(),
e
);
state.enabled = false;
continue;
} else {
return Err(e);
}
}
}
// Phase 2: self_test (init 之后, start 之前)
for state in &mut self.plugins {
if !state.enabled || !state.auto_test {
continue;
}
// 获取插件的功能列表并运行自测
let caps = state.plugin.capabilities();
if caps.is_empty() && state.required_capabilities.is_empty() {
// 无功能声明 → 跳过自测
continue;
}
println!(
"[ServiceManager] 自测插件: {} (功能: {:?})",
state.id(),
caps
);
state.capabilities = caps;
let results = state.plugin.self_test();
// 检查 required_capabilities 中的项是否全部通过
let mut has_required_failure = false;
let passed_caps: std::collections::HashSet<&str> = results
.iter()
.filter(|r| r.passed)
.map(|r| r.capability.as_str())
.collect();
// 检查每个 required capability 是否出现在通过列表中
for req in &state.required_capabilities {
if !passed_caps.contains(req.as_str()) {
eprintln!(
"[ServiceManager] ✗ [必须] {}{}",
req,
results
.iter()
.find(|r| r.capability == *req)
.map(|r| r.message.as_str())
.unwrap_or("未在测试结果中出现")
);
has_required_failure = true;
}
}
for result in &results {
if result.passed {
println!(
"[ServiceManager] ✓ {}{}",
result.capability, result.message
);
} else if !state.required_capabilities.contains(&result.capability) {
eprintln!(
"[ServiceManager] ✗ [可选] {}{}",
result.capability, result.message
);
}
}
state.test_results = results;
if has_required_failure {
if state.is_dynamic {
match &state.error_policy {
ErrorPolicy::AutoRollback => {
eprintln!(
"[ServiceManager] 动态插件 '{}' 必须能力自测失败,尝试自动回退到稳定版本",
state.id()
);
state.needs_rollback = true;
}
ErrorPolicy::DisableAndLog => {
eprintln!(
"[ServiceManager] 动态插件 '{}' 必须能力自测失败,禁用",
state.id()
);
state.needs_rollback = false;
}
}
state.enabled = false;
} else {
return Err(anyhow!("静态插件 '{}' 必须能力自测失败", state.id()));
}
}
}
for idx in 0..self.plugins.len() {
if !self.plugins[idx].needs_rollback {
continue;
}
let plugin_id = self.plugins[idx].id().to_string();
self.rollback_dynamic_plugin(idx, &plugin_id);
}
// Phase 3: start
for state in &mut self.plugins {
if !state.enabled {
continue;
}
println!("[ServiceManager] 启动插件: {}", state.id());
if let Err(e) = state.plugin.start() {
if state.is_dynamic {
eprintln!(
"[ServiceManager] 动态插件 '{}' 启动失败,禁用: {}",
state.id(),
e
);
state.enabled = false;
continue;
} else {
return Err(e);
}
}
}
self.broadcast_plugin_states();
Ok(())
}
/// 主消息循环(阻塞)
pub fn run(&mut self) -> Result<()> {
println!("[ServiceManager] 进入主消息循环");
self.running = true;
while self.running {
let envelope = match self.rx.recv() {
Ok(env) => env,
Err(_) => {
println!("[ServiceManager] 所有发送端已关闭,退出");
break;
}
};
match envelope.to {
Destination::Plugin(id) => {
self.deliver_to_plugin(&id, envelope.message);
}
Destination::Broadcast => {
self.broadcast_message(envelope.message);
}
Destination::Manager => {
self.handle_manager_message(envelope.message)?;
}
}
}
self.stop_all()
}
/// 逆序 stop() 所有插件
pub fn stop_all(&mut self) -> Result<()> {
println!("[ServiceManager] 停止所有插件");
for state in self.plugins.iter_mut().rev() {
if !state.enabled {
continue;
}
println!("[ServiceManager] 停止插件: {}", state.id());
if let Err(e) = state.plugin.stop() {
eprintln!("[ServiceManager] 停止插件 '{}' 失败: {}", state.id(), e);
}
}
Ok(())
}
/// 启用/禁用指定插件
pub fn set_plugin_enabled(&mut self, plugin_id: &str, enabled: bool) -> Result<()> {
let idx = self
.plugins
.iter()
.position(|s| s.id() == plugin_id)
.ok_or_else(|| anyhow!("plugin '{plugin_id}' not found"))?;
if enabled && !self.plugins[idx].enabled {
let ctx = self.plugin_context();
let state = &mut self.plugins[idx];
state.error_count = 0;
match Self::init_and_start_plugin_with_context(state, ctx) {
Ok(()) => {
state.enabled = true;
println!("[ServiceManager] 插件 '{plugin_id}' 已启用");
}
Err(error) => {
state.enabled = false;
return Err(anyhow!("failed to enable plugin '{plugin_id}': {error}"));
}
}
} else if !enabled && self.plugins[idx].enabled {
let state = &mut self.plugins[idx];
state.plugin.stop()?;
state.enabled = false;
println!("[ServiceManager] 插件 '{plugin_id}' 已禁用");
}
Ok(())
}
pub fn rollback_plugin(&mut self, plugin_id: &str) -> Result<()> {
let idx = self
.plugins
.iter()
.position(|state| state.id() == plugin_id)
.ok_or_else(|| anyhow!("plugin '{plugin_id}' not found"))?;
if !self.plugins[idx].is_dynamic {
return Err(anyhow!(
"plugin '{plugin_id}' is not dynamic and cannot be rolled back"
));
}
self.rollback_dynamic_plugin(idx, plugin_id);
Ok(())
}
/// 查询插件状态信息(供 HTTP API 使用)
pub fn plugin_states(&self) -> Vec<PluginStateInfo> {
self.plugins
.iter()
.map(|s| PluginStateInfo {
id: s.id().to_string(),
info: s.plugin.info(),
is_dynamic: s.is_dynamic,
error_policy: s.error_policy.clone(),
error_count: s.error_count,
max_errors: s.max_errors,
enabled: s.enabled,
test_results: s.test_results.clone(),
capabilities: s.capabilities.clone(),
needs_rollback: s.needs_rollback,
})
.collect()
}
fn replace_dynamic_plugin_at_index(
&mut self,
idx: usize,
plugin_id: &str,
new_plugin: Box<dyn Plugin>,
error_policy: ErrorPolicy,
max_errors: u32,
required_capabilities: Vec<String>,
capabilities: Vec<String>,
auto_test: bool,
) -> Result<()> {
if !self.plugins[idx].is_dynamic {
return Err(anyhow!(
"plugin '{plugin_id}' is not dynamic and cannot be replaced"
));
}
let mut new_state = PluginState::new_dynamic(new_plugin, error_policy, max_errors);
new_state.required_capabilities = required_capabilities;
new_state.capabilities = capabilities;
new_state.auto_test = auto_test;
let ctx = self.plugin_context();
let mut old_state = self.plugins.remove(idx);
let old_was_enabled = old_state.enabled;
if old_was_enabled {
// 先停旧插件,避免热替换窗口内新旧实例同时持有端口、文件句柄等独占资源。
old_state.plugin.stop()?;
}
let replace_result = Self::init_and_start_plugin_with_context(&mut new_state, ctx);
match replace_result {
Ok(()) => {
new_state.enabled = true;
self.plugins.insert(idx, new_state);
}
Err(new_error) => {
if old_was_enabled {
let restore_ctx = self.plugin_context();
match Self::init_and_start_plugin_with_context(&mut old_state, restore_ctx) {
Ok(()) => {
old_state.enabled = true;
self.plugins.insert(idx, old_state);
return Err(anyhow!(
"failed to replace plugin '{plugin_id}': {new_error}; restored previous plugin"
));
}
Err(restore_error) => {
old_state.enabled = false;
self.plugins.insert(idx, old_state);
return Err(anyhow!(
"failed to replace plugin '{plugin_id}': {new_error}; failed to restore previous plugin: {restore_error}"
));
}
}
}
old_state.enabled = false;
self.plugins.insert(idx, old_state);
return Err(anyhow!(
"failed to replace plugin '{plugin_id}': {new_error}"
));
}
}
println!("[ServiceManager] 插件 '{plugin_id}' 热替换成功");
Ok(())
}
/// 热替换动态插件stop 旧的 → 替换 → init → start 新的)
pub fn replace_dynamic_plugin(
&mut self,
plugin_id: &str,
new_plugin: Box<dyn Plugin>,
error_policy: ErrorPolicy,
max_errors: u32,
) -> Result<()> {
let idx = self
.plugins
.iter()
.position(|s| s.id() == plugin_id)
.ok_or_else(|| anyhow!("plugin '{plugin_id}' not found for replacement"))?;
if !self.plugins[idx].is_dynamic {
return Err(anyhow!(
"plugin '{plugin_id}' is not dynamic and cannot be replaced"
));
}
let required_capabilities = self.plugins[idx].required_capabilities.clone();
let capabilities = self.plugins[idx].capabilities.clone();
let auto_test = self.plugins[idx].auto_test;
self.replace_dynamic_plugin_at_index(
idx,
plugin_id,
new_plugin,
error_policy,
max_errors,
required_capabilities,
capabilities,
auto_test,
)
}
fn plugin_loader(&self) -> Result<PluginLoader> {
let version_manager = self
.version_manager
.as_ref()
.ok_or_else(|| anyhow!("plugin version manager is not configured"))?;
Ok(PluginLoader::new(version_manager.loader().store_path()))
}
fn plugin_repository(&self) -> Result<PluginRepository> {
Ok(PluginRepository::new(
&std::env::var("SHOWEN_PLUGIN_REPO_URL")
.ok()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| DEFAULT_PLUGIN_REPO_URL.to_string()),
self.plugin_loader()?,
))
}
fn register_dynamic_plugin_runtime(
&mut self,
plugin_id: &str,
plugin: Box<dyn Plugin>,
error_policy: ErrorPolicy,
max_errors: u32,
required_capabilities: Vec<String>,
capabilities: Vec<String>,
auto_test: bool,
) -> Result<()> {
let mut state = PluginState::new_dynamic(plugin, error_policy, max_errors);
state.required_capabilities = required_capabilities;
state.capabilities = capabilities;
state.auto_test = auto_test;
let ctx = self.plugin_context();
Self::init_and_start_plugin_with_context(&mut state, ctx)
.map_err(|error| anyhow!("failed to start installed plugin '{plugin_id}': {error}"))?;
state.enabled = true;
self.plugins.push(state);
println!("[ServiceManager] 插件 '{plugin_id}' 已安装并启动");
Ok(())
}
fn switch_plugin_version(&mut self, plugin_id: &str, version: &str) -> Result<()> {
let version_manager = self
.version_manager
.as_ref()
.ok_or_else(|| anyhow!("plugin version manager is not configured"))?;
version_manager.switch_version(plugin_id, version)?;
let (plugin, manifest) = version_manager
.loader()
.load_plugin(plugin_id, Some(version))?;
let loader = PluginLoader::new(version_manager.loader().store_path());
let registry = loader.load_registry()?;
let entry = registry
.plugins
.get(plugin_id)
.ok_or_else(|| anyhow!("plugin '{plugin_id}' not in registry"))?;
if let Some(idx) = self
.plugins
.iter()
.position(|state| state.id() == plugin_id)
{
self.replace_dynamic_plugin_at_index(
idx,
plugin_id,
Box::new(plugin),
manifest.error_policy,
entry.max_errors,
manifest.required_capabilities,
manifest.capabilities,
manifest.auto_test,
)?;
} else {
self.register_dynamic_plugin_runtime(
plugin_id,
Box::new(plugin),
manifest.error_policy,
entry.max_errors,
manifest.required_capabilities,
manifest.capabilities,
manifest.auto_test,
)?;
}
println!("[ServiceManager] 插件 '{plugin_id}' 已切换到版本 {version}");
Ok(())
}
fn install_plugin(&mut self, request: PluginInstallCommand) -> Result<()> {
let plugin_id = request.id.clone();
let repo = self.plugin_repository()?;
let version = match request.version.clone() {
Some(version) => version,
None => repo.check_update(&plugin_id, "0.0.0")?.ok_or_else(|| {
anyhow!(
"repo did not report an installable version for '{}'",
plugin_id
)
})?,
};
repo.download_and_install(&plugin_id, &version)?;
let loader = self.plugin_loader()?;
let (plugin, manifest) = loader.load_plugin(&plugin_id, Some(&version))?;
let mut registry = loader.load_registry()?;
let existing = registry.plugins.get(&plugin_id).cloned();
let entry = PluginRegistryEntry {
active_version: version.clone(),
last_stable_version: existing
.as_ref()
.and_then(|entry| entry.last_stable_version.clone()),
enabled: true,
error_policy: existing
.as_ref()
.map(|entry| entry.error_policy.clone())
.unwrap_or_else(|| manifest.error_policy.clone()),
max_errors: existing.as_ref().map(|entry| entry.max_errors).unwrap_or(5),
};
registry.plugins.insert(plugin_id.clone(), entry.clone());
loader.save_registry(&registry)?;
if let Some(idx) = self
.plugins
.iter()
.position(|state| state.id() == plugin_id)
{
self.replace_dynamic_plugin_at_index(
idx,
&plugin_id,
Box::new(plugin),
manifest.error_policy,
entry.max_errors,
manifest.required_capabilities,
manifest.capabilities,
manifest.auto_test,
)?;
} else {
self.register_dynamic_plugin_runtime(
&plugin_id,
Box::new(plugin),
manifest.error_policy,
entry.max_errors,
manifest.required_capabilities,
manifest.capabilities,
manifest.auto_test,
)?;
}
Ok(())
}
fn check_plugin_updates(&self) -> Result<()> {
let repo = self.plugin_repository()?;
let registry = self.plugin_loader()?.load_registry()?;
for (plugin_id, entry) in &registry.plugins {
match repo.check_update(plugin_id, &entry.active_version)? {
Some(version) => println!(
"[ServiceManager] 插件 '{plugin_id}' 发现可用更新: {} -> {version}",
entry.active_version
),
None => println!(
"[ServiceManager] 插件 '{plugin_id}' 已是最新版本 {}",
entry.active_version
),
}
}
Ok(())
}
fn broadcast_plugin_states(&mut self) {
match serde_json::to_string(&self.plugin_states()) {
Ok(payload) => self.broadcast_message(Message::Custom {
kind: "plugin_states".to_string(),
payload,
}),
Err(error) => eprintln!("[ServiceManager] 序列化 plugin_states 失败: {error}"),
}
}
/// 处理发给管理层自身的消息
fn handle_manager_message(&mut self, msg: Message) -> Result<()> {
match msg {
Message::Shutdown => {
println!("[ServiceManager] 收到 Shutdown 指令");
self.broadcast_message(Message::Shutdown);
}
Message::WifiResult(payload) => {
self.broadcast_message(Message::WifiResult(payload));
}
Message::PlayerStatus(status) => {
self.broadcast_message(Message::PlayerStatus(status));
}
Message::StateChanged {
old_state,
new_state,
} => {
self.broadcast_message(Message::StateChanged {
old_state,
new_state,
});
}
Message::WifiProvisioned { ssid, ip } => {
self.broadcast_message(Message::WifiProvisioned { ssid, ip });
}
Message::ConfigReloadRequest => {
println!("[ServiceManager] 收到配置重载请求");
match AppConfig::from_file(&self.config.source_path) {
Ok(new_config) => {
let new_config = Arc::new(new_config);
self.config = Arc::clone(&new_config);
println!("[ServiceManager] 配置重载成功,广播 ConfigReloaded");
self.broadcast_message(Message::ConfigReloaded((*new_config).clone()));
}
Err(e) => {
eprintln!("[ServiceManager] 配置重载失败: {}", e);
}
}
}
Message::PluginReady(id) => {
println!("[ServiceManager] 插件 '{}' 就绪", id);
self.broadcast_message(Message::PluginReady(id));
}
Message::Custom { kind, payload } => {
let should_broadcast = match kind.as_str() {
"plugin_enable" => {
if let Err(error) = self.set_plugin_enabled(&payload, true) {
eprintln!(
"[ServiceManager] plugin_enable('{}') 失败: {error}",
payload
);
}
true
}
"plugin_disable" => {
if let Err(error) = self.set_plugin_enabled(&payload, false) {
eprintln!(
"[ServiceManager] plugin_disable('{}') 失败: {error}",
payload
);
}
true
}
"plugin_rollback" => {
if let Err(error) = self.rollback_plugin(&payload) {
eprintln!(
"[ServiceManager] plugin_rollback('{}') 失败: {error}",
payload
);
}
true
}
"plugin_switch" => {
match serde_json::from_str::<PluginSwitchCommand>(&payload) {
Ok(command) => {
if let Err(error) =
self.switch_plugin_version(&command.id, &command.version)
{
eprintln!(
"[ServiceManager] plugin_switch('{}', '{}') 失败: {error}",
command.id, command.version
);
}
}
Err(error) => {
eprintln!("[ServiceManager] plugin_switch payload 非法: {error}");
}
}
true
}
"plugin_install" => {
match serde_json::from_str::<PluginInstallCommand>(&payload) {
Ok(command) => {
if let Err(error) = self.install_plugin(command) {
eprintln!("[ServiceManager] plugin_install 失败: {error}");
}
}
Err(error) => {
eprintln!("[ServiceManager] plugin_install payload 非法: {error}");
}
}
true
}
"plugin_check_updates" => {
if let Err(error) = self.check_plugin_updates() {
eprintln!("[ServiceManager] plugin_check_updates 失败: {error}");
}
true
}
_ => false,
};
if should_broadcast {
self.broadcast_plugin_states();
}
}
_ => {}
}
Ok(())
}
fn validate_and_sort_plugins(&mut self) -> Result<()> {
let mut plugin_ids = Vec::with_capacity(self.plugins.len());
let mut plugin_set = HashSet::with_capacity(self.plugins.len());
let mut dependency_map = HashMap::with_capacity(self.plugins.len());
for state in &self.plugins {
let id = state.id().to_string();
if !plugin_set.insert(id.clone()) {
return Err(anyhow!("duplicate plugin id registered: '{id}'"));
}
plugin_ids.push(id.clone());
dependency_map.insert(id, state.plugin.dependencies());
}
for (plugin_id, dependencies) in &dependency_map {
for dependency in dependencies {
if dependency == plugin_id {
return Err(anyhow!("plugin '{plugin_id}' cannot depend on itself"));
}
if !plugin_set.contains(dependency.as_str()) {
return Err(anyhow!(
"plugin '{plugin_id}' depends on missing plugin '{dependency}'"
));
}
}
}
let mut resolved = HashSet::with_capacity(plugin_ids.len());
let mut sorted_ids = Vec::with_capacity(plugin_ids.len());
while sorted_ids.len() < plugin_ids.len() {
let mut progressed = false;
for plugin_id in &plugin_ids {
if resolved.contains(plugin_id) {
continue;
}
let dependencies = dependency_map
.get(plugin_id)
.expect("plugin dependency map must contain all registered ids");
if dependencies
.iter()
.all(|dependency| resolved.contains(dependency))
{
resolved.insert(plugin_id.clone());
sorted_ids.push(plugin_id.clone());
progressed = true;
}
}
if !progressed {
let unresolved = plugin_ids
.iter()
.filter(|plugin_id| !resolved.contains(plugin_id.as_str()))
.cloned()
.collect::<Vec<_>>()
.join(", ");
return Err(anyhow!(
"plugin dependency cycle detected among: {unresolved}"
));
}
}
let mut remaining_plugins = std::mem::take(&mut self.plugins);
let mut ordered_plugins = Vec::with_capacity(remaining_plugins.len());
for plugin_id in &sorted_ids {
let index = remaining_plugins
.iter()
.position(|state| state.id() == plugin_id)
.ok_or_else(|| anyhow!("plugin '{plugin_id}' disappeared during sorting"))?;
ordered_plugins.push(remaining_plugins.remove(index));
}
self.plugins = ordered_plugins;
Ok(())
}
/// 投递消息给指定插件,带错误计数和策略处理
fn deliver_to_plugin(&mut self, id: &str, msg: Message) {
let state = match self.plugins.iter_mut().find(|s| s.id() == id) {
Some(s) => s,
None => {
eprintln!("[ServiceManager] 目标插件 '{}' 不存在", id);
return;
}
};
if !state.enabled {
return;
}
match state.plugin.handle_message(msg) {
Ok(()) => {
state.reset_errors();
}
Err(e) => {
eprintln!(
"[ServiceManager] 插件 '{}' 处理消息失败 ({}/{}): {}",
id,
state.error_count + 1,
state.max_errors,
e
);
if state.record_error() && state.is_dynamic {
self.handle_error_threshold(id);
}
}
}
}
fn broadcast_message(&mut self, msg: Message) {
let should_shutdown = matches!(&msg, Message::Shutdown);
for state in &mut self.plugins {
if !state.enabled {
continue;
}
match state.plugin.handle_message(msg.clone()) {
Ok(()) => {
state.reset_errors();
}
Err(e) => {
eprintln!(
"[ServiceManager] 插件 '{}' 处理广播消息失败: {}",
state.id(),
e
);
// 广播消息的错误不触发阈值处理(避免广播期间修改列表)
}
}
}
if should_shutdown {
println!("[ServiceManager] 收到 Shutdown 广播");
self.running = false;
}
}
/// 插件错误达到阈值时的处理
fn handle_error_threshold(&mut self, plugin_id: &str) {
let idx = match self.plugins.iter().position(|s| s.id() == plugin_id) {
Some(idx) => idx,
None => return,
};
match self.plugins[idx].error_policy.clone() {
ErrorPolicy::DisableAndLog => {
eprintln!(
"[ServiceManager] 插件 '{}' 错误次数达到阈值,已禁用",
plugin_id
);
let state = &mut self.plugins[idx];
let _ = state.plugin.stop();
state.enabled = false;
state.needs_rollback = false;
}
ErrorPolicy::AutoRollback => {
{
let state = &mut self.plugins[idx];
let _ = state.plugin.stop();
state.enabled = false;
state.needs_rollback = false;
}
eprintln!(
"[ServiceManager] 插件 '{}' 错误次数达到阈值,尝试自动回退到稳定版本",
plugin_id
);
self.rollback_dynamic_plugin(idx, plugin_id);
}
}
}
fn rollback_dynamic_plugin(&mut self, idx: usize, plugin_id: &str) {
let rollback_result = {
let Some(version_manager) = self.version_manager.as_ref() else {
eprintln!(
"[ServiceManager] 插件 '{}' 未配置 VersionManager标记为待回退",
plugin_id
);
self.plugins[idx].needs_rollback = true;
return;
};
match version_manager.rollback(plugin_id) {
Ok(version) => match version_manager
.loader()
.load_plugin(plugin_id, Some(&version))
{
Ok((plugin, manifest)) => {
Ok((version, Box::new(plugin) as Box<dyn Plugin>, manifest))
}
Err(e) => Err((Some(version), e)),
},
Err(e) => Err((None, e)),
}
};
match rollback_result {
Ok((version, plugin, manifest)) => {
let max_errors = self.plugins[idx].max_errors;
match self.replace_dynamic_plugin_at_index(
idx,
plugin_id,
plugin,
manifest.error_policy,
max_errors,
manifest.required_capabilities,
manifest.capabilities,
manifest.auto_test,
) {
Ok(()) => {
self.plugins[idx].needs_rollback = false;
println!(
"[ServiceManager] 插件 '{}' 已回退并重新加载稳定版本 {}",
plugin_id, version
);
}
Err(e) => {
eprintln!(
"[ServiceManager] 插件 '{}' 已切换到稳定版本 {},但热替换失败: {}",
plugin_id, version, e
);
self.plugins[idx].needs_rollback = true;
}
}
}
Err((Some(version), e)) => {
eprintln!(
"[ServiceManager] 插件 '{}' 已切换到稳定版本 {},但加载回退版本失败: {}",
plugin_id, version, e
);
self.plugins[idx].needs_rollback = true;
}
Err((None, e)) => {
eprintln!(
"[ServiceManager] 插件 '{}' 自动回退失败,标记为待回退: {}",
plugin_id, e
);
self.plugins[idx].needs_rollback = true;
}
}
}
/// 获取发送通道的克隆(供外部使用)
pub fn sender(&self) -> mpsc::Sender<Envelope> {
self.tx.clone()
}
}
/// 插件状态信息(用于 API 查询)
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PluginStateInfo {
pub id: String,
pub info: crate::core::plugin::PluginInfo,
pub is_dynamic: bool,
pub error_policy: ErrorPolicy,
pub error_count: u32,
pub max_errors: u32,
pub enabled: bool,
pub test_results: Vec<CapabilityTestResult>,
pub capabilities: Vec<String>,
pub needs_rollback: bool,
}