//! HttpPlugin — Web UI + REST API //! //! 基于 warp 的 HTTP 服务,提供播放控制、配置管理、视频管理等 API。 mod routes; use crate::core::config::AppConfig; use crate::core::message::{Envelope, Message}; use crate::core::plugin::{Platform, Plugin, PluginContext, PluginInfo}; use anyhow::{Context, Result}; use serde::Serialize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use tokio::sync::broadcast; #[derive(Serialize)] struct WsEvent<'a, T> { #[serde(rename = "type")] event_type: &'a str, data: T, } fn encode_ws_event(event_type: &str, data: T) -> Option { match serde_json::to_string(&WsEvent { event_type, data }) { Ok(payload) => Some(payload), Err(error) => { eprintln!("[HttpPlugin] failed to serialize websocket event '{event_type}': {error}"); None } } } struct PendingWifiResponse { version: u64, payload: Option, } pub(crate) struct HttpState { wifi_response: Mutex, wifi_response_cv: Condvar, last_wifi_result: Mutex>, config: Mutex>, player_status: Mutex, ble_ready: AtomicBool, ws_events: broadcast::Sender, } impl HttpState { fn new(config: Arc) -> Self { let (ws_events, _) = broadcast::channel(32); let player_status = crate::core::message::PlayerStatusData { running: false, paused: !config.playback.auto_start, in_transition: false, current_index: 0, playlist_length: config.playlist.len(), current_video: config.playlist.first().map(|item| item.id.clone()), }; Self { wifi_response: Mutex::new(PendingWifiResponse { version: 0, payload: None, }), wifi_response_cv: Condvar::new(), last_wifi_result: Mutex::new(None), config: Mutex::new(config), player_status: Mutex::new(player_status), ble_ready: AtomicBool::new(false), ws_events, } } fn publish_wifi_result(&self, payload: String) { if let Ok(mut state) = self.wifi_response.lock() { state.version += 1; state.payload = Some(payload.clone()); self.wifi_response_cv.notify_all(); } if let Ok(mut last_wifi_result) = self.last_wifi_result.lock() { *last_wifi_result = Some(payload.clone()); } let ws_payload = match serde_json::from_str::(&payload) { Ok(value) => encode_ws_event("wifi_update", value), Err(_) => encode_ws_event("wifi_update", serde_json::json!({ "raw": payload })), }; if let Some(ws_payload) = ws_payload { self.publish_ws(ws_payload); } } pub(crate) fn config(&self) -> Arc { self.config .lock() .map(|config| Arc::clone(&config)) .expect("http config state poisoned") } fn replace_config(&self, config: Arc) { if let Ok(mut current) = self.config.lock() { *current = Arc::clone(&config); } if let Ok(mut player_status) = self.player_status.lock() { player_status.playlist_length = config.playlist.len(); if player_status.current_video.is_none() { player_status.current_video = config.playlist.first().map(|item| item.id.clone()); } } } pub(crate) fn player_status(&self) -> crate::core::message::PlayerStatusData { self.player_status .lock() .map(|status| status.clone()) .expect("http player status state poisoned") } fn update_player_status(&self, status: crate::core::message::PlayerStatusData) { if let Ok(mut current) = self.player_status.lock() { *current = status; } } pub(crate) fn ble_ready(&self) -> bool { self.ble_ready.load(Ordering::SeqCst) } fn publish_ws(&self, payload: String) { let _ = self.ws_events.send(payload); } pub(crate) fn ws_snapshots(&self) -> Vec { let mut snapshots = Vec::new(); if let Some(payload) = encode_ws_event("status_update", self.player_status()) { snapshots.push(payload); } let config = self.config(); if let Some(payload) = encode_ws_event("config_update", config.as_ref()) { snapshots.push(payload); } if let Some(payload) = encode_ws_event( "ble_update", serde_json::json!({ "ready": self.ble_ready() }), ) { snapshots.push(payload); } if let Ok(last_wifi_result) = self.last_wifi_result.lock() { if let Some(raw) = last_wifi_result.as_ref() { let payload = match serde_json::from_str::(raw) { Ok(value) => encode_ws_event("wifi_update", value), Err(_) => encode_ws_event("wifi_update", serde_json::json!({ "raw": raw })), }; if let Some(payload) = payload { snapshots.push(payload); } } } snapshots } pub(crate) fn ws_subscribe(&self) -> broadcast::Receiver { self.ws_events.subscribe() } fn set_ble_ready(&self, ready: bool) { self.ble_ready.store(ready, Ordering::SeqCst); if let Some(payload) = encode_ws_event("ble_update", serde_json::json!({ "ready": ready })) { self.publish_ws(payload); } } } pub struct HttpPlugin { ctx: Option, state: Option>, } impl HttpPlugin { pub fn new() -> Self { Self { ctx: None, state: None, } } } impl Default for HttpPlugin { fn default() -> Self { Self::new() } } impl Plugin for HttpPlugin { fn id(&self) -> &'static str { "http" } fn info(&self) -> PluginInfo { PluginInfo { name: "HTTP API", version: "0.2.0", description: "Web UI + REST API (warp)", platform: Platform::Any, } } fn dependencies(&self) -> Vec<&'static str> { vec!["video"] } fn init(&mut self, ctx: PluginContext) -> Result<()> { self.state = Some(Arc::new(HttpState::new(Arc::clone(&ctx.config)))); self.ctx = Some(ctx); Ok(()) } fn start(&mut self) -> Result<()> { let ctx = self .ctx .as_ref() .context("http plugin context is not initialized")?; if !ctx.config.remote_control.enabled { println!("[HttpPlugin] Remote control disabled, skip HTTP server startup"); return Ok(()); } let host = ctx.config.remote_control.host.clone(); let port = ctx.config.remote_control.port; let tx = ctx.tx.clone(); let state = Arc::clone( self.state .as_ref() .context("http plugin state is not initialized")?, ); std::thread::spawn(move || { let runtime = match tokio::runtime::Builder::new_multi_thread() .enable_all() .build() { Ok(runtime) => runtime, Err(error) => { eprintln!("[HttpPlugin] failed to create tokio runtime: {error}"); return; } }; runtime.block_on(async move { let routes = routes::build_routes(tx.clone(), state); let addr: std::net::SocketAddr = match format!("{host}:{port}").parse() { Ok(addr) => addr, Err(error) => { eprintln!("[HttpPlugin] invalid listen address {host}:{port}: {error}"); return; } }; if let Err(error) = tx.send(Envelope { from: "http", to: crate::core::message::Destination::Manager, message: Message::PluginReady("http"), }) { eprintln!("[HttpPlugin] failed to report ready state: {error}"); } println!("[HttpPlugin] listening on http://{addr}"); warp::serve(routes).run(addr).await; }); }); Ok(()) } fn handle_message(&mut self, msg: Message) -> Result<()> { let state = match self.state.as_ref() { Some(state) => state, None => return Ok(()), }; match msg { Message::WifiResult(payload) => state.publish_wifi_result(payload), Message::PlayerStatus(status) => { state.update_player_status(status.clone()); if let Some(payload) = encode_ws_event("status_update", &status) { state.publish_ws(payload); } } Message::ConfigReloaded(config) => { state.replace_config(Arc::clone(&config)); if let Some(payload) = encode_ws_event("config_update", config.as_ref()) { state.publish_ws(payload); } } Message::StateChanged { old_state, new_state, } => { if let Some(payload) = encode_ws_event( "state_update", serde_json::json!({ "old_state": old_state, "new_state": new_state }), ) { state.publish_ws(payload); } } Message::PluginReady("ble") => state.set_ble_ready(true), Message::Shutdown => state.set_ble_ready(false), _ => {} } Ok(()) } fn stop(&mut self) -> Result<()> { Ok(()) } }