From 8e2f34819e54027a17d9e6be43fad48e27997621 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Fri, 6 Nov 2020 21:52:49 +0000 Subject: [PATCH 01/11] Half implemented room state management foundations. --- src/bot.rs | 35 +++++++++++++++++++-- src/db.rs | 4 +++ src/db/rooms.rs | 82 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 src/db/rooms.rs diff --git a/src/bot.rs b/src/bot.rs index a687551..0aaf6f4 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -11,9 +11,9 @@ use matrix_sdk::Error as MatrixError; use matrix_sdk::{ self, events::{ - room::member::MemberEventContent, + room::member::{MemberEventContent, MembershipState}, room::message::{MessageEventContent, NoticeMessageEventContent, TextMessageEventContent}, - AnyMessageEventContent, StrippedStateEvent, SyncMessageEvent, + AnyMessageEventContent, StrippedStateEvent, SyncMessageEvent, SyncStateEvent, }, Client, ClientConfig, EventEmitter, JsonStore, Room, SyncRoom, SyncSettings, }; @@ -229,6 +229,37 @@ async fn should_process<'a>( /// Originally adapted from the matrix-rust-sdk examples. #[async_trait] impl EventEmitter for DiceBot { + async fn on_room_member( + &self, + room: SyncRoom, + room_member: &SyncStateEvent, + ) { + //When joining a channel, we get join events from other users. + //content is MemberContent, and it has a membership type. + + //Ignore if state_key is our username, because we only care about other users. + let event_affects_us = if let Some(our_user_id) = self.client.user_id().await { + room_member.state_key == our_user_id + } else { + false + }; + + let should_add = match room_member.content.membership { + MembershipState::Join => true, + MembershipState::Leave | MembershipState::Ban => false, + _ => return, + }; + + //if event affects us and is leave/ban, delete all our info. + //if event does not affect us, delete info only for that user. + + //TODO replace with call to new db.rooms thing. + println!( + "member {} recorded with action {:?} to/from db.", + room_member.state_key, should_add + ); + } + async fn on_stripped_state_member( &self, room: SyncRoom, diff --git a/src/db.rs b/src/db.rs index 62a94e6..b667bd8 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,5 +1,6 @@ use crate::db::errors::{DataError, MigrationError}; use crate::db::migrations::{get_migration_version, Migrations}; +use crate::db::rooms::Rooms; use crate::db::variables::Variables; use log::info; use sled::{Config, Db}; @@ -8,6 +9,7 @@ use std::path::Path; pub mod data_migrations; pub mod errors; pub mod migrations; +pub mod rooms; pub mod schema; pub mod variables; @@ -16,6 +18,7 @@ pub struct Database { db: Db, pub(crate) variables: Variables, pub(crate) migrations: Migrations, + pub(crate) rooms: Rooms, } impl Database { @@ -26,6 +29,7 @@ impl Database { db: db.clone(), variables: Variables::new(&db)?, migrations: Migrations(migrations), + rooms: Rooms::new(&db)?, }) } diff --git a/src/db/rooms.rs b/src/db/rooms.rs new file mode 100644 index 0000000..3d507fb --- /dev/null +++ b/src/db/rooms.rs @@ -0,0 +1,82 @@ +use crate::db::errors::DataError; +use crate::db::schema::convert_i32; +use byteorder::LittleEndian; +use sled::transaction::{abort, TransactionalTree}; +use sled::Transactional; +use sled::Tree; +use std::collections::HashMap; +use std::collections::HashSet; +use std::convert::From; +use std::str; +use zerocopy::byteorder::I32; +use zerocopy::AsBytes; + +#[derive(Clone)] +pub struct Rooms { + /// Room ID -> RoomInfo struct (single entries) + pub(in crate::db) roomid_roominfo: Tree, + + /// Room ID -> list of usernames in room. + pub(in crate::db) roomid_usernames: Tree, + + /// Username -> list of room IDs user is in. + pub(in crate::db) username_roomids: Tree, +} + +// /// Request soemthing by a username and room ID. +// pub struct UserAndRoom<'a>(pub &'a str, pub &'a str); + +// fn to_vec(value: &UserAndRoom<'_>) -> Vec { +// let mut bytes = vec![]; +// bytes.extend_from_slice(value.0.as_bytes()); +// bytes.push(0xfe); +// bytes.extend_from_slice(value.1.as_bytes()); +// bytes +// } + +// impl From> for Vec { +// fn from(value: UserAndRoom) -> Vec { +// to_vec(&value) +// } +// } + +// impl From<&UserAndRoom<'_>> for Vec { +// fn from(value: &UserAndRoom) -> Vec { +// to_vec(value) +// } +// } + +impl Rooms { + pub(in crate::db) fn new(db: &sled::Db) -> Result { + Ok(Rooms { + roomid_roominfo: db.open_tree("roomid_roominfo")?, + roomid_usernames: db.open_tree("roomid_usernames")?, + username_roomids: db.open_tree("username_roomids")?, + }) + } + + pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { + //in txn: + //get or create list of users in room + //get or create list of rooms user is in + //deserialize/create set and add username to set for roomid + //deserialize/create set and add roomid to set for username + //store both again + let user_to_rooms: HashSet = self + .username_roomids + .get(username.as_bytes())? + .map(|bytes| bincode::deserialize::>(&bytes)) + .unwrap_or(Ok(HashSet::new()))?; + + let room_to_users: HashSet = self + .roomid_usernames + .get(room_id.as_bytes())? + .map(|bytes| bincode::deserialize::>(&bytes)) + .unwrap_or(Ok(HashSet::new()))?; + Ok(()) + } + + pub fn remove_user_from_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { + Ok(()) + } +} -- 2.40.1 From 51ba3e3f424b9d8cd5089eafbeb4bd4933373c62 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Sat, 7 Nov 2020 14:37:56 +0000 Subject: [PATCH 02/11] Record user join/leaves in rooms. Move event emitter to its own file. --- src/bot.rs | 146 ++----------------------------- src/bot/event_handlers.rs | 163 +++++++++++++++++++++++++++++++++++ src/db/errors.rs | 18 ++-- src/db/rooms.rs | 175 ++++++++++++++++++++++++++++++++------ src/lib.rs | 1 + src/models.rs | 5 ++ 6 files changed, 334 insertions(+), 174 deletions(-) create mode 100644 src/bot/event_handlers.rs create mode 100644 src/models.rs diff --git a/src/bot.rs b/src/bot.rs index 0aaf6f4..8928b2a 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -4,27 +4,25 @@ use crate::context::Context; use crate::db::Database; use crate::error::BotError; use crate::state::DiceBotState; -use async_trait::async_trait; use dirs; -use log::{debug, error, info, warn}; +use log::{error, info}; use matrix_sdk::Error as MatrixError; use matrix_sdk::{ self, events::{ - room::member::{MemberEventContent, MembershipState}, - room::message::{MessageEventContent, NoticeMessageEventContent, TextMessageEventContent}, - AnyMessageEventContent, StrippedStateEvent, SyncMessageEvent, SyncStateEvent, + room::message::{MessageEventContent, NoticeMessageEventContent}, + AnyMessageEventContent, }, - Client, ClientConfig, EventEmitter, JsonStore, Room, SyncRoom, SyncSettings, + Client, ClientConfig, JsonStore, Room, SyncSettings, }; //use matrix_sdk_common_macros::async_trait; use std::clone::Clone; -use std::ops::Sub; use std::path::PathBuf; use std::sync::{Arc, RwLock}; -use std::time::{Duration, SystemTime}; use url::Url; +pub mod event_handlers; + /// The DiceBot struct represents an active dice bot. The bot is not /// connected to Matrix until its run() function is called. pub struct DiceBot { @@ -167,135 +165,3 @@ impl DiceBot { } } } - -/// Check if a message is recent enough to actually process. If the -/// message is within "oldest_message_age" seconds, this function -/// returns true. If it's older than that, it returns false and logs a -/// debug message. -fn check_message_age( - event: &SyncMessageEvent, - oldest_message_age: u64, -) -> bool { - let sending_time = event.origin_server_ts; - let oldest_timestamp = SystemTime::now().sub(Duration::new(oldest_message_age, 0)); - - if sending_time > oldest_timestamp { - true - } else { - let age = match oldest_timestamp.duration_since(sending_time) { - Ok(n) => format!("{} seconds too old", n.as_secs()), - Err(_) => "before the UNIX epoch".to_owned(), - }; - - debug!("Ignoring message because it is {}: {:?}", age, event); - false - } -} - -async fn should_process<'a>( - bot: &DiceBot, - event: &SyncMessageEvent, -) -> Result<(String, String), BotError> { - //Ignore messages that are older than configured duration. - if !check_message_age(event, bot.config.oldest_message_age()) { - let state_check = bot.state.read().unwrap(); - if !((*state_check).logged_skipped_old_messages()) { - drop(state_check); - let mut state = bot.state.write().unwrap(); - (*state).skipped_old_messages(); - } - - return Err(BotError::ShouldNotProcessError); - } - - let (msg_body, sender_username) = if let SyncMessageEvent { - content: MessageEventContent::Text(TextMessageEventContent { body, .. }), - sender, - .. - } = event - { - ( - body.clone(), - format!("@{}:{}", sender.localpart(), sender.server_name()), - ) - } else { - (String::new(), String::new()) - }; - - Ok((msg_body, sender_username)) -} - -/// This event emitter listens for messages with dice rolling commands. -/// Originally adapted from the matrix-rust-sdk examples. -#[async_trait] -impl EventEmitter for DiceBot { - async fn on_room_member( - &self, - room: SyncRoom, - room_member: &SyncStateEvent, - ) { - //When joining a channel, we get join events from other users. - //content is MemberContent, and it has a membership type. - - //Ignore if state_key is our username, because we only care about other users. - let event_affects_us = if let Some(our_user_id) = self.client.user_id().await { - room_member.state_key == our_user_id - } else { - false - }; - - let should_add = match room_member.content.membership { - MembershipState::Join => true, - MembershipState::Leave | MembershipState::Ban => false, - _ => return, - }; - - //if event affects us and is leave/ban, delete all our info. - //if event does not affect us, delete info only for that user. - - //TODO replace with call to new db.rooms thing. - println!( - "member {} recorded with action {:?} to/from db.", - room_member.state_key, should_add - ); - } - - async fn on_stripped_state_member( - &self, - room: SyncRoom, - room_member: &StrippedStateEvent, - _: Option, - ) { - if let SyncRoom::Invited(room) = room { - if let Some(user_id) = self.client.user_id().await { - if room_member.state_key != user_id { - return; - } - } - - let room = room.read().await; - info!("Autojoining room {}", room.display_name()); - - if let Err(e) = self.client.join_room_by_id(&room.room_id).await { - warn!("Could not join room: {}", e.to_string()) - } - } - } - - async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent) { - if let SyncRoom::Joined(room) = room { - let (msg_body, sender_username) = - if let Ok((msg_body, sender_username)) = should_process(self, &event).await { - (msg_body, sender_username) - } else { - return; - }; - - //we clone here to hold the lock for as little time as possible. - let real_room = room.read().await.clone(); - - self.execute_commands(&real_room, &sender_username, &msg_body) - .await; - } - } -} diff --git a/src/bot/event_handlers.rs b/src/bot/event_handlers.rs new file mode 100644 index 0000000..526bb64 --- /dev/null +++ b/src/bot/event_handlers.rs @@ -0,0 +1,163 @@ +use crate::error::BotError; +use async_trait::async_trait; +use log::{debug, error, info, warn}; +use matrix_sdk::{ + self, + events::{ + room::member::{MemberEventContent, MembershipState}, + room::message::{MessageEventContent, TextMessageEventContent}, + StrippedStateEvent, SyncMessageEvent, SyncStateEvent, + }, + EventEmitter, SyncRoom, +}; +//use matrix_sdk_common_macros::async_trait; +use super::DiceBot; +use std::clone::Clone; +use std::ops::Sub; +use std::time::{Duration, SystemTime}; + +/// Check if a message is recent enough to actually process. If the +/// message is within "oldest_message_age" seconds, this function +/// returns true. If it's older than that, it returns false and logs a +/// debug message. +fn check_message_age( + event: &SyncMessageEvent, + oldest_message_age: u64, +) -> bool { + let sending_time = event.origin_server_ts; + let oldest_timestamp = SystemTime::now().sub(Duration::new(oldest_message_age, 0)); + + if sending_time > oldest_timestamp { + true + } else { + let age = match oldest_timestamp.duration_since(sending_time) { + Ok(n) => format!("{} seconds too old", n.as_secs()), + Err(_) => "before the UNIX epoch".to_owned(), + }; + + debug!("Ignoring message because it is {}: {:?}", age, event); + false + } +} + +async fn should_process<'a>( + bot: &DiceBot, + event: &SyncMessageEvent, +) -> Result<(String, String), BotError> { + //Ignore messages that are older than configured duration. + if !check_message_age(event, bot.config.oldest_message_age()) { + let state_check = bot.state.read().unwrap(); + if !((*state_check).logged_skipped_old_messages()) { + drop(state_check); + let mut state = bot.state.write().unwrap(); + (*state).skipped_old_messages(); + } + + return Err(BotError::ShouldNotProcessError); + } + + let (msg_body, sender_username) = if let SyncMessageEvent { + content: MessageEventContent::Text(TextMessageEventContent { body, .. }), + sender, + .. + } = event + { + ( + body.clone(), + format!("@{}:{}", sender.localpart(), sender.server_name()), + ) + } else { + (String::new(), String::new()) + }; + + Ok((msg_body, sender_username)) +} + +/// This event emitter listens for messages with dice rolling commands. +/// Originally adapted from the matrix-rust-sdk examples. +#[async_trait] +impl EventEmitter for DiceBot { + async fn on_room_member( + &self, + room: SyncRoom, + room_member: &SyncStateEvent, + ) { + if let SyncRoom::Joined(room) = room { + let event_affects_us = if let Some(our_user_id) = self.client.user_id().await { + room_member.state_key == our_user_id + } else { + false + }; + + let adding_user = match room_member.content.membership { + MembershipState::Join => true, + MembershipState::Leave | MembershipState::Ban => false, + _ => return, + }; + + //Clone to avoid holding lock. + let room = room.read().await.clone(); + let (room_id, username) = (room.room_id.as_str(), &room_member.state_key); + + let result = if event_affects_us && !adding_user { + debug!("Clearing all information for room ID {}", room_id); + self.db.rooms.clear_info(room_id) + } else if !event_affects_us && adding_user { + debug!("Adding {} to room ID {}", username, room_id); + self.db.rooms.add_user_to_room(username, room_id) + } else if !event_affects_us && !adding_user { + debug!("Removing {} from room ID {}", username, room_id); + self.db.rooms.remove_user_from_room(username, room_id) + } else { + debug!("Ignoring a room member event: {:#?}", room_member); + Ok(()) + }; + + if let Err(e) = result { + error!("Could not update room information: {}", e.to_string()); + } else { + debug!("Successfully processed room member update."); + } + } + } + + async fn on_stripped_state_member( + &self, + room: SyncRoom, + room_member: &StrippedStateEvent, + _: Option, + ) { + if let SyncRoom::Invited(room) = room { + if let Some(user_id) = self.client.user_id().await { + if room_member.state_key != user_id { + return; + } + } + + //Clone to avoid holding lock. + let room = room.read().await.clone(); + info!("Autojoining room {}", room.display_name()); + + if let Err(e) = self.client.join_room_by_id(&room.room_id).await { + warn!("Could not join room: {}", e.to_string()) + } + } + } + + async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent) { + if let SyncRoom::Joined(room) = room { + let (msg_body, sender_username) = + if let Ok((msg_body, sender_username)) = should_process(self, &event).await { + (msg_body, sender_username) + } else { + return; + }; + + //we clone here to hold the lock for as little time as possible. + let real_room = room.read().await.clone(); + + self.execute_commands(&real_room, &sender_username, &msg_body) + .await; + } + } +} diff --git a/src/db/errors.rs b/src/db/errors.rs index cb25c9b..63fd2a5 100644 --- a/src/db/errors.rs +++ b/src/db/errors.rs @@ -62,13 +62,11 @@ impl From> for DataError { } } -// impl From> for DataError { -// fn from(error: ConflictableTransactionError) -> Self { -// match error { -// ConflictableTransactionError::Abort(data_err) => data_err, -// ConflictableTransactionError::Storage(storage_err) => { -// DataError::TransactionError(TransactionError::Storage(storage_err)) -// } -// } -// } -// } +/// Automatically aborts transactions that hit a DataError by using +/// the try (question mark) operator when this trait implementation is +/// in scope. +impl From for sled::transaction::ConflictableTransactionError { + fn from(error: DataError) -> Self { + sled::transaction::ConflictableTransactionError::Abort(error) + } +} diff --git a/src/db/rooms.rs b/src/db/rooms.rs index 3d507fb..61ccd57 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -1,15 +1,9 @@ use crate::db::errors::DataError; -use crate::db::schema::convert_i32; -use byteorder::LittleEndian; -use sled::transaction::{abort, TransactionalTree}; +use sled::transaction::TransactionalTree; use sled::Transactional; use sled::Tree; -use std::collections::HashMap; use std::collections::HashSet; -use std::convert::From; use std::str; -use zerocopy::byteorder::I32; -use zerocopy::AsBytes; #[derive(Clone)] pub struct Rooms { @@ -45,6 +39,46 @@ pub struct Rooms { // to_vec(value) // } // } +enum TxableTree<'a> { + Tree(&'a Tree), + Tx(&'a TransactionalTree), +} + +impl<'a> Into> for &'a Tree { + fn into(self) -> TxableTree<'a> { + TxableTree::Tree(self) + } +} + +impl<'a> Into> for &'a TransactionalTree { + fn into(self) -> TxableTree<'a> { + TxableTree::Tx(self) + } +} + +fn get_set<'a, T: Into>>(tree: T, key: &[u8]) -> Result, DataError> { + let set: HashSet = match tree.into() { + TxableTree::Tree(tree) => tree.get(key)?, + TxableTree::Tx(tx) => tx.get(key)?, + } + .map(|bytes| bincode::deserialize::>(&bytes)) + .unwrap_or(Ok(HashSet::new()))?; + + Ok(set) +} + +fn insert_set<'a, T: Into>>( + tree: T, + key: &[u8], + set: HashSet, +) -> Result<(), DataError> { + let serialized = bincode::serialize(&set)?; + match tree.into() { + TxableTree::Tree(tree) => tree.insert(key, serialized)?, + TxableTree::Tx(tx) => tx.insert(key, serialized)?, + }; + Ok(()) +} impl Rooms { pub(in crate::db) fn new(db: &sled::Db) -> Result { @@ -55,28 +89,121 @@ impl Rooms { }) } - pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { - //in txn: - //get or create list of users in room - //get or create list of rooms user is in - //deserialize/create set and add username to set for roomid - //deserialize/create set and add roomid to set for username - //store both again - let user_to_rooms: HashSet = self - .username_roomids - .get(username.as_bytes())? - .map(|bytes| bincode::deserialize::>(&bytes)) - .unwrap_or(Ok(HashSet::new()))?; + pub fn get_rooms_for_user(&self, username: &str) -> Result, DataError> { + get_set(&self.username_roomids, username.as_bytes()) + } + + pub fn get_users_in_room(&self, room_id: &str) -> Result, DataError> { + get_set(&self.roomid_usernames, room_id.as_bytes()) + } + + pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { + (&self.username_roomids, &self.roomid_usernames).transaction( + |(tx_username_rooms, tx_room_usernames)| { + let username_key = &username.as_bytes(); + let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; + user_to_rooms.insert(room_id.to_string()); + insert_set(tx_username_rooms, username_key, user_to_rooms)?; + + let roomid_key = &room_id.as_bytes(); + let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; + room_to_users.insert(username.to_string()); + insert_set(tx_room_usernames, roomid_key, room_to_users)?; + + Ok(()) + }, + )?; - let room_to_users: HashSet = self - .roomid_usernames - .get(room_id.as_bytes())? - .map(|bytes| bincode::deserialize::>(&bytes)) - .unwrap_or(Ok(HashSet::new()))?; Ok(()) } pub fn remove_user_from_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { + (&self.username_roomids, &self.roomid_usernames).transaction( + |(tx_username_rooms, tx_room_usernames)| { + let username_key = &username.as_bytes(); + let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; + user_to_rooms.remove(room_id); + insert_set(tx_username_rooms, username_key, user_to_rooms)?; + + let roomid_key = &room_id.as_bytes(); + let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; + room_to_users.remove(username); + insert_set(tx_room_usernames, roomid_key, room_to_users)?; + + Ok(()) + }, + )?; + + Ok(()) + } + + pub fn clear_info(&self, _room_id: &str) -> Result<(), DataError> { + //TODO implement me + //when bot leaves a room, it must, atomically: + // - delete roominfo struct from room info tree. + // - load list of users it knows about in room. + // - remove room id from every user's list. (cannot reuse existing fn because atomicity) + // - delete list of users in room from tree. Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use sled::Config; + + fn create_test_instance() -> Rooms { + let config = Config::new().temporary(true); + let db = config.open().unwrap(); + Rooms::new(&db).unwrap() + } + + #[test] + fn add_user_to_room() { + let rooms = create_test_instance(); + rooms + .add_user_to_room("testuser", "myroom") + .expect("Could not add user to room"); + + let users_in_room = rooms + .get_users_in_room("myroom") + .expect("Could not retrieve users in room"); + + let rooms_for_user = rooms + .get_rooms_for_user("testuser") + .expect("Could not get rooms for user"); + + let expected_users_in_room: HashSet = + vec![String::from("testuser")].into_iter().collect(); + + let expected_rooms_for_user: HashSet = + vec![String::from("myroom")].into_iter().collect(); + + assert_eq!(expected_users_in_room, users_in_room); + assert_eq!(expected_rooms_for_user, rooms_for_user); + } + + #[test] + fn remove_user_from_room() { + let rooms = create_test_instance(); + rooms + .add_user_to_room("testuser", "myroom") + .expect("Could not add user to room"); + + rooms + .remove_user_from_room("testuser", "myroom") + .expect("Could not remove user from room"); + + let users_in_room = rooms + .get_users_in_room("myroom") + .expect("Could not retrieve users in room"); + + let rooms_for_user = rooms + .get_rooms_for_user("testuser") + .expect("Could not get rooms for user"); + + assert_eq!(HashSet::new(), users_in_room); + assert_eq!(HashSet::new(), rooms_for_user); + } +} diff --git a/src/lib.rs b/src/lib.rs index 3b78482..50ed009 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod db; pub mod dice; pub mod error; mod help; +pub mod models; mod parser; pub mod state; pub mod variables; diff --git a/src/models.rs b/src/models.rs new file mode 100644 index 0000000..5d76f96 --- /dev/null +++ b/src/models.rs @@ -0,0 +1,5 @@ +/// RoomInfo has basic metadata about a room: its name, ID, etc. +pub struct RoomInfo { + pub room_id: String, + pub room_name: String, +} -- 2.40.1 From b5c78bcee59e50a8f3e44d9e69c84ff15c7b3580 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Sat, 7 Nov 2020 14:39:43 +0000 Subject: [PATCH 03/11] Remove userandroom struct from rooms db --- src/db/rooms.rs | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/src/db/rooms.rs b/src/db/rooms.rs index 61ccd57..9eb8249 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -17,28 +17,6 @@ pub struct Rooms { pub(in crate::db) username_roomids: Tree, } -// /// Request soemthing by a username and room ID. -// pub struct UserAndRoom<'a>(pub &'a str, pub &'a str); - -// fn to_vec(value: &UserAndRoom<'_>) -> Vec { -// let mut bytes = vec![]; -// bytes.extend_from_slice(value.0.as_bytes()); -// bytes.push(0xfe); -// bytes.extend_from_slice(value.1.as_bytes()); -// bytes -// } - -// impl From> for Vec { -// fn from(value: UserAndRoom) -> Vec { -// to_vec(&value) -// } -// } - -// impl From<&UserAndRoom<'_>> for Vec { -// fn from(value: &UserAndRoom) -> Vec { -// to_vec(value) -// } -// } enum TxableTree<'a> { Tree(&'a Tree), Tx(&'a TransactionalTree), -- 2.40.1 From d7aaed9e001ff30b2b2fed9f0c23a115709bfb44 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Sat, 7 Nov 2020 20:47:49 +0000 Subject: [PATCH 04/11] Implement room clearing, log to info for member updates. --- src/bot/event_handlers.rs | 6 ++-- src/db/rooms.rs | 76 +++++++++++++++++++++++++++++++++++---- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/src/bot/event_handlers.rs b/src/bot/event_handlers.rs index 526bb64..f115a39 100644 --- a/src/bot/event_handlers.rs +++ b/src/bot/event_handlers.rs @@ -100,13 +100,13 @@ impl EventEmitter for DiceBot { let (room_id, username) = (room.room_id.as_str(), &room_member.state_key); let result = if event_affects_us && !adding_user { - debug!("Clearing all information for room ID {}", room_id); + info!("Clearing all information for room ID {}", room_id); self.db.rooms.clear_info(room_id) } else if !event_affects_us && adding_user { - debug!("Adding {} to room ID {}", username, room_id); + info!("Adding user {} to room ID {}", username, room_id); self.db.rooms.add_user_to_room(username, room_id) } else if !event_affects_us && !adding_user { - debug!("Removing {} from room ID {}", username, room_id); + info!("Removing user {} from room ID {}", username, room_id); self.db.rooms.remove_user_from_room(username, room_id) } else { debug!("Ignoring a room member event: {:#?}", room_member); diff --git a/src/db/rooms.rs b/src/db/rooms.rs index 9eb8249..c80db5a 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -17,6 +17,7 @@ pub struct Rooms { pub(in crate::db) username_roomids: Tree, } +#[derive(Clone, Copy)] enum TxableTree<'a> { Tree(&'a Tree), Tx(&'a TransactionalTree), @@ -45,6 +46,17 @@ fn get_set<'a, T: Into>>(tree: T, key: &[u8]) -> Result> + Copy>( + tree: T, + key: &[u8], + value_to_remove: &str, +) -> Result<(), DataError> { + let mut set = get_set(tree, key)?; + set.remove(value_to_remove); + insert_set(tree, key, set)?; + Ok(()) +} + fn insert_set<'a, T: Into>>( tree: T, key: &[u8], @@ -115,13 +127,25 @@ impl Rooms { Ok(()) } - pub fn clear_info(&self, _room_id: &str) -> Result<(), DataError> { - //TODO implement me - //when bot leaves a room, it must, atomically: - // - delete roominfo struct from room info tree. - // - load list of users it knows about in room. - // - remove room id from every user's list. (cannot reuse existing fn because atomicity) - // - delete list of users in room from tree. + pub fn clear_info(&self, room_id: &str) -> Result<(), DataError> { + (&self.username_roomids, &self.roomid_usernames).transaction( + |(tx_username_roomids, tx_roomid_usernames)| { + let roomid_key = room_id.as_bytes(); + let users_in_room = get_set(tx_roomid_usernames, roomid_key)?; + + //Remove the room ID from every user's room ID list. + for username in users_in_room { + remove_from_set(tx_username_roomids, username.as_bytes(), room_id)?; + } + + //Remove this room entry for the room ID -> username tree. + tx_roomid_usernames.remove(roomid_key)?; + + //TODO: delete roominfo struct from room info tree. + Ok(()) + }, + )?; + Ok(()) } } @@ -184,4 +208,42 @@ mod tests { assert_eq!(HashSet::new(), users_in_room); assert_eq!(HashSet::new(), rooms_for_user); } + + #[test] + fn clear_info() { + let rooms = create_test_instance(); + rooms + .add_user_to_room("testuser", "myroom1") + .expect("Could not add user to room1"); + + rooms + .add_user_to_room("testuser", "myroom2") + .expect("Could not add user to room2"); + + rooms + .clear_info("myroom1") + .expect("Could not clear room info"); + + let users_in_room1 = rooms + .get_users_in_room("myroom1") + .expect("Could not retrieve users in room1"); + + let users_in_room2 = rooms + .get_users_in_room("myroom2") + .expect("Could not retrieve users in room2"); + + let rooms_for_user = rooms + .get_rooms_for_user("testuser") + .expect("Could not get rooms for user"); + + let expected_users_in_room2: HashSet = + vec![String::from("testuser")].into_iter().collect(); + + let expected_rooms_for_user: HashSet = + vec![String::from("myroom2")].into_iter().collect(); + + assert_eq!(HashSet::new(), users_in_room1); + assert_eq!(expected_users_in_room2, users_in_room2); + assert_eq!(expected_rooms_for_user, rooms_for_user); + } } -- 2.40.1 From fb24090952fc8028af4975d7131c170463c110f4 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Mon, 9 Nov 2020 21:14:37 +0000 Subject: [PATCH 05/11] Keep seen events in database, don't process already-seen events. Adds a new function `should_process` to rooms impl that determines if calling could should proceed with processing an event ID. Event IDs are recorded (along with room ID) as a key pointing to the system-local timestamp of when the event was received. If the key was not originally present, we instruct calling code to process the event. Events are also asychronously recorded by timestamp using a sled event watcher that listens to inserts in the main tree (described above). This secondary tree will allow easy cleanup of old events in the future. --- src/db.rs | 10 +- src/db/errors.rs | 3 + src/db/rooms.rs | 268 +++++++++++++++++++++++++++++++++++++++-------- src/db/schema.rs | 21 +++- 4 files changed, 253 insertions(+), 49 deletions(-) diff --git a/src/db.rs b/src/db.rs index b667bd8..cf8c05b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -25,12 +25,18 @@ impl Database { fn new_db(db: sled::Db) -> Result { let migrations = db.open_tree("migrations")?; - Ok(Database { + let database = Database { db: db.clone(), variables: Variables::new(&db)?, migrations: Migrations(migrations), rooms: Rooms::new(&db)?, - }) + }; + + //Start any event handlers. + database.rooms.start_handler(); + + info!("Opened database."); + Ok(database) } pub fn new>(path: P) -> Result { diff --git a/src/db/errors.rs b/src/db/errors.rs index 63fd2a5..6cfe7f7 100644 --- a/src/db/errors.rs +++ b/src/db/errors.rs @@ -26,6 +26,9 @@ pub enum DataError { #[error("expected i32, but i32 schema was violated")] I32SchemaViolation, + #[error("unexpected or corruptd data bytes")] + InvalidValue, + #[error("expected string, but utf8 schema was violated: {0}")] Utf8chemaViolation(#[from] std::str::Utf8Error), diff --git a/src/db/rooms.rs b/src/db/rooms.rs index c80db5a..f953ef6 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -1,9 +1,16 @@ use crate::db::errors::DataError; +use crate::db::schema::convert_u64; +use byteorder::BigEndian; +use log::{error, log_enabled, trace}; use sled::transaction::TransactionalTree; use sled::Transactional; -use sled::Tree; +use sled::{CompareAndSwapError, Tree}; use std::collections::HashSet; use std::str; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::task::JoinHandle; +use zerocopy::byteorder::U64; +use zerocopy::AsBytes; #[derive(Clone)] pub struct Rooms { @@ -15,8 +22,23 @@ pub struct Rooms { /// Username -> list of room IDs user is in. pub(in crate::db) username_roomids: Tree, + + /// Room ID(str) 0xff event ID(str) -> timestamp. Records event + /// IDs that we have received, so we do not process twice. + pub(in crate::db) roomeventid_timestamp: Tree, + + /// Room ID(str) 0xff timestamp(u64) -> event ID. Records event + /// IDs with timestamp as the primary key instead. Exists to allow + /// easy scanning of old roomeventid_timestamp records for + /// removal. Be careful with u64, it can have 0xff and 0xfe bytes. + /// A simple split on 0xff will not work with this key. Instead, + /// it is meant to be split on the first 0xff byte only, and + /// separated into room ID and timestamp. + pub(in crate::db) roomtimestamp_eventid: Tree, } +/// An enum that can hold either a regular sled Tree, or a +/// Transactional tree. #[derive(Clone, Copy)] enum TxableTree<'a> { Tree(&'a Tree), @@ -35,39 +57,107 @@ impl<'a> Into> for &'a TransactionalTree { } } -fn get_set<'a, T: Into>>(tree: T, key: &[u8]) -> Result, DataError> { - let set: HashSet = match tree.into() { - TxableTree::Tree(tree) => tree.get(key)?, - TxableTree::Tx(tx) => tx.get(key)?, +/// A set of functions that can be used with a sled::Tree that stores +/// HashSets as its values. Atomicity is partially handled. If the +/// Tree is a transactional tree, operations will be atomic. +/// Otherwise, there is a potential non-atomic step. +mod hashset_tree { + use super::*; + + fn insert_set<'a, T: Into>>( + tree: T, + key: &[u8], + set: HashSet, + ) -> Result<(), DataError> { + let serialized = bincode::serialize(&set)?; + match tree.into() { + TxableTree::Tree(tree) => tree.insert(key, serialized)?, + TxableTree::Tx(tx) => tx.insert(key, serialized)?, + }; + Ok(()) } - .map(|bytes| bincode::deserialize::>(&bytes)) - .unwrap_or(Ok(HashSet::new()))?; - Ok(set) + pub(super) fn get_set<'a, T: Into>>( + tree: T, + key: &[u8], + ) -> Result, DataError> { + let set: HashSet = match tree.into() { + TxableTree::Tree(tree) => tree.get(key)?, + TxableTree::Tx(tx) => tx.get(key)?, + } + .map(|bytes| bincode::deserialize::>(&bytes)) + .unwrap_or(Ok(HashSet::new()))?; + + Ok(set) + } + + pub(super) fn remove_from_set<'a, T: Into> + Copy>( + tree: T, + key: &[u8], + value_to_remove: &str, + ) -> Result<(), DataError> { + let mut set = get_set(tree, key)?; + set.remove(value_to_remove); + insert_set(tree, key, set)?; + Ok(()) + } + + pub(super) fn add_to_set<'a, T: Into> + Copy>( + tree: T, + key: &[u8], + value_to_add: String, + ) -> Result<(), DataError> { + let mut set = get_set(tree, key)?; + set.insert(value_to_add); + insert_set(tree, key, set)?; + Ok(()) + } } -fn remove_from_set<'a, T: Into> + Copy>( - tree: T, - key: &[u8], - value_to_remove: &str, -) -> Result<(), DataError> { - let mut set = get_set(tree, key)?; - set.remove(value_to_remove); - insert_set(tree, key, set)?; - Ok(()) -} +/// Functions that specifically relate to the "timestamp index" tree, +/// which is stored on the Room sinstance as a tree called +/// roomtimestamp_eventid. Tightly coupled to the event watcher in the +/// Rooms impl, and only factored out for unit testing. +mod timestamp_index { + use super::*; -fn insert_set<'a, T: Into>>( - tree: T, - key: &[u8], - set: HashSet, -) -> Result<(), DataError> { - let serialized = bincode::serialize(&set)?; - match tree.into() { - TxableTree::Tree(tree) => tree.insert(key, serialized)?, - TxableTree::Tx(tx) => tx.insert(key, serialized)?, - }; - Ok(()) + /// Insert an entry from the main roomeventid_timestamp Tree into + /// the timestamp index. Keys in this Tree are stored as room ID + /// 0xff timestamp, with the value being a hashset of event IDs + /// received at the time. The parameters come from an insert to + /// that Tree, where the key is room ID 0xff event ID, and the + /// value is the timestamp. + pub(super) fn insert( + roomtimestamp_eventid: &Tree, + key: &[u8], + timestamp_bytes: &[u8], + ) -> Result<(), DataError> { + let parts: Vec<&[u8]> = key.split(|&b| b == 0xff).collect(); + if let [room_id, event_id] = parts[..] { + let mut ts_key = room_id.to_vec(); + ts_key.push(0xff); + ts_key.extend_from_slice(×tamp_bytes); + trace_index_record(room_id, event_id, ×tamp_bytes); + + let event_id = str::from_utf8(event_id)?; + hashset_tree::add_to_set(roomtimestamp_eventid, &ts_key, event_id.to_owned())?; + Ok(()) + } else { + Err(DataError::InvalidValue) + } + } + + /// Log a trace message. + fn trace_index_record(room_id: &[u8], event_id: &[u8], timestamp: &[u8]) { + if log_enabled!(log::Level::Trace) { + trace!( + "Recording event {} | {} received at {} in timestamp index.", + str::from_utf8(room_id).unwrap_or("[invalid room id]"), + str::from_utf8(event_id).unwrap_or("[invalid event id]"), + convert_u64(timestamp).unwrap_or(0) + ); + } + } } impl Rooms { @@ -76,29 +166,81 @@ impl Rooms { roomid_roominfo: db.open_tree("roomid_roominfo")?, roomid_usernames: db.open_tree("roomid_usernames")?, username_roomids: db.open_tree("username_roomids")?, + roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?, + roomtimestamp_eventid: db.open_tree("roomtimestamp_eventid")?, }) } + /// Start an event subscriber that listens for inserts made by the + /// `should_process` function. This event handler duplicates the + /// entry by timestamp instead of event ID. + pub(in crate::db) fn start_handler(&self) -> JoinHandle<()> { + //Clone due to lifetime requirements. + let roomeventid_timestamp = self.roomeventid_timestamp.clone(); + let roomtimestamp_eventid = self.roomtimestamp_eventid.clone(); + + tokio::spawn(async move { + let mut subscriber = roomeventid_timestamp.watch_prefix(b""); + + // TODO make this handler receive kill messages somehow so + // we can unit test it and gracefully shut it down. + while let Some(event) = (&mut subscriber).await { + if let sled::Event::Insert { key, value } = event { + match timestamp_index::insert(&roomtimestamp_eventid, &key, &value) { + Err(e) => { + error!("Unable to update the timestamp index: {}", e); + } + _ => (), + } + } + } + }) + } + + /// Determine if an event in a room should be processed. The event + /// is atomically recorded and true returned if the database has + /// not seen tis event yet. If the event already exists in the + /// database, the function returns false. Events are recorded by + /// this function by inserting the (system-local) timestamp in + /// epoch seconds. + pub fn should_process(&self, room_id: &str, event_id: &str) -> Result { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(event_id.as_bytes()); + + let timestamp: U64 = U64::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Clock has gone backwards") + .as_secs(), + ); + + match self.roomeventid_timestamp.compare_and_swap( + key, + None as Option<&[u8]>, + Some(timestamp.as_bytes()), + )? { + Ok(()) => Ok(true), + Err(CompareAndSwapError { .. }) => Ok(false), + } + } + pub fn get_rooms_for_user(&self, username: &str) -> Result, DataError> { - get_set(&self.username_roomids, username.as_bytes()) + hashset_tree::get_set(&self.username_roomids, username.as_bytes()) } pub fn get_users_in_room(&self, room_id: &str) -> Result, DataError> { - get_set(&self.roomid_usernames, room_id.as_bytes()) + hashset_tree::get_set(&self.roomid_usernames, room_id.as_bytes()) } pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_rooms, tx_room_usernames)| { let username_key = &username.as_bytes(); - let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; - user_to_rooms.insert(room_id.to_string()); - insert_set(tx_username_rooms, username_key, user_to_rooms)?; + hashset_tree::add_to_set(tx_username_rooms, username_key, room_id.to_owned())?; let roomid_key = &room_id.as_bytes(); - let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; - room_to_users.insert(username.to_string()); - insert_set(tx_room_usernames, roomid_key, room_to_users)?; + hashset_tree::add_to_set(tx_room_usernames, roomid_key, username.to_owned())?; Ok(()) }, @@ -111,14 +253,10 @@ impl Rooms { (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_rooms, tx_room_usernames)| { let username_key = &username.as_bytes(); - let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; - user_to_rooms.remove(room_id); - insert_set(tx_username_rooms, username_key, user_to_rooms)?; + hashset_tree::remove_from_set(tx_username_rooms, username_key, room_id)?; let roomid_key = &room_id.as_bytes(); - let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; - room_to_users.remove(username); - insert_set(tx_room_usernames, roomid_key, room_to_users)?; + hashset_tree::remove_from_set(tx_room_usernames, roomid_key, username)?; Ok(()) }, @@ -131,11 +269,15 @@ impl Rooms { (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_roomids, tx_roomid_usernames)| { let roomid_key = room_id.as_bytes(); - let users_in_room = get_set(tx_roomid_usernames, roomid_key)?; + let users_in_room = hashset_tree::get_set(tx_roomid_usernames, roomid_key)?; //Remove the room ID from every user's room ID list. for username in users_in_room { - remove_from_set(tx_username_roomids, username.as_bytes(), room_id)?; + hashset_tree::remove_from_set( + tx_username_roomids, + username.as_bytes(), + room_id, + )?; } //Remove this room entry for the room ID -> username tree. @@ -164,6 +306,7 @@ mod tests { #[test] fn add_user_to_room() { let rooms = create_test_instance(); + rooms .add_user_to_room("testuser", "myroom") .expect("Could not add user to room"); @@ -189,6 +332,7 @@ mod tests { #[test] fn remove_user_from_room() { let rooms = create_test_instance(); + rooms .add_user_to_room("testuser", "myroom") .expect("Could not add user to room"); @@ -212,6 +356,7 @@ mod tests { #[test] fn clear_info() { let rooms = create_test_instance(); + rooms .add_user_to_room("testuser", "myroom1") .expect("Could not add user to room1"); @@ -246,4 +391,37 @@ mod tests { assert_eq!(expected_users_in_room2, users_in_room2); assert_eq!(expected_rooms_for_user, rooms_for_user); } + + #[test] + fn insert_to_timestamp_index() { + let rooms = create_test_instance(); + + // Insertion into timestamp index based on data that would go + // into main room x eventID -> timestamp tree. + let mut key = b"myroom".to_vec(); + key.push(0xff); + key.extend_from_slice(b"myeventid"); + + let timestamp: U64 = U64::new(1000); + + let result = timestamp_index::insert( + &rooms.roomtimestamp_eventid, + key.as_bytes(), + timestamp.as_bytes(), + ); + + assert!(result.is_ok()); + + // Retrieval of data from the timestamp index tree. + let mut ts_key = b"myroom".to_vec(); + ts_key.push(0xff); + ts_key.extend_from_slice(timestamp.as_bytes()); + + let expected_events: HashSet = + vec![String::from("myeventid")].into_iter().collect(); + + let event_ids = hashset_tree::get_set(&rooms.roomtimestamp_eventid, &ts_key) + .expect("Could not get set out of Tree"); + assert_eq!(expected_events, event_ids); + } } diff --git a/src/db/schema.rs b/src/db/schema.rs index 61eec0a..35a31ef 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -1,6 +1,6 @@ use crate::db::errors::DataError; -use byteorder::LittleEndian; -use zerocopy::byteorder::{I32, U32}; +use byteorder::{BigEndian, LittleEndian}; +use zerocopy::byteorder::{I32, U32, U64}; use zerocopy::LayoutVerified; /// User variables are stored as little-endian 32-bit integers in the @@ -10,6 +10,11 @@ type LittleEndianI32Layout<'a> = LayoutVerified<&'a [u8], I32>; type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32>; +#[allow(dead_code)] +type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64>; + +type BigEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64>; + /// Convert bytes to an i32 with zero-copy deserialization. An error /// is returned if the bytes do not represent an i32. pub(super) fn convert_i32(raw_value: &[u8]) -> Result { @@ -33,3 +38,15 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result { Err(DataError::I32SchemaViolation) } } + +#[allow(dead_code)] +pub(super) fn convert_u64(raw_value: &[u8]) -> Result { + let layout = BigEndianU64Layout::new_unaligned(raw_value.as_ref()); + + if let Some(layout) = layout { + let value: U64 = *layout; + Ok(value.get()) + } else { + Err(DataError::I32SchemaViolation) + } +} -- 2.40.1 From 74d0b88e807299ba5c25ce9011cc32364a25bff1 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Mon, 9 Nov 2020 21:13:47 +0000 Subject: [PATCH 06/11] Add should process check to room member event --- src/bot/event_handlers.rs | 46 +++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/src/bot/event_handlers.rs b/src/bot/event_handlers.rs index f115a39..3068560 100644 --- a/src/bot/event_handlers.rs +++ b/src/bot/event_handlers.rs @@ -1,10 +1,11 @@ +use crate::db::Database; use crate::error::BotError; use async_trait::async_trait; use log::{debug, error, info, warn}; use matrix_sdk::{ self, events::{ - room::member::{MemberEventContent, MembershipState}, + room::member::{MemberEventContent, MembershipChange}, room::message::{MessageEventContent, TextMessageEventContent}, StrippedStateEvent, SyncMessageEvent, SyncStateEvent, }, @@ -73,32 +74,45 @@ async fn should_process<'a>( Ok((msg_body, sender_username)) } +fn should_process_event(db: &Database, room_id: &str, event_id: &str) -> bool { + db.rooms + .should_process(room_id, event_id) + .unwrap_or_else(|e| { + error!( + "Database error when checking if we should process an event: {}", + e.to_string() + ); + false + }) +} + /// This event emitter listens for messages with dice rolling commands. /// Originally adapted from the matrix-rust-sdk examples. #[async_trait] impl EventEmitter for DiceBot { - async fn on_room_member( - &self, - room: SyncRoom, - room_member: &SyncStateEvent, - ) { - if let SyncRoom::Joined(room) = room { + async fn on_room_member(&self, room: SyncRoom, event: &SyncStateEvent) { + if let SyncRoom::Joined(room) | SyncRoom::Left(room) = room { + //Clone to avoid holding lock. + let room = room.read().await.clone(); + let (room_id, username) = (room.room_id.as_str(), &event.state_key); + + if !should_process_event(&self.db, room_id, event.event_id.as_str()) { + return; + } + let event_affects_us = if let Some(our_user_id) = self.client.user_id().await { - room_member.state_key == our_user_id + event.state_key == our_user_id } else { false }; - let adding_user = match room_member.content.membership { - MembershipState::Join => true, - MembershipState::Leave | MembershipState::Ban => false, + use MembershipChange::*; + let adding_user = match event.membership_change() { + Joined => true, + Banned | Left | Kicked | KickedAndBanned => false, _ => return, }; - //Clone to avoid holding lock. - let room = room.read().await.clone(); - let (room_id, username) = (room.room_id.as_str(), &room_member.state_key); - let result = if event_affects_us && !adding_user { info!("Clearing all information for room ID {}", room_id); self.db.rooms.clear_info(room_id) @@ -109,7 +123,7 @@ impl EventEmitter for DiceBot { info!("Removing user {} from room ID {}", username, room_id); self.db.rooms.remove_user_from_room(username, room_id) } else { - debug!("Ignoring a room member event: {:#?}", room_member); + debug!("Ignoring a room member event: {:#?}", event); Ok(()) }; -- 2.40.1 From 9349dd5f00c9724fb76ed6817b15fa8eac3b76c1 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Mon, 9 Nov 2020 21:05:44 +0000 Subject: [PATCH 07/11] Add event processing check to received messages. Also rename the existing `should_process` function to be more clear, given presence of another similarly named function: should_process_message. --- src/bot/event_handlers.rs | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/bot/event_handlers.rs b/src/bot/event_handlers.rs index 3068560..a8af648 100644 --- a/src/bot/event_handlers.rs +++ b/src/bot/event_handlers.rs @@ -41,7 +41,13 @@ fn check_message_age( } } -async fn should_process<'a>( +/// Determine whether or not to process a received message. This check +/// is necessary in addition to the event processing check because we +/// may receive message events when entering a room for the first +/// time, and we don't want to respond to things before the bot was in +/// the channel, but we do want to respond to things that were sent if +/// the bot left and rejoined quickly. +async fn should_process_message<'a>( bot: &DiceBot, event: &SyncMessageEvent, ) -> Result<(String, String), BotError> { @@ -138,12 +144,12 @@ impl EventEmitter for DiceBot { async fn on_stripped_state_member( &self, room: SyncRoom, - room_member: &StrippedStateEvent, + event: &StrippedStateEvent, _: Option, ) { if let SyncRoom::Invited(room) = room { if let Some(user_id) = self.client.user_id().await { - if room_member.state_key != user_id { + if event.state_key != user_id { return; } } @@ -160,17 +166,22 @@ impl EventEmitter for DiceBot { async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent) { if let SyncRoom::Joined(room) = room { - let (msg_body, sender_username) = - if let Ok((msg_body, sender_username)) = should_process(self, &event).await { - (msg_body, sender_username) - } else { - return; - }; + //Clone to avoid holding lock. + let room = room.read().await.clone(); + let room_id = room.room_id.as_str(); + if !should_process_event(&self.db, room_id, event.event_id.as_str()) { + return; + } - //we clone here to hold the lock for as little time as possible. - let real_room = room.read().await.clone(); + let (msg_body, sender_username) = if let Ok((msg_body, sender_username)) = + should_process_message(self, &event).await + { + (msg_body, sender_username) + } else { + return; + }; - self.execute_commands(&real_room, &sender_username, &msg_body) + self.execute_commands(&room, &sender_username, &msg_body) .await; } } -- 2.40.1 From 0939feee844a01624ecf9340568f99b4bd626edd Mon Sep 17 00:00:00 2001 From: projectmoon Date: Mon, 9 Nov 2020 21:14:15 +0000 Subject: [PATCH 08/11] Placeholder to record all user info when joining room --- src/bot/event_handlers.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/bot/event_handlers.rs b/src/bot/event_handlers.rs index a8af648..809e1be 100644 --- a/src/bot/event_handlers.rs +++ b/src/bot/event_handlers.rs @@ -122,6 +122,9 @@ impl EventEmitter for DiceBot { let result = if event_affects_us && !adding_user { info!("Clearing all information for room ID {}", room_id); self.db.rooms.clear_info(room_id) + } else if event_affects_us && adding_user { + info!("Joined room {}; recording user information", room_id); + Ok(()) } else if !event_affects_us && adding_user { info!("Adding user {} to room ID {}", username, room_id); self.db.rooms.add_user_to_room(username, room_id) -- 2.40.1 From 9ed2a81dd3f2ee673716e69044cefb9e5c8ad540 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Tue, 10 Nov 2020 20:18:00 +0000 Subject: [PATCH 09/11] Record all users in room when joining. --- src/bot/event_handlers.rs | 23 +++++++++++++++++++++-- src/db/rooms.rs | 9 +++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/bot/event_handlers.rs b/src/bot/event_handlers.rs index 809e1be..6b1b4c5 100644 --- a/src/bot/event_handlers.rs +++ b/src/bot/event_handlers.rs @@ -9,7 +9,8 @@ use matrix_sdk::{ room::message::{MessageEventContent, TextMessageEventContent}, StrippedStateEvent, SyncMessageEvent, SyncStateEvent, }, - EventEmitter, SyncRoom, + identifiers::RoomId, + Client, EventEmitter, SyncRoom, }; //use matrix_sdk_common_macros::async_trait; use super::DiceBot; @@ -92,6 +93,19 @@ fn should_process_event(db: &Database, room_id: &str, event_id: &str) -> bool { }) } +async fn get_users_in_room(client: &Client, room_id: &RoomId) -> Vec { + if let Some(joined_room) = client.get_joined_room(room_id).await { + let joined_room: matrix_sdk::Room = joined_room.read().await.clone(); + joined_room + .joined_members + .keys() + .map(|user_id| format!("@{}:{}", user_id.localpart(), user_id.server_name())) + .collect() + } else { + vec![] + } +} + /// This event emitter listens for messages with dice rolling commands. /// Originally adapted from the matrix-rust-sdk examples. #[async_trait] @@ -124,7 +138,12 @@ impl EventEmitter for DiceBot { self.db.rooms.clear_info(room_id) } else if event_affects_us && adding_user { info!("Joined room {}; recording user information", room_id); - Ok(()) + let usernames = get_users_in_room(&self.client, &room.room_id).await; + usernames + .into_iter() + .filter(|username| username != &event.state_key) + .map(|username| self.db.rooms.add_user_to_room(&username, room_id)) + .collect() //Make use of collect impl on Result. } else if !event_affects_us && adding_user { info!("Adding user {} to room ID {}", username, room_id); self.db.rooms.add_user_to_room(username, room_id) diff --git a/src/db/rooms.rs b/src/db/rooms.rs index f953ef6..0dd37fe 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -1,7 +1,7 @@ use crate::db::errors::DataError; use crate::db::schema::convert_u64; use byteorder::BigEndian; -use log::{error, log_enabled, trace}; +use log::{debug, error, log_enabled, trace}; use sled::transaction::TransactionalTree; use sled::Transactional; use sled::{CompareAndSwapError, Tree}; @@ -134,12 +134,14 @@ mod timestamp_index { ) -> Result<(), DataError> { let parts: Vec<&[u8]> = key.split(|&b| b == 0xff).collect(); if let [room_id, event_id] = parts[..] { + let event_id = str::from_utf8(event_id)?; + debug!("Adding event ID {} to timestamp index", event_id); + let mut ts_key = room_id.to_vec(); ts_key.push(0xff); ts_key.extend_from_slice(×tamp_bytes); trace_index_record(room_id, event_id, ×tamp_bytes); - let event_id = str::from_utf8(event_id)?; hashset_tree::add_to_set(roomtimestamp_eventid, &ts_key, event_id.to_owned())?; Ok(()) } else { @@ -234,6 +236,7 @@ impl Rooms { } pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { + debug!("Adding user {} to room {}", username, room_id); (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_rooms, tx_room_usernames)| { let username_key = &username.as_bytes(); @@ -250,6 +253,7 @@ impl Rooms { } pub fn remove_user_from_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { + debug!("Removing user {} from room {}", username, room_id); (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_rooms, tx_room_usernames)| { let username_key = &username.as_bytes(); @@ -266,6 +270,7 @@ impl Rooms { } pub fn clear_info(&self, room_id: &str) -> Result<(), DataError> { + debug!("Clearing all information for room {}", room_id); (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_roomids, tx_roomid_usernames)| { let roomid_key = room_id.as_bytes(); -- 2.40.1 From 551f21a49a5653481b79a1895b5ef80f67d83c3d Mon Sep 17 00:00:00 2001 From: projectmoon Date: Tue, 10 Nov 2020 20:22:18 +0000 Subject: [PATCH 10/11] Fix minor typo in rooms db code. --- src/db/rooms.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/rooms.rs b/src/db/rooms.rs index 0dd37fe..d86e8e5 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -115,7 +115,7 @@ mod hashset_tree { } /// Functions that specifically relate to the "timestamp index" tree, -/// which is stored on the Room sinstance as a tree called +/// which is stored on the Rooms instance as a tree called /// roomtimestamp_eventid. Tightly coupled to the event watcher in the /// Rooms impl, and only factored out for unit testing. mod timestamp_index { -- 2.40.1 From 0e04e67f6e6280f5b5f4846100920654c4b5154c Mon Sep 17 00:00:00 2001 From: projectmoon Date: Tue, 10 Nov 2020 20:35:27 +0000 Subject: [PATCH 11/11] Log debug instead of trace for timestamp index inserts. --- src/db/rooms.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/db/rooms.rs b/src/db/rooms.rs index d86e8e5..02a80ea 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -1,7 +1,7 @@ use crate::db::errors::DataError; use crate::db::schema::convert_u64; use byteorder::BigEndian; -use log::{debug, error, log_enabled, trace}; +use log::{debug, error, log_enabled}; use sled::transaction::TransactionalTree; use sled::Transactional; use sled::{CompareAndSwapError, Tree}; @@ -134,14 +134,12 @@ mod timestamp_index { ) -> Result<(), DataError> { let parts: Vec<&[u8]> = key.split(|&b| b == 0xff).collect(); if let [room_id, event_id] = parts[..] { - let event_id = str::from_utf8(event_id)?; - debug!("Adding event ID {} to timestamp index", event_id); - let mut ts_key = room_id.to_vec(); ts_key.push(0xff); ts_key.extend_from_slice(×tamp_bytes); - trace_index_record(room_id, event_id, ×tamp_bytes); + log_index_record(room_id, event_id, ×tamp_bytes); + let event_id = str::from_utf8(event_id)?; hashset_tree::add_to_set(roomtimestamp_eventid, &ts_key, event_id.to_owned())?; Ok(()) } else { @@ -149,10 +147,10 @@ mod timestamp_index { } } - /// Log a trace message. - fn trace_index_record(room_id: &[u8], event_id: &[u8], timestamp: &[u8]) { - if log_enabled!(log::Level::Trace) { - trace!( + /// Log a debug message. + fn log_index_record(room_id: &[u8], event_id: &[u8], timestamp: &[u8]) { + if log_enabled!(log::Level::Debug) { + debug!( "Recording event {} | {} received at {} in timestamp index.", str::from_utf8(room_id).unwrap_or("[invalid room id]"), str::from_utf8(event_id).unwrap_or("[invalid event id]"), -- 2.40.1