From 8ed9c93c8ece3522e1b937279322698abbcd0a36 Mon Sep 17 00:00:00 2001 From: showen Date: Thu, 12 Mar 2026 08:07:21 +0800 Subject: [PATCH] fix BLE wifi status delivery and websocket compile issues --- Cargo.lock | 356 +++++++++++++++++++++++++++++++++++- Cargo.toml | 2 +- TEAM_CHAT.md | 89 +++++++++ src/core/plugin.rs | 7 +- src/core/service_manager.rs | 107 ++++++++++- src/plugins/ble/gatt.rs | 96 +++++++++- src/plugins/ble/mod.rs | 4 + src/plugins/http/mod.rs | 95 +++++++++- src/plugins/http/routes.rs | 145 +++++++++++++-- src/plugins/wifi/mod.rs | 34 +++- 10 files changed, 886 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 216a1c7..c9e72bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,12 @@ dependencies = [ "objc2", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -134,6 +140,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "dbus" version = "0.9.10" @@ -176,6 +188,17 @@ dependencies = [ "objc2", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dunce" version = "1.0.5" @@ -331,7 +354,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -354,7 +377,7 @@ dependencies = [ "base64", "bytes", "headers-core", - "http", + "http 0.2.12", "httpdate", "mime", "sha1", @@ -366,7 +389,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.12", ] [[package]] @@ -380,6 +403,16 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -387,7 +420,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] @@ -414,7 +447,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "httparse", "httpdate", @@ -427,6 +460,108 @@ dependencies = [ "want", ] +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -478,6 +613,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "litemap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" + [[package]] name = "log" version = "0.4.29" @@ -532,7 +673,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 0.2.12", "httparse", "log", "memchr", @@ -657,6 +798,15 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "potential_utf" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +dependencies = [ + "zerovec", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -873,6 +1023,12 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + [[package]] name = "socket2" version = "0.5.10" @@ -899,6 +1055,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "syn" version = "2.0.117" @@ -910,6 +1072,47 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinystr" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.50.0" @@ -937,6 +1140,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -982,6 +1197,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.4.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -1000,6 +1234,30 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1031,7 +1289,7 @@ dependencies = [ "futures-channel", "futures-util", "headers", - "http", + "http 0.2.12", "hyper", "log", "mime", @@ -1044,6 +1302,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-tungstenite", "tokio-util", "tower-service", "tracing", @@ -1167,6 +1426,35 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +[[package]] +name = "writeable" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" + +[[package]] +name = "yoke" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.8.42" @@ -1187,6 +1475,60 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/Cargo.toml b/Cargo.toml index 0a3c80f..b20698d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ ctrlc = "3" # 跨平台插件依赖 opencv = { version = "0.66", default-features = false, features = ["highgui", "imgproc", "videoio"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "process", "sync"] } -warp = { version = "0.3.7", default-features = false, features = ["multipart"] } +warp = { version = "0.3.7", default-features = false, features = ["multipart", "websocket"] } bytes = "1" futures-util = "0.3" diff --git a/TEAM_CHAT.md b/TEAM_CHAT.md index 3f5d0b0..f10fab1 100644 --- a/TEAM_CHAT.md +++ b/TEAM_CHAT.md @@ -5,6 +5,7 @@ [时间] 发送者 → 接收者: 内容 ``` + --- ## 2026-03-12 @@ -1515,3 +1516,91 @@ git log --format="%h %an %ae %s" -5 3. 考虑更换团队成员或改变工作方式 --- + +[第2天] 陈逸飞(CEO) → 全体: **第三轮任务已派发** + +## 任务分配 + +| 成员 | 任务 | 优先级 | +|------|------|--------| +| 张明远 | ServiceManager WifiResult转发 + Plugin dependencies() + 拓扑排序 | P0+P1 | +| 李思琪 | VideoPlugin 质量验证(与旧代码对比)+ 单元测试 | P1 | +| 王浩然 | BLE/WiFi 插件验证(与旧代码对比)+ BLE notify修复 | P1 | +| 赵雨薇 | HTTP/Screen 插件验证(与旧代码对比) | P1 | + +## 当前状态 +- 12个文件已有改动(未提交) +- 编译状态:❌ 4个错误,全在 src/plugins/http/routes.rs + - E0599: `ws_snapshots()` 和 `ws_subscribe()` 方法不存在 + - E0282: WebSocket message 类型推断失败 + - E0382: `state` Arc 被 move 后又 borrow +- 团队仍在运行中(12个 kilo 进程) + +## CEO指令 +- 赵雨薇负责修复 routes.rs 的编译错误,这是她的文件 +- 所有成员必须确保 cargo check 通过后再 git commit +- **验证标准**:只看 git commit(author + 实际代码改动),不看文字汇报 + +--- + +--- + +[2026-03-12 08:03:27] 副CEO → TEAM: 第1次监督检查 + +- kilo 进程数: 12 +- 最新提交: +7548064 ceo: 重要教训 - 不能盲信文件内容,必须验证 +c342262 ceo: 监督检查总结 - 2人完成,发现2个关键问题 +35fa6d4 ceo: 强制执行机制 - 直接启动所有团队成员 + +- 工作区状态: +M Cargo.lock + M Cargo.toml + M TEAM_CHAT.md + M src/core/plugin.rs + M src/core/service_manager.rs + M src/plugins/ble/gatt.rs + M src/plugins/ble/mod.rs + M src/plugins/http/mod.rs + M src/plugins/http/routes.rs + M src/plugins/mod.rs + M src/plugins/screen/mod.rs + M src/plugins/video/mod.rs + M src/plugins/video/processor.rs + M src/plugins/video/state_machine.rs + M src/plugins/wifi/mod.rs +?? CEO_BACKUP.md + +- cargo check (通过): +Blocking waiting for file lock on build directory + Checking showen_v2 v0.2.0 (/home/showen/Showen/ShowenV2) + Finished `dev` profile [unoptimized + debuginfo] target(s) in 3m 31s + +- 结果:cargo check 通过,暂未检测到新的 commit。 +[08:03] Kilo(副CEO) → 全体: 第1/10次监督巡检 +- kilo 进程数: 12 +- 最近3条提交: + 7548064 ceo: 重要教训 - 不能盲信文件内容,必须验证 + c342262 ceo: 监督检查总结 - 2人完成,发现2个关键问题 + 35fa6d4 ceo: 强制执行机制 - 直接启动所有团队成员 +- 工作区改动: + M Cargo.lock + M Cargo.toml + M TEAM_CHAT.md + M src/core/plugin.rs + M src/core/service_manager.rs + M src/plugins/ble/gatt.rs + M src/plugins/ble/mod.rs + M src/plugins/http/mod.rs + M src/plugins/http/routes.rs + M src/plugins/mod.rs + M src/plugins/screen/mod.rs + M src/plugins/video/mod.rs + M src/plugins/video/processor.rs + M src/plugins/video/state_machine.rs + M src/plugins/wifi/mod.rs + ?? CEO_BACKUP.md +- 编译结果: + cargo check: PASS + Finished `dev` profile [unoptimized + debuginfo] target(s) in 3m 22s + diff --git a/src/core/plugin.rs b/src/core/plugin.rs index 9d6de1d..58f4183 100644 --- a/src/core/plugin.rs +++ b/src/core/plugin.rs @@ -1,7 +1,7 @@ +use crate::core::config::AppConfig; use crate::core::message::{Envelope, Message}; use anyhow::Result; use std::sync::{mpsc, Arc}; -use crate::core::config::AppConfig; /// 所有功能都通过实现此 trait 接入系统 pub trait Plugin: Send { @@ -11,6 +11,11 @@ pub trait Plugin: Send { /// 插件信息 fn info(&self) -> PluginInfo; + /// 声明启动所需的插件依赖 + fn dependencies(&self) -> Vec<&'static str> { + vec![] + } + /// 初始化:获取发送通道,声明订阅的消息类型 fn init(&mut self, ctx: PluginContext) -> Result<()>; diff --git a/src/core/service_manager.rs b/src/core/service_manager.rs index bfec471..7c4facc 100644 --- a/src/core/service_manager.rs +++ b/src/core/service_manager.rs @@ -1,7 +1,8 @@ use crate::core::config::AppConfig; use crate::core::message::{Destination, Envelope, Message}; use crate::core::plugin::{Plugin, PluginContext}; -use anyhow::Result; +use anyhow::{anyhow, Result}; +use std::collections::{HashMap, HashSet}; use std::sync::{mpsc, Arc}; /// 中央调度器:插件注册、生命周期管理、消息路由 @@ -33,6 +34,8 @@ impl ServiceManager { /// 按注册顺序 init() + start() 所有插件 pub fn start_all(&mut self) -> Result<()> { + self.validate_and_sort_plugins()?; + // init for plugin in &mut self.plugins { let ctx = PluginContext { @@ -105,18 +108,120 @@ impl ServiceManager { println!("[ServiceManager] 收到 Shutdown 指令"); self.broadcast_message(Message::Shutdown); } + Message::WifiResult(payload) => { + self.broadcast_message(Message::WifiResult(payload)); + } + Message::PlayerStatus(status) => { + self.broadcast_message(Message::PlayerStatus(status)); + } + Message::StateChanged { + old_state, + new_state, + } => { + self.broadcast_message(Message::StateChanged { + old_state, + new_state, + }); + } + Message::WifiProvisioned { ssid, ip } => { + self.broadcast_message(Message::WifiProvisioned { ssid, ip }); + } Message::ConfigReloadRequest => { println!("[ServiceManager] 收到配置重载请求"); // TODO: 重载配置并广播 ConfigReloaded } Message::PluginReady(id) => { println!("[ServiceManager] 插件 '{}' 就绪", id); + self.broadcast_message(Message::PluginReady(id)); } _ => {} } Ok(()) } + fn validate_and_sort_plugins(&mut self) -> Result<()> { + let mut plugin_ids = Vec::with_capacity(self.plugins.len()); + let mut plugin_set = HashSet::with_capacity(self.plugins.len()); + let mut dependency_map = HashMap::with_capacity(self.plugins.len()); + + for plugin in &self.plugins { + let id = plugin.id(); + if !plugin_set.insert(id) { + return Err(anyhow!("duplicate plugin id registered: '{id}'")); + } + + plugin_ids.push(id); + dependency_map.insert(id, plugin.dependencies()); + } + + for (plugin_id, dependencies) in &dependency_map { + for dependency in dependencies { + if dependency == plugin_id { + return Err(anyhow!("plugin '{plugin_id}' cannot depend on itself")); + } + + if !plugin_set.contains(dependency) { + return Err(anyhow!( + "plugin '{plugin_id}' depends on missing plugin '{dependency}'" + )); + } + } + } + + let mut resolved = HashSet::with_capacity(plugin_ids.len()); + let mut sorted_ids = Vec::with_capacity(plugin_ids.len()); + + while sorted_ids.len() < plugin_ids.len() { + let mut progressed = false; + + for plugin_id in &plugin_ids { + if resolved.contains(plugin_id) { + continue; + } + + let dependencies = dependency_map + .get(plugin_id) + .expect("plugin dependency map must contain all registered ids"); + + if dependencies + .iter() + .all(|dependency| resolved.contains(dependency)) + { + resolved.insert(*plugin_id); + sorted_ids.push(*plugin_id); + progressed = true; + } + } + + if !progressed { + let unresolved = plugin_ids + .iter() + .copied() + .filter(|plugin_id| !resolved.contains(plugin_id)) + .collect::>() + .join(", "); + + return Err(anyhow!( + "plugin dependency cycle detected among: {unresolved}" + )); + } + } + + let mut remaining_plugins = std::mem::take(&mut self.plugins); + let mut ordered_plugins = Vec::with_capacity(remaining_plugins.len()); + + for plugin_id in sorted_ids { + let index = remaining_plugins + .iter() + .position(|plugin| plugin.id() == plugin_id) + .ok_or_else(|| anyhow!("plugin '{plugin_id}' disappeared during sorting"))?; + ordered_plugins.push(remaining_plugins.remove(index)); + } + + self.plugins = ordered_plugins; + Ok(()) + } + fn broadcast_message(&mut self, msg: Message) { let should_shutdown = matches!(&msg, Message::Shutdown); diff --git a/src/plugins/ble/gatt.rs b/src/plugins/ble/gatt.rs index b71ea77..2370453 100644 --- a/src/plugins/ble/gatt.rs +++ b/src/plugins/ble/gatt.rs @@ -4,6 +4,7 @@ use dbus::arg::{PropMap, Variant}; use dbus::blocking::stdintf::org_freedesktop_dbus::{ObjectManager, Properties}; use dbus::blocking::Connection; use dbus::channel::MatchingReceiver; +use dbus::channel::Sender; use dbus::message::MatchRule; use dbus::Path; use dbus_crossroads::{Crossroads, IfaceBuilder, IfaceToken, MethodErr}; @@ -46,6 +47,8 @@ struct SharedState { ssid: Arc>, password: Arc>, status: Arc>, + notifying: Arc, + pending_notify: Arc, } impl SharedState { @@ -55,6 +58,8 @@ impl SharedState { ssid: Arc::new(Mutex::new(String::new())), password: Arc::new(Mutex::new(String::new())), status: Arc::new(Mutex::new(r#"{"ok":true,"action":"idle"}"#.to_string())), + notifying: Arc::new(AtomicBool::new(false)), + pending_notify: Arc::new(AtomicBool::new(false)), } } @@ -62,6 +67,15 @@ impl SharedState { self.status.lock().unwrap().as_bytes().to_vec() } + fn read_value(&self, kind: &CharacteristicKind) -> Vec { + match kind { + CharacteristicKind::Status => self.read_status(), + CharacteristicKind::Ssid + | CharacteristicKind::Password + | CharacteristicKind::Command => Vec::new(), + } + } + fn set_ssid(&self, value: &[u8]) { *self.ssid.lock().unwrap() = bytes_to_string(value); } @@ -72,6 +86,22 @@ impl SharedState { fn set_status(&self, status: impl Into) { *self.status.lock().unwrap() = status.into(); + self.pending_notify.store(true, Ordering::SeqCst); + } + + fn set_notifying(&self, notifying: bool) { + self.notifying.store(notifying, Ordering::SeqCst); + if notifying { + self.pending_notify.store(true, Ordering::SeqCst); + } + } + + fn is_notifying(&self) -> bool { + self.notifying.load(Ordering::SeqCst) + } + + fn take_pending_notification(&self) -> bool { + self.pending_notify.swap(false, Ordering::SeqCst) } fn dispatch_command(&self, raw: &[u8]) -> Result<()> { @@ -294,7 +324,7 @@ fn run_server_connection( service: Path::from(SERVICE_PATH.to_string()), flags: vec!["read".to_string(), "notify".to_string()], kind: CharacteristicKind::Status, - shared, + shared: shared.clone(), }, ); cr.insert( @@ -330,6 +360,9 @@ fn run_server_connection( .map_err(|_| anyhow!("failed to notify BLE server readiness"))?; while !stop.load(Ordering::SeqCst) { + if shared.is_notifying() && shared.take_pending_notification() { + emit_status_notification(&conn_server, &shared)?; + } conn_server .process(SERVER_TIMEOUT) .context("BLE server connection process loop failed")?; @@ -377,18 +410,18 @@ fn register_characteristic_iface( .get(|_, data| Ok(data.flags.clone())); b.property::>, _>("Descriptors") .get(|_, _| Ok(Vec::new())); + b.property::, _>("Value") + .get(|_, data| Ok(data.shared.read_value(&data.kind))); + b.property::("Notifying").get(|_, data| { + Ok(matches!(data.kind, CharacteristicKind::Status) && data.shared.is_notifying()) + }); b.method( "ReadValue", ("options",), ("value",), |_, data, (_options,): (PropMap,)| { - let value = match data.kind { - CharacteristicKind::Ssid => Vec::new(), - CharacteristicKind::Password => Vec::new(), - CharacteristicKind::Command => Vec::new(), - CharacteristicKind::Status => data.shared.read_status(), - }; + let value = data.shared.read_value(&data.kind); Ok((value,)) }, ); @@ -413,8 +446,24 @@ fn register_characteristic_iface( }, ); - b.method("StartNotify", (), (), |_, _, ()| Ok(())); - b.method("StopNotify", (), (), |_, _, ()| Ok(())); + b.method("StartNotify", (), (), |_, data, ()| match data.kind { + CharacteristicKind::Status => { + data.shared.set_notifying(true); + Ok(()) + } + _ => Err(MethodErr::failed( + "notify is only supported on status characteristic", + )), + }); + b.method("StopNotify", (), (), |_, data, ()| match data.kind { + CharacteristicKind::Status => { + data.shared.set_notifying(false); + Ok(()) + } + _ => Err(MethodErr::failed( + "notify is only supported on status characteristic", + )), + }); }, ) } @@ -482,7 +531,14 @@ fn build_managed_objects() -> ManagedObjects { Variant(Box::new(Path::from(SERVICE_PATH))), ); props.insert("UUID".into(), Variant(Box::new(uuid.to_string()))); + let value = if path == CHAR_STATUS_PATH { + r#"{"ok":true,"action":"idle"}"#.as_bytes().to_vec() + } else { + Vec::new() + }; props.insert("Flags".into(), Variant(Box::new(flags))); + props.insert("Value".into(), Variant(Box::new(value))); + props.insert("Notifying".into(), Variant(Box::new(false))); props.insert( "Descriptors".into(), Variant(Box::new(Vec::>::new())), @@ -588,3 +644,25 @@ fn drain_control_messages(shared: &SharedState, control_rx: &Receiver Result<()> { + let mut changed = PropMap::new(); + changed.insert("Value".into(), Variant(Box::new(shared.read_status()))); + changed.insert("Notifying".into(), Variant(Box::new(shared.is_notifying()))); + + let signal = dbus::Message::signal( + &Path::from(CHAR_STATUS_PATH), + &"org.freedesktop.DBus.Properties".into(), + &"PropertiesChanged".into(), + ) + .append3( + "org.bluez.GattCharacteristic1".to_string(), + changed, + Vec::::new(), + ); + + conn.send(signal) + .map_err(|_| anyhow!("failed to emit BLE status notification"))?; + + Ok(()) +} diff --git a/src/plugins/ble/mod.rs b/src/plugins/ble/mod.rs index 3460f88..64ef347 100644 --- a/src/plugins/ble/mod.rs +++ b/src/plugins/ble/mod.rs @@ -51,6 +51,10 @@ impl Plugin for BlePlugin { } } + fn dependencies(&self) -> Vec<&'static str> { + vec![] + } + fn init(&mut self, ctx: PluginContext) -> Result<()> { self.stop.store(false, Ordering::SeqCst); self.ctx = Some(ctx); diff --git a/src/plugins/http/mod.rs b/src/plugins/http/mod.rs index b3f1ed2..1697fbc 100644 --- a/src/plugins/http/mod.rs +++ b/src/plugins/http/mod.rs @@ -6,10 +6,29 @@ mod routes; use crate::core::config::AppConfig; use crate::core::message::{Envelope, Message}; -use crate::core::plugin::{Plugin, PluginContext, PluginInfo, Platform}; +use crate::core::plugin::{Platform, Plugin, PluginContext, PluginInfo}; use anyhow::{Context, Result}; +use serde::Serialize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; +use tokio::sync::broadcast; + +#[derive(Serialize)] +struct WsEvent<'a, T> { + #[serde(rename = "type")] + event_type: &'a str, + data: T, +} + +fn encode_ws_event(event_type: &str, data: T) -> Option { + match serde_json::to_string(&WsEvent { event_type, data }) { + Ok(payload) => Some(payload), + Err(error) => { + eprintln!("[HttpPlugin] failed to serialize websocket event '{event_type}': {error}"); + None + } + } +} struct PendingWifiResponse { version: u64, @@ -22,10 +41,12 @@ pub(crate) struct HttpState { config: Mutex>, player_status: Mutex, ble_ready: AtomicBool, + ws_events: broadcast::Sender, } impl HttpState { fn new(config: Arc) -> Self { + let (ws_events, _) = broadcast::channel(32); let player_status = crate::core::message::PlayerStatusData { running: false, paused: !config.playback.auto_start, @@ -44,6 +65,7 @@ impl HttpState { config: Mutex::new(config), player_status: Mutex::new(player_status), ble_ready: AtomicBool::new(false), + ws_events, } } @@ -92,8 +114,42 @@ impl HttpState { self.ble_ready.load(Ordering::SeqCst) } + fn publish_ws(&self, payload: String) { + let _ = self.ws_events.send(payload); + } + + pub(crate) fn ws_snapshots(&self) -> Vec { + let mut snapshots = Vec::new(); + + if let Some(payload) = encode_ws_event("status_update", self.player_status()) { + snapshots.push(payload); + } + + let config = self.config(); + if let Some(payload) = encode_ws_event("config_update", config.as_ref()) { + snapshots.push(payload); + } + + if let Some(payload) = encode_ws_event( + "ble_update", + serde_json::json!({ "ready": self.ble_ready() }), + ) { + snapshots.push(payload); + } + + snapshots + } + + pub(crate) fn ws_subscribe(&self) -> broadcast::Receiver { + self.ws_events.subscribe() + } + fn set_ble_ready(&self, ready: bool) { self.ble_ready.store(ready, Ordering::SeqCst); + if let Some(payload) = encode_ws_event("ble_update", serde_json::json!({ "ready": ready })) + { + self.publish_ws(payload); + } } } @@ -118,7 +174,9 @@ impl Default for HttpPlugin { } impl Plugin for HttpPlugin { - fn id(&self) -> &'static str { "http" } + fn id(&self) -> &'static str { + "http" + } fn info(&self) -> PluginInfo { PluginInfo { @@ -129,6 +187,10 @@ impl Plugin for HttpPlugin { } } + fn dependencies(&self) -> Vec<&'static str> { + vec!["video"] + } + fn init(&mut self, ctx: PluginContext) -> Result<()> { self.state = Some(Arc::new(HttpState::new(Arc::clone(&ctx.config)))); self.ctx = Some(ctx); @@ -201,8 +263,29 @@ impl Plugin for HttpPlugin { match msg { Message::WifiResult(payload) => state.publish_wifi_result(payload), - Message::PlayerStatus(status) => state.update_player_status(status), - Message::ConfigReloaded(config) => state.replace_config(config), + Message::PlayerStatus(status) => { + state.update_player_status(status.clone()); + if let Some(payload) = encode_ws_event("status_update", &status) { + state.publish_ws(payload); + } + } + Message::ConfigReloaded(config) => { + state.replace_config(Arc::clone(&config)); + if let Some(payload) = encode_ws_event("config_update", config.as_ref()) { + state.publish_ws(payload); + } + } + Message::StateChanged { + old_state, + new_state, + } => { + if let Some(payload) = encode_ws_event( + "state_update", + serde_json::json!({ "old_state": old_state, "new_state": new_state }), + ) { + state.publish_ws(payload); + } + } Message::PluginReady("ble") => state.set_ble_ready(true), Message::Shutdown => state.set_ble_ready(false), _ => {} @@ -211,5 +294,7 @@ impl Plugin for HttpPlugin { Ok(()) } - fn stop(&mut self) -> Result<()> { Ok(()) } + fn stop(&mut self) -> Result<()> { + Ok(()) + } } diff --git a/src/plugins/http/routes.rs b/src/plugins/http/routes.rs index 5d1b641..d1e01f7 100644 --- a/src/plugins/http/routes.rs +++ b/src/plugins/http/routes.rs @@ -2,7 +2,8 @@ use super::HttpState; use crate::core::config::{self, AppConfig}; use crate::core::message::{Destination, Envelope, Message, PlayerCommand, WifiCommand}; use bytes::Buf; -use futures_util::TryStreamExt; +use futures_util::{SinkExt, StreamExt, TryStreamExt}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::convert::Infallible; @@ -19,13 +20,13 @@ struct WifiConnectRequest { password: String, } -#[derive(Deserialize)] +#[derive(Default, Deserialize)] struct WifiApStartRequest { ssid: Option, password: Option, } -#[derive(Deserialize)] +#[derive(Default, Deserialize)] struct BleStartRequest { device_name: Option, } @@ -82,9 +83,9 @@ pub(crate) fn build_routes( .or(wifi_ap_stop_route(tx.clone(), Arc::clone(&state))) .or(ble_start_route(Arc::clone(&state))) .or(ble_stop_route()) - .or(ble_status_route(state)); + .or(ble_status_route(Arc::clone(&state))); - root_route().or(api).with( + root_route().or(ws_route(Arc::clone(&state))).or(api).with( warp::cors() .allow_any_origin() .allow_headers(["content-type"]) @@ -98,6 +99,16 @@ fn root_route() -> impl Filter + .map(|| warp::reply::html(WEB_UI_HTML)) } +fn ws_route( + state: Arc, +) -> impl Filter + Clone { + warp::path("ws").and(warp::ws()).and(with_state(state)).map( + |ws: warp::ws::Ws, state: Arc| { + ws.on_upgrade(move |socket| websocket_session(socket, state)) + }, + ) +} + fn status_route( state: Arc, ) -> impl Filter + Clone { @@ -216,9 +227,9 @@ fn scene_route( fn trigger_route( tx: mpsc::Sender, ) -> impl Filter + Clone { - warp::path!("api" / "trigger" / String / String) + let with_value = warp::path!("api" / "trigger" / String / String) .and(warp::post()) - .and(with_tx(tx)) + .and(with_tx(tx.clone())) .and_then(|name: String, value: String, tx| async move { send_video_command( tx, @@ -229,7 +240,24 @@ fn trigger_route( format!("触发器 '{name}' 已发送,值: {value}"), ) .await - }) + }); + + let without_value = warp::path!("api" / "trigger" / String) + .and(warp::post()) + .and(with_tx(tx)) + .and_then(|name: String, tx| async move { + send_video_command( + tx, + Message::Trigger { + name: name.clone(), + value: String::new(), + }, + format!("触发器 '{name}' 已发送"), + ) + .await + }); + + with_value.or(without_value) } fn config_get_route( @@ -357,10 +385,14 @@ fn wifi_ap_start_route( ) -> impl Filter + Clone { warp::path!("api" / "wifi" / "ap" / "start") .and(warp::post()) - .and(warp::body::json()) + .and(warp::body::bytes()) .and(with_tx(tx)) .and(with_state(state)) - .and_then(|req: WifiApStartRequest, tx, state| async move { + .and_then(|body: bytes::Bytes, tx, state| async move { + let req: WifiApStartRequest = match parse_optional_json(&body) { + Ok(req) => req, + Err(reply) => return Ok::<_, Infallible>(*reply), + }; let ssid = req.ssid.unwrap_or_else(|| "showen".to_string()); let password = req.password.unwrap_or_else(|| "12345678".to_string()); let success_ssid = ssid.clone(); @@ -383,8 +415,10 @@ fn wifi_ap_stop_route( .and(with_tx(tx)) .and(with_state(state)) .and_then(|tx, state| async move { - wifi_action_reply(tx, state, WifiCommand::ApStop, |_| "AP 热点已关闭".to_string()) - .await + wifi_action_reply(tx, state, WifiCommand::ApStop, |_| { + "AP 热点已关闭".to_string() + }) + .await }) } @@ -393,11 +427,17 @@ fn ble_start_route( ) -> impl Filter + Clone { warp::path!("api" / "ble" / "start") .and(warp::post()) - .and(warp::body::json()) + .and(warp::body::bytes()) .and(with_state(state)) - .and_then(|req: BleStartRequest, state: Arc| async move { + .and_then(|body: bytes::Bytes, state: Arc| async move { + let req: BleStartRequest = match parse_optional_json(&body) { + Ok(req) => req, + Err(reply) => return Ok::<_, Infallible>(*reply), + }; let config = state.config(); - let device_name = req.device_name.unwrap_or_else(|| config.ble.device_name.clone()); + let device_name = req + .device_name + .unwrap_or_else(|| config.ble.device_name.clone()); Ok::<_, Infallible>(success_json(format!( "BLE 配网服务已内嵌运行中,设备名: {device_name}" ))) @@ -436,7 +476,12 @@ async fn handle_config_update( ) -> Result { let raw = match std::str::from_utf8(&body) { Ok(raw) => raw, - Err(_) => return Ok(error_json(StatusCode::BAD_REQUEST, "请求体不是有效的 UTF-8")), + Err(_) => { + return Ok(error_json( + StatusCode::BAD_REQUEST, + "请求体不是有效的 UTF-8", + )) + } }; let current = state.config(); @@ -687,7 +732,10 @@ async fn wifi_request( while guard.version == version { let now = Instant::now(); if now >= deadline { - return Err(error_json(StatusCode::GATEWAY_TIMEOUT, "等待 WiFi 响应超时")); + return Err(error_json( + StatusCode::GATEWAY_TIMEOUT, + "等待 WiFi 响应超时", + )); } let result = state @@ -705,7 +753,10 @@ async fn wifi_request( guard = next_guard; if wait_result.timed_out() && guard.version == version { - return Err(error_json(StatusCode::GATEWAY_TIMEOUT, "等待 WiFi 响应超时")); + return Err(error_json( + StatusCode::GATEWAY_TIMEOUT, + "等待 WiFi 响应超时", + )); } } @@ -731,6 +782,64 @@ async fn wifi_request( Ok(payload) } +async fn websocket_session(ws: warp::ws::WebSocket, state: Arc) { + let (mut sender, mut receiver) = ws.split(); + + for payload in state.ws_snapshots() { + if sender.send(warp::ws::Message::text(payload)).await.is_err() { + return; + } + } + + let mut events = state.ws_subscribe(); + + loop { + tokio::select! { + event = events.recv() => { + match event { + Ok(payload) => { + if sender.send(warp::ws::Message::text(payload)).await.is_err() { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + incoming = receiver.next() => { + match incoming { + Some(Ok(message)) => { + if message.is_ping() { + if sender.send(warp::ws::Message::pong(message.as_bytes())).await.is_err() { + break; + } + } else if message.is_close() { + break; + } + } + Some(Err(_)) | None => break, + } + } + } + } +} + +fn parse_optional_json(body: &bytes::Bytes) -> Result> +where + T: DeserializeOwned + Default, +{ + if body.is_empty() { + return Ok(T::default()); + } + + serde_json::from_slice(body).map_err(|error| { + Box::new(error_json( + StatusCode::BAD_REQUEST, + &format!("JSON 格式错误: {error}"), + )) + }) +} + fn list_video_files(dir: &Path) -> Vec { let mut files = Vec::new(); diff --git a/src/plugins/wifi/mod.rs b/src/plugins/wifi/mod.rs index 8a7a4e6..988ba49 100644 --- a/src/plugins/wifi/mod.rs +++ b/src/plugins/wifi/mod.rs @@ -6,7 +6,7 @@ use crate::core::{message::*, plugin::*}; use anyhow::{anyhow, Context, Result}; use serde::Serialize; use serde_json::json; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::process::Command; use std::thread; use std::time::Duration; @@ -60,7 +60,7 @@ impl WifiPlugin { ctx.tx.send(Envelope { from: "wifi", - to: Destination::Manager, + to: Destination::Broadcast, message: Message::WifiResult(payload), })?; @@ -95,9 +95,12 @@ impl WifiPlugin { let networks = output .lines() .filter(|line| !line.trim().is_empty()) - .map(|line| { + .filter_map(|line| { let mut parts = line.splitn(3, ':'); let ssid = parts.next().unwrap_or_default().trim().to_string(); + if ssid.is_empty() { + return None; + } let signal = parts .next() .unwrap_or_default() @@ -106,13 +109,22 @@ impl WifiPlugin { .unwrap_or_default(); let security = parts.next().unwrap_or_default().trim().to_string(); - WifiNetwork { + Some(WifiNetwork { ssid, signal, security, - } + }) }) - .collect::>(); + .fold( + (HashSet::new(), Vec::new()), + |(mut seen, mut networks), network| { + if seen.insert(network.ssid.clone()) { + networks.push(network); + } + (seen, networks) + }, + ) + .1; Ok(json!({ "ok": true, @@ -122,7 +134,11 @@ impl WifiPlugin { } fn connect_network(&self, ssid: &str, password: &str) -> Result { - let output = Self::run_nmcli(&["device", "wifi", "connect", ssid, "password", password])?; + let mut args = vec!["device", "wifi", "connect", ssid]; + if !password.trim().is_empty() { + args.extend(["password", password]); + } + let output = Self::run_nmcli(&args)?; Ok(json!({ "ok": true, @@ -234,6 +250,10 @@ impl Plugin for WifiPlugin { } } + fn dependencies(&self) -> Vec<&'static str> { + vec![] + } + fn init(&mut self, ctx: PluginContext) -> Result<()> { self.ctx = Some(ctx); Ok(())