docs: 战略规划和管理架构优化

- 新增 STRATEGY.md: 三年战略规划、技术路线、团队策略
- 新增 MILESTONES.md: 详细里程碑和时间表(M1.1-M1.4)
- 新增 CODE_REVIEW.md: 代码审核标准和流程
- 组建管理班子: 新增 PM 刘建国,优化管理架构
- 丰富团队成员背景: 补充所有成员的教育经历、工作经验、技能树
- 解锁多线程思考能力: 团队成员可使用 kilo 命令并行探索
- 更新工作流程: CEO → PM → 开发团队,两级审核制度
- 修正 kilo 调用方式: 不使用 -f 参数,在消息中指示读取文件
This commit is contained in:
showen
2026-03-12 06:14:52 +08:00
parent 98ba7704dd
commit d443f28f6e
22 changed files with 3572 additions and 100 deletions

View File

@@ -2,6 +2,7 @@ use crate::core::config::AppConfig;
use std::sync::Arc;
/// 消息信封:包含来源、目的地、消息体
#[derive(Debug, Clone)]
pub struct Envelope {
pub from: &'static str,
pub to: Destination,
@@ -9,7 +10,7 @@ pub struct Envelope {
}
/// 消息目的地
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum Destination {
/// 点对点发送给指定插件
Plugin(&'static str),
@@ -20,7 +21,7 @@ pub enum Destination {
}
/// 所有插件间通信的类型安全消息
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum Message {
// ── 播放控制 ──
PlayerCommand(PlayerCommand),
@@ -61,7 +62,7 @@ pub enum Message {
},
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum PlayerCommand {
Play,
Pause,
@@ -81,7 +82,7 @@ pub struct PlayerStatusData {
pub current_video: Option<String>,
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum WifiCommand {
Scan,
Connect { ssid: String, password: String },

View File

@@ -75,27 +75,7 @@ impl ServiceManager {
}
}
Destination::Broadcast => {
let from = envelope.from;
let msg = envelope.message;
for plugin in &mut self.plugins {
if plugin.id() == from {
continue;
}
if let Err(e) = plugin.handle_message(msg.clone()) {
eprintln!(
"[ServiceManager] 插件 '{}' 处理广播消息失败: {}",
plugin.id(),
e
);
}
}
if matches!(msg, Message::Shutdown) {
println!("[ServiceManager] 收到 Shutdown 广播");
self.running = false;
}
self.broadcast_message(envelope.message);
}
Destination::Manager => {
self.handle_manager_message(envelope.message)?;
@@ -123,19 +103,7 @@ impl ServiceManager {
match msg {
Message::Shutdown => {
println!("[ServiceManager] 收到 Shutdown 指令");
let shutdown = Message::Shutdown;
for plugin in &mut self.plugins {
if let Err(e) = plugin.handle_message(shutdown.clone()) {
eprintln!(
"[ServiceManager] 插件 '{}' 处理 Shutdown 失败: {}",
plugin.id(),
e
);
}
}
self.running = false;
self.broadcast_message(Message::Shutdown);
}
Message::ConfigReloadRequest => {
println!("[ServiceManager] 收到配置重载请求");
@@ -149,6 +117,25 @@ impl ServiceManager {
Ok(())
}
fn broadcast_message(&mut self, msg: Message) {
let should_shutdown = matches!(&msg, Message::Shutdown);
for plugin in &mut self.plugins {
if let Err(e) = plugin.handle_message(msg.clone()) {
eprintln!(
"[ServiceManager] 插件 '{}' 处理广播消息失败: {}",
plugin.id(),
e
);
}
}
if should_shutdown {
println!("[ServiceManager] 收到 Shutdown 广播");
self.running = false;
}
}
/// 获取发送通道的克隆(供外部使用)
pub fn sender(&self) -> mpsc::Sender<Envelope> {
self.tx.clone()

554
src/plugins/ble/gatt.rs Normal file
View File

@@ -0,0 +1,554 @@
use crate::core::message::{Destination, Envelope, Message, WifiCommand};
use anyhow::{anyhow, Context, Result};
use dbus::arg::{PropMap, Variant};
use dbus::blocking::stdintf::org_freedesktop_dbus::{ObjectManager, Properties};
use dbus::blocking::Connection;
use dbus::channel::MatchingReceiver;
use dbus::message::MatchRule;
use dbus::Path;
use dbus_crossroads::{Crossroads, IfaceBuilder, IfaceToken, MethodErr};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
const BUS_NAME: &str = "io.showen.BleProvisioning";
const BLUEZ_SERVICE: &str = "org.bluez";
const ADAPTER_IFACE: &str = "org.bluez.Adapter1";
const GATT_MANAGER_IFACE: &str = "org.bluez.GattManager1";
const LE_ADVERTISING_MANAGER_IFACE: &str = "org.bluez.LEAdvertisingManager1";
const APP_PATH: &str = "/org/showen/ble";
const SERVICE_PATH: &str = "/org/showen/ble/service0";
const CHAR_SSID_PATH: &str = "/org/showen/ble/service0/char0";
const CHAR_PASSWORD_PATH: &str = "/org/showen/ble/service0/char1";
const CHAR_COMMAND_PATH: &str = "/org/showen/ble/service0/char2";
const CHAR_STATUS_PATH: &str = "/org/showen/ble/service0/char3";
const ADV_PATH: &str = "/org/showen/ble/advertisement0";
const SERVICE_UUID: &str = "12345678-1234-5678-1234-56789abcdef0";
const CHAR_SSID_UUID: &str = "12345678-1234-5678-1234-56789abcdef1";
const CHAR_PASSWORD_UUID: &str = "12345678-1234-5678-1234-56789abcdef2";
const CHAR_COMMAND_UUID: &str = "12345678-1234-5678-1234-56789abcdef3";
const CHAR_STATUS_UUID: &str = "12345678-1234-5678-1234-56789abcdef4";
const SERVER_TIMEOUT: Duration = Duration::from_millis(250);
const PROXY_TIMEOUT: Duration = Duration::from_secs(10);
type ManagedObjects = HashMap<Path<'static>, HashMap<String, PropMap>>;
#[derive(Clone)]
struct SharedState {
tx: mpsc::Sender<Envelope>,
ssid: Arc<Mutex<String>>,
password: Arc<Mutex<String>>,
status: Arc<Mutex<String>>,
}
impl SharedState {
fn new(tx: mpsc::Sender<Envelope>) -> Self {
Self {
tx,
ssid: Arc::new(Mutex::new(String::new())),
password: Arc::new(Mutex::new(String::new())),
status: Arc::new(Mutex::new(r#"{"ok":true,"action":"idle"}"#.to_string())),
}
}
fn read_status(&self) -> Vec<u8> {
self.status.lock().unwrap().as_bytes().to_vec()
}
fn set_ssid(&self, value: &[u8]) {
*self.ssid.lock().unwrap() = bytes_to_string(value);
}
fn set_password(&self, value: &[u8]) {
*self.password.lock().unwrap() = bytes_to_string(value);
}
fn set_status(&self, status: impl Into<String>) {
*self.status.lock().unwrap() = status.into();
}
fn dispatch_command(&self, raw: &[u8]) -> Result<()> {
let command = bytes_to_string(raw);
let message = match command.as_str() {
"scan" => Message::WifiCommand(WifiCommand::Scan),
"status" => Message::WifiCommand(WifiCommand::Status),
"connect" => {
let ssid = self.ssid.lock().unwrap().clone();
let password = self.password.lock().unwrap().clone();
if ssid.trim().is_empty() {
self.set_status(r#"{"ok":false,"action":"connect","error":"ssid required"}"#);
return Err(anyhow!("ssid required before connect"));
}
Message::WifiCommand(WifiCommand::Connect { ssid, password })
}
"ap_start" => {
let ssid = self.ssid.lock().unwrap().clone();
let password = self.password.lock().unwrap().clone();
if ssid.trim().is_empty() {
self.set_status(r#"{"ok":false,"action":"ap_start","error":"ssid required"}"#);
return Err(anyhow!("ssid required before ap_start"));
}
Message::WifiCommand(WifiCommand::ApStart { ssid, password })
}
"ap_stop" => Message::WifiCommand(WifiCommand::ApStop),
other => {
self.set_status(format!(
r#"{{"ok":false,"action":"{}","error":"unsupported command"}}"#,
other
));
return Err(anyhow!("unsupported BLE command: {}", other));
}
};
self.tx
.send(Envelope {
from: "ble",
to: Destination::Plugin("wifi"),
message,
})
.context("failed to send WiFi command from BLE")?;
self.set_status(format!(
r#"{{"ok":true,"action":"{}","state":"queued"}}"#,
command
));
Ok(())
}
}
struct AppData;
struct GattServiceData {
uuid: String,
primary: bool,
characteristics: Vec<Path<'static>>,
}
enum CharacteristicKind {
Ssid,
Password,
Command,
Status,
}
struct GattCharacteristicData {
uuid: String,
service: Path<'static>,
flags: Vec<String>,
kind: CharacteristicKind,
shared: SharedState,
}
struct AdvertisementData {
advertisement_type: String,
service_uuids: Vec<String>,
local_name: String,
includes: Vec<String>,
}
pub fn run_ble_service(
device_name: String,
tx: mpsc::Sender<Envelope>,
stop: Arc<AtomicBool>,
) -> Result<()> {
let shared = SharedState::new(tx.clone());
let (ready_tx, ready_rx) = mpsc::channel();
let server_stop = Arc::clone(&stop);
let server_shared = shared.clone();
let server_device_name = device_name.clone();
let server_thread = thread::spawn(move || {
run_server_connection(server_shared, server_device_name, ready_tx, server_stop)
});
ready_rx
.recv_timeout(Duration::from_secs(5))
.context("BLE server connection did not become ready in time")??;
let client_result = (|| -> Result<()> {
let conn_client =
Connection::new_system().context("failed to connect to system bus for BLE client")?;
let adapter_path = find_adapter(&conn_client)?;
configure_adapter(&conn_client, &adapter_path, &device_name)?;
register_ble_objects(&conn_client, &adapter_path)?;
tx.send(Envelope {
from: "ble",
to: Destination::Manager,
message: Message::PluginReady("ble"),
})
.context("failed to report BLE plugin readiness")?;
while !stop.load(Ordering::SeqCst) {
thread::sleep(SERVER_TIMEOUT);
}
unregister_ble_objects(&conn_client, &adapter_path)
})();
if client_result.is_err() {
stop.store(true, Ordering::SeqCst);
}
server_thread
.join()
.map_err(|_| anyhow!("BLE server thread panicked"))??;
client_result
}
fn run_server_connection(
shared: SharedState,
device_name: String,
ready_tx: mpsc::Sender<Result<()>>,
stop: Arc<AtomicBool>,
) -> Result<()> {
let conn_server =
Connection::new_system().context("failed to connect to system bus for BLE server")?;
conn_server
.request_name(BUS_NAME, false, true, false)
.context("failed to request BLE D-Bus name")?;
let mut cr = Crossroads::new();
let object_manager = register_object_manager_iface(&mut cr);
let service_iface = register_service_iface(&mut cr);
let characteristic_iface = register_characteristic_iface(&mut cr);
let advertisement_iface = register_advertisement_iface(&mut cr);
cr.insert(APP_PATH, &[object_manager], AppData);
cr.insert(
SERVICE_PATH,
&[service_iface],
GattServiceData {
uuid: SERVICE_UUID.to_string(),
primary: true,
characteristics: vec![
Path::from(CHAR_SSID_PATH.to_string()),
Path::from(CHAR_PASSWORD_PATH.to_string()),
Path::from(CHAR_COMMAND_PATH.to_string()),
Path::from(CHAR_STATUS_PATH.to_string()),
],
},
);
cr.insert(
Path::from(CHAR_SSID_PATH.to_string()),
&[characteristic_iface],
GattCharacteristicData {
uuid: CHAR_SSID_UUID.to_string(),
service: Path::from(SERVICE_PATH.to_string()),
flags: vec!["write".to_string()],
kind: CharacteristicKind::Ssid,
shared: shared.clone(),
},
);
cr.insert(
Path::from(CHAR_PASSWORD_PATH.to_string()),
&[characteristic_iface],
GattCharacteristicData {
uuid: CHAR_PASSWORD_UUID.to_string(),
service: Path::from(SERVICE_PATH.to_string()),
flags: vec!["write".to_string()],
kind: CharacteristicKind::Password,
shared: shared.clone(),
},
);
cr.insert(
Path::from(CHAR_COMMAND_PATH.to_string()),
&[characteristic_iface],
GattCharacteristicData {
uuid: CHAR_COMMAND_UUID.to_string(),
service: Path::from(SERVICE_PATH.to_string()),
flags: vec!["write".to_string()],
kind: CharacteristicKind::Command,
shared: shared.clone(),
},
);
cr.insert(
Path::from(CHAR_STATUS_PATH.to_string()),
&[characteristic_iface],
GattCharacteristicData {
uuid: CHAR_STATUS_UUID.to_string(),
service: Path::from(SERVICE_PATH.to_string()),
flags: vec!["read".to_string(), "notify".to_string()],
kind: CharacteristicKind::Status,
shared,
},
);
cr.insert(
ADV_PATH,
&[advertisement_iface],
AdvertisementData {
advertisement_type: "peripheral".to_string(),
service_uuids: vec![SERVICE_UUID.to_string()],
local_name: device_name,
includes: vec!["tx-power".to_string()],
},
);
let shared_cr = Arc::new(Mutex::new(cr));
let cr_for_handler = Arc::clone(&shared_cr);
conn_server.start_receive(
MatchRule::new_method_call(),
Box::new(move |msg, conn| {
if cr_for_handler
.lock()
.unwrap()
.handle_message(msg, conn)
.is_err()
{
eprintln!("[ble] crossroads dispatch error");
}
true
}),
);
ready_tx
.send(Ok(()))
.map_err(|_| anyhow!("failed to notify BLE server readiness"))?;
while !stop.load(Ordering::SeqCst) {
conn_server
.process(SERVER_TIMEOUT)
.context("BLE server connection process loop failed")?;
}
Ok(())
}
fn register_object_manager_iface(cr: &mut Crossroads) -> IfaceToken<AppData> {
cr.register(
"org.freedesktop.DBus.ObjectManager",
|b: &mut IfaceBuilder<AppData>| {
b.method("GetManagedObjects", (), ("objects",), |_, _, ()| {
Ok((build_managed_objects(),))
});
},
)
}
fn register_service_iface(cr: &mut Crossroads) -> dbus_crossroads::IfaceToken<GattServiceData> {
cr.register(
"org.bluez.GattService1",
|b: &mut IfaceBuilder<GattServiceData>| {
b.property::<String, _>("UUID")
.get(|_, data| Ok(data.uuid.clone()));
b.property::<bool, _>("Primary")
.get(|_, data| Ok(data.primary));
b.property::<Vec<Path<'static>>, _>("Characteristics")
.get(|_, data| Ok(data.characteristics.clone()));
},
)
}
fn register_characteristic_iface(
cr: &mut Crossroads,
) -> dbus_crossroads::IfaceToken<GattCharacteristicData> {
cr.register(
"org.bluez.GattCharacteristic1",
|b: &mut IfaceBuilder<GattCharacteristicData>| {
b.property::<String, _>("UUID")
.get(|_, data| Ok(data.uuid.clone()));
b.property::<Path<'static>, _>("Service")
.get(|_, data| Ok(data.service.clone()));
b.property::<Vec<String>, _>("Flags")
.get(|_, data| Ok(data.flags.clone()));
b.property::<Vec<Path<'static>>, _>("Descriptors")
.get(|_, _| Ok(Vec::new()));
b.method(
"ReadValue",
("options",),
("value",),
|_, data, (_options,): (PropMap,)| {
let value = match data.kind {
CharacteristicKind::Ssid => Vec::new(),
CharacteristicKind::Password => Vec::new(),
CharacteristicKind::Command => Vec::new(),
CharacteristicKind::Status => data.shared.read_status(),
};
Ok((value,))
},
);
b.method(
"WriteValue",
("value", "options"),
(),
|_, data, (value, _options): (Vec<u8>, PropMap)| -> Result<(), MethodErr> {
match data.kind {
CharacteristicKind::Ssid => data.shared.set_ssid(&value),
CharacteristicKind::Password => data.shared.set_password(&value),
CharacteristicKind::Command => data
.shared
.dispatch_command(&value)
.map_err(|e| MethodErr::failed(&e.to_string()))?,
CharacteristicKind::Status => {
return Err(MethodErr::failed("status characteristic is read-only"));
}
}
Ok(())
},
);
b.method("StartNotify", (), (), |_, _, ()| Ok(()));
b.method("StopNotify", (), (), |_, _, ()| Ok(()));
},
)
}
fn register_advertisement_iface(
cr: &mut Crossroads,
) -> dbus_crossroads::IfaceToken<AdvertisementData> {
cr.register(
"org.bluez.LEAdvertisement1",
|b: &mut IfaceBuilder<AdvertisementData>| {
b.property::<String, _>("Type")
.get(|_, data| Ok(data.advertisement_type.clone()));
b.property::<Vec<String>, _>("ServiceUUIDs")
.get(|_, data| Ok(data.service_uuids.clone()));
b.property::<String, _>("LocalName")
.get(|_, data| Ok(data.local_name.clone()));
b.property::<Vec<String>, _>("Includes")
.get(|_, data| Ok(data.includes.clone()));
b.method("Release", (), (), |_, _, ()| Ok(()));
},
)
}
fn build_managed_objects() -> ManagedObjects {
let mut objects = ManagedObjects::new();
let mut service_props = PropMap::new();
service_props.insert("UUID".into(), Variant(Box::new(SERVICE_UUID.to_string())));
service_props.insert("Primary".into(), Variant(Box::new(true)));
service_props.insert(
"Characteristics".into(),
Variant(Box::new(vec![
Path::from(CHAR_SSID_PATH),
Path::from(CHAR_PASSWORD_PATH),
Path::from(CHAR_COMMAND_PATH),
Path::from(CHAR_STATUS_PATH),
])),
);
let mut service_ifaces = HashMap::new();
service_ifaces.insert("org.bluez.GattService1".to_string(), service_props);
objects.insert(Path::from(SERVICE_PATH), service_ifaces);
for (path, uuid, flags) in [
(CHAR_SSID_PATH, CHAR_SSID_UUID, vec!["write".to_string()]),
(
CHAR_PASSWORD_PATH,
CHAR_PASSWORD_UUID,
vec!["write".to_string()],
),
(
CHAR_COMMAND_PATH,
CHAR_COMMAND_UUID,
vec!["write".to_string()],
),
(
CHAR_STATUS_PATH,
CHAR_STATUS_UUID,
vec!["read".to_string(), "notify".to_string()],
),
] {
let mut props = PropMap::new();
props.insert(
"Service".into(),
Variant(Box::new(Path::from(SERVICE_PATH))),
);
props.insert("UUID".into(), Variant(Box::new(uuid.to_string())));
props.insert("Flags".into(), Variant(Box::new(flags)));
props.insert(
"Descriptors".into(),
Variant(Box::new(Vec::<Path<'static>>::new())),
);
let mut ifaces = HashMap::new();
ifaces.insert("org.bluez.GattCharacteristic1".to_string(), props);
objects.insert(Path::from(path), ifaces);
}
objects
}
fn find_adapter(conn: &Connection) -> Result<String> {
let proxy = conn.with_proxy(BLUEZ_SERVICE, "/", PROXY_TIMEOUT);
let objects = proxy
.get_managed_objects()
.context("failed to enumerate BlueZ managed objects")?;
for (path, interfaces) in objects {
if interfaces.contains_key(GATT_MANAGER_IFACE)
&& interfaces.contains_key(LE_ADVERTISING_MANAGER_IFACE)
{
return Ok(path.to_string());
}
}
Err(anyhow!(
"BLE adapter with GATT and advertising support not found"
))
}
fn configure_adapter(conn: &Connection, adapter_path: &str, device_name: &str) -> Result<()> {
let adapter = conn.with_proxy(BLUEZ_SERVICE, adapter_path, PROXY_TIMEOUT);
adapter
.set(ADAPTER_IFACE, "Powered", true)
.context("failed to power on BLE adapter")?;
adapter
.set(ADAPTER_IFACE, "Alias", device_name.to_string())
.context("failed to set BLE adapter alias")?;
Ok(())
}
fn register_ble_objects(conn: &Connection, adapter_path: &str) -> Result<()> {
let gatt_manager = conn.with_proxy(BLUEZ_SERVICE, adapter_path, PROXY_TIMEOUT);
gatt_manager
.method_call::<(), _, _, _>(
GATT_MANAGER_IFACE,
"RegisterApplication",
(Path::from(APP_PATH.to_string()), PropMap::new()),
)
.context("failed to register BLE GATT application")?;
let adv_manager = conn.with_proxy(BLUEZ_SERVICE, adapter_path, PROXY_TIMEOUT);
adv_manager
.method_call::<(), _, _, _>(
LE_ADVERTISING_MANAGER_IFACE,
"RegisterAdvertisement",
(Path::from(ADV_PATH.to_string()), PropMap::new()),
)
.context("failed to register BLE advertisement")?;
Ok(())
}
fn unregister_ble_objects(conn: &Connection, adapter_path: &str) -> Result<()> {
let adv_manager = conn.with_proxy(BLUEZ_SERVICE, adapter_path, PROXY_TIMEOUT);
let _ = adv_manager.method_call::<(), _, _, _>(
LE_ADVERTISING_MANAGER_IFACE,
"UnregisterAdvertisement",
(Path::from(ADV_PATH.to_string()),),
);
let gatt_manager = conn.with_proxy(BLUEZ_SERVICE, adapter_path, PROXY_TIMEOUT);
let _ = gatt_manager.method_call::<(), _, _, _>(
GATT_MANAGER_IFACE,
"UnregisterApplication",
(Path::from(APP_PATH.to_string()),),
);
Ok(())
}
fn bytes_to_string(value: &[u8]) -> String {
String::from_utf8_lossy(value)
.trim_end_matches('\0')
.trim()
.to_string()
}

View File

@@ -3,22 +3,35 @@
//! 通过 D-Bus 与 BlueZ 交互,注册 GATT 服务和 LE Advertisement。
//! 含 LocalName 双连接修复。
use crate::core::message::Message;
use crate::core::plugin::{Plugin, PluginContext, PluginInfo, Platform};
use anyhow::Result;
mod gatt;
use crate::core::message::{Destination, Envelope, Message};
use crate::core::plugin::{Platform, Plugin, PluginContext, PluginInfo};
use anyhow::{anyhow, Context, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
pub struct BlePlugin {
ctx: Option<PluginContext>,
stop: Arc<AtomicBool>,
worker: Option<JoinHandle<Result<()>>>,
}
impl BlePlugin {
pub fn new() -> Self {
Self { ctx: None }
Self {
ctx: None,
stop: Arc::new(AtomicBool::new(false)),
worker: None,
}
}
}
impl Plugin for BlePlugin {
fn id(&self) -> &'static str { "ble" }
fn id(&self) -> &'static str {
"ble"
}
fn info(&self) -> PluginInfo {
PluginInfo {
@@ -30,11 +43,55 @@ impl Plugin for BlePlugin {
}
fn init(&mut self, ctx: PluginContext) -> Result<()> {
self.stop.store(false, Ordering::SeqCst);
self.ctx = Some(ctx);
Ok(())
}
fn start(&mut self) -> Result<()> { Ok(()) }
fn handle_message(&mut self, _msg: Message) -> Result<()> { Ok(()) }
fn stop(&mut self) -> Result<()> { Ok(()) }
fn start(&mut self) -> Result<()> {
let ctx = self
.ctx
.as_ref()
.context("ble plugin context is not initialized")?;
self.stop.store(false, Ordering::SeqCst);
if !ctx.config.ble.enabled {
ctx.tx.send(Envelope {
from: "ble",
to: Destination::Manager,
message: Message::PluginReady("ble"),
})?;
return Ok(());
}
let device_name = ctx.config.ble.device_name.clone();
let tx = ctx.tx.clone();
let stop = Arc::clone(&self.stop);
self.worker = Some(thread::spawn(move || {
gatt::run_ble_service(device_name, tx, stop)
}));
Ok(())
}
fn handle_message(&mut self, msg: Message) -> Result<()> {
if let Message::Shutdown = msg {
self.stop.store(true, Ordering::SeqCst);
}
Ok(())
}
fn stop(&mut self) -> Result<()> {
self.stop.store(true, Ordering::SeqCst);
if let Some(worker) = self.worker.take() {
worker
.join()
.map_err(|_| anyhow!("BLE worker thread panicked"))??;
}
Ok(())
}
}

View File

@@ -2,17 +2,112 @@
//!
//! 基于 warp 的 HTTP 服务,提供播放控制、配置管理、视频管理等 API。
use crate::core::message::Message;
mod routes;
use crate::core::config::AppConfig;
use crate::core::message::{Envelope, Message};
use crate::core::plugin::{Plugin, PluginContext, PluginInfo, Platform};
use anyhow::Result;
use anyhow::{Context, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
struct PendingWifiResponse {
version: u64,
payload: Option<String>,
}
pub(crate) struct HttpState {
wifi_response: Mutex<PendingWifiResponse>,
wifi_response_cv: Condvar,
config: Mutex<Arc<AppConfig>>,
player_status: Mutex<crate::core::message::PlayerStatusData>,
ble_ready: AtomicBool,
}
impl HttpState {
fn new(config: Arc<AppConfig>) -> Self {
let player_status = crate::core::message::PlayerStatusData {
running: false,
paused: !config.playback.auto_start,
in_transition: false,
current_index: 0,
playlist_length: config.playlist.len(),
current_video: config.playlist.first().map(|item| item.id.clone()),
};
Self {
wifi_response: Mutex::new(PendingWifiResponse {
version: 0,
payload: None,
}),
wifi_response_cv: Condvar::new(),
config: Mutex::new(config),
player_status: Mutex::new(player_status),
ble_ready: AtomicBool::new(false),
}
}
fn publish_wifi_result(&self, payload: String) {
if let Ok(mut state) = self.wifi_response.lock() {
state.version += 1;
state.payload = Some(payload);
self.wifi_response_cv.notify_all();
}
}
pub(crate) fn config(&self) -> Arc<AppConfig> {
self.config
.lock()
.map(|config| Arc::clone(&config))
.expect("http config state poisoned")
}
fn replace_config(&self, config: Arc<AppConfig>) {
if let Ok(mut current) = self.config.lock() {
*current = Arc::clone(&config);
}
if let Ok(mut player_status) = self.player_status.lock() {
player_status.playlist_length = config.playlist.len();
if player_status.current_video.is_none() {
player_status.current_video = config.playlist.first().map(|item| item.id.clone());
}
}
}
pub(crate) fn player_status(&self) -> crate::core::message::PlayerStatusData {
self.player_status
.lock()
.map(|status| status.clone())
.expect("http player status state poisoned")
}
fn update_player_status(&self, status: crate::core::message::PlayerStatusData) {
if let Ok(mut current) = self.player_status.lock() {
*current = status;
}
}
pub(crate) fn ble_ready(&self) -> bool {
self.ble_ready.load(Ordering::SeqCst)
}
fn set_ble_ready(&self, ready: bool) {
self.ble_ready.store(ready, Ordering::SeqCst);
}
}
pub struct HttpPlugin {
ctx: Option<PluginContext>,
state: Option<Arc<HttpState>>,
}
impl HttpPlugin {
pub fn new() -> Self {
Self { ctx: None }
Self {
ctx: None,
state: None,
}
}
}
@@ -29,11 +124,86 @@ impl Plugin for HttpPlugin {
}
fn init(&mut self, ctx: PluginContext) -> Result<()> {
self.state = Some(Arc::new(HttpState::new(Arc::clone(&ctx.config))));
self.ctx = Some(ctx);
Ok(())
}
fn start(&mut self) -> Result<()> { Ok(()) }
fn handle_message(&mut self, _msg: Message) -> Result<()> { Ok(()) }
fn start(&mut self) -> Result<()> {
let ctx = self
.ctx
.as_ref()
.context("http plugin context is not initialized")?;
if !ctx.config.remote_control.enabled {
println!("[HttpPlugin] Remote control disabled, skip HTTP server startup");
return Ok(());
}
let host = ctx.config.remote_control.host.clone();
let port = ctx.config.remote_control.port;
let tx = ctx.tx.clone();
let state = Arc::clone(
self.state
.as_ref()
.context("http plugin state is not initialized")?,
);
std::thread::spawn(move || {
let runtime = match tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
{
Ok(runtime) => runtime,
Err(error) => {
eprintln!("[HttpPlugin] failed to create tokio runtime: {error}");
return;
}
};
runtime.block_on(async move {
let routes = routes::build_routes(tx.clone(), state);
let addr: std::net::SocketAddr = match format!("{host}:{port}").parse() {
Ok(addr) => addr,
Err(error) => {
eprintln!("[HttpPlugin] invalid listen address {host}:{port}: {error}");
return;
}
};
if let Err(error) = tx.send(Envelope {
from: "http",
to: crate::core::message::Destination::Manager,
message: Message::PluginReady("http"),
}) {
eprintln!("[HttpPlugin] failed to report ready state: {error}");
}
println!("[HttpPlugin] listening on http://{addr}");
warp::serve(routes).run(addr).await;
});
});
Ok(())
}
fn handle_message(&mut self, msg: Message) -> Result<()> {
let state = match self.state.as_ref() {
Some(state) => state,
None => return Ok(()),
};
match msg {
Message::WifiResult(payload) => state.publish_wifi_result(payload),
Message::PlayerStatus(status) => state.update_player_status(status),
Message::ConfigReloaded(config) => state.replace_config(config),
Message::PluginReady("ble") => state.set_ble_ready(true),
Message::Shutdown => state.set_ble_ready(false),
_ => {}
}
Ok(())
}
fn stop(&mut self) -> Result<()> { Ok(()) }
}

412
src/plugins/http/routes.rs Normal file
View File

@@ -0,0 +1,412 @@
use super::HttpState;
use crate::core::config;
use crate::core::message::{Destination, Envelope, Message, PlayerCommand, WifiCommand};
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use std::convert::Infallible;
use std::sync::{mpsc, Arc};
use std::time::{Duration, Instant};
use warp::http::StatusCode;
use warp::{Filter, Reply};
#[derive(Deserialize)]
struct WifiConnectRequest {
ssid: String,
password: String,
}
#[derive(Deserialize)]
struct WifiApStartRequest {
ssid: String,
password: String,
}
#[derive(Serialize)]
struct ApiMessage<'a> {
status: &'a str,
message: String,
}
pub(crate) fn build_routes(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
let api = play_route(tx.clone())
.or(pause_route(tx.clone()))
.or(next_route(tx.clone()))
.or(previous_route(tx.clone()))
.or(goto_route(tx.clone()))
.or(trigger_route(tx.clone()))
.or(scene_route(tx.clone()))
.or(status_route(Arc::clone(&state)))
.or(config_get_route(Arc::clone(&state)))
.or(config_post_route(tx.clone(), Arc::clone(&state)))
.or(wifi_status_route(tx.clone(), Arc::clone(&state)))
.or(wifi_scan_route(tx.clone(), Arc::clone(&state)))
.or(wifi_connect_route(tx.clone(), Arc::clone(&state)))
.or(wifi_ap_start_route(tx.clone(), Arc::clone(&state)))
.or(wifi_ap_stop_route(tx, state));
let cors = warp::cors()
.allow_any_origin()
.allow_headers(["content-type"])
.allow_methods(["GET", "POST", "OPTIONS"]);
root_route().or(api).with(cors)
}
fn root_route() -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path::end().and(warp::get()).map(|| {
warp::reply::html(
"<!doctype html><html><head><meta charset=\"utf-8\"><title>ShowenV2 HTTP API</title></head><body><h1>ShowenV2 HTTP API</h1><p>HTTP API is running.</p></body></html>",
)
})
}
fn play_route(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "play")
.and(warp::post())
.and(with_tx(tx))
.and_then(|tx| command_reply(tx, Message::PlayerCommand(PlayerCommand::Play), "开始播放"))
}
fn pause_route(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "pause")
.and(warp::post())
.and(with_tx(tx))
.and_then(|tx| command_reply(tx, Message::PlayerCommand(PlayerCommand::Pause), "已暂停"))
}
fn next_route(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "next")
.and(warp::post())
.and(with_tx(tx))
.and_then(|tx| command_reply(tx, Message::PlayerCommand(PlayerCommand::Next), "切换到下一个视频"))
}
fn previous_route(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "previous")
.and(warp::post())
.and(with_tx(tx))
.and_then(|tx| command_reply(tx, Message::PlayerCommand(PlayerCommand::Previous), "切换到上一个视频"))
}
fn goto_route(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "goto" / usize)
.and(warp::post())
.and(with_tx(tx))
.and_then(|index, tx| {
command_reply(
tx,
Message::PlayerCommand(PlayerCommand::Goto(index)),
format!("跳转到视频 {index}"),
)
})
}
fn trigger_route(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "trigger" / String / String)
.and(warp::post())
.and(with_tx(tx))
.and_then(|name, value, tx| {
let message = format!("触发器 '{name}' 已发送,值: {value}");
command_reply(tx, Message::Trigger { name, value }, message)
})
}
fn scene_route(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "scene" / String)
.and(warp::post())
.and(with_tx(tx))
.and_then(|name, tx| {
let message = format!("切换到场景: {name}");
command_reply(tx, Message::PlayerCommand(PlayerCommand::ChangeScene(name)), message)
})
}
fn status_route(
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "status")
.and(warp::get())
.and(with_state(state))
.and_then(status_reply)
}
fn config_get_route(
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "config")
.and(warp::get())
.and(with_state(state))
.and_then(config_get_reply)
}
fn config_post_route(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "config")
.and(warp::post())
.and(warp::body::content_length_limit(1024 * 64))
.and(warp::body::bytes())
.and(with_tx(tx))
.and(with_state(state))
.and_then(handle_config_post)
}
fn wifi_status_route(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "wifi" / "status")
.and(warp::get())
.and(with_tx(tx))
.and(with_state(state))
.and_then(|tx, state| wifi_reply(tx, state, WifiCommand::Status))
}
fn wifi_scan_route(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "wifi" / "scan")
.and(warp::get())
.and(with_tx(tx))
.and(with_state(state))
.and_then(|tx, state| wifi_reply(tx, state, WifiCommand::Scan))
}
fn wifi_connect_route(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "wifi" / "connect")
.and(warp::post())
.and(warp::body::json())
.and(with_tx(tx))
.and(with_state(state))
.and_then(|req: WifiConnectRequest, tx, state| {
wifi_reply(
tx,
state,
WifiCommand::Connect {
ssid: req.ssid,
password: req.password,
},
)
})
}
fn wifi_ap_start_route(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "wifi" / "ap" / "start")
.and(warp::post())
.and(warp::body::json())
.and(with_tx(tx))
.and(with_state(state))
.and_then(|req: WifiApStartRequest, tx, state| {
wifi_reply(
tx,
state,
WifiCommand::ApStart {
ssid: req.ssid,
password: req.password,
},
)
})
}
fn wifi_ap_stop_route(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
warp::path!("api" / "wifi" / "ap" / "stop")
.and(warp::post())
.and(with_tx(tx))
.and(with_state(state))
.and_then(|tx, state| wifi_reply(tx, state, WifiCommand::ApStop))
}
async fn status_reply(state: Arc<HttpState>) -> Result<warp::reply::Response, Infallible> {
Ok(warp::reply::json(&json!({
"player": state.player_status(),
"ble_ready": state.ble_ready(),
}))
.into_response())
}
async fn config_get_reply(state: Arc<HttpState>) -> Result<warp::reply::Response, Infallible> {
Ok(warp::reply::json(state.config().as_ref()).into_response())
}
async fn handle_config_post(
body: bytes::Bytes,
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
) -> Result<warp::reply::Response, Infallible> {
let current_config = state.config();
let raw = match std::str::from_utf8(&body) {
Ok(raw) => raw,
Err(_) => return Ok(error_json(StatusCode::BAD_REQUEST, "请求体不是有效的 UTF-8")),
};
if let Err(error) = config::parse_str(raw, &current_config.source_path) {
return Ok(error_json(
StatusCode::BAD_REQUEST,
&format!("配置验证失败: {error}"),
));
}
if let Err(error) = std::fs::write(&current_config.source_path, raw) {
return Ok(error_json(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("写入配置文件失败: {error}"),
));
}
if let Err(error) = tx.send(Envelope {
from: "http",
to: Destination::Manager,
message: Message::ConfigReloadRequest,
}) {
return Ok(error_json(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("发送配置重载请求失败: {error}"),
));
}
Ok(success_json("配置已保存并请求重载"))
}
async fn command_reply(
tx: mpsc::Sender<Envelope>,
message: Message,
success_message: impl Into<String>,
) -> Result<warp::reply::Response, Infallible> {
match tx.send(Envelope {
from: "http",
to: Destination::Plugin("video"),
message,
}) {
Ok(()) => Ok(success_json(success_message)),
Err(error) => Ok(error_json(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("发送命令失败: {error}"),
)),
}
}
async fn wifi_reply(
tx: mpsc::Sender<Envelope>,
state: Arc<HttpState>,
command: WifiCommand,
) -> Result<warp::reply::Response, Infallible> {
let version = match state.wifi_response.lock() {
Ok(guard) => guard.version,
Err(_) => {
return Ok(error_json(
StatusCode::INTERNAL_SERVER_ERROR,
"WiFi 响应状态锁已损坏",
));
}
};
if let Err(error) = tx.send(Envelope {
from: "http",
to: Destination::Plugin("wifi"),
message: Message::WifiCommand(command),
}) {
return Ok(error_json(
StatusCode::INTERNAL_SERVER_ERROR,
&format!("发送 WiFi 命令失败: {error}"),
));
}
let deadline = Instant::now() + Duration::from_secs(10);
let mut guard = match state.wifi_response.lock() {
Ok(guard) => guard,
Err(_) => {
return Ok(error_json(
StatusCode::INTERNAL_SERVER_ERROR,
"WiFi 响应状态锁已损坏",
));
}
};
while guard.version == version {
let now = Instant::now();
if now >= deadline {
return Ok(error_json(StatusCode::GATEWAY_TIMEOUT, "等待 WiFi 响应超时"));
}
let timeout = deadline.saturating_duration_since(now);
let (next_guard, wait_result) = match state.wifi_response_cv.wait_timeout(guard, timeout) {
Ok(result) => result,
Err(_) => {
return Ok(error_json(
StatusCode::INTERNAL_SERVER_ERROR,
"等待 WiFi 响应失败",
));
}
};
guard = next_guard;
if wait_result.timed_out() && guard.version == version {
return Ok(error_json(StatusCode::GATEWAY_TIMEOUT, "等待 WiFi 响应超时"));
}
}
Ok(warp::reply::with_status(guard.payload.clone().unwrap_or_default(), StatusCode::OK).into_response())
}
fn with_tx(
tx: mpsc::Sender<Envelope>,
) -> impl Filter<Extract = (mpsc::Sender<Envelope>,), Error = Infallible> + Clone {
warp::any().map(move || tx.clone())
}
fn with_state(
state: Arc<HttpState>,
) -> impl Filter<Extract = (Arc<HttpState>,), Error = Infallible> + Clone {
warp::any().map(move || Arc::clone(&state))
}
fn success_json(message: impl Into<String>) -> warp::reply::Response {
warp::reply::with_status(
warp::reply::json(&ApiMessage {
status: "ok",
message: message.into(),
}),
StatusCode::OK,
)
.into_response()
}
fn error_json(status: StatusCode, message: &str) -> warp::reply::Response {
warp::reply::with_status(
warp::reply::json(&json!({
"status": "error",
"message": message,
})),
status,
)
.into_response()
}

View File

@@ -1,27 +1,91 @@
//! VideoPlugin — 视频播放引擎
//!
//! 基于 OpenCV 的视频播放,支持状态机驱动、帧变换、过渡效果。
//! Phase 1 核心:迁移旧 video_processor.rs + state_machine.rs
pub mod processor;
pub mod state_machine;
use crate::core::message::Message;
use crate::core::plugin::{Plugin, PluginContext, PluginInfo, Platform};
use anyhow::Result;
use crate::core::message::{Destination, Envelope, Message, PlayerCommand, PlayerStatusData};
use crate::core::plugin::{Platform, Plugin, PluginContext, PluginInfo};
use anyhow::{anyhow, Context, Result};
use opencv::highgui;
use processor::VideoProcessor;
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::JoinHandle;
pub struct VideoPlugin {
ctx: Option<PluginContext>,
processor: Option<Arc<Mutex<VideoProcessor>>>,
worker: Option<JoinHandle<()>>,
}
impl VideoPlugin {
pub fn new() -> Self {
Self { ctx: None }
Self {
ctx: None,
processor: None,
worker: None,
}
}
fn processor(&self) -> Result<&Arc<Mutex<VideoProcessor>>> {
self.processor
.as_ref()
.context("video processor is not initialized")
}
fn publish_status(&self) {
let Some(ctx) = &self.ctx else {
return;
};
let Some(processor) = &self.processor else {
return;
};
let status = match processor.lock() {
Ok(processor) => processor.status(),
Err(_) => return,
};
if let Err(error) = ctx.tx.send(Envelope {
from: self.id(),
to: Destination::Broadcast,
message: Message::PlayerStatus(status),
}) {
eprintln!("[VideoPlugin] failed to publish status: {error}");
}
}
fn publish_state_changed(&self, old_state: Option<String>, new_state: Option<String>) {
let Some(ctx) = &self.ctx else {
return;
};
let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
return;
};
if old_state == new_state {
return;
}
if let Err(error) = ctx.tx.send(Envelope {
from: self.id(),
to: Destination::Broadcast,
message: Message::StateChanged {
old_state,
new_state,
},
}) {
eprintln!("[VideoPlugin] failed to publish state change: {error}");
}
}
}
impl Plugin for VideoPlugin {
fn id(&self) -> &'static str { "video" }
fn id(&self) -> &'static str {
"video"
}
fn info(&self) -> PluginInfo {
PluginInfo {
@@ -38,17 +102,246 @@ impl Plugin for VideoPlugin {
}
fn start(&mut self) -> Result<()> {
// TODO: Commit 4 实现
let ctx = self
.ctx
.as_ref()
.context("video plugin context is not initialized")?;
let processor = Arc::new(Mutex::new(VideoProcessor::new((*ctx.config).clone())?));
let worker_processor = Arc::clone(&processor);
let tx = ctx.tx.clone();
let handle = std::thread::spawn(move || {
if let Err(error) = run_processor_loop(worker_processor, tx) {
eprintln!("[VideoPlugin] playback loop failed: {error}");
}
});
self.processor = Some(processor);
self.worker = Some(handle);
ctx.tx.send(Envelope {
from: self.id(),
to: Destination::Manager,
message: Message::PluginReady(self.id()),
})?;
self.publish_status();
Ok(())
}
fn handle_message(&mut self, _msg: Message) -> Result<()> {
// TODO: Commit 4 实现
fn handle_message(&mut self, msg: Message) -> Result<()> {
match msg {
Message::PlayerCommand(command) => {
let processor = Arc::clone(self.processor()?);
let mut processor = lock_processor(&processor)?;
match command {
PlayerCommand::Play => {
processor.play()?;
}
PlayerCommand::Pause => {
processor.pause();
}
PlayerCommand::Next => {
processor.next()?;
}
PlayerCommand::Previous => {
processor.previous()?;
}
PlayerCommand::Goto(index) => {
processor.goto(index)?;
}
PlayerCommand::ChangeScene(name) => {
processor.change_scene(&name)?;
}
}
drop(processor);
self.publish_status();
}
Message::Trigger { name, value } => {
let processor = Arc::clone(self.processor()?);
let mut processor = lock_processor(&processor)?;
let old_state = processor.current_state().map(str::to_owned);
processor.trigger(&name, &value)?;
let new_state = processor.current_state().map(str::to_owned);
drop(processor);
self.publish_state_changed(old_state, new_state);
self.publish_status();
}
Message::ConfigReloaded(config) => {
let processor = Arc::new(Mutex::new(VideoProcessor::new((*config).clone())?));
if let Some(old) = self.processor.replace(Arc::clone(&processor)) {
if let Ok(mut old) = old.lock() {
let _ = old.stop();
}
}
if let Some(handle) = self.worker.take() {
let _ = handle.join();
}
let worker_processor = Arc::clone(&processor);
let tx = self
.ctx
.as_ref()
.context("video plugin context is not initialized")?
.tx
.clone();
self.worker = Some(std::thread::spawn(move || {
if let Err(error) = run_processor_loop(worker_processor, tx) {
eprintln!("[VideoPlugin] playback loop failed after reload: {error}");
}
}));
self.publish_status();
}
Message::Shutdown => {
self.stop()?;
}
_ => {}
}
Ok(())
}
fn stop(&mut self) -> Result<()> {
// TODO: Commit 4 实现
if let Some(processor) = &self.processor {
if let Ok(mut processor) = processor.lock() {
let _ = processor.stop();
}
}
if let Some(handle) = self.worker.take() {
let _ = handle.join();
}
self.publish_status();
Ok(())
}
}
fn run_processor_loop(
processor: Arc<Mutex<VideoProcessor>>,
tx: std::sync::mpsc::Sender<Envelope>,
) -> Result<()> {
{
let mut processor = lock_processor(&processor)?;
processor.start()?;
}
publish_processor_status(&tx, &processor)?;
loop {
let (outcome, old_state, new_state, old_status, new_status) = {
let mut processor = lock_processor(&processor)?;
let old_state = processor.current_state().map(str::to_owned);
let old_status = processor.status();
let outcome = processor.step()?;
let new_state = processor.current_state().map(str::to_owned);
let new_status = processor.status();
(outcome, old_state, new_state, old_status, new_status)
};
if let Some(frame) = outcome.frame {
let processor = lock_processor(&processor)?;
processor.display_frame(&outcome.window_name, &frame)?;
}
if old_state != new_state {
publish_state_changed(&tx, old_state, new_state)?;
}
if status_changed(&old_status, &new_status) {
publish_status_message(&tx, new_status.clone())?;
}
if !outcome.keep_running {
break;
}
let key = highgui::wait_key(outcome.delay)?;
let (old_state, new_state, old_status, new_status) = {
let mut processor = lock_processor(&processor)?;
let old_state = processor.current_state().map(str::to_owned);
let old_status = processor.status();
processor.handle_key_code(key)?;
let new_state = processor.current_state().map(str::to_owned);
let new_status = processor.status();
(old_state, new_state, old_status, new_status)
};
if old_state != new_state {
publish_state_changed(&tx, old_state, new_state)?;
}
if status_changed(&old_status, &new_status) {
publish_status_message(&tx, new_status.clone())?;
}
if !new_status.running {
break;
}
}
let mut processor = lock_processor(&processor)?;
processor.stop()
}
fn publish_processor_status(
tx: &std::sync::mpsc::Sender<Envelope>,
processor: &Arc<Mutex<VideoProcessor>>,
) -> Result<()> {
let status = lock_processor(processor)?.status();
publish_status_message(tx, status)
}
fn publish_status_message(
tx: &std::sync::mpsc::Sender<Envelope>,
status: PlayerStatusData,
) -> Result<()> {
tx.send(Envelope {
from: "video",
to: Destination::Broadcast,
message: Message::PlayerStatus(status),
})?;
Ok(())
}
fn publish_state_changed(
tx: &std::sync::mpsc::Sender<Envelope>,
old_state: Option<String>,
new_state: Option<String>,
) -> Result<()> {
let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
return Ok(());
};
if old_state == new_state {
return Ok(());
}
tx.send(Envelope {
from: "video",
to: Destination::Broadcast,
message: Message::StateChanged {
old_state,
new_state,
},
})?;
Ok(())
}
fn status_changed(old: &PlayerStatusData, new: &PlayerStatusData) -> bool {
old.running != new.running
|| old.paused != new.paused
|| old.in_transition != new.in_transition
|| old.current_index != new.current_index
|| old.playlist_length != new.playlist_length
|| old.current_video != new.current_video
}
fn lock_processor(
processor: &Arc<Mutex<VideoProcessor>>,
) -> Result<MutexGuard<'_, VideoProcessor>> {
processor
.lock()
.map_err(|_| anyhow!("video processor lock poisoned"))
}

File diff suppressed because it is too large Load Diff