From c2ec816188cb948262688b9a2840bbf2cfc8b927 Mon Sep 17 00:00:00 2001 From: Joshua Higgins Date: Sat, 28 Sep 2024 13:01:53 -0400 Subject: [PATCH] Removing "name", Owner role, Promoting, Live events! --- auth/src/server.rs | 1 - .../20240727041731_create_everything.sql | 5 +- server/src/events.rs | 60 ++++- server/src/main.rs | 52 ++-- server/src/server.rs | 227 ++++++++++++++---- server/src/types.rs | 18 +- 6 files changed, 285 insertions(+), 78 deletions(-) diff --git a/auth/src/server.rs b/auth/src/server.rs index c1e5717..c0bb4ce 100644 --- a/auth/src/server.rs +++ b/auth/src/server.rs @@ -354,7 +354,6 @@ impl RealmAuth for RealmAuthServer { Ok(()) } - // TODO: find a way of supporting, post-capstone work // async fn change_username(self, _: Context, username: String, token: String, new_username: String) -> Result<(), ErrorCode> { // info!("API Request: change_username( username -> {}, token -> {}, new_username -> {} )", username, token, new_username); // diff --git a/server/migrations/20240727041731_create_everything.sql b/server/migrations/20240727041731_create_everything.sql index 88bd459..a50e8b9 100644 --- a/server/migrations/20240727041731_create_everything.sql +++ b/server/migrations/20240727041731_create_everything.sql @@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS room ( id INTEGER PRIMARY KEY, roomid VARCHAR(255) NOT NULL, - name VARCHAR(255) NOT NULL, +-- name VARCHAR(255) NOT NULL, admin_only_send BOOL NOT NULL, admin_only_view BOOL NOT NULL ); @@ -11,7 +11,8 @@ CREATE TABLE IF NOT EXISTS user ( id INTEGER PRIMARY KEY, userid VARCHAR(255) NOT NULL, name VARCHAR(255) NOT NULL, - online BOOL NOT NULL, +-- online BOOL NOT NULL, + owner BOOL NOT NULL, admin BOOL NOT NULL ); diff --git a/server/src/events.rs b/server/src/events.rs index 9dcf5cd..9ed25d4 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -1,6 +1,64 @@ use durian::bincode_packet; +use crate::types::{Message, Room, User}; #[bincode_packet] pub struct Greet { - id: u32 + pub id: u32 +} + +#[bincode_packet] +pub struct UserJoinedEvent { + pub user: User, +} + +#[bincode_packet] +pub struct UserLeftEvent { + pub user: User, +} + +#[bincode_packet] +pub struct NewMessageEvent { + pub message: Message, +} + +#[bincode_packet] +pub struct StartTypingEvent { + pub user: User, + pub room: String, +} + +#[bincode_packet] +pub struct StopTypingEvent { + pub user: User, + pub room: String, +} + +#[bincode_packet] +pub struct NewRoomEvent { + pub room: Room, +} + +#[bincode_packet] +pub struct DeleteRoomEvent { + pub roomid: String, +} + +#[bincode_packet] +pub struct KickedUserEvent { + pub userid: String, +} + +#[bincode_packet] +pub struct BannedUserEvent { + pub userid: String, +} + +#[bincode_packet] +pub struct PromotedUserEvent { + pub userid: String, +} + +#[bincode_packet] +pub struct DemotedUserEvent { + pub userid: String, } \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 710b265..7261a63 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,7 @@ use std::env; use std::future::Future; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use std::sync::{Arc}; use dotenvy::dotenv; use durian::{PacketManager, ServerConfig}; use futures::future::{self}; @@ -13,9 +14,9 @@ use tarpc::{ }; use tarpc::server::incoming::Incoming; use tarpc::server::BaseChannel; +use tokio::sync::Mutex; use tracing::{info, subscriber, warn}; -use realm_server::events; -use realm_server::events::{GreetPacketBuilder}; +use realm_server::events::*; use realm_server::server::RealmChatServer; use realm_server::types::RealmChat; @@ -35,7 +36,7 @@ async fn main() -> anyhow::Result<()> { .with_target(false) .finish(); - subscriber::set_global_default(subscriber).unwrap(); + subscriber::set_global_default(subscriber)?; let database_url: &str = &env::var("DATABASE_URL").expect("DATABASE_URL must be set"); @@ -49,28 +50,47 @@ async fn main() -> anyhow::Result<()> { warn!("Database already exists"); } // TODO: Do in Docker with Sqlx-cli - let db_pool = SqlitePool::connect(database_url).await.unwrap(); + let db_pool = SqlitePool::connect(database_url).await?; info!("Running migrations..."); migrate!().run(&db_pool).await?; // TODO: Do in Docker with Sqlx-cli info!("Migrations complete!"); - let port = env::var("PORT").expect("PORT must be set").parse::().unwrap(); + let port = env::var("PORT").expect("PORT must be set").parse::()?; let server_addr = (IpAddr::V6(Ipv6Addr::LOCALHOST), port); - let mut manager = PacketManager::new(); - manager.init_server( + let mut inner_manager = PacketManager::new_for_async(); + + inner_manager.register_receive_packet::(GreetPacketBuilder)?; + inner_manager.register_receive_packet::(UserJoinedEventPacketBuilder)?; + inner_manager.register_receive_packet::(UserLeftEventPacketBuilder)?; + inner_manager.register_receive_packet::(NewMessageEventPacketBuilder)?; + inner_manager.register_receive_packet::(StartTypingEventPacketBuilder)?; + inner_manager.register_receive_packet::(StopTypingEventPacketBuilder)?; + inner_manager.register_receive_packet::(NewRoomEventPacketBuilder)?; + inner_manager.register_receive_packet::(DeleteRoomEventPacketBuilder)?; + inner_manager.register_receive_packet::(KickedUserEventPacketBuilder)?; + inner_manager.register_receive_packet::(BannedUserEventPacketBuilder)?; + + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + inner_manager.register_send_packet::()?; + + inner_manager.async_init_server( ServerConfig::new( SocketAddr::from((IpAddr::V6(Ipv6Addr::LOCALHOST), port-1)).to_string(), - 0, None, 2, 256))?; + 0, None, 10, 10)).await?; + + let manager = Arc::new(Mutex::new(inner_manager)); + info!("Listening on port {}", port-1); - let callback = |_| { - - }; - - manager.register_receive_packet::(GreetPacketBuilder).unwrap(); - - // JSON transport is provided by the json_transport tarpc module. It makes it easy // to start up a serde-powered json serialization strategy over TCP. let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?; @@ -85,7 +105,7 @@ async fn main() -> anyhow::Result<()> { // serve is generated by the service attribute. It takes as input any type implementing // the generated World trait. .map(|channel| { - let server = RealmChatServer::new(env::var("SERVER_ID").expect("SERVER_ID must be set"), channel.transport().peer_addr().unwrap(), db_pool.clone()); + let server = RealmChatServer::new(env::var("SERVER_ID").expect("SERVER_ID must be set"), channel.transport().peer_addr().unwrap(), db_pool.clone(), manager.clone()); channel.execute(server.serve()).for_each(spawn) }) // Max 10 channels. diff --git a/server/src/server.rs b/server/src/server.rs index 58e87c6..3fb6701 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -1,17 +1,20 @@ use std::env; -use std::net::{Ipv6Addr, SocketAddr}; +use std::net::SocketAddr; +use std::sync::{Arc}; use std::time::Duration; use chrono::{DateTime, Utc}; +use durian::PacketManager; use moka::future::Cache; use sqlx::{FromRow, Pool, query_as, Sqlite}; use sqlx::query; use tarpc::context::Context; use tarpc::tokio_serde::formats::Json; +use tokio::sync::Mutex; use tracing::error; use realm_auth::types::RealmAuthClient; use realm_shared::types::ErrorCode::*; use realm_shared::types::ErrorCode; - +use crate::events::*; use crate::types::{Attachment, Edit, FromRows, Message, MessageData, Reaction, RealmChat, Redaction, Reply, ReplyChain, Room, User}; #[derive(Clone)] @@ -23,15 +26,18 @@ pub struct RealmChatServer { pub db_pool: Pool, pub typing_users: Vec<(String, String)>, //NOTE: user.userid, room.roomid pub cache: Cache, + pub packet_manager: Arc>, } const FETCH_MESSAGE: &str = "SELECT message.*, - room.id AS 'room_id', room.roomid AS 'room_roomid', room.name AS 'room_name', room.admin_only_send AS 'room_admin_only_send', room.admin_only_view AS 'room_admin_only_view', - user.id AS 'user_id', user.userid AS 'user_userid', user.name AS 'user_name', user.online AS 'user_online', user.admin AS 'user_admin' + room.id AS 'room_id', room.roomid AS 'room_roomid', room.admin_only_send AS 'room_admin_only_send', room.admin_only_view AS 'room_admin_only_view', + user.id AS 'user_id', user.userid AS 'user_userid', user.name AS 'user_name', user.owner AS 'user_owner', user.admin AS 'user_admin' FROM message INNER JOIN room ON message.room = room.id INNER JOIN user ON message.user = user.id WHERE room.admin_only_view = ? OR false"; +// room.name AS 'room_name', +// user.online AS 'user_online', impl RealmChatServer { - pub fn new(server_id: String, socket: SocketAddr, db_pool: Pool) -> RealmChatServer { + pub fn new(server_id: String, socket: SocketAddr, db_pool: Pool, packet_manager: Arc>) -> RealmChatServer { RealmChatServer { server_id, port: env::var("PORT").unwrap().parse::().unwrap(), @@ -43,7 +49,8 @@ impl RealmChatServer { .max_capacity(10_000) .time_to_idle(Duration::from_secs(5*60)) .time_to_live(Duration::from_secs(60*60)) - .build() + .build(), + packet_manager, } } @@ -105,6 +112,22 @@ impl RealmChatServer { false } + async fn is_user_owner(&self, stoken: &str) -> bool { + if let Some(userid) = self.cache.get(stoken).await { + let result = query!("SELECT owner FROM user WHERE userid = ?", userid).fetch_one(&self.db_pool).await; + return match result { + Ok(record) => { + if record.owner { + return true + } + false + } + Err(_) => false + } + } + false + } + async fn is_user_in_server(&self, userid: &str) -> bool { let result = query!("SELECT NOT EXISTS (SELECT 1 FROM user WHERE userid = ?) AS does_exist", userid).fetch_one(&self.db_pool).await; @@ -172,6 +195,15 @@ impl RealmChatServer { Err(_) => Err(UserNotFound), } } + + async fn inner_get_all_users(&self) -> Result, ErrorCode> { + let result = query_as!(User, "SELECT * FROM user").fetch_all(&self.db_pool).await; + + match result { + Ok(users) => Ok(users), + Err(_) => Err(Error), + } + } async fn inner_get_message(&self, stoken: &str, id: i64) -> Result { let is_admin = self.is_user_admin(&stoken).await; @@ -192,7 +224,7 @@ impl RealmChat for RealmChatServer { format!("Hello, {name}!") } - async fn join_server(self, _: Context, stoken: String, user: User) -> Result<(), ErrorCode> { + async fn join_server(self, _: Context, stoken: String, user: User) -> Result { if !self.is_stoken_valid(&user.userid, &stoken).await { return Err(Unauthorized) } @@ -200,15 +232,28 @@ impl RealmChat for RealmChatServer { if self.is_user_in_server(&user.userid).await { return Err(AlreadyJoinedServer) } - - let result = query!( - "INSERT INTO user (userid, name, online, admin) VALUES (?,?,?,?)", - user.userid, user.name, false, false).execute(&self.db_pool).await; - //TODO: tell everyone + let is_owner = { + let all_users = self.inner_get_all_users().await?; + all_users.is_empty() + }; + + let result = query!("INSERT INTO user (userid, name, owner, admin) VALUES (?,?,?,?)", user.userid, user.name, is_owner, is_owner).execute(&self.db_pool).await; + match result { - Ok(_) => Ok(()), + Ok(_) => { + let new_user = self.inner_get_user(&user.userid).await?; + + let result = self.packet_manager.lock().await.broadcast(UserJoinedEvent { + user: new_user.clone(), + }); + if result.is_err() { + error!("Error broadcasting UserJoinedEvent!"); + } + + Ok(new_user) + }, Err(_) => Err(MalformedDBResponse), } } @@ -223,11 +268,19 @@ impl RealmChat for RealmChatServer { } let result = query!("DELETE FROM user WHERE userid = ?",user.userid).execute(&self.db_pool).await; - - //TODO: tell everyone match result { - Ok(_) => Ok(()), + Ok(_) => { + let result = self.packet_manager.lock().await.broadcast(UserLeftEvent { + user: user.clone(), + }); + + if result.is_err() { + error!("Error broadcasting UserLeftEvent!"); + } + + Ok(()) + }, Err(_) => Err(MalformedDBResponse), } } @@ -238,7 +291,7 @@ impl RealmChat for RealmChatServer { } // Assert all the data in message is correct - message.user = self.inner_get_user(&message.user.userid).await.unwrap(); + message.user = self.inner_get_user(&message.user.userid).await?; match &message.data { // Check that the sender is the owner of the referencing msg MessageData::Edit(e) => { @@ -268,7 +321,7 @@ impl RealmChat for RealmChatServer { return Err(RoomNotFound) } - message.room = self.inner_get_room(&stoken, &message.room.roomid).await.unwrap(); + message.room = self.inner_get_room(&stoken, &message.room.roomid).await?; let result = match &message.data { MessageData::Text(text) => { @@ -300,8 +353,14 @@ impl RealmChat for RealmChatServer { }; match result { - Ok(id) => { - //TODO: Tell everyone + Ok(_) => { + let result = self.packet_manager.lock().await.broadcast(NewMessageEvent { + message: message.clone(), + }); + + if result.is_err() { + error!("Error broadcasting NewMessageEvent!"); + } Ok(message) }, @@ -309,7 +368,7 @@ impl RealmChat for RealmChatServer { } } - async fn start_typing(self, _: Context, stoken: String, userid: String, roomid: String) -> ErrorCode { //TODO: auth for all of these + async fn start_typing(self, _: Context, stoken: String, userid: String, roomid: String) -> ErrorCode { todo!() } @@ -379,35 +438,37 @@ impl RealmChat for RealmChatServer { } async fn get_users(self, _: Context) -> Result, ErrorCode> { - let result = query_as!(User, "SELECT * FROM user").fetch_all(&self.db_pool).await; - - match result { - Ok(users) => Ok(users), - Err(_) => Err(Error), - } + self.inner_get_all_users().await } - async fn get_online_users(self, _: Context) -> Result, ErrorCode> { - let result = query_as!(User, "SELECT * FROM user WHERE online = true").fetch_all(&self.db_pool).await; - - match result { - Ok(users) => Ok(users), - Err(_) => Err(Error), - } - } + // async fn get_online_users(self, _: Context) -> Result, ErrorCode> { + // let result = query_as!(User, "SELECT * FROM user WHERE online = true").fetch_all(&self.db_pool).await; + // + // match result { + // Ok(users) => Ok(users), + // Err(_) => Err(Error), + // } + // } async fn create_room(self, _: Context, stoken: String, room: Room) -> Result { if !self.is_user_admin(&stoken).await { return Err(Unauthorized) } - let result = query!("INSERT INTO room (roomid, name, admin_only_send, admin_only_view) VALUES (?,?,?,?)", - room.roomid, room.name, room.admin_only_send, room.admin_only_view) + let result = query!("INSERT INTO room (roomid, admin_only_send, admin_only_view) VALUES (?,?,?)", + room.roomid, room.admin_only_send, room.admin_only_view) .execute(&self.db_pool).await; match result { Ok(_) => { - // TODO: tell everyone + let result = self.packet_manager.lock().await.broadcast(NewRoomEvent { + room: room.clone(), + }); + + if result.is_err() { + error!("Error broadcasting NewRoomEvent!"); + } + Ok(room) } Err(_) => Err(MalformedDBResponse) @@ -423,23 +484,76 @@ impl RealmChat for RealmChatServer { match result { Ok(_) => { - // TODO: tell everyone + let result = self.packet_manager.lock().await.broadcast(DeleteRoomEvent { + roomid, + }); + + if result.is_err() { + error!("Error broadcasting DeleteRoomEvent!"); + } + Ok(()) } Err(_) => Err(MalformedDBResponse) } } - async fn rename_room(self, _: Context, stoken: String, roomid: String, new_name: String) -> Result<(), ErrorCode> { - if !self.is_user_admin(&stoken).await { + // async fn rename_room(self, _: Context, stoken: String, roomid: String, new_name: String) -> Result<(), ErrorCode> { + // if !self.is_user_admin(&stoken).await { + // return Err(Unauthorized) + // } + // + // let result = query!("UPDATE room SET name = ? WHERE roomid = ?", new_name, roomid).execute(&self.db_pool).await; + // + // match result { + // Ok(_) => { + // // TODO: tell everyone + // Ok(()) + // } + // Err(_) => Err(MalformedDBResponse) + // } + // } + + async fn promote_user(self, _: Context, stoken: String, userid: String) -> Result<(), ErrorCode> { + if !self.is_user_owner(&stoken).await { return Err(Unauthorized) } - - let result = query!("UPDATE room SET name = ? WHERE roomid = ?", new_name, roomid).execute(&self.db_pool).await; - + + let result = query!("UPDATE user SET admin = true WHERE userid = ?", userid).execute(&self.db_pool).await; + match result { Ok(_) => { - // TODO: tell everyone + let result = self.packet_manager.lock().await.broadcast(PromotedUserEvent { + userid, + }); + + if result.is_err() { + error!("Error broadcasting PromotedUserEvent!"); + } + + Ok(()) + } + Err(_) => Err(MalformedDBResponse) + } + } + + async fn demote_user(self, _: Context, stoken: String, userid: String) -> Result<(), ErrorCode> { + if !self.is_user_owner(&stoken).await { + return Err(Unauthorized) + } + + let result = query!("UPDATE user SET admin = false WHERE userid = ?", userid).execute(&self.db_pool).await; + + match result { + Ok(_) => { + let result = self.packet_manager.lock().await.broadcast(DemotedUserEvent { + userid, + }); + + if result.is_err() { + error!("Error broadcasting DemotedUserEvent!"); + } + Ok(()) } Err(_) => Err(MalformedDBResponse) @@ -455,7 +569,14 @@ impl RealmChat for RealmChatServer { match result { Ok(_) => { - // TODO: tell everyone + let result = self.packet_manager.lock().await.broadcast(KickedUserEvent { + userid, + }); + + if result.is_err() { + error!("Error broadcasting KickedUserEvent!"); + } + Ok(()) } Err(_) => Err(MalformedDBResponse) @@ -472,7 +593,14 @@ impl RealmChat for RealmChatServer { match result { Ok(_) => { - // TODO: tell everyone + let result = self.packet_manager.lock().await.broadcast(BannedUserEvent { + userid, + }); + + if result.is_err() { + error!("Error broadcasting BannedUserEvent!"); + } + Ok(()) } Err(_) => Err(MalformedDBResponse) @@ -487,10 +615,7 @@ impl RealmChat for RealmChatServer { let result = query!("DELETE FROM banned WHERE userid = ?", userid).execute(&self.db_pool).await; match result { - Ok(_) => { - // TODO: tell everyone - Ok(()) - } + Ok(_) => Ok(()), Err(_) => Err(MalformedDBResponse) } } diff --git a/server/src/types.rs b/server/src/types.rs index 20ad024..c119c48 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -11,7 +11,7 @@ use crate::types::MessageData::*; pub trait RealmChat { async fn test(name: String) -> String; - async fn join_server(stoken: String, user: User) -> Result<(), ErrorCode>; + async fn join_server(stoken: String, user: User) -> Result; async fn leave_server(stoken: String, user: User) -> Result<(), ErrorCode>; //NOTE: Any user authorized as themselves @@ -29,10 +29,12 @@ pub trait RealmChat { async fn get_room(stoken: String, roomid: String) -> Result; async fn get_user(userid: String) -> Result; async fn get_users() -> Result, ErrorCode>; - async fn get_online_users() -> Result, ErrorCode>; + // async fn get_online_users() -> Result, ErrorCode>; async fn create_room(stoken: String, room: Room) -> Result; async fn delete_room(stoken: String, roomid: String) -> Result<(), ErrorCode>; - async fn rename_room(stoken: String, roomid: String, new_name: String) -> Result<(), ErrorCode>; + // async fn rename_room(stoken: String, roomid: String, new_name: String) -> Result<(), ErrorCode>; + async fn promote_user(stoken: String, userid: String) -> Result<(), ErrorCode>; + async fn demote_user(stoken: String, userid: String) -> Result<(), ErrorCode>; async fn kick_user(stoken: String, userid: String) -> Result<(), ErrorCode>; async fn ban_user(stoken: String, userid: String) -> Result<(), ErrorCode>; async fn pardon_user(stoken: String, userid: String) -> Result<(), ErrorCode>; @@ -73,13 +75,14 @@ impl FromRow<'_, SqliteRow> for Message { id: row.try_get("user_id")?, userid: row.try_get("user_userid")?, name: row.try_get("user_name")?, - online: row.try_get("user_online")?, + // online: row.try_get("user_online")?, + owner: row.try_get("user_owner")?, admin: row.try_get("user_admin")?, }, room: Room { id: row.try_get("room_id")?, roomid: row.try_get("room_roomid")?, - name: row.try_get("room_name")?, + // name: row.try_get("room_name")?, admin_only_send: row.try_get("room_admin_only_send")?, admin_only_view: row.try_get("room_admin_only_view")?, }, @@ -153,7 +156,8 @@ pub struct User { pub id: i64, pub userid: String, pub name: String, - pub online: bool, + // pub online: bool, + pub owner: bool, pub admin: bool, } @@ -161,7 +165,7 @@ pub struct User { pub struct Room { pub id: i64, pub roomid: String, - pub name: String, + // pub name: String, pub admin_only_send: bool, pub admin_only_view: bool, }