Threads, Get Messages Since

This commit is contained in:
2024-08-23 16:39:15 -04:00
Unverified
parent a0ccdb1def
commit 68b7ba6809
3 changed files with 72 additions and 24 deletions

View File

@@ -5,13 +5,14 @@ use chrono::{DateTime, Utc};
use moka::future::Cache; use moka::future::Cache;
use sqlx::{FromRow, Pool, query_as, Sqlite}; use sqlx::{FromRow, Pool, query_as, Sqlite};
use sqlx::query; use sqlx::query;
use sqlx::sqlite::SqliteRow;
use tarpc::context::Context; use tarpc::context::Context;
use tracing::error; use tracing::error;
use realm_auth::types::RealmAuthClient; use realm_auth::types::RealmAuthClient;
use realm_shared::types::ErrorCode::*; use realm_shared::types::ErrorCode::*;
use realm_shared::types::ErrorCode; use realm_shared::types::ErrorCode;
use crate::types::{Attachment, Edit, Message, MessageData, Reaction, RealmChat, Redaction, Reply, Room, User}; use crate::types::{Attachment, Edit, FromRows, Message, MessageData, Reaction, RealmChat, Redaction, Reply, ReplyChain, Room, User};
#[derive(Clone)] #[derive(Clone)]
pub struct RealmChatServer { pub struct RealmChatServer {
@@ -25,6 +26,11 @@ pub struct RealmChatServer {
pub cache: Cache<String, String>, pub cache: Cache<String, String>,
} }
const FETCH_MESSAGE: &str = "SELECT message.*,
room.id AS 'room_id', room.roomid AS 'room_roomid', room.name AS 'room_name', room.admin_only_send AS 'room_admin_only_send', room.admin_only_view AS 'room_admin_only_view',
user.id AS 'user_id', user.userid AS 'user_userid', user.name AS 'user_name', user.online AS 'user_online', user.admin AS 'user_admin'
FROM message INNER JOIN room ON message.room = room.id INNER JOIN user ON message.user = user.id WHERE room.admin_only_view = ? OR false";
impl RealmChatServer { impl RealmChatServer {
pub fn new(server_id: String, socket: SocketAddr, db_pool: Pool<Sqlite>, auth_client: RealmAuthClient) -> RealmChatServer { pub fn new(server_id: String, socket: SocketAddr, db_pool: Pool<Sqlite>, auth_client: RealmAuthClient) -> RealmChatServer {
RealmChatServer { RealmChatServer {
@@ -181,12 +187,9 @@ impl RealmChat for RealmChatServer {
async fn get_message_from_id(self, _: Context, stoken: String, id: i64) -> Result<Message, ErrorCode> { async fn get_message_from_id(self, _: Context, stoken: String, id: i64) -> Result<Message, ErrorCode> {
let is_admin = self.is_user_admin(&stoken).await; let is_admin = self.is_user_admin(&stoken).await;
let result = sqlx::query("SELECT message.*, let result = sqlx::query(&format!("{}{}", FETCH_MESSAGE, "AND message.id = ?"))
room.id AS 'room_id', room.roomid AS 'room_roomid', room.name AS 'room_name', room.admin_only_send AS 'room_admin_only_send', room.admin_only_view AS 'room_admin_only_view',
user.id AS 'user_id', user.userid AS 'user_userid', user.name AS 'user_name', user.online AS 'user_online', user.admin AS 'user_admin'
FROM message INNER JOIN room ON message.room = room.id INNER JOIN user ON message.user = user.id WHERE message.id = ? AND room.admin_only_view = ? OR false")
.bind(id)
.bind(is_admin) .bind(is_admin)
.bind(id)
.fetch_one(&self.db_pool).await; .fetch_one(&self.db_pool).await;
match result { match result {
@@ -200,34 +203,55 @@ impl RealmChat for RealmChatServer {
} }
async fn get_messages_since(self, _: Context, stoken: String, time: DateTime<Utc>) -> Result<Vec<Message>, ErrorCode> { async fn get_messages_since(self, _: Context, stoken: String, time: DateTime<Utc>) -> Result<Vec<Message>, ErrorCode> {
//TODO: Auth for admin rooms let is_admin = self.is_user_admin(&stoken).await;
todo!() let result = sqlx::query(&format!("{}{}", FETCH_MESSAGE, "AND message.timestamp >= ?"))
.bind(is_admin)
.bind(time)
.fetch_all(&self.db_pool).await;
match result {
Ok(rows) => Ok(Message::from_rows(rows).unwrap()),
Err(_) => Err(MalformedDBResponse)
}
} }
async fn get_all_direct_replies(self, _: Context, stoken: String, head: i64) -> Result<Vec<Message>, ErrorCode> { async fn get_all_direct_replies(self, _: Context, stoken: String, head: i64) -> Result<Vec<Message>, ErrorCode> {
let mut messages: Vec<Message> = Vec::new();
let is_admin = self.is_user_admin(&stoken).await; let is_admin = self.is_user_admin(&stoken).await;
let result = sqlx::query("SELECT message.*, let result = sqlx::query(&format!("{}{}", FETCH_MESSAGE, "AND message.referencing_id = ?"))
room.id AS 'room_id', room.roomid AS 'room_roomid', room.name AS 'room_name', room.admin_only_send AS 'room_admin_only_send', room.admin_only_view AS 'room_admin_only_view',
user.id AS 'user_id', user.userid AS 'user_userid', user.name AS 'user_name', user.online AS 'user_online', user.admin AS 'user_admin'
FROM message INNER JOIN room ON message.room = room.id INNER JOIN user ON message.user = user.id WHERE message.referencing_id = ? AND room.admin_only_view = ? OR false")
.bind(head)
.bind(is_admin) .bind(is_admin)
.bind(head)
.fetch_all(&self.db_pool).await; .fetch_all(&self.db_pool).await;
match result { match result {
Ok(rows) => { Ok(rows) => Ok(Message::from_rows(rows).unwrap()),
for row in rows { Err(_) => Err(MessageNotFound),
messages.push(Message::from_row(&row).unwrap()) }
} }
},
Err(_) => { async fn get_reply_chain(self, ctx: Context, stoken: String, head: Message, depth: u8) -> Result<ReplyChain, ErrorCode> {
return Err(MessageNotFound) if depth > 8 {
}, return Err(DepthTooLarge)
} }
Ok(messages) let direct_replies = self.clone().get_all_direct_replies(ctx, stoken.clone(), head.id).await?;
let replies = if direct_replies.is_empty() || depth == 0 {
None
} else {
let mut chains = Vec::new();
for reply in direct_replies {
chains.push(Box::pin(self.clone().get_reply_chain(ctx, stoken.clone(), reply, depth - 1)).await?);
}
Some(chains)
};
let chain = ReplyChain {
message: head,
replies,
};
Ok(chain)
} }
async fn get_rooms(self, _: Context, stoken: String) -> Result<Vec<Room>, ErrorCode> { async fn get_rooms(self, _: Context, stoken: String) -> Result<Vec<Room>, ErrorCode> {

View File

@@ -21,6 +21,7 @@ pub trait RealmChat {
async fn get_message_from_id(stoken: String, id: i64) -> Result<Message, ErrorCode>; async fn get_message_from_id(stoken: String, id: i64) -> Result<Message, ErrorCode>;
async fn get_messages_since(stoken: String, time: DateTime<Utc>) -> Result<Vec<Message>, ErrorCode>; async fn get_messages_since(stoken: String, time: DateTime<Utc>) -> Result<Vec<Message>, ErrorCode>;
async fn get_all_direct_replies(stoken: String, head: i64) -> Result<Vec<Message>, ErrorCode>; async fn get_all_direct_replies(stoken: String, head: i64) -> Result<Vec<Message>, ErrorCode>;
async fn get_reply_chain(stoken: String, head: Message, depth: u8) -> Result<ReplyChain, ErrorCode>;
async fn get_rooms(stoken: String) -> Result<Vec<Room>, ErrorCode>; async fn get_rooms(stoken: String) -> Result<Vec<Room>, ErrorCode>;
async fn get_room(stoken: String, roomid: String) -> Result<Room, ErrorCode>; async fn get_room(stoken: String, roomid: String) -> Result<Room, ErrorCode>;
async fn get_user(userid: String) -> Result<User, ErrorCode>; async fn get_user(userid: String) -> Result<User, ErrorCode>;
@@ -46,6 +47,22 @@ pub struct Message {
pub data: MessageData, pub data: MessageData,
} }
pub trait FromRows<R: Row>: Sized {
fn from_rows(rows: Vec<R>) -> Result<Vec<Self>, sqlx::Error>;
}
impl FromRows<SqliteRow> for Message {
fn from_rows(rows: Vec<SqliteRow>) -> sqlx::Result<Vec<Self>> {
let mut messages = Vec::new();
for row in rows {
messages.push(Message::from_row(&row)?);
}
Ok(messages)
}
}
impl FromRow<'_, SqliteRow> for Message { impl FromRow<'_, SqliteRow> for Message {
fn from_row(row: &SqliteRow) -> sqlx::Result<Self> { fn from_row(row: &SqliteRow) -> sqlx::Result<Self> {
Ok(Self { Ok(Self {
@@ -146,3 +163,9 @@ pub struct Room {
pub admin_only_view: bool, pub admin_only_view: bool,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplyChain {
pub message: Message,
pub replies: Option<Vec<ReplyChain>>,
}

View File

@@ -17,5 +17,6 @@ pub enum ErrorCode {
MessageNotFound, MessageNotFound,
RoomNotFound, RoomNotFound,
UserNotFound, UserNotFound,
DepthTooLarge,
MalformedDBResponse, MalformedDBResponse,
} }