fix BLE wifi status delivery and websocket compile issues
This commit is contained in:
@@ -6,10 +6,29 @@ mod routes;
|
||||
|
||||
use crate::core::config::AppConfig;
|
||||
use crate::core::message::{Envelope, Message};
|
||||
use crate::core::plugin::{Plugin, PluginContext, PluginInfo, Platform};
|
||||
use crate::core::plugin::{Platform, Plugin, PluginContext, PluginInfo};
|
||||
use anyhow::{Context, Result};
|
||||
use serde::Serialize;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct WsEvent<'a, T> {
|
||||
#[serde(rename = "type")]
|
||||
event_type: &'a str,
|
||||
data: T,
|
||||
}
|
||||
|
||||
fn encode_ws_event<T: Serialize>(event_type: &str, data: T) -> Option<String> {
|
||||
match serde_json::to_string(&WsEvent { event_type, data }) {
|
||||
Ok(payload) => Some(payload),
|
||||
Err(error) => {
|
||||
eprintln!("[HttpPlugin] failed to serialize websocket event '{event_type}': {error}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PendingWifiResponse {
|
||||
version: u64,
|
||||
@@ -22,10 +41,12 @@ pub(crate) struct HttpState {
|
||||
config: Mutex<Arc<AppConfig>>,
|
||||
player_status: Mutex<crate::core::message::PlayerStatusData>,
|
||||
ble_ready: AtomicBool,
|
||||
ws_events: broadcast::Sender<String>,
|
||||
}
|
||||
|
||||
impl HttpState {
|
||||
fn new(config: Arc<AppConfig>) -> Self {
|
||||
let (ws_events, _) = broadcast::channel(32);
|
||||
let player_status = crate::core::message::PlayerStatusData {
|
||||
running: false,
|
||||
paused: !config.playback.auto_start,
|
||||
@@ -44,6 +65,7 @@ impl HttpState {
|
||||
config: Mutex::new(config),
|
||||
player_status: Mutex::new(player_status),
|
||||
ble_ready: AtomicBool::new(false),
|
||||
ws_events,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,8 +114,42 @@ impl HttpState {
|
||||
self.ble_ready.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
fn publish_ws(&self, payload: String) {
|
||||
let _ = self.ws_events.send(payload);
|
||||
}
|
||||
|
||||
pub(crate) fn ws_snapshots(&self) -> Vec<String> {
|
||||
let mut snapshots = Vec::new();
|
||||
|
||||
if let Some(payload) = encode_ws_event("status_update", self.player_status()) {
|
||||
snapshots.push(payload);
|
||||
}
|
||||
|
||||
let config = self.config();
|
||||
if let Some(payload) = encode_ws_event("config_update", config.as_ref()) {
|
||||
snapshots.push(payload);
|
||||
}
|
||||
|
||||
if let Some(payload) = encode_ws_event(
|
||||
"ble_update",
|
||||
serde_json::json!({ "ready": self.ble_ready() }),
|
||||
) {
|
||||
snapshots.push(payload);
|
||||
}
|
||||
|
||||
snapshots
|
||||
}
|
||||
|
||||
pub(crate) fn ws_subscribe(&self) -> broadcast::Receiver<String> {
|
||||
self.ws_events.subscribe()
|
||||
}
|
||||
|
||||
fn set_ble_ready(&self, ready: bool) {
|
||||
self.ble_ready.store(ready, Ordering::SeqCst);
|
||||
if let Some(payload) = encode_ws_event("ble_update", serde_json::json!({ "ready": ready }))
|
||||
{
|
||||
self.publish_ws(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,7 +174,9 @@ impl Default for HttpPlugin {
|
||||
}
|
||||
|
||||
impl Plugin for HttpPlugin {
|
||||
fn id(&self) -> &'static str { "http" }
|
||||
fn id(&self) -> &'static str {
|
||||
"http"
|
||||
}
|
||||
|
||||
fn info(&self) -> PluginInfo {
|
||||
PluginInfo {
|
||||
@@ -129,6 +187,10 @@ impl Plugin for HttpPlugin {
|
||||
}
|
||||
}
|
||||
|
||||
fn dependencies(&self) -> Vec<&'static str> {
|
||||
vec!["video"]
|
||||
}
|
||||
|
||||
fn init(&mut self, ctx: PluginContext) -> Result<()> {
|
||||
self.state = Some(Arc::new(HttpState::new(Arc::clone(&ctx.config))));
|
||||
self.ctx = Some(ctx);
|
||||
@@ -201,8 +263,29 @@ impl Plugin for HttpPlugin {
|
||||
|
||||
match msg {
|
||||
Message::WifiResult(payload) => state.publish_wifi_result(payload),
|
||||
Message::PlayerStatus(status) => state.update_player_status(status),
|
||||
Message::ConfigReloaded(config) => state.replace_config(config),
|
||||
Message::PlayerStatus(status) => {
|
||||
state.update_player_status(status.clone());
|
||||
if let Some(payload) = encode_ws_event("status_update", &status) {
|
||||
state.publish_ws(payload);
|
||||
}
|
||||
}
|
||||
Message::ConfigReloaded(config) => {
|
||||
state.replace_config(Arc::clone(&config));
|
||||
if let Some(payload) = encode_ws_event("config_update", config.as_ref()) {
|
||||
state.publish_ws(payload);
|
||||
}
|
||||
}
|
||||
Message::StateChanged {
|
||||
old_state,
|
||||
new_state,
|
||||
} => {
|
||||
if let Some(payload) = encode_ws_event(
|
||||
"state_update",
|
||||
serde_json::json!({ "old_state": old_state, "new_state": new_state }),
|
||||
) {
|
||||
state.publish_ws(payload);
|
||||
}
|
||||
}
|
||||
Message::PluginReady("ble") => state.set_ble_ready(true),
|
||||
Message::Shutdown => state.set_ble_ready(false),
|
||||
_ => {}
|
||||
@@ -211,5 +294,7 @@ impl Plugin for HttpPlugin {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stop(&mut self) -> Result<()> { Ok(()) }
|
||||
fn stop(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,8 @@ use super::HttpState;
|
||||
use crate::core::config::{self, AppConfig};
|
||||
use crate::core::message::{Destination, Envelope, Message, PlayerCommand, WifiCommand};
|
||||
use bytes::Buf;
|
||||
use futures_util::TryStreamExt;
|
||||
use futures_util::{SinkExt, StreamExt, TryStreamExt};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::convert::Infallible;
|
||||
@@ -19,13 +20,13 @@ struct WifiConnectRequest {
|
||||
password: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Default, Deserialize)]
|
||||
struct WifiApStartRequest {
|
||||
ssid: Option<String>,
|
||||
password: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Default, Deserialize)]
|
||||
struct BleStartRequest {
|
||||
device_name: Option<String>,
|
||||
}
|
||||
@@ -82,9 +83,9 @@ pub(crate) fn build_routes(
|
||||
.or(wifi_ap_stop_route(tx.clone(), Arc::clone(&state)))
|
||||
.or(ble_start_route(Arc::clone(&state)))
|
||||
.or(ble_stop_route())
|
||||
.or(ble_status_route(state));
|
||||
.or(ble_status_route(Arc::clone(&state)));
|
||||
|
||||
root_route().or(api).with(
|
||||
root_route().or(ws_route(Arc::clone(&state))).or(api).with(
|
||||
warp::cors()
|
||||
.allow_any_origin()
|
||||
.allow_headers(["content-type"])
|
||||
@@ -98,6 +99,16 @@ fn root_route() -> impl Filter<Extract = impl Reply, Error = warp::Rejection> +
|
||||
.map(|| warp::reply::html(WEB_UI_HTML))
|
||||
}
|
||||
|
||||
fn ws_route(
|
||||
state: Arc<HttpState>,
|
||||
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
|
||||
warp::path("ws").and(warp::ws()).and(with_state(state)).map(
|
||||
|ws: warp::ws::Ws, state: Arc<HttpState>| {
|
||||
ws.on_upgrade(move |socket| websocket_session(socket, state))
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn status_route(
|
||||
state: Arc<HttpState>,
|
||||
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
|
||||
@@ -216,9 +227,9 @@ fn scene_route(
|
||||
fn trigger_route(
|
||||
tx: mpsc::Sender<Envelope>,
|
||||
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
|
||||
warp::path!("api" / "trigger" / String / String)
|
||||
let with_value = warp::path!("api" / "trigger" / String / String)
|
||||
.and(warp::post())
|
||||
.and(with_tx(tx))
|
||||
.and(with_tx(tx.clone()))
|
||||
.and_then(|name: String, value: String, tx| async move {
|
||||
send_video_command(
|
||||
tx,
|
||||
@@ -229,7 +240,24 @@ fn trigger_route(
|
||||
format!("触发器 '{name}' 已发送,值: {value}"),
|
||||
)
|
||||
.await
|
||||
})
|
||||
});
|
||||
|
||||
let without_value = warp::path!("api" / "trigger" / String)
|
||||
.and(warp::post())
|
||||
.and(with_tx(tx))
|
||||
.and_then(|name: String, tx| async move {
|
||||
send_video_command(
|
||||
tx,
|
||||
Message::Trigger {
|
||||
name: name.clone(),
|
||||
value: String::new(),
|
||||
},
|
||||
format!("触发器 '{name}' 已发送"),
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
with_value.or(without_value)
|
||||
}
|
||||
|
||||
fn config_get_route(
|
||||
@@ -357,10 +385,14 @@ fn wifi_ap_start_route(
|
||||
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
|
||||
warp::path!("api" / "wifi" / "ap" / "start")
|
||||
.and(warp::post())
|
||||
.and(warp::body::json())
|
||||
.and(warp::body::bytes())
|
||||
.and(with_tx(tx))
|
||||
.and(with_state(state))
|
||||
.and_then(|req: WifiApStartRequest, tx, state| async move {
|
||||
.and_then(|body: bytes::Bytes, tx, state| async move {
|
||||
let req: WifiApStartRequest = match parse_optional_json(&body) {
|
||||
Ok(req) => req,
|
||||
Err(reply) => return Ok::<_, Infallible>(*reply),
|
||||
};
|
||||
let ssid = req.ssid.unwrap_or_else(|| "showen".to_string());
|
||||
let password = req.password.unwrap_or_else(|| "12345678".to_string());
|
||||
let success_ssid = ssid.clone();
|
||||
@@ -383,8 +415,10 @@ fn wifi_ap_stop_route(
|
||||
.and(with_tx(tx))
|
||||
.and(with_state(state))
|
||||
.and_then(|tx, state| async move {
|
||||
wifi_action_reply(tx, state, WifiCommand::ApStop, |_| "AP 热点已关闭".to_string())
|
||||
.await
|
||||
wifi_action_reply(tx, state, WifiCommand::ApStop, |_| {
|
||||
"AP 热点已关闭".to_string()
|
||||
})
|
||||
.await
|
||||
})
|
||||
}
|
||||
|
||||
@@ -393,11 +427,17 @@ fn ble_start_route(
|
||||
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
|
||||
warp::path!("api" / "ble" / "start")
|
||||
.and(warp::post())
|
||||
.and(warp::body::json())
|
||||
.and(warp::body::bytes())
|
||||
.and(with_state(state))
|
||||
.and_then(|req: BleStartRequest, state: Arc<HttpState>| async move {
|
||||
.and_then(|body: bytes::Bytes, state: Arc<HttpState>| async move {
|
||||
let req: BleStartRequest = match parse_optional_json(&body) {
|
||||
Ok(req) => req,
|
||||
Err(reply) => return Ok::<_, Infallible>(*reply),
|
||||
};
|
||||
let config = state.config();
|
||||
let device_name = req.device_name.unwrap_or_else(|| config.ble.device_name.clone());
|
||||
let device_name = req
|
||||
.device_name
|
||||
.unwrap_or_else(|| config.ble.device_name.clone());
|
||||
Ok::<_, Infallible>(success_json(format!(
|
||||
"BLE 配网服务已内嵌运行中,设备名: {device_name}"
|
||||
)))
|
||||
@@ -436,7 +476,12 @@ async fn handle_config_update(
|
||||
) -> Result<warp::reply::Response, Infallible> {
|
||||
let raw = match std::str::from_utf8(&body) {
|
||||
Ok(raw) => raw,
|
||||
Err(_) => return Ok(error_json(StatusCode::BAD_REQUEST, "请求体不是有效的 UTF-8")),
|
||||
Err(_) => {
|
||||
return Ok(error_json(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"请求体不是有效的 UTF-8",
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let current = state.config();
|
||||
@@ -687,7 +732,10 @@ async fn wifi_request(
|
||||
while guard.version == version {
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
return Err(error_json(StatusCode::GATEWAY_TIMEOUT, "等待 WiFi 响应超时"));
|
||||
return Err(error_json(
|
||||
StatusCode::GATEWAY_TIMEOUT,
|
||||
"等待 WiFi 响应超时",
|
||||
));
|
||||
}
|
||||
|
||||
let result = state
|
||||
@@ -705,7 +753,10 @@ async fn wifi_request(
|
||||
guard = next_guard;
|
||||
|
||||
if wait_result.timed_out() && guard.version == version {
|
||||
return Err(error_json(StatusCode::GATEWAY_TIMEOUT, "等待 WiFi 响应超时"));
|
||||
return Err(error_json(
|
||||
StatusCode::GATEWAY_TIMEOUT,
|
||||
"等待 WiFi 响应超时",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,6 +782,64 @@ async fn wifi_request(
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
async fn websocket_session(ws: warp::ws::WebSocket, state: Arc<HttpState>) {
|
||||
let (mut sender, mut receiver) = ws.split();
|
||||
|
||||
for payload in state.ws_snapshots() {
|
||||
if sender.send(warp::ws::Message::text(payload)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let mut events = state.ws_subscribe();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = events.recv() => {
|
||||
match event {
|
||||
Ok(payload) => {
|
||||
if sender.send(warp::ws::Message::text(payload)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
incoming = receiver.next() => {
|
||||
match incoming {
|
||||
Some(Ok(message)) => {
|
||||
if message.is_ping() {
|
||||
if sender.send(warp::ws::Message::pong(message.as_bytes())).await.is_err() {
|
||||
break;
|
||||
}
|
||||
} else if message.is_close() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Some(Err(_)) | None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_optional_json<T>(body: &bytes::Bytes) -> Result<T, Box<warp::reply::Response>>
|
||||
where
|
||||
T: DeserializeOwned + Default,
|
||||
{
|
||||
if body.is_empty() {
|
||||
return Ok(T::default());
|
||||
}
|
||||
|
||||
serde_json::from_slice(body).map_err(|error| {
|
||||
Box::new(error_json(
|
||||
StatusCode::BAD_REQUEST,
|
||||
&format!("JSON 格式错误: {error}"),
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
fn list_video_files(dir: &Path) -> Vec<VideoFileInfo> {
|
||||
let mut files = Vec::new();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user