From 4ec5e3fb568f09e904f0cef36c7a4be9f8f47800 Mon Sep 17 00:00:00 2001 From: Joshua Higgins Date: Tue, 15 Oct 2024 13:42:48 -0400 Subject: [PATCH] Event polling not optimal but its tomorrow --- client/src/app.rs | 82 ++++++++++++++++++++++++++++++++++++++++++-- client/src/types.rs | 6 ++-- server/src/events.rs | 78 ++++++++--------------------------------- server/src/server.rs | 14 ++++++++ server/src/types.rs | 4 +-- 5 files changed, 114 insertions(+), 70 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 0c9742b..f717d4f 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -1,10 +1,14 @@ +use std::time::Duration; use tarpc::context; use tarpc::tokio_serde::formats::Json; use tokio::sync::broadcast; use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::task::JoinHandle; +use tokio::time::sleep; use tracing::log::*; use realm_auth::types::RealmAuthClient; -use realm_server::types::{RealmChatClient, Room, User}; +use realm_server::events::Event; +use realm_server::types::{RealmChatClient, Room}; use realm_shared::stoken; use realm_shared::types::ErrorCode::*; use realm_shared::types::ErrorCode; @@ -95,7 +99,12 @@ pub struct RealmApp { #[serde(skip)] pub delete_room_channel: (Sender>, Receiver>), #[serde(skip)] - pub room_changes_channel: (Sender), ErrorCode>>, Receiver), ErrorCode>>) + pub room_changes_channel: (Sender), ErrorCode>>, Receiver), ErrorCode>>), + + #[serde(skip)] + pub event_channel: (Sender<(String, (u32, Event))>, Receiver<(String, (u32, Event))>), + #[serde(skip)] + pub polling_threads: Vec<(String, JoinHandle<()>)>, } impl Default for RealmApp { @@ -142,6 +151,8 @@ impl Default for RealmApp { add_room_channel: broadcast::channel(256), delete_room_channel: broadcast::channel(256), room_changes_channel: broadcast::channel(256), + event_channel: broadcast::channel(256), + polling_threads: Vec::new(), } } } @@ -237,6 +248,8 @@ pub fn fetch_server_data(channel: Sender>, addresses: port, is_admin, is_owner, + last_event_index: 0, + messages: Vec::new(), rooms, })).unwrap(); }); @@ -534,6 +547,71 @@ impl eframe::App for RealmApp { Err(e) => error!("Error fetching room data: {:?}", e), } } + + // Polling events + while let Ok((serverid, (index, event))) = self.event_channel.1.try_recv() { + if let Some(active_servers) = &mut self.active_servers { + for server in active_servers { + if server.server_id.eq(&serverid) { + match event.clone() { + Event::NewMessage(message) => { + server.messages.push(message); + } + Event::NewRoom(room) => { + server.rooms.push(room); + } + Event::DeleteRoom(roomid) => { + server.rooms.retain(|r| !r.roomid.eq(&roomid)); + } + } + server.last_event_index = index; + } + } + } + } + + // Manage polling threads + if let Some(active_servers) = &mut self.active_servers { + if self.polling_threads.len() != active_servers.len() { + let running_thread_serverids = self.polling_threads.iter().map(|t| t.0.clone()).collect::>(); + let missing_servers = active_servers.clone().into_iter().filter(|s| !running_thread_serverids.contains(&s.server_id)).collect::>(); + for server in missing_servers { + let send_channel = self.event_channel.0.clone(); + let _handle = tokio::spawn(async move { + loop { + let mut transport = tarpc::serde_transport::tcp::connect(format!("{}:{}", server.domain, server.port), Json::default); + transport.config_mut().max_frame_length(usize::MAX); + + let result = transport.await; + let connection = match result { + Ok(connection) => connection, + Err(_) => { + break; + } + }; + + let client = RealmChatClient::new(tarpc::client::Config::default(), connection).spawn(); + + let result = client.poll_events_since( + context::current(), + server.last_event_index + ).await; + + match result { + Ok(events) => { + for event in events { + send_channel.send((server.server_id.clone(), (event.0, event.1))).unwrap(); + } + } + Err(_) => break, + } + + sleep(Duration::from_millis(100)).await; + } + }); + } + } + } // File -> Quit gui::top_panel(self, ctx); diff --git a/client/src/types.rs b/client/src/types.rs index 68316f3..104f98e 100644 --- a/client/src/types.rs +++ b/client/src/types.rs @@ -1,4 +1,4 @@ -use realm_server::types::{RealmChatClient, Room}; +use realm_server::types::{Message, RealmChatClient, Room}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct CUser { @@ -19,5 +19,7 @@ pub struct CServer { pub port: u16, pub is_admin: bool, pub is_owner: bool, - pub rooms: Vec + pub rooms: Vec, + pub last_event_index: u32, + pub messages: Vec, } \ No newline at end of file diff --git a/server/src/events.rs b/server/src/events.rs index b39d95e..7254d6b 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -1,64 +1,14 @@ -// use durian::bincode_packet; -// use crate::types::{Message, Room, User}; -// -// #[bincode_packet] -// pub struct Greet { -// pub id: u32 -// } -// -// #[bincode_packet] -// pub struct UserJoinedEvent { -// pub user: User, -// } -// -// #[bincode_packet] -// pub struct UserLeftEvent { -// pub user: User, -// } -// -// #[bincode_packet] -// pub struct NewMessageEvent { -// pub message: Message, -// } -// -// // #[bincode_packet] -// // pub struct StartTypingEvent { -// // pub user: User, -// // pub room: String, -// // } -// // -// // #[bincode_packet] -// // pub struct StopTypingEvent { -// // pub user: User, -// // pub room: String, -// // } -// -// #[bincode_packet] -// pub struct NewRoomEvent { -// pub room: Room, -// } -// -// #[bincode_packet] -// pub struct DeleteRoomEvent { -// pub roomid: String, -// } -// -// #[bincode_packet] -// pub struct KickedUserEvent { -// pub userid: String, -// } -// -// #[bincode_packet] -// pub struct BannedUserEvent { -// pub userid: String, -// } -// -// #[bincode_packet] -// pub struct PromotedUserEvent { -// pub userid: String, -// } -// -// #[bincode_packet] -// pub struct DemotedUserEvent { -// pub userid: String, -// } \ No newline at end of file +use crate::types::{Message, Room, User}; + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub enum Event { + // UserJoined(User), + // UserLeft(User), + NewMessage(Message), + NewRoom(Room), + DeleteRoom(String), + // KickedUser(KickedUser), + // BannedUser(BannedUser), + // PromotedUser(PromotedUser), + // DemotedUser(DemotedUser), +} \ No newline at end of file diff --git a/server/src/server.rs b/server/src/server.rs index 4141886..e25f114 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -25,6 +25,7 @@ pub struct RealmChatServer { pub db_pool: Pool, pub typing_users: Vec<(String, String)>, //NOTE: user.userid, room.roomid pub cache: Cache, + pub events: Vec<(u32, Event)>, } const FETCH_MESSAGE: &str = "SELECT message.*, @@ -46,6 +47,7 @@ impl RealmChatServer { .time_to_idle(Duration::from_secs(5*60)) .time_to_live(Duration::from_secs(60*60)) .build(), + events: Vec::new(), } } @@ -217,6 +219,18 @@ impl RealmChat for RealmChatServer { self.internal_is_user_owner(&userid).await } + async fn poll_events_since(self, _: Context, index: u32) -> Vec<(u32, Event)> { + let mut events_to_send = Vec::new(); + + for (i, event) in self.events { + if i > index { + events_to_send.push((i, event)); + } + } + + events_to_send + } + async fn join_server(self, _: Context, stoken: String, userid: String) -> Result { if !self.is_stoken_valid(&userid, &stoken).await { return Err(Unauthorized) diff --git a/server/src/types.rs b/server/src/types.rs index 4cd9e01..8977978 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -4,7 +4,7 @@ use sqlx::sqlite::SqliteRow; use tarpc::serde::{Deserialize, Serialize}; use realm_shared::types::ErrorCode; - +use crate::events::Event; use crate::types::MessageData::*; #[tarpc::service] @@ -14,7 +14,7 @@ pub trait RealmChat { async fn get_info() -> ServerInfo; async fn is_user_admin(stoken: String) -> bool; async fn is_user_owner(stoken: String) -> bool; - + async fn poll_events_since(index: u32) -> Vec<(u32, Event)>; async fn join_server(stoken: String, userid: String) -> Result; async fn leave_server(stoken: String, userid: String) -> Result<(), ErrorCode>;