Event polling

not optimal but its tomorrow
This commit is contained in:
2024-10-15 13:42:48 -04:00
Unverified
parent 3e31066a5f
commit 4ec5e3fb56
5 changed files with 114 additions and 70 deletions

View File

@@ -1,10 +1,14 @@
use std::time::Duration;
use tarpc::context; use tarpc::context;
use tarpc::tokio_serde::formats::Json; use tarpc::tokio_serde::formats::Json;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::broadcast::{Receiver, Sender};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::log::*; use tracing::log::*;
use realm_auth::types::RealmAuthClient; use realm_auth::types::RealmAuthClient;
use realm_server::types::{RealmChatClient, Room, User}; use realm_server::events::Event;
use realm_server::types::{RealmChatClient, Room};
use realm_shared::stoken; use realm_shared::stoken;
use realm_shared::types::ErrorCode::*; use realm_shared::types::ErrorCode::*;
use realm_shared::types::ErrorCode; use realm_shared::types::ErrorCode;
@@ -95,7 +99,12 @@ pub struct RealmApp {
#[serde(skip)] #[serde(skip)]
pub delete_room_channel: (Sender<Result<CServer, ErrorCode>>, Receiver<Result<CServer, ErrorCode>>), pub delete_room_channel: (Sender<Result<CServer, ErrorCode>>, Receiver<Result<CServer, ErrorCode>>),
#[serde(skip)] #[serde(skip)]
pub room_changes_channel: (Sender<Result<(CServer, Vec<Room>), ErrorCode>>, Receiver<Result<(CServer, Vec<Room>), ErrorCode>>) pub room_changes_channel: (Sender<Result<(CServer, Vec<Room>), ErrorCode>>, Receiver<Result<(CServer, Vec<Room>), ErrorCode>>),
#[serde(skip)]
pub event_channel: (Sender<(String, (u32, Event))>, Receiver<(String, (u32, Event))>),
#[serde(skip)]
pub polling_threads: Vec<(String, JoinHandle<()>)>,
} }
impl Default for RealmApp { impl Default for RealmApp {
@@ -142,6 +151,8 @@ impl Default for RealmApp {
add_room_channel: broadcast::channel(256), add_room_channel: broadcast::channel(256),
delete_room_channel: broadcast::channel(256), delete_room_channel: broadcast::channel(256),
room_changes_channel: broadcast::channel(256), room_changes_channel: broadcast::channel(256),
event_channel: broadcast::channel(256),
polling_threads: Vec::new(),
} }
} }
} }
@@ -237,6 +248,8 @@ pub fn fetch_server_data(channel: Sender<Result<CServer, ErrorCode>>, addresses:
port, port,
is_admin, is_admin,
is_owner, is_owner,
last_event_index: 0,
messages: Vec::new(),
rooms, rooms,
})).unwrap(); })).unwrap();
}); });
@@ -534,6 +547,71 @@ impl eframe::App for RealmApp {
Err(e) => error!("Error fetching room data: {:?}", e), Err(e) => error!("Error fetching room data: {:?}", e),
} }
} }
// Polling events
while let Ok((serverid, (index, event))) = self.event_channel.1.try_recv() {
if let Some(active_servers) = &mut self.active_servers {
for server in active_servers {
if server.server_id.eq(&serverid) {
match event.clone() {
Event::NewMessage(message) => {
server.messages.push(message);
}
Event::NewRoom(room) => {
server.rooms.push(room);
}
Event::DeleteRoom(roomid) => {
server.rooms.retain(|r| !r.roomid.eq(&roomid));
}
}
server.last_event_index = index;
}
}
}
}
// Manage polling threads
if let Some(active_servers) = &mut self.active_servers {
if self.polling_threads.len() != active_servers.len() {
let running_thread_serverids = self.polling_threads.iter().map(|t| t.0.clone()).collect::<Vec<String>>();
let missing_servers = active_servers.clone().into_iter().filter(|s| !running_thread_serverids.contains(&s.server_id)).collect::<Vec<CServer>>();
for server in missing_servers {
let send_channel = self.event_channel.0.clone();
let _handle = tokio::spawn(async move {
loop {
let mut transport = tarpc::serde_transport::tcp::connect(format!("{}:{}", server.domain, server.port), Json::default);
transport.config_mut().max_frame_length(usize::MAX);
let result = transport.await;
let connection = match result {
Ok(connection) => connection,
Err(_) => {
break;
}
};
let client = RealmChatClient::new(tarpc::client::Config::default(), connection).spawn();
let result = client.poll_events_since(
context::current(),
server.last_event_index
).await;
match result {
Ok(events) => {
for event in events {
send_channel.send((server.server_id.clone(), (event.0, event.1))).unwrap();
}
}
Err(_) => break,
}
sleep(Duration::from_millis(100)).await;
}
});
}
}
}
// File -> Quit // File -> Quit
gui::top_panel(self, ctx); gui::top_panel(self, ctx);

View File

@@ -1,4 +1,4 @@
use realm_server::types::{RealmChatClient, Room}; use realm_server::types::{Message, RealmChatClient, Room};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct CUser { pub struct CUser {
@@ -19,5 +19,7 @@ pub struct CServer {
pub port: u16, pub port: u16,
pub is_admin: bool, pub is_admin: bool,
pub is_owner: bool, pub is_owner: bool,
pub rooms: Vec<Room> pub rooms: Vec<Room>,
pub last_event_index: u32,
pub messages: Vec<Message>,
} }

View File

@@ -1,64 +1,14 @@
// use durian::bincode_packet; use crate::types::{Message, Room, User};
// use crate::types::{Message, Room, User};
// #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
// #[bincode_packet] pub enum Event {
// pub struct Greet { // UserJoined(User),
// pub id: u32 // UserLeft(User),
// } NewMessage(Message),
// NewRoom(Room),
// #[bincode_packet] DeleteRoom(String),
// pub struct UserJoinedEvent { // KickedUser(KickedUser),
// pub user: User, // BannedUser(BannedUser),
// } // PromotedUser(PromotedUser),
// // DemotedUser(DemotedUser),
// #[bincode_packet] }
// pub struct UserLeftEvent {
// pub user: User,
// }
//
// #[bincode_packet]
// pub struct NewMessageEvent {
// pub message: Message,
// }
//
// // #[bincode_packet]
// // pub struct StartTypingEvent {
// // pub user: User,
// // pub room: String,
// // }
// //
// // #[bincode_packet]
// // pub struct StopTypingEvent {
// // pub user: User,
// // pub room: String,
// // }
//
// #[bincode_packet]
// pub struct NewRoomEvent {
// pub room: Room,
// }
//
// #[bincode_packet]
// pub struct DeleteRoomEvent {
// pub roomid: String,
// }
//
// #[bincode_packet]
// pub struct KickedUserEvent {
// pub userid: String,
// }
//
// #[bincode_packet]
// pub struct BannedUserEvent {
// pub userid: String,
// }
//
// #[bincode_packet]
// pub struct PromotedUserEvent {
// pub userid: String,
// }
//
// #[bincode_packet]
// pub struct DemotedUserEvent {
// pub userid: String,
// }

View File

@@ -25,6 +25,7 @@ pub struct RealmChatServer {
pub db_pool: Pool<Sqlite>, pub db_pool: Pool<Sqlite>,
pub typing_users: Vec<(String, String)>, //NOTE: user.userid, room.roomid pub typing_users: Vec<(String, String)>, //NOTE: user.userid, room.roomid
pub cache: Cache<String, String>, pub cache: Cache<String, String>,
pub events: Vec<(u32, Event)>,
} }
const FETCH_MESSAGE: &str = "SELECT message.*, const FETCH_MESSAGE: &str = "SELECT message.*,
@@ -46,6 +47,7 @@ impl RealmChatServer {
.time_to_idle(Duration::from_secs(5*60)) .time_to_idle(Duration::from_secs(5*60))
.time_to_live(Duration::from_secs(60*60)) .time_to_live(Duration::from_secs(60*60))
.build(), .build(),
events: Vec::new(),
} }
} }
@@ -217,6 +219,18 @@ impl RealmChat for RealmChatServer {
self.internal_is_user_owner(&userid).await self.internal_is_user_owner(&userid).await
} }
async fn poll_events_since(self, _: Context, index: u32) -> Vec<(u32, Event)> {
let mut events_to_send = Vec::new();
for (i, event) in self.events {
if i > index {
events_to_send.push((i, event));
}
}
events_to_send
}
async fn join_server(self, _: Context, stoken: String, userid: String) -> Result<User, ErrorCode> { async fn join_server(self, _: Context, stoken: String, userid: String) -> Result<User, ErrorCode> {
if !self.is_stoken_valid(&userid, &stoken).await { if !self.is_stoken_valid(&userid, &stoken).await {
return Err(Unauthorized) return Err(Unauthorized)

View File

@@ -4,7 +4,7 @@ use sqlx::sqlite::SqliteRow;
use tarpc::serde::{Deserialize, Serialize}; use tarpc::serde::{Deserialize, Serialize};
use realm_shared::types::ErrorCode; use realm_shared::types::ErrorCode;
use crate::events::Event;
use crate::types::MessageData::*; use crate::types::MessageData::*;
#[tarpc::service] #[tarpc::service]
@@ -14,7 +14,7 @@ pub trait RealmChat {
async fn get_info() -> ServerInfo; async fn get_info() -> ServerInfo;
async fn is_user_admin(stoken: String) -> bool; async fn is_user_admin(stoken: String) -> bool;
async fn is_user_owner(stoken: String) -> bool; async fn is_user_owner(stoken: String) -> bool;
async fn poll_events_since(index: u32) -> Vec<(u32, Event)>;
async fn join_server(stoken: String, userid: String) -> Result<User, ErrorCode>; async fn join_server(stoken: String, userid: String) -> Result<User, ErrorCode>;
async fn leave_server(stoken: String, userid: String) -> Result<(), ErrorCode>; async fn leave_server(stoken: String, userid: String) -> Result<(), ErrorCode>;