fix: 修复3个P0遗留 — AutoRollback回退/ConfigReloaded序列化/FfiString跨allocator

This commit is contained in:
showen
2026-03-13 05:15:04 +08:00
parent 1264b94e36
commit 6067c3f0a2
10 changed files with 393 additions and 57 deletions

View File

@@ -64,6 +64,7 @@ pub enum Message {
ssid: String, ssid: String,
ip: String, ip: String,
}, },
ConfigReloaded(serde_json::Value),
ConfigReloadRequest, ConfigReloadRequest,
Shutdown, Shutdown,
PluginReady(String), PluginReady(String),
@@ -104,6 +105,20 @@ impl FfiString {
len: 0, len: 0,
} }
} }
/// 复制为 Rust String不释放底层内存
///
/// # Safety
/// ptr 必须指向有效的 null-terminated C 字符串
pub unsafe fn to_string(&self) -> Option<String> {
if self.ptr.is_null() {
return None;
}
unsafe { std::ffi::CStr::from_ptr(self.ptr) }
.to_str()
.ok()
.map(str::to_owned)
}
} }
#[repr(C)] #[repr(C)]
@@ -144,6 +159,7 @@ pub struct PluginVTable {
pub handle_message: pub handle_message:
unsafe extern "C" fn(handle: PluginHandle, message_json: FfiStr) -> FfiResult, unsafe extern "C" fn(handle: PluginHandle, message_json: FfiStr) -> FfiResult,
pub stop: unsafe extern "C" fn(handle: PluginHandle) -> FfiResult, pub stop: unsafe extern "C" fn(handle: PluginHandle) -> FfiResult,
pub free_string: unsafe extern "C" fn(s: FfiString),
pub destroy: unsafe extern "C" fn(handle: PluginHandle), pub destroy: unsafe extern "C" fn(handle: PluginHandle),
pub get_capabilities: unsafe extern "C" fn(handle: PluginHandle) -> FfiString, pub get_capabilities: unsafe extern "C" fn(handle: PluginHandle) -> FfiString,
pub self_test: unsafe extern "C" fn(handle: PluginHandle) -> FfiString, pub self_test: unsafe extern "C" fn(handle: PluginHandle) -> FfiString,
@@ -371,6 +387,12 @@ macro_rules! export_plugin {
} }
} }
unsafe extern "C" fn __showen_free_string(s: $crate::FfiString) {
if !s.ptr.is_null() {
drop(unsafe { std::ffi::CString::from_raw(s.ptr) });
}
}
unsafe extern "C" fn __showen_get_capabilities( unsafe extern "C" fn __showen_get_capabilities(
handle: $crate::PluginHandle, handle: $crate::PluginHandle,
) -> $crate::FfiString { ) -> $crate::FfiString {
@@ -409,6 +431,7 @@ macro_rules! export_plugin {
start: __showen_start, start: __showen_start,
handle_message: __showen_handle_message, handle_message: __showen_handle_message,
stop: __showen_stop, stop: __showen_stop,
free_string: __showen_free_string,
destroy: __showen_destroy, destroy: __showen_destroy,
get_capabilities: __showen_get_capabilities, get_capabilities: __showen_get_capabilities,
self_test: __showen_self_test, self_test: __showen_self_test,

View File

@@ -64,7 +64,7 @@ impl DynamicPlugin {
// 获取插件信息 // 获取插件信息
let info_ffi: FfiString = unsafe { (vtable.get_info)(handle) }; let info_ffi: FfiString = unsafe { (vtable.get_info)(handle) };
let info_json = unsafe { info_ffi.into_string() } let info_json = unsafe { Self::read_plugin_string(vtable, info_ffi) }
.ok_or_else(|| anyhow!("plugin get_info() returned null for {so_path}"))?; .ok_or_else(|| anyhow!("plugin get_info() returned null for {so_path}"))?;
let info: PluginInfo = serde_json::from_str(&info_json) let info: PluginInfo = serde_json::from_str(&info_json)
.with_context(|| format!("invalid plugin info JSON from {so_path}"))?; .with_context(|| format!("invalid plugin info JSON from {so_path}"))?;
@@ -95,8 +95,25 @@ impl DynamicPlugin {
/// 将 FfiResult 转为 anyhow::Result /// 将 FfiResult 转为 anyhow::Result
unsafe fn check_result(&self, result: FfiResult, operation: &str) -> Result<()> { unsafe fn check_result(&self, result: FfiResult, operation: &str) -> Result<()> {
unsafe { result.into_result() } if result.code == 0 {
.map_err(|e| anyhow!("plugin '{}' {} failed: {}", self.id, operation, e)) return Ok(());
}
let error = unsafe { Self::read_plugin_string(self.vtable, result.error) }
.unwrap_or_else(|| "unknown plugin error".to_string());
Err(anyhow!(
"plugin '{}' {} failed: {}",
self.id,
operation,
error
))
}
/// 读取插件返回的字符串,并通过插件提供的 free_string 释放
unsafe fn read_plugin_string(vtable: &PluginVTable, ffi_str: FfiString) -> Option<String> {
let string = unsafe { ffi_str.to_string() };
unsafe { (vtable.free_string)(ffi_str) };
string
} }
} }
@@ -115,7 +132,7 @@ impl Plugin for DynamicPlugin {
fn capabilities(&self) -> Vec<String> { fn capabilities(&self) -> Vec<String> {
let ffi_str: FfiString = unsafe { (self.vtable.get_capabilities)(self.handle) }; let ffi_str: FfiString = unsafe { (self.vtable.get_capabilities)(self.handle) };
let json = match unsafe { ffi_str.into_string() } { let json = match unsafe { Self::read_plugin_string(self.vtable, ffi_str) } {
Some(s) => s, Some(s) => s,
None => return vec![], None => return vec![],
}; };
@@ -124,7 +141,7 @@ impl Plugin for DynamicPlugin {
fn self_test(&mut self) -> Vec<CapabilityTestResult> { fn self_test(&mut self) -> Vec<CapabilityTestResult> {
let ffi_str: FfiString = unsafe { (self.vtable.self_test)(self.handle) }; let ffi_str: FfiString = unsafe { (self.vtable.self_test)(self.handle) };
let json = match unsafe { ffi_str.into_string() } { let json = match unsafe { Self::read_plugin_string(self.vtable, ffi_str) } {
Some(s) => s, Some(s) => s,
None => return vec![], None => return vec![],
}; };

View File

@@ -1,6 +1,5 @@
use crate::core::config::AppConfig; use crate::core::config::AppConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc;
/// 消息信封:包含来源、目的地、消息体 /// 消息信封:包含来源、目的地、消息体
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -49,9 +48,8 @@ pub enum Message {
}, },
// ── 配置 ── // ── 配置 ──
/// Arc<AppConfig> 无法跨 FFI 序列化,动态插件通过 init 时传入的 JSON 获取配置 /// 配置重载广播需要经过 JSON/FFI 路径,因此这里保存可序列化的 AppConfig。
#[serde(skip)] ConfigReloaded(AppConfig),
ConfigReloaded(Arc<AppConfig>),
ConfigReloadRequest, ConfigReloadRequest,
// ── 系统 ── // ── 系统 ──

View File

@@ -10,7 +10,7 @@ use std::ptr;
pub type PluginHandle = *mut c_void; pub type PluginHandle = *mut c_void;
/// FFI 安全的字符串:指向 C 字符串 + 长度 /// FFI 安全的字符串:指向 C 字符串 + 长度
/// 调用方负责释放(通过对应的 free 函数 /// 调用方读取内容后,必须通过分配方提供的 free 函数释放
#[repr(C)] #[repr(C)]
pub struct FfiString { pub struct FfiString {
pub ptr: *mut c_char, pub ptr: *mut c_char,
@@ -40,16 +40,18 @@ impl FfiString {
} }
} }
/// 转换回 Rust String消耗 FfiString /// 复制为 Rust String不释放底层内存
/// ///
/// # Safety /// # Safety
/// ptr 必须是由 CString::into_raw 产生的有效指针 /// ptr 必须是由 CString::into_raw 产生的有效指针
pub unsafe fn into_string(self) -> Option<String> { pub unsafe fn to_string(&self) -> Option<String> {
if self.ptr.is_null() { if self.ptr.is_null() {
return None; return None;
} }
let cstr = unsafe { CString::from_raw(self.ptr) }; unsafe { CStr::from_ptr(self.ptr) }
cstr.into_string().ok() .to_str()
.ok()
.map(str::to_owned)
} }
} }
@@ -80,15 +82,15 @@ impl FfiResult {
} }
} }
/// 转换为 Rust Result /// 转换为 Rust Result(不释放 error 底层内存)
/// ///
/// # Safety /// # Safety
/// 如果 error 非 null必须是由 CString::into_raw 产生的有效指针 /// 如果 error 非 null必须是由 CString::into_raw 产生的有效指针
pub unsafe fn into_result(self) -> Result<(), String> { pub unsafe fn to_result(&self) -> Result<(), String> {
if self.code == 0 { if self.code == 0 {
Ok(()) Ok(())
} else { } else {
let msg = unsafe { self.error.into_string() } let msg = unsafe { self.error.to_string() }
.unwrap_or_else(|| "unknown plugin error".to_string()); .unwrap_or_else(|| "unknown plugin error".to_string());
Err(msg) Err(msg)
} }
@@ -129,6 +131,9 @@ pub struct PluginVTable {
/// 停止插件 /// 停止插件
pub stop: unsafe extern "C" fn(handle: PluginHandle) -> FfiResult, pub stop: unsafe extern "C" fn(handle: PluginHandle) -> FfiResult,
/// 释放插件分配的 FfiString
pub free_string: unsafe extern "C" fn(s: FfiString),
/// 销毁插件实例,释放资源 /// 销毁插件实例,释放资源
pub destroy: unsafe extern "C" fn(handle: PluginHandle), pub destroy: unsafe extern "C" fn(handle: PluginHandle),

View File

@@ -2,6 +2,7 @@ use crate::core::config::AppConfig;
use crate::core::message::{Destination, Envelope, Message}; use crate::core::message::{Destination, Envelope, Message};
use crate::core::plugin::{CapabilityTestResult, Plugin, PluginContext}; use crate::core::plugin::{CapabilityTestResult, Plugin, PluginContext};
use crate::core::plugin_loader::ErrorPolicy; use crate::core::plugin_loader::ErrorPolicy;
use crate::core::version_manager::VersionManager;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
@@ -27,6 +28,8 @@ struct PluginState {
required_capabilities: Vec<String>, required_capabilities: Vec<String>,
/// 是否自动测试 /// 是否自动测试
auto_test: bool, auto_test: bool,
/// 是否需要在后续生命周期中执行回退
needs_rollback: bool,
} }
impl PluginState { impl PluginState {
@@ -42,6 +45,7 @@ impl PluginState {
capabilities: vec![], capabilities: vec![],
required_capabilities: vec![], required_capabilities: vec![],
auto_test: false, // 静态插件默认不自测 auto_test: false, // 静态插件默认不自测
needs_rollback: false,
} }
} }
@@ -57,6 +61,7 @@ impl PluginState {
capabilities: vec![], capabilities: vec![],
required_capabilities: vec![], required_capabilities: vec![],
auto_test: true, auto_test: true,
needs_rollback: false,
} }
} }
@@ -83,6 +88,7 @@ pub struct ServiceManager {
tx: mpsc::Sender<Envelope>, tx: mpsc::Sender<Envelope>,
rx: mpsc::Receiver<Envelope>, rx: mpsc::Receiver<Envelope>,
running: bool, running: bool,
version_manager: Option<VersionManager>,
} }
impl ServiceManager { impl ServiceManager {
@@ -94,9 +100,14 @@ impl ServiceManager {
tx, tx,
rx, rx,
running: false, running: false,
version_manager: None,
} }
} }
pub fn set_version_manager(&mut self, version_manager: VersionManager) {
self.version_manager = Some(version_manager);
}
/// 注册静态插件(编译时链接的插件) /// 注册静态插件(编译时链接的插件)
pub fn register(&mut self, plugin: Box<dyn Plugin>) { pub fn register(&mut self, plugin: Box<dyn Plugin>) {
println!("[ServiceManager] 注册插件: {}", plugin.id()); println!("[ServiceManager] 注册插件: {}", plugin.id());
@@ -351,10 +362,49 @@ impl ServiceManager {
enabled: s.enabled, enabled: s.enabled,
test_results: s.test_results.clone(), test_results: s.test_results.clone(),
capabilities: s.capabilities.clone(), capabilities: s.capabilities.clone(),
needs_rollback: s.needs_rollback,
}) })
.collect() .collect()
} }
fn replace_dynamic_plugin_at_index(
&mut self,
idx: usize,
plugin_id: &str,
new_plugin: Box<dyn Plugin>,
error_policy: ErrorPolicy,
max_errors: u32,
required_capabilities: Vec<String>,
capabilities: Vec<String>,
auto_test: bool,
) -> Result<()> {
if !self.plugins[idx].is_dynamic {
return Err(anyhow!(
"plugin '{plugin_id}' is not dynamic and cannot be replaced"
));
}
let mut new_state = PluginState::new_dynamic(new_plugin, error_policy, max_errors);
new_state.required_capabilities = required_capabilities;
new_state.capabilities = capabilities;
new_state.auto_test = auto_test;
let ctx = PluginContext {
tx: self.tx.clone(),
config: Arc::clone(&self.config),
};
new_state.plugin.init(ctx)?;
new_state.plugin.start()?;
if self.plugins[idx].enabled {
let _ = self.plugins[idx].plugin.stop();
}
self.plugins[idx] = new_state;
println!("[ServiceManager] 插件 '{plugin_id}' 热替换成功");
Ok(())
}
/// 热替换动态插件stop 旧的 → 替换 → init → start 新的) /// 热替换动态插件stop 旧的 → 替换 → init → start 新的)
pub fn replace_dynamic_plugin( pub fn replace_dynamic_plugin(
&mut self, &mut self,
@@ -374,22 +424,21 @@ impl ServiceManager {
"plugin '{plugin_id}' is not dynamic and cannot be replaced" "plugin '{plugin_id}' is not dynamic and cannot be replaced"
)); ));
} }
let mut new_state = PluginState::new_dynamic(new_plugin, error_policy, max_errors);
let ctx = PluginContext { let required_capabilities = self.plugins[idx].required_capabilities.clone();
tx: self.tx.clone(), let capabilities = self.plugins[idx].capabilities.clone();
config: Arc::clone(&self.config), let auto_test = self.plugins[idx].auto_test;
};
new_state.plugin.init(ctx)?;
new_state.plugin.start()?;
if self.plugins[idx].enabled { self.replace_dynamic_plugin_at_index(
let _ = self.plugins[idx].plugin.stop(); idx,
} plugin_id,
new_plugin,
self.plugins[idx] = new_state; error_policy,
println!("[ServiceManager] 插件 '{plugin_id}' 热替换成功"); max_errors,
Ok(()) required_capabilities,
capabilities,
auto_test,
)
} }
/// 处理发给管理层自身的消息 /// 处理发给管理层自身的消息
@@ -424,7 +473,7 @@ impl ServiceManager {
let new_config = Arc::new(new_config); let new_config = Arc::new(new_config);
self.config = Arc::clone(&new_config); self.config = Arc::clone(&new_config);
println!("[ServiceManager] 配置重载成功,广播 ConfigReloaded"); println!("[ServiceManager] 配置重载成功,广播 ConfigReloaded");
self.broadcast_message(Message::ConfigReloaded(new_config)); self.broadcast_message(Message::ConfigReloaded((*new_config).clone()));
} }
Err(e) => { Err(e) => {
eprintln!("[ServiceManager] 配置重载失败: {}", e); eprintln!("[ServiceManager] 配置重载失败: {}", e);
@@ -588,28 +637,102 @@ impl ServiceManager {
/// 插件错误达到阈值时的处理 /// 插件错误达到阈值时的处理
fn handle_error_threshold(&mut self, plugin_id: &str) { fn handle_error_threshold(&mut self, plugin_id: &str) {
let state = match self.plugins.iter_mut().find(|s| s.id() == plugin_id) { let idx = match self.plugins.iter().position(|s| s.id() == plugin_id) {
Some(s) => s, Some(idx) => idx,
None => return, None => return,
}; };
match state.error_policy { match self.plugins[idx].error_policy.clone() {
ErrorPolicy::DisableAndLog => { ErrorPolicy::DisableAndLog => {
eprintln!( eprintln!(
"[ServiceManager] 插件 '{}' 错误次数达到阈值,已禁用", "[ServiceManager] 插件 '{}' 错误次数达到阈值,已禁用",
plugin_id plugin_id
); );
let state = &mut self.plugins[idx];
let _ = state.plugin.stop(); let _ = state.plugin.stop();
state.enabled = false; state.enabled = false;
state.needs_rollback = false;
} }
ErrorPolicy::AutoRollback => { ErrorPolicy::AutoRollback => {
{
let state = &mut self.plugins[idx];
let _ = state.plugin.stop();
state.enabled = false;
state.needs_rollback = false;
}
eprintln!( eprintln!(
"[ServiceManager] 插件 '{}' 错误次数达到阈值,需要回退 (由外部 VersionManager 处理)", "[ServiceManager] 插件 '{}' 错误次数达到阈值,尝试自动回退到稳定版本",
plugin_id plugin_id
); );
// 先禁用,等待外部 (main.rs / HTTP API) 调用 VersionManager 执行回退
let _ = state.plugin.stop(); let rollback_result = {
state.enabled = false; let Some(version_manager) = self.version_manager.as_ref() else {
eprintln!(
"[ServiceManager] 插件 '{}' 未配置 VersionManager标记为待回退",
plugin_id
);
self.plugins[idx].needs_rollback = true;
return;
};
match version_manager.rollback(plugin_id) {
Ok(version) => match version_manager
.loader()
.load_plugin(plugin_id, Some(&version))
{
Ok((plugin, manifest)) => {
Ok((version, Box::new(plugin) as Box<dyn Plugin>, manifest))
}
Err(e) => Err((Some(version), e)),
},
Err(e) => Err((None, e)),
}
};
match rollback_result {
Ok((version, plugin, manifest)) => {
let max_errors = self.plugins[idx].max_errors;
match self.replace_dynamic_plugin_at_index(
idx,
plugin_id,
plugin,
manifest.error_policy,
max_errors,
manifest.required_capabilities,
manifest.capabilities,
manifest.auto_test,
) {
Ok(()) => {
println!(
"[ServiceManager] 插件 '{}' 已回退并重新加载稳定版本 {}",
plugin_id, version
);
}
Err(e) => {
eprintln!(
"[ServiceManager] 插件 '{}' 已切换到稳定版本 {},但热替换失败: {}",
plugin_id, version, e
);
self.plugins[idx].needs_rollback = true;
}
}
}
Err((Some(version), e)) => {
eprintln!(
"[ServiceManager] 插件 '{}' 已切换到稳定版本 {},但加载回退版本失败: {}",
plugin_id, version, e
);
self.plugins[idx].needs_rollback = true;
}
Err((None, e)) => {
eprintln!(
"[ServiceManager] 插件 '{}' 自动回退失败,标记为待回退: {}",
plugin_id, e
);
self.plugins[idx].needs_rollback = true;
}
}
} }
} }
} }
@@ -632,4 +755,5 @@ pub struct PluginStateInfo {
pub enabled: bool, pub enabled: bool,
pub test_results: Vec<CapabilityTestResult>, pub test_results: Vec<CapabilityTestResult>,
pub capabilities: Vec<String>, pub capabilities: Vec<String>,
pub needs_rollback: bool,
} }

View File

@@ -1,9 +1,12 @@
use super::config::{parse_str, AppConfig}; use super::config::{parse_str, AppConfig};
use super::message::{Destination, Envelope, Message}; use super::message::{Destination, Envelope, Message};
use super::plugin::{CapabilityTestResult, Platform, Plugin, PluginContext, PluginInfo}; use super::plugin::{CapabilityTestResult, Platform, Plugin, PluginContext, PluginInfo};
use super::plugin_loader::{ErrorPolicy, PluginLoader, PluginRegistry, PluginRegistryEntry};
use super::service_manager::ServiceManager; use super::service_manager::ServiceManager;
use super::plugin_loader::ErrorPolicy; use super::version_manager::VersionManager;
use anyhow::Result; use anyhow::Result;
use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
fn test_config() -> AppConfig { fn test_config() -> AppConfig {
@@ -385,7 +388,11 @@ fn all_plugin_ids_must_be_unique() {
let mut ids = HashSet::new(); let mut ids = HashSet::new();
for plugin in plugins { for plugin in plugins {
let id = plugin.id().to_string(); let id = plugin.id().to_string();
assert!(ids.insert(id.clone()), "duplicate plugin id detected: '{}'", id); assert!(
ids.insert(id.clone()),
"duplicate plugin id detected: '{}'",
id
);
} }
} }
@@ -496,6 +503,81 @@ impl Plugin for TestPluginWithSelfTest {
} }
} }
struct FailingPlugin {
id: String,
events: Arc<Mutex<Vec<String>>>,
}
impl FailingPlugin {
fn new(id: &str, events: Arc<Mutex<Vec<String>>>) -> Self {
Self {
id: id.to_string(),
events,
}
}
fn record(&self, entry: impl Into<String>) {
lock_events(&self.events).push(entry.into());
}
}
impl Plugin for FailingPlugin {
fn id(&self) -> &str {
&self.id
}
fn info(&self) -> PluginInfo {
PluginInfo {
name: self.id.clone(),
version: "test".to_string(),
description: "failing test plugin".to_string(),
platform: Platform::Any,
}
}
fn init(&mut self, _ctx: PluginContext) -> Result<()> {
self.record(format!("init:{}", self.id));
Ok(())
}
fn start(&mut self) -> Result<()> {
self.record(format!("start:{}", self.id));
Ok(())
}
fn handle_message(&mut self, _msg: Message) -> Result<()> {
self.record(format!("error:{}", self.id));
Err(anyhow::anyhow!("simulated failure"))
}
fn stop(&mut self) -> Result<()> {
self.record(format!("stop:{}", self.id));
Ok(())
}
}
fn setup_rollback_store(base: &Path, plugin_id: &str) -> VersionManager {
let _ = fs::remove_dir_all(base);
fs::create_dir_all(base.join(plugin_id).join("1.0.0")).unwrap();
fs::create_dir_all(base.join(plugin_id).join("2.0.0")).unwrap();
let loader = PluginLoader::new(base);
let mut registry = PluginRegistry::default();
registry.plugins.insert(
plugin_id.to_string(),
PluginRegistryEntry {
active_version: "2.0.0".to_string(),
last_stable_version: Some("1.0.0".to_string()),
enabled: true,
error_policy: ErrorPolicy::AutoRollback,
max_errors: 1,
},
);
loader.save_registry(&registry).unwrap();
VersionManager::new(loader)
}
#[test] #[test]
fn self_test_all_pass_allows_normal_start() { fn self_test_all_pass_allows_normal_start() {
let events = Arc::new(Mutex::new(Vec::new())); let events = Arc::new(Mutex::new(Vec::new()));
@@ -521,7 +603,9 @@ fn self_test_all_pass_allows_normal_start() {
true, true,
); );
manager.start_all().expect("start_all should succeed when all tests pass"); manager
.start_all()
.expect("start_all should succeed when all tests pass");
let log = lock_events(&events); let log = lock_events(&events);
assert!(log.contains(&"init:sensor".to_string())); assert!(log.contains(&"init:sensor".to_string()));
@@ -565,7 +649,10 @@ fn self_test_required_capability_fails_disables_dynamic_plugin() {
// Plugin should be disabled // Plugin should be disabled
let states = manager.plugin_states(); let states = manager.plugin_states();
assert!(!states[0].enabled, "plugin should be disabled after required capability failure"); assert!(
!states[0].enabled,
"plugin should be disabled after required capability failure"
);
} }
#[test] #[test]
@@ -605,7 +692,10 @@ fn self_test_optional_capability_fails_still_starts() {
let log = lock_events(&events); let log = lock_events(&events);
assert!(log.contains(&"self_test:sensor".to_string())); assert!(log.contains(&"self_test:sensor".to_string()));
assert!(log.contains(&"start:sensor".to_string()), "plugin should start despite optional failure"); assert!(
log.contains(&"start:sensor".to_string()),
"plugin should start despite optional failure"
);
// Test results should be recorded // Test results should be recorded
let states = manager.plugin_states(); let states = manager.plugin_states();
@@ -613,3 +703,82 @@ fn self_test_optional_capability_fails_still_starts() {
assert!(states[0].test_results[0].passed); assert!(states[0].test_results[0].passed);
assert!(!states[0].test_results[1].passed); assert!(!states[0].test_results[1].passed);
} }
#[test]
fn auto_rollback_updates_registry_and_marks_pending_when_reload_fails() {
let tmp = std::env::temp_dir().join("showen_test_service_manager_autorollback");
let events = Arc::new(Mutex::new(Vec::new()));
let mut manager = ServiceManager::new(test_config());
manager.set_version_manager(setup_rollback_store(&tmp, "sensor"));
manager.register_dynamic(
Box::new(FailingPlugin::new("sensor", events.clone())),
ErrorPolicy::AutoRollback,
1,
);
manager.start_all().expect("start_all should succeed");
let sender = manager.sender();
sender
.send(Envelope {
from: "test".to_string(),
to: Destination::Plugin("sensor".to_string()),
message: Message::Custom {
kind: "tick".to_string(),
payload: "1".to_string(),
},
})
.expect("failing message should send");
sender
.send(Envelope {
from: "test".to_string(),
to: Destination::Manager,
message: Message::Shutdown,
})
.expect("shutdown should send");
manager.run().expect("run should succeed");
let registry = PluginLoader::new(&tmp).load_registry().unwrap();
assert_eq!(registry.plugins["sensor"].active_version, "1.0.0");
let states = manager.plugin_states();
assert!(
!states[0].enabled,
"plugin should be disabled after rollback failure"
);
assert!(
states[0].needs_rollback,
"plugin should be marked for restart-time reload"
);
assert!(has_event(&events, "stop:sensor"));
let _ = fs::remove_dir_all(&tmp);
}
#[test]
fn message_config_reload_request_round_trips_through_json() {
let json = serde_json::to_string(&Message::ConfigReloadRequest)
.expect("ConfigReloadRequest should serialize");
let message: Message =
serde_json::from_str(&json).expect("ConfigReloadRequest should deserialize");
assert!(matches!(message, Message::ConfigReloadRequest));
}
#[test]
fn message_config_reloaded_round_trips_through_json() {
let config = test_config();
let json = serde_json::to_string(&Message::ConfigReloaded(config.clone()))
.expect("ConfigReloaded should serialize");
let message: Message = serde_json::from_str(&json).expect("ConfigReloaded should deserialize");
match message {
Message::ConfigReloaded(decoded) => {
assert_eq!(decoded.display.window_title, config.display.window_title);
assert_eq!(decoded.playlist.len(), config.playlist.len());
assert_eq!(decoded.remote_control.port, config.remote_control.port);
}
other => panic!("unexpected message after round trip: {:?}", other),
}
}

View File

@@ -2,6 +2,8 @@ use anyhow::Result;
use showen_v2::core::config::AppConfig; use showen_v2::core::config::AppConfig;
use showen_v2::core::plugin_loader::PluginLoader; use showen_v2::core::plugin_loader::PluginLoader;
use showen_v2::core::service_manager::ServiceManager; use showen_v2::core::service_manager::ServiceManager;
#[cfg(not(test))]
use showen_v2::core::version_manager::VersionManager;
use showen_v2::plugins::{ use showen_v2::plugins::{
ble::BlePlugin, http::HttpPlugin, screen::ScreenPlugin, video::VideoPlugin, wifi::WifiPlugin, ble::BlePlugin, http::HttpPlugin, screen::ScreenPlugin, video::VideoPlugin, wifi::WifiPlugin,
}; };
@@ -69,6 +71,8 @@ fn main() -> Result<()> {
let plugin_store = std::path::Path::new("plugin_store"); let plugin_store = std::path::Path::new("plugin_store");
if plugin_store.exists() { if plugin_store.exists() {
println!("扫描动态插件..."); println!("扫描动态插件...");
#[cfg(not(test))]
manager.set_version_manager(VersionManager::new(PluginLoader::new(plugin_store)));
let loader = PluginLoader::new(plugin_store); let loader = PluginLoader::new(plugin_store);
match loader.load_registry() { match loader.load_registry() {
Ok(registry) => { Ok(registry) => {
@@ -88,16 +92,10 @@ fn main() -> Result<()> {
manifest.capabilities, manifest.capabilities,
manifest.auto_test, manifest.auto_test,
); );
println!( println!("{} v{} (动态)", plugin_id, entry.active_version);
"{} v{} (动态)",
plugin_id, entry.active_version
);
} }
Err(e) => { Err(e) => {
eprintln!( eprintln!("{} v{} 加载失败: {e}", plugin_id, entry.active_version);
"{} v{} 加载失败: {e}",
plugin_id, entry.active_version
);
} }
} }
} }

View File

@@ -316,8 +316,8 @@ impl Plugin for HttpPlugin {
} }
} }
Message::ConfigReloaded(config) => { Message::ConfigReloaded(config) => {
state.replace_config(Arc::clone(&config)); state.replace_config(Arc::new(config.clone()));
if let Some(payload) = encode_ws_event("config_update", config.as_ref()) { if let Some(payload) = encode_ws_event("config_update", &config) {
state.publish_ws(payload); state.publish_ws(payload);
} }
} }

View File

@@ -194,7 +194,7 @@ impl Plugin for VideoPlugin {
self.publish_status(); self.publish_status();
} }
Message::ConfigReloaded(config) => { Message::ConfigReloaded(config) => {
let processor = Arc::new(Mutex::new(VideoProcessor::new((*config).clone())?)); let processor = Arc::new(Mutex::new(VideoProcessor::new(config)?));
if let Some(old) = self.processor.replace(Arc::clone(&processor)) { if let Some(old) = self.processor.replace(Arc::clone(&processor)) {
if let Ok(mut old) = old.lock() { if let Ok(mut old) = old.lock() {
let _ = old.stop(); let _ = old.stop();

View File

@@ -929,8 +929,10 @@ impl VideoProcessor {
if let Some(resolution) = parts.first() { if let Some(resolution) = parts.first() {
let dims: Vec<&str> = resolution.split('x').collect(); let dims: Vec<&str> = resolution.split('x').collect();
if dims.len() == 2 { if dims.len() == 2 {
let w_str = dims[0].trim_end_matches(|c: char| !c.is_ascii_digit()); let w_str =
let h_str = dims[1].trim_end_matches(|c: char| !c.is_ascii_digit()); dims[0].trim_end_matches(|c: char| !c.is_ascii_digit());
let h_str =
dims[1].trim_end_matches(|c: char| !c.is_ascii_digit());
if let (Ok(w), Ok(h)) = if let (Ok(w), Ok(h)) =
(w_str.parse::<i32>(), h_str.parse::<i32>()) (w_str.parse::<i32>(), h_str.parse::<i32>())
{ {