diff --git a/server/Cargo.toml b/server/Cargo.toml index 0f86cba..3e6b489 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -9,10 +9,11 @@ futures = "0.3.30" tarpc = { version = "0.34.0", features = ["full"] } tokio = { version = "1.39.1", features = ["macros", "net", "rt-multi-thread"] } tracing = "0.1.40" +tracing-subscriber = "0.3.18" serde = { version = "1.0.204", features = ["derive"] } emojis = "0.6.3" chrono = { version = "0.4.38", features = ["serde"] } -sqlx = { version = "0.8.0", features = [ "runtime-tokio", "tls-rustls", "mysql", "chrono" ] } +sqlx = { version = "0.8.0", features = [ "runtime-tokio", "tls-rustls", "sqlite", "chrono" ] } dotenvy = "0.15.7" realm_auth = { path = "../auth" } realm_shared = { path = "../shared" } \ No newline at end of file diff --git a/server/migrations/20240727041731_create_everything.sql b/server/migrations/20240727041731_create_everything.sql new file mode 100644 index 0000000..1f4d4c7 --- /dev/null +++ b/server/migrations/20240727041731_create_everything.sql @@ -0,0 +1,27 @@ +-- Add migration script here +CREATE TABLE IF NOT EXISTS room ( + id INTEGER PRIMARY KEY, + roomid VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + admin_only_send BOOL NOT NULL, + admin_only_view BOOL NOT NULL + ); + +CREATE TABLE IF NOT EXISTS user ( + id INTEGER PRIMARY KEY, + userid VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + online BOOL NOT NULL, + admin BOOL NOT NULL + ); + +CREATE TABLE IF NOT EXISTS message ( + id INTEGER PRIMARY KEY, + timestamp DATETIME NOT NULL, + user INT NOT NULL, + room INT NOT NULL, + msg_type VARCHAR CHECK( msg_type IN ('text', 'attachment', 'reply', 'edit', 'reaction', 'redaction')) NOT NULL, + msg_text TEXT, + referencing_id INTEGER, + emoji TEXT + ); \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 9bd11e5..5b9d865 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,13 +4,16 @@ use std::net::{IpAddr, Ipv6Addr}; use dotenvy::dotenv; use futures::future::{self}; use futures::StreamExt; -use sqlx::mysql::MySqlPoolOptions; +use sqlx::migrate::MigrateDatabase; +use sqlx::{migrate, Sqlite, SqlitePool}; +use sqlx::sqlite::SqlitePoolOptions; use tarpc::{ server::{Channel}, tokio_serde::formats::Json, }; use tarpc::server::incoming::Incoming; use tarpc::server::BaseChannel; +use tracing::{info, subscriber, warn}; use realm_server::server::RealmChatServer; use realm_server::types::RealmChat; @@ -21,56 +24,41 @@ async fn spawn(fut: impl Future + Send + 'static) { #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv().ok(); - - let db_pool = MySqlPoolOptions::new() - .max_connections(64) - .connect(env::var("DATABASE_URL").expect("DATABASE_URL must be set").as_str()).await?; - - sqlx::query( - "CREATE DATABASE IF NOT EXISTS realmchat; USE realmchat;" - ).fetch_one(&db_pool).await?; - sqlx::query( - "CREATE TABLE IF NOT EXISTS room ( - id SERIAL, - room_id VARCHAR(255) NOT NULL, - name VARCHAR(255) NOT NULL, - admin_only_send BOOL NOT NULL, - admin_only_view BOOL NOT NULL - );" - ).execute(&db_pool).await?; + let subscriber = tracing_subscriber::fmt() + .compact() + .with_file(true) + .with_line_number(true) + .with_thread_ids(true) + .with_target(false) + .finish(); - sqlx::query( - "CREATE TABLE IF NOT EXISTS user ( - id SERIAL, - user_id VARCHAR(255) NOT NULL, - name VARCHAR(255) NOT NULL, - online BOOL NOT NULL, - admin BOOL NOT NULL - );" - ).execute(&db_pool).await?; + subscriber::set_global_default(subscriber).unwrap(); - sqlx::query( - "CREATE TABLE IF NOT EXISTS message ( - id SERIAL, - timestamp DATETIME NOT NULL, - user INT NOT NULL, - room INT NOT NULL, - type ENUM('text', 'attachment', 'reply', 'edit', 'reaction', 'redaction') NOT NULL, + let database_url: &str = &env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - msgText TEXT, - referencingID INT, - emoji TEXT, - redaction BOOL - );" - ).execute(&db_pool).await?; + if !Sqlite::database_exists(database_url).await.unwrap_or(false) { + info!("Creating database {}", database_url); + match Sqlite::create_database(database_url).await { + Ok(_) => info!("Create db success"), + Err(error) => panic!("error: {}", error), + } + } else { + warn!("Database already exists"); + } // TODO: Do in Docker with Sqlx-cli + + let db_pool = SqlitePool::connect(database_url).await.unwrap(); + + info!("Running migrations..."); + migrate!().run(&db_pool).await?; // TODO: Do in Docker with Sqlx-cli + info!("Migrations complete!"); let server_addr = (IpAddr::V6(Ipv6Addr::LOCALHOST), env::var("PORT").expect("PORT must be set").parse::().unwrap()); // JSON transport is provided by the json_transport tarpc module. It makes it easy // to start up a serde-powered json serialization strategy over TCP. let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?; - tracing::info!("Listening on port {}", listener.local_addr().port()); + info!("Listening on port {}", listener.local_addr().port()); listener.config_mut().max_frame_length(usize::MAX); listener // Ignore accept errors. diff --git a/server/src/server.rs b/server/src/server.rs index 66c129c..32017a1 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -1,17 +1,20 @@ use std::net::SocketAddr; + use chrono::{DateTime, Utc}; -use sqlx::{Error, MySql, Pool, Row}; -use sqlx::mysql::MySqlRow; +use sqlx::{FromRow, Pool, query_as, Sqlite}; +use sqlx::query; use tarpc::context::Context; -use crate::types::{Edit, Message, MessageData, Reaction, RealmChat, Redaction, Reply, Room, User}; + use realm_shared::types::ErrorCode::*; use realm_shared::types::ErrorCode; +use crate::types::{Message, MessageData, RealmChat, Room, User}; + #[derive(Clone)] pub struct RealmChatServer { pub server_id: String, pub socket: SocketAddr, - pub db_pool: Pool, + pub db_pool: Pool, pub typing_users: Vec<(u32, u32)> //NOTE: userid, roomid } //TODO: Cache for auth @@ -25,29 +28,29 @@ impl RealmChat for RealmChatServer { let result = match &message.data { MessageData::Text(text) => { - sqlx::query("INSERT INTO message (timestamp, user, room, type, msgText) VALUES (?, ?, ?, 'text', ?)") - .bind(message.timestamp).bind(message.user.id).bind(message.room.id).bind(text) + query!("INSERT INTO message (timestamp, user, room, msg_type, msg_text) VALUES (?, ?, ?, 'text', ?)", + message.timestamp, message.user.id, message.room.id, text) .execute(&self.db_pool).await } MessageData::Attachment(attachment) => { todo!() } MessageData::Reply(reply) => { - sqlx::query("INSERT INTO message (timestamp, user, room, type, msgText, referencingID) VALUES (?, ?, ?, 'reply', ?, ?)") - .bind(message.timestamp).bind(message.user.id).bind(message.room.id).bind(reply.text.clone()).bind(reply.referencing_id) + query!("INSERT INTO message (timestamp, user, room, msg_type, msg_text, referencing_id) VALUES (?, ?, ?, 'reply', ?, ?)", + message.timestamp, message.user.id, message.room.id, reply.text, reply.referencing_id) .execute(&self.db_pool).await } MessageData::Edit(edit) => { - sqlx::query("INSERT INTO message (timestamp, user, room, type, msgText, referencingID) VALUES (?, ?, ?, 'edit', ?, ?)") - .bind(message.timestamp).bind(message.user.id).bind(message.room.id).bind(edit.text.clone()).bind(edit.referencing_id) + query!("INSERT INTO message (timestamp, user, room, msg_type, msg_text, referencing_id) VALUES (?, ?, ?, 'edit', ?, ?)", + message.timestamp, message.user.id, message.room.id, edit.text, edit.referencing_id) .execute(&self.db_pool).await } MessageData::Reaction(reaction) => { - sqlx::query("INSERT INTO message (timestamp, user, room, type, emoji, referencingID) VALUES (?, ?, ?, 'reaction', ?, ?)") - .bind(message.timestamp).bind(message.user.id).bind(message.room.id).bind(reaction.emoji.clone()).bind(reaction.referencing_id) + query!("INSERT INTO message (timestamp, user, room, msg_type, emoji, referencing_id) VALUES (?, ?, ?, 'reaction', ?, ?)", + message.timestamp, message.user.id, message.room.id, reaction.emoji, reaction.referencing_id) .execute(&self.db_pool).await } MessageData::Redaction(redaction) => { - sqlx::query("INSERT INTO message (timestamp, user, room, type, redaction, referencingID) VALUES (?, ?, ?, 'redaction', ?, ?)") - .bind(message.timestamp).bind(message.user.id).bind(message.room.id).bind(true).bind(redaction.referencing_id) + query!("INSERT INTO message (timestamp, user, room, msg_type, referencing_id) VALUES (?, ?, ?, 'redaction', ?)", + message.timestamp, message.user.id, message.room.id, redaction.referencing_id) .execute(&self.db_pool).await } }; @@ -58,7 +61,7 @@ impl RealmChat for RealmChatServer { Ok(message) }, - Err(_) => Err(ErrorCode::Error), + Err(_) => Err(Error), } } @@ -76,13 +79,16 @@ impl RealmChat for RealmChatServer { async fn get_message_from_id(self, _: Context, auth_token: String, id: u32) -> Result { //TODO: Auth for admin room - let result = sqlx::query( - "SELECT * FROM message INNER JOIN room ON message.room = room.id INNER JOIN user ON message.user = user.id WHERE message.id = ?" - ).bind(id).fetch_one(&self.db_pool).await; + let result = sqlx::query("SELECT message.*, + room.id AS 'room_id', room.roomid AS 'room_roomid', room.name AS 'room_name', room.admin_only_send AS 'room_admin_only_send', room.admin_only_view AS 'room_admin_only_view', + user.id AS 'user_id', user.userid AS 'user_userid', user.name AS 'user_name', user.online AS 'user_online', user.admin AS 'user_admin' + FROM message INNER JOIN room ON message.room = room.id INNER JOIN user ON message.user = user.id WHERE message.id = ?") + .bind(id) + .fetch_one(&self.db_pool).await; match result { Ok(row) => { - self.dbmessage_to_message(row) + Ok(Message::from_row(&row).unwrap()) }, Err(_) => { Err(MessageNotFound) @@ -97,74 +103,54 @@ impl RealmChat for RealmChatServer { async fn get_rooms(self, _: Context, auth_token: String) -> Result, ErrorCode> { //TODO: Auth for admin rooms! - let result = sqlx::query("SELECT * FROM room").fetch_all(&self.db_pool).await; - let mut rooms: Vec = Vec::new(); + let result = query_as!(Room, "SELECT * FROM room").fetch_all(&self.db_pool).await; match result { - Ok(rows) => { - for row in rows { - let room = self.dbroom_to_room(row); - if let Some(err) = room.clone().err() { - return Err(err) - } - rooms.push(room.unwrap()); - } - Ok(rooms) - }, - Err(_) => { - Err(Error) - }, + Ok(rooms) => Ok(rooms), + Err(_) => Err(Error), } } async fn get_room(self, _: Context, auth_token: String, roomid: String) -> Result { //TODO: Auth for admin rooms! - let result = sqlx::query("SELECT * FROM room WHERE room_id = ?").bind(roomid).fetch_one(&self.db_pool).await; + let result = query_as!(Room, "SELECT * FROM room WHERE roomid = ?", roomid).fetch_one(&self.db_pool).await; match result { - Ok(row) => { self.dbroom_to_room(row) }, + Ok(room) => { Ok(room) }, Err(_) => Err(RoomNotFound), } } async fn get_user(self, _: Context, userid: String) -> Result { - let result = sqlx::query("SELECT * FROM user WHERE user_id = ?").bind(userid).fetch_one(&self.db_pool).await; + let result = query_as!(User, "SELECT * FROM user WHERE userid = ?", userid).fetch_one(&self.db_pool).await; match result { - Ok(row) => { self.dbuser_to_user(row) }, + Ok(user) => { Ok(user) }, Err(_) => Err(UserNotFound), } } - async fn get_users(self, _: Context, get_only_online: bool) -> Result, ErrorCode> { - let mut query = sqlx::query("SELECT * FROM user"); - if get_only_online { - query = sqlx::query("SELECT * FROM user WHERE online = true"); - } - - let result = query.fetch_all(&self.db_pool).await; - let mut users: Vec = Vec::new(); + async fn get_users(self, _: Context) -> Result, ErrorCode> { + let result = query_as!(User, "SELECT * FROM user").fetch_all(&self.db_pool).await; match result { - Ok(rows) => { - for row in rows { - let user = self.dbuser_to_user(row); - if let Some(err) = user.clone().err() { - return Err(err) - } - users.push(user.unwrap()) - } - Ok(users) - }, - Err(_) => { - Err(Error) - }, - } + Ok(users) => Ok(users), + Err(_) => Err(Error), + } + } + + async fn get_online_users(self, _: Context) -> Result, ErrorCode> { + let result = query_as!(User, "SELECT * FROM user WHERE online = true").fetch_all(&self.db_pool).await; + + match result { + Ok(users) => Ok(users), + Err(_) => Err(Error), + } } } impl RealmChatServer { - pub fn new(server_id: String, socket: SocketAddr, db_pool: Pool) -> RealmChatServer { + pub fn new(server_id: String, socket: SocketAddr, db_pool: Pool) -> RealmChatServer { RealmChatServer { server_id, socket, @@ -172,131 +158,4 @@ impl RealmChatServer { typing_users: Vec::new(), } } - - fn dbroom_to_room(&self, row: MySqlRow) -> Result { - let id: Result = row.try_get("id"); - let roomid: Result = row.try_get("user_id"); - let name: Result = row.try_get("name"); - let admin_only_send: Result = row.try_get("admin_only_send"); - let admin_only_view: Result = row.try_get("admin_only_view"); - - if id.is_err() { - return Err(MalformedDBResponse) - } - - Ok(Room { - id: id.unwrap(), - roomid: roomid.unwrap(), - name: name.unwrap(), - admin_only_send: admin_only_send.unwrap(), - admin_only_view: admin_only_view.unwrap(), - }) - } - - fn dbuser_to_user(&self, row: MySqlRow) -> Result { - let id: Result = row.try_get("id"); - let userid: Result = row.try_get("user_id"); - let name: Result = row.try_get("name"); - let online: Result = row.try_get("online"); - let admin: Result = row.try_get("admin"); - - if id.is_err() { - return Err(MalformedDBResponse) - } - - Ok(User { - id: id.unwrap(), - userid: userid.unwrap(), - name: name.unwrap(), - online: online.unwrap(), - admin: admin.unwrap(), - }) - } - - fn dbmessage_to_message(&self, row: MySqlRow) -> Result { //NOTE: Query results passed in should have a join - let result: Result<&str, Error> = row.try_get("type"); - let type_enum: &str = match result { - Ok(string) => { string } - Err(_) => { "" } - }; - - if type_enum == "" { - return Err(MalformedDBResponse) - } - - let id: u32 = row.try_get("message.id").unwrap(); - let timestamp: DateTime = row.try_get("timestamp").unwrap(); - - let room = Room { - id: row.try_get("room").unwrap(), - roomid: row.try_get("room_id").unwrap(), - name: row.try_get("room.name").unwrap(), - admin_only_send: row.try_get("admin_only_send").unwrap(), - admin_only_view: row.try_get("admin_only_view").unwrap(), - }; - - let user = User { - id: row.try_get("user.id").unwrap(), - userid: row.try_get("user_id").unwrap(), - name: row.try_get("user.name").unwrap(), - online: row.try_get("online").unwrap(), - admin: row.try_get("admin").unwrap(), - }; - - match type_enum { - "text" => { - let text: String = row.try_get("msgText").unwrap(); - Ok(Message { - id, timestamp, user, room, - data: MessageData::Text(text), - }) - } - "attachment" => { - todo!() - } - "reply" => { - let text: &str = row.try_get("msgText").unwrap(); - let referencing_id: u32 = row.try_get("referencingID").unwrap(); - Ok(Message { - id, timestamp, user, room, - data: MessageData::Reply(Reply { - referencing_id, - text: text.to_string(), - }), - }) - } - "edit" => { - let text: &str = row.try_get("msgText").unwrap(); - let referencing_id: u32 = row.try_get("referencingID").unwrap(); - Ok(Message { - id, timestamp, user, room, - data: MessageData::Edit(Edit { - referencing_id, - text: text.to_string(), - }), - }) - } - "reaction" => { - let emoji: &str = row.try_get("emoji").unwrap(); - let referencing_id: u32 = row.try_get("referencingID").unwrap(); - Ok(Message { - id, timestamp, user, room, - data: MessageData::Reaction(Reaction { - referencing_id, - emoji: emoji.to_string(), - }), - }) - } - "redaction" => { - let referencing_id: u32 = row.try_get("referencingID").unwrap(); - Ok(Message { - id, timestamp, user, room, - data: MessageData::Redaction(Redaction { - referencing_id, - }), - }) - } - _ => { Err(MalformedDBResponse) } - } - } } \ No newline at end of file diff --git a/server/src/types.rs b/server/src/types.rs index af3823e..3d84df8 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -1,7 +1,12 @@ -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Utc}; +use sqlx::{FromRow, Row}; +use sqlx::sqlite::SqliteRow; use tarpc::serde::{Deserialize, Serialize}; + use realm_shared::types::ErrorCode; +use crate::types::MessageData::*; + #[tarpc::service] pub trait RealmChat { async fn test(name: String) -> String; @@ -18,7 +23,8 @@ pub trait RealmChat { async fn get_rooms(auth_token: String) -> Result, ErrorCode>; async fn get_room(auth_token: String, roomid: String) -> Result; async fn get_user(userid: String) -> Result; - async fn get_users(get_only_online: bool) -> Result, ErrorCode>; + async fn get_users() -> Result, ErrorCode>; + async fn get_online_users() -> Result, ErrorCode>; //TODO: Admin access only! // async fn create_room() -> Result; @@ -29,15 +35,59 @@ pub trait RealmChat { // unban user } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct Message { - pub id: u32, - pub timestamp: DateTime, //TODO: Does the database already have timestamps for us? + pub id: i64, + pub timestamp: DateTime, pub user: User, pub room: Room, + #[sqlx(flatten)] pub data: MessageData, } +impl FromRow<'_, SqliteRow> for Message { + fn from_row(row: &SqliteRow) -> sqlx::Result { + Ok(Self { + id: row.try_get("id")?, + timestamp: row.try_get("timestamp")?, + user: User { + id: row.try_get("user_id")?, + userid: row.try_get("user_userid")?, + name: row.try_get("user_name")?, + online: row.try_get("user_online")?, + admin: row.try_get("user_admin")?, + }, + room: Room { + id: row.try_get("room_id")?, + roomid: row.try_get("room_roomid")?, + name: row.try_get("room_name")?, + admin_only_send: row.try_get("room_admin_only_send")?, + admin_only_view: row.try_get("room_admin_only_view")?, + }, + data: match row.try_get("msg_type")? { + "text" => Text(row.try_get("msg_text")?), + "attachment" => todo!(), + "reply" => Reply(Reply { + referencing_id: row.try_get("referencing_id")?, + text: row.try_get("msg_text")?, + }), + "edit" => Edit(Edit { + referencing_id: row.try_get("referencing_id")?, + text: row.try_get("msg_text")?, + }), + "reaction" => Reaction(Reaction { + referencing_id: row.try_get("referencing_id")?, + emoji: row.try_get("emoji")?, + }), + "redaction" => Redaction(Redaction { + referencing_id: row.try_get("referencing_id")?, + }), + _ => { panic!() } + }, + }) + } +} + //TODO: Maybe have multipart messages #[derive(Debug, Clone, Serialize, Deserialize)] pub enum MessageData { @@ -77,19 +127,18 @@ pub struct Redaction { pub referencing_id: u32, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct User { - pub id: u32, + pub id: i64, pub userid: String, pub name: String, pub online: bool, pub admin: bool, - //TODO: auth stuff needed, should be Option } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct Room { - pub id: u32, + pub id: i64, pub roomid: String, pub name: String, pub admin_only_send: bool, diff --git a/server/surrealdb.sh b/server/surrealdb.sh deleted file mode 100755 index b332701..0000000 --- a/server/surrealdb.sh +++ /dev/null @@ -1,2 +0,0 @@ -docker run --rm --pull always --name surrealdb -p 8000:8000 surrealdb/surrealdb:latest start --log trace --user root --pass root memory -