#![cfg_attr(feature = "backtrace", feature(error_generic_member_access))]
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};
use tokio_stream::StreamExt;
use basic_frame::DynamicFrame;
use bui_backend_session_types::ConnectionKey;
use event_stream_types::{ConnectionEvent, ConnectionEventType, EventChunkSender};
pub use http_video_streaming_types::{
CircleParams, DrawableShape, FirehoseCallbackInner, Point, Shape, ToClient,
};
type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("unknown path")]
UnknownPath(#[cfg(feature = "backtrace")] std::backtrace::Backtrace),
#[error(transparent)]
ConvertImageError(
#[from]
#[cfg_attr(feature = "backtrace", backtrace)]
convert_image::Error,
),
}
#[derive(Debug)]
pub struct AnnotatedFrame {
pub frame: DynamicFrame,
pub found_points: Vec<Point>,
pub valid_display: Option<Shape>,
pub annotations: Vec<DrawableShape>,
}
fn _test_annotated_frame_is_send() {
fn implements<T: Send>() {}
implements::<AnnotatedFrame>();
}
#[derive(Debug)]
pub struct FirehoseCallback {
pub arrival_time: chrono::DateTime<chrono::Utc>,
pub inner: FirehoseCallbackInner,
}
struct PerSender {
out: EventChunkSender,
frame_lifo: Option<Arc<Mutex<AnnotatedFrame>>>,
ready_to_send: bool,
conn_key: ConnectionKey,
fno: u64,
}
fn _test_per_sender_is_send() {
fn implements<T: Send>() {}
implements::<PerSender>();
}
#[derive(Debug)]
pub enum NameSelector {
All,
None,
Name(String),
}
impl PerSender {
fn new(
out: EventChunkSender,
conn_key: ConnectionKey,
frame: Arc<Mutex<AnnotatedFrame>>,
) -> PerSender {
PerSender {
out,
frame_lifo: Some(frame),
ready_to_send: true,
conn_key,
fno: 0,
}
}
fn push(&mut self, frame: Arc<Mutex<AnnotatedFrame>>) {
self.fno += 1;
self.frame_lifo = Some(frame);
}
fn got_callback(&mut self, msg: FirehoseCallback) {
match chrono::DateTime::parse_from_rfc3339(&msg.inner.ts_rfc3339) {
Ok(sent_time) => {
let latency = msg.arrival_time.signed_duration_since(sent_time);
tracing::trace!("latency: {:?}", latency);
}
Err(e) => {
tracing::error!("failed to parse timestamp in callback: {:?}", e);
}
}
self.ready_to_send = true;
}
async fn service(&mut self) -> Result<()> {
if let Some(ref most_recent_frame_data) = self.frame_lifo {
if self.ready_to_send {
let sent_time: chrono::DateTime<chrono::Utc> = chrono::Utc::now();
let tc = {
let most_recent_frame_data = most_recent_frame_data.lock();
let bytes = basic_frame::match_all_dynamic_fmts!(
&most_recent_frame_data.frame,
x,
convert_image::frame_to_image(x, convert_image::ImageOptions::Jpeg(80),)
)?;
let firehose_frame_base64 = base64::encode(&bytes);
let data_url = format!("data:image/jpeg;base64,{}", firehose_frame_base64);
let found_points = most_recent_frame_data.found_points.clone();
ToClient {
firehose_frame_data_url: data_url,
found_points,
valid_display: most_recent_frame_data.valid_display.clone(),
annotations: most_recent_frame_data.annotations.clone(),
fno: self.fno,
ts_rfc3339: sent_time.to_rfc3339(),
ck: self.conn_key,
}
};
let buf = serde_json::to_string(&tc).expect("encode");
let buf = format!(
"event: {}\ndata: {}\n\n",
http_video_streaming_types::VIDEO_STREAM_EVENT_NAME,
buf
);
let hc = http_body::Frame::data(bytes::Bytes::from(buf));
match self.out.send(Ok(hc)).await {
Ok(()) => {}
Err(_) => {
tracing::info!("failed to send data to connection. dropping.");
}
}
self.ready_to_send = false;
}
}
self.frame_lifo = None;
Ok(())
}
}
struct TaskState {
per_sender_map: HashMap<ConnectionKey, PerSender>,
frame: Arc<Mutex<AnnotatedFrame>>,
}
fn _test_task_state_is_send() {
fn implements<T: Send>() {}
implements::<TaskState>();
}
impl TaskState {
async fn service(&mut self) -> Result<()> {
for ps in self.per_sender_map.values_mut() {
ps.service().await?;
}
Ok(())
}
fn handle_connection(&mut self, conn_evt: ConnectionEvent) -> Result<()> {
match conn_evt.typ {
ConnectionEventType::Connect(chunk_sender) => {
let ps = PerSender::new(chunk_sender, conn_evt.connection_key, self.frame.clone());
self.per_sender_map.insert(conn_evt.connection_key, ps);
}
ConnectionEventType::Disconnect => {
self.per_sender_map.remove(&conn_evt.connection_key);
}
}
Ok(())
}
fn handle_frame(&mut self, new_frame: AnnotatedFrame) -> Result<()> {
self.frame = Arc::new(Mutex::new(new_frame));
for ps in self.per_sender_map.values_mut() {
ps.push(self.frame.clone());
}
Ok(())
}
fn handle_callback(&mut self, callback: FirehoseCallback) -> Result<()> {
if let Some(ps) = self.per_sender_map.get_mut(&callback.inner.ck) {
ps.got_callback(callback)
} else {
tracing::warn!(
"Got firehose_callback for non-existant connection key. \
Did connection disconnect?"
);
}
Ok(())
}
}
pub async fn firehose_task(
connection_callback_rx: tokio::sync::mpsc::Receiver<ConnectionEvent>,
mut firehose_rx: tokio::sync::mpsc::Receiver<AnnotatedFrame>,
firehose_callback_rx: tokio::sync::mpsc::Receiver<FirehoseCallback>,
) -> Result<()> {
let first_frame = firehose_rx.recv().await.unwrap();
let frame = Arc::new(Mutex::new(first_frame));
let mut task_state = TaskState {
per_sender_map: HashMap::new(),
frame,
};
let mut connection_callback_rx =
tokio_stream::wrappers::ReceiverStream::new(connection_callback_rx);
let mut firehose_callback_rx =
tokio_stream::wrappers::ReceiverStream::new(firehose_callback_rx);
loop {
tokio::select! {
opt_new_connection = connection_callback_rx.next() => {
match opt_new_connection {
Some(new_connection) => {
task_state.handle_connection(new_connection)?;
}
None => {
tracing::debug!("new connection senders done.");
break;
}
}
}
opt_new_frame = firehose_rx.recv() => {
match opt_new_frame {
Some(new_frame) => {
task_state.handle_frame(new_frame)?;
}
None => {
tracing::debug!("new frame senders done.");
break;
}
}
},
opt_callback = firehose_callback_rx.next() => {
match opt_callback {
Some(callback) => {
task_state.handle_callback(callback)?;
}
None => {
tracing::debug!("new callback senders done.");
break;
}
}
},
}
task_state.service().await?; }
tracing::debug!("firehose task done.");
Ok(())
}