Compare commits

..

4 Commits

Author SHA1 Message Date
projectmoon 0939feee84 Placeholder to record all user info when joining room
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details
2020-11-09 21:17:03 +00:00
projectmoon 9349dd5f00 Add event processing check to received messages.
Also rename the existing `should_process` function to be more clear,
given presence of another similarly named function:
should_process_message.
2020-11-09 21:16:20 +00:00
projectmoon 74d0b88e80 Add should process check to room member event 2020-11-09 21:16:07 +00:00
projectmoon fb24090952 Keep seen events in database, don't process already-seen events.
Adds a new function `should_process` to rooms impl that determines if
calling could should proceed with processing an event ID. Event IDs
are recorded (along with room ID) as a key pointing to the
system-local timestamp of when the event was received. If the key was
not originally present, we instruct calling code to process the event.

Events are also asychronously recorded by timestamp using a sled event
watcher that listens to inserts in the main tree (described above).
This secondary tree will allow easy cleanup of old events in the
future.
2020-11-09 21:16:07 +00:00
5 changed files with 228 additions and 64 deletions

View File

@ -41,7 +41,13 @@ fn check_message_age(
} }
} }
async fn should_process<'a>( /// Determine whether or not to process a received message. This check
/// is necessary in addition to the event processing check because we
/// may receive message events when entering a room for the first
/// time, and we don't want to respond to things before the bot was in
/// the channel, but we do want to respond to things that were sent if
/// the bot left and rejoined quickly.
async fn should_process_message<'a>(
bot: &DiceBot, bot: &DiceBot,
event: &SyncMessageEvent<MessageEventContent>, event: &SyncMessageEvent<MessageEventContent>,
) -> Result<(String, String), BotError> { ) -> Result<(String, String), BotError> {
@ -141,12 +147,12 @@ impl EventEmitter for DiceBot {
async fn on_stripped_state_member( async fn on_stripped_state_member(
&self, &self,
room: SyncRoom, room: SyncRoom,
room_member: &StrippedStateEvent<MemberEventContent>, event: &StrippedStateEvent<MemberEventContent>,
_: Option<MemberEventContent>, _: Option<MemberEventContent>,
) { ) {
if let SyncRoom::Invited(room) = room { if let SyncRoom::Invited(room) = room {
if let Some(user_id) = self.client.user_id().await { if let Some(user_id) = self.client.user_id().await {
if room_member.state_key != user_id { if event.state_key != user_id {
return; return;
} }
} }
@ -163,17 +169,22 @@ impl EventEmitter for DiceBot {
async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent<MessageEventContent>) { async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent<MessageEventContent>) {
if let SyncRoom::Joined(room) = room { if let SyncRoom::Joined(room) = room {
let (msg_body, sender_username) = //Clone to avoid holding lock.
if let Ok((msg_body, sender_username)) = should_process(self, &event).await { let room = room.read().await.clone();
(msg_body, sender_username) let room_id = room.room_id.as_str();
} else { if !should_process_event(&self.db, room_id, event.event_id.as_str()) {
return; return;
}; }
//we clone here to hold the lock for as little time as possible. let (msg_body, sender_username) = if let Ok((msg_body, sender_username)) =
let real_room = room.read().await.clone(); should_process_message(self, &event).await
{
(msg_body, sender_username)
} else {
return;
};
self.execute_commands(&real_room, &sender_username, &msg_body) self.execute_commands(&room, &sender_username, &msg_body)
.await; .await;
} }
} }

View File

@ -25,12 +25,18 @@ impl Database {
fn new_db(db: sled::Db) -> Result<Database, DataError> { fn new_db(db: sled::Db) -> Result<Database, DataError> {
let migrations = db.open_tree("migrations")?; let migrations = db.open_tree("migrations")?;
Ok(Database { let database = Database {
db: db.clone(), db: db.clone(),
variables: Variables::new(&db)?, variables: Variables::new(&db)?,
migrations: Migrations(migrations), migrations: Migrations(migrations),
rooms: Rooms::new(&db)?, rooms: Rooms::new(&db)?,
}) };
//Start any event handlers.
database.rooms.start_handler();
info!("Opened database.");
Ok(database)
} }
pub fn new<P: AsRef<Path>>(path: P) -> Result<Database, DataError> { pub fn new<P: AsRef<Path>>(path: P) -> Result<Database, DataError> {

View File

@ -26,6 +26,9 @@ pub enum DataError {
#[error("expected i32, but i32 schema was violated")] #[error("expected i32, but i32 schema was violated")]
I32SchemaViolation, I32SchemaViolation,
#[error("unexpected or corruptd data bytes")]
InvalidValue,
#[error("expected string, but utf8 schema was violated: {0}")] #[error("expected string, but utf8 schema was violated: {0}")]
Utf8chemaViolation(#[from] std::str::Utf8Error), Utf8chemaViolation(#[from] std::str::Utf8Error),

View File

@ -1,11 +1,14 @@
use crate::db::errors::DataError; use crate::db::errors::DataError;
use byteorder::LittleEndian; use crate::db::schema::convert_u64;
use byteorder::BigEndian;
use log::{error, log_enabled, trace};
use sled::transaction::TransactionalTree; use sled::transaction::TransactionalTree;
use sled::Transactional; use sled::Transactional;
use sled::{CompareAndSwapError, Tree}; use sled::{CompareAndSwapError, Tree};
use std::collections::HashSet; use std::collections::HashSet;
use std::str; use std::str;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
use zerocopy::byteorder::U64; use zerocopy::byteorder::U64;
use zerocopy::AsBytes; use zerocopy::AsBytes;
@ -20,10 +23,22 @@ pub struct Rooms {
/// Username -> list of room IDs user is in. /// Username -> list of room IDs user is in.
pub(in crate::db) username_roomids: Tree, pub(in crate::db) username_roomids: Tree,
/// Room ID 0xff event ID -> received timestamp. /// Room ID(str) 0xff event ID(str) -> timestamp. Records event
/// IDs that we have received, so we do not process twice.
pub(in crate::db) roomeventid_timestamp: Tree, pub(in crate::db) roomeventid_timestamp: Tree,
/// Room ID(str) 0xff timestamp(u64) -> event ID. Records event
/// IDs with timestamp as the primary key instead. Exists to allow
/// easy scanning of old roomeventid_timestamp records for
/// removal. Be careful with u64, it can have 0xff and 0xfe bytes.
/// A simple split on 0xff will not work with this key. Instead,
/// it is meant to be split on the first 0xff byte only, and
/// separated into room ID and timestamp.
pub(in crate::db) roomtimestamp_eventid: Tree,
} }
/// An enum that can hold either a regular sled Tree, or a
/// Transactional tree.
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
enum TxableTree<'a> { enum TxableTree<'a> {
Tree(&'a Tree), Tree(&'a Tree),
@ -42,39 +57,107 @@ impl<'a> Into<TxableTree<'a>> for &'a TransactionalTree {
} }
} }
fn get_set<'a, T: Into<TxableTree<'a>>>(tree: T, key: &[u8]) -> Result<HashSet<String>, DataError> { /// A set of functions that can be used with a sled::Tree that stores
let set: HashSet<String> = match tree.into() { /// HashSets as its values. Atomicity is partially handled. If the
TxableTree::Tree(tree) => tree.get(key)?, /// Tree is a transactional tree, operations will be atomic.
TxableTree::Tx(tx) => tx.get(key)?, /// Otherwise, there is a potential non-atomic step.
mod hashset_tree {
use super::*;
fn insert_set<'a, T: Into<TxableTree<'a>>>(
tree: T,
key: &[u8],
set: HashSet<String>,
) -> Result<(), DataError> {
let serialized = bincode::serialize(&set)?;
match tree.into() {
TxableTree::Tree(tree) => tree.insert(key, serialized)?,
TxableTree::Tx(tx) => tx.insert(key, serialized)?,
};
Ok(())
} }
.map(|bytes| bincode::deserialize::<HashSet<String>>(&bytes))
.unwrap_or(Ok(HashSet::new()))?;
Ok(set) pub(super) fn get_set<'a, T: Into<TxableTree<'a>>>(
tree: T,
key: &[u8],
) -> Result<HashSet<String>, DataError> {
let set: HashSet<String> = match tree.into() {
TxableTree::Tree(tree) => tree.get(key)?,
TxableTree::Tx(tx) => tx.get(key)?,
}
.map(|bytes| bincode::deserialize::<HashSet<String>>(&bytes))
.unwrap_or(Ok(HashSet::new()))?;
Ok(set)
}
pub(super) fn remove_from_set<'a, T: Into<TxableTree<'a>> + Copy>(
tree: T,
key: &[u8],
value_to_remove: &str,
) -> Result<(), DataError> {
let mut set = get_set(tree, key)?;
set.remove(value_to_remove);
insert_set(tree, key, set)?;
Ok(())
}
pub(super) fn add_to_set<'a, T: Into<TxableTree<'a>> + Copy>(
tree: T,
key: &[u8],
value_to_add: String,
) -> Result<(), DataError> {
let mut set = get_set(tree, key)?;
set.insert(value_to_add);
insert_set(tree, key, set)?;
Ok(())
}
} }
fn remove_from_set<'a, T: Into<TxableTree<'a>> + Copy>( /// Functions that specifically relate to the "timestamp index" tree,
tree: T, /// which is stored on the Room sinstance as a tree called
key: &[u8], /// roomtimestamp_eventid. Tightly coupled to the event watcher in the
value_to_remove: &str, /// Rooms impl, and only factored out for unit testing.
) -> Result<(), DataError> { mod timestamp_index {
let mut set = get_set(tree, key)?; use super::*;
set.remove(value_to_remove);
insert_set(tree, key, set)?;
Ok(())
}
fn insert_set<'a, T: Into<TxableTree<'a>>>( /// Insert an entry from the main roomeventid_timestamp Tree into
tree: T, /// the timestamp index. Keys in this Tree are stored as room ID
key: &[u8], /// 0xff timestamp, with the value being a hashset of event IDs
set: HashSet<String>, /// received at the time. The parameters come from an insert to
) -> Result<(), DataError> { /// that Tree, where the key is room ID 0xff event ID, and the
let serialized = bincode::serialize(&set)?; /// value is the timestamp.
match tree.into() { pub(super) fn insert(
TxableTree::Tree(tree) => tree.insert(key, serialized)?, roomtimestamp_eventid: &Tree,
TxableTree::Tx(tx) => tx.insert(key, serialized)?, key: &[u8],
}; timestamp_bytes: &[u8],
Ok(()) ) -> Result<(), DataError> {
let parts: Vec<&[u8]> = key.split(|&b| b == 0xff).collect();
if let [room_id, event_id] = parts[..] {
let mut ts_key = room_id.to_vec();
ts_key.push(0xff);
ts_key.extend_from_slice(&timestamp_bytes);
trace_index_record(room_id, event_id, &timestamp_bytes);
let event_id = str::from_utf8(event_id)?;
hashset_tree::add_to_set(roomtimestamp_eventid, &ts_key, event_id.to_owned())?;
Ok(())
} else {
Err(DataError::InvalidValue)
}
}
/// Log a trace message.
fn trace_index_record(room_id: &[u8], event_id: &[u8], timestamp: &[u8]) {
if log_enabled!(log::Level::Trace) {
trace!(
"Recording event {} | {} received at {} in timestamp index.",
str::from_utf8(room_id).unwrap_or("[invalid room id]"),
str::from_utf8(event_id).unwrap_or("[invalid event id]"),
convert_u64(timestamp).unwrap_or(0)
);
}
}
} }
impl Rooms { impl Rooms {
@ -84,6 +167,33 @@ impl Rooms {
roomid_usernames: db.open_tree("roomid_usernames")?, roomid_usernames: db.open_tree("roomid_usernames")?,
username_roomids: db.open_tree("username_roomids")?, username_roomids: db.open_tree("username_roomids")?,
roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?, roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?,
roomtimestamp_eventid: db.open_tree("roomtimestamp_eventid")?,
})
}
/// Start an event subscriber that listens for inserts made by the
/// `should_process` function. This event handler duplicates the
/// entry by timestamp instead of event ID.
pub(in crate::db) fn start_handler(&self) -> JoinHandle<()> {
//Clone due to lifetime requirements.
let roomeventid_timestamp = self.roomeventid_timestamp.clone();
let roomtimestamp_eventid = self.roomtimestamp_eventid.clone();
tokio::spawn(async move {
let mut subscriber = roomeventid_timestamp.watch_prefix(b"");
// TODO make this handler receive kill messages somehow so
// we can unit test it and gracefully shut it down.
while let Some(event) = (&mut subscriber).await {
if let sled::Event::Insert { key, value } = event {
match timestamp_index::insert(&roomtimestamp_eventid, &key, &value) {
Err(e) => {
error!("Unable to update the timestamp index: {}", e);
}
_ => (),
}
}
}
}) })
} }
@ -98,7 +208,7 @@ impl Rooms {
key.push(0xff); key.push(0xff);
key.extend_from_slice(event_id.as_bytes()); key.extend_from_slice(event_id.as_bytes());
let timestamp: U64<LittleEndian> = U64::new( let timestamp: U64<BigEndian> = U64::new(
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("Clock has gone backwards") .expect("Clock has gone backwards")
@ -116,25 +226,21 @@ impl Rooms {
} }
pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> { pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> {
get_set(&self.username_roomids, username.as_bytes()) hashset_tree::get_set(&self.username_roomids, username.as_bytes())
} }
pub fn get_users_in_room(&self, room_id: &str) -> Result<HashSet<String>, DataError> { pub fn get_users_in_room(&self, room_id: &str) -> Result<HashSet<String>, DataError> {
get_set(&self.roomid_usernames, room_id.as_bytes()) hashset_tree::get_set(&self.roomid_usernames, room_id.as_bytes())
} }
pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> {
(&self.username_roomids, &self.roomid_usernames).transaction( (&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_rooms, tx_room_usernames)| { |(tx_username_rooms, tx_room_usernames)| {
let username_key = &username.as_bytes(); let username_key = &username.as_bytes();
let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; hashset_tree::add_to_set(tx_username_rooms, username_key, room_id.to_owned())?;
user_to_rooms.insert(room_id.to_string());
insert_set(tx_username_rooms, username_key, user_to_rooms)?;
let roomid_key = &room_id.as_bytes(); let roomid_key = &room_id.as_bytes();
let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; hashset_tree::add_to_set(tx_room_usernames, roomid_key, username.to_owned())?;
room_to_users.insert(username.to_string());
insert_set(tx_room_usernames, roomid_key, room_to_users)?;
Ok(()) Ok(())
}, },
@ -147,14 +253,10 @@ impl Rooms {
(&self.username_roomids, &self.roomid_usernames).transaction( (&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_rooms, tx_room_usernames)| { |(tx_username_rooms, tx_room_usernames)| {
let username_key = &username.as_bytes(); let username_key = &username.as_bytes();
let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; hashset_tree::remove_from_set(tx_username_rooms, username_key, room_id)?;
user_to_rooms.remove(room_id);
insert_set(tx_username_rooms, username_key, user_to_rooms)?;
let roomid_key = &room_id.as_bytes(); let roomid_key = &room_id.as_bytes();
let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; hashset_tree::remove_from_set(tx_room_usernames, roomid_key, username)?;
room_to_users.remove(username);
insert_set(tx_room_usernames, roomid_key, room_to_users)?;
Ok(()) Ok(())
}, },
@ -167,11 +269,15 @@ impl Rooms {
(&self.username_roomids, &self.roomid_usernames).transaction( (&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_roomids, tx_roomid_usernames)| { |(tx_username_roomids, tx_roomid_usernames)| {
let roomid_key = room_id.as_bytes(); let roomid_key = room_id.as_bytes();
let users_in_room = get_set(tx_roomid_usernames, roomid_key)?; let users_in_room = hashset_tree::get_set(tx_roomid_usernames, roomid_key)?;
//Remove the room ID from every user's room ID list. //Remove the room ID from every user's room ID list.
for username in users_in_room { for username in users_in_room {
remove_from_set(tx_username_roomids, username.as_bytes(), room_id)?; hashset_tree::remove_from_set(
tx_username_roomids,
username.as_bytes(),
room_id,
)?;
} }
//Remove this room entry for the room ID -> username tree. //Remove this room entry for the room ID -> username tree.
@ -200,6 +306,7 @@ mod tests {
#[test] #[test]
fn add_user_to_room() { fn add_user_to_room() {
let rooms = create_test_instance(); let rooms = create_test_instance();
rooms rooms
.add_user_to_room("testuser", "myroom") .add_user_to_room("testuser", "myroom")
.expect("Could not add user to room"); .expect("Could not add user to room");
@ -225,6 +332,7 @@ mod tests {
#[test] #[test]
fn remove_user_from_room() { fn remove_user_from_room() {
let rooms = create_test_instance(); let rooms = create_test_instance();
rooms rooms
.add_user_to_room("testuser", "myroom") .add_user_to_room("testuser", "myroom")
.expect("Could not add user to room"); .expect("Could not add user to room");
@ -248,6 +356,7 @@ mod tests {
#[test] #[test]
fn clear_info() { fn clear_info() {
let rooms = create_test_instance(); let rooms = create_test_instance();
rooms rooms
.add_user_to_room("testuser", "myroom1") .add_user_to_room("testuser", "myroom1")
.expect("Could not add user to room1"); .expect("Could not add user to room1");
@ -282,4 +391,37 @@ mod tests {
assert_eq!(expected_users_in_room2, users_in_room2); assert_eq!(expected_users_in_room2, users_in_room2);
assert_eq!(expected_rooms_for_user, rooms_for_user); assert_eq!(expected_rooms_for_user, rooms_for_user);
} }
#[test]
fn insert_to_timestamp_index() {
let rooms = create_test_instance();
// Insertion into timestamp index based on data that would go
// into main room x eventID -> timestamp tree.
let mut key = b"myroom".to_vec();
key.push(0xff);
key.extend_from_slice(b"myeventid");
let timestamp: U64<BigEndian> = U64::new(1000);
let result = timestamp_index::insert(
&rooms.roomtimestamp_eventid,
key.as_bytes(),
timestamp.as_bytes(),
);
assert!(result.is_ok());
// Retrieval of data from the timestamp index tree.
let mut ts_key = b"myroom".to_vec();
ts_key.push(0xff);
ts_key.extend_from_slice(timestamp.as_bytes());
let expected_events: HashSet<String> =
vec![String::from("myeventid")].into_iter().collect();
let event_ids = hashset_tree::get_set(&rooms.roomtimestamp_eventid, &ts_key)
.expect("Could not get set out of Tree");
assert_eq!(expected_events, event_ids);
}
} }

View File

@ -1,5 +1,5 @@
use crate::db::errors::DataError; use crate::db::errors::DataError;
use byteorder::LittleEndian; use byteorder::{BigEndian, LittleEndian};
use zerocopy::byteorder::{I32, U32, U64}; use zerocopy::byteorder::{I32, U32, U64};
use zerocopy::LayoutVerified; use zerocopy::LayoutVerified;
@ -13,6 +13,8 @@ type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>;
#[allow(dead_code)] #[allow(dead_code)]
type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<LittleEndian>>; type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<LittleEndian>>;
type BigEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<BigEndian>>;
/// Convert bytes to an i32 with zero-copy deserialization. An error /// Convert bytes to an i32 with zero-copy deserialization. An error
/// is returned if the bytes do not represent an i32. /// is returned if the bytes do not represent an i32.
pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> { pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> {
@ -39,10 +41,10 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result<u32, DataError> {
#[allow(dead_code)] #[allow(dead_code)]
pub(super) fn convert_u64(raw_value: &[u8]) -> Result<u64, DataError> { pub(super) fn convert_u64(raw_value: &[u8]) -> Result<u64, DataError> {
let layout = LittleEndianU64Layout::new_unaligned(raw_value.as_ref()); let layout = BigEndianU64Layout::new_unaligned(raw_value.as_ref());
if let Some(layout) = layout { if let Some(layout) = layout {
let value: U64<LittleEndian> = *layout; let value: U64<BigEndian> = *layout;
Ok(value.get()) Ok(value.get())
} else { } else {
Err(DataError::I32SchemaViolation) Err(DataError::I32SchemaViolation)