diff --git a/src/db.rs b/src/db.rs index b667bd8..cf8c05b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -25,12 +25,18 @@ impl Database { fn new_db(db: sled::Db) -> Result { let migrations = db.open_tree("migrations")?; - Ok(Database { + let database = Database { db: db.clone(), variables: Variables::new(&db)?, migrations: Migrations(migrations), rooms: Rooms::new(&db)?, - }) + }; + + //Start any event handlers. + database.rooms.start_handler(); + + info!("Opened database."); + Ok(database) } pub fn new>(path: P) -> Result { diff --git a/src/db/errors.rs b/src/db/errors.rs index 63fd2a5..6cfe7f7 100644 --- a/src/db/errors.rs +++ b/src/db/errors.rs @@ -26,6 +26,9 @@ pub enum DataError { #[error("expected i32, but i32 schema was violated")] I32SchemaViolation, + #[error("unexpected or corruptd data bytes")] + InvalidValue, + #[error("expected string, but utf8 schema was violated: {0}")] Utf8chemaViolation(#[from] std::str::Utf8Error), diff --git a/src/db/rooms.rs b/src/db/rooms.rs index c80db5a..f953ef6 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -1,9 +1,16 @@ use crate::db::errors::DataError; +use crate::db::schema::convert_u64; +use byteorder::BigEndian; +use log::{error, log_enabled, trace}; use sled::transaction::TransactionalTree; use sled::Transactional; -use sled::Tree; +use sled::{CompareAndSwapError, Tree}; use std::collections::HashSet; use std::str; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::task::JoinHandle; +use zerocopy::byteorder::U64; +use zerocopy::AsBytes; #[derive(Clone)] pub struct Rooms { @@ -15,8 +22,23 @@ pub struct Rooms { /// Username -> list of room IDs user is in. pub(in crate::db) username_roomids: Tree, + + /// Room ID(str) 0xff event ID(str) -> timestamp. Records event + /// IDs that we have received, so we do not process twice. + pub(in crate::db) roomeventid_timestamp: Tree, + + /// Room ID(str) 0xff timestamp(u64) -> event ID. Records event + /// IDs with timestamp as the primary key instead. Exists to allow + /// easy scanning of old roomeventid_timestamp records for + /// removal. Be careful with u64, it can have 0xff and 0xfe bytes. + /// A simple split on 0xff will not work with this key. Instead, + /// it is meant to be split on the first 0xff byte only, and + /// separated into room ID and timestamp. + pub(in crate::db) roomtimestamp_eventid: Tree, } +/// An enum that can hold either a regular sled Tree, or a +/// Transactional tree. #[derive(Clone, Copy)] enum TxableTree<'a> { Tree(&'a Tree), @@ -35,39 +57,107 @@ impl<'a> Into> for &'a TransactionalTree { } } -fn get_set<'a, T: Into>>(tree: T, key: &[u8]) -> Result, DataError> { - let set: HashSet = match tree.into() { - TxableTree::Tree(tree) => tree.get(key)?, - TxableTree::Tx(tx) => tx.get(key)?, +/// A set of functions that can be used with a sled::Tree that stores +/// HashSets as its values. Atomicity is partially handled. If the +/// Tree is a transactional tree, operations will be atomic. +/// Otherwise, there is a potential non-atomic step. +mod hashset_tree { + use super::*; + + fn insert_set<'a, T: Into>>( + tree: T, + key: &[u8], + set: HashSet, + ) -> Result<(), DataError> { + let serialized = bincode::serialize(&set)?; + match tree.into() { + TxableTree::Tree(tree) => tree.insert(key, serialized)?, + TxableTree::Tx(tx) => tx.insert(key, serialized)?, + }; + Ok(()) } - .map(|bytes| bincode::deserialize::>(&bytes)) - .unwrap_or(Ok(HashSet::new()))?; - Ok(set) + pub(super) fn get_set<'a, T: Into>>( + tree: T, + key: &[u8], + ) -> Result, DataError> { + let set: HashSet = match tree.into() { + TxableTree::Tree(tree) => tree.get(key)?, + TxableTree::Tx(tx) => tx.get(key)?, + } + .map(|bytes| bincode::deserialize::>(&bytes)) + .unwrap_or(Ok(HashSet::new()))?; + + Ok(set) + } + + pub(super) fn remove_from_set<'a, T: Into> + Copy>( + tree: T, + key: &[u8], + value_to_remove: &str, + ) -> Result<(), DataError> { + let mut set = get_set(tree, key)?; + set.remove(value_to_remove); + insert_set(tree, key, set)?; + Ok(()) + } + + pub(super) fn add_to_set<'a, T: Into> + Copy>( + tree: T, + key: &[u8], + value_to_add: String, + ) -> Result<(), DataError> { + let mut set = get_set(tree, key)?; + set.insert(value_to_add); + insert_set(tree, key, set)?; + Ok(()) + } } -fn remove_from_set<'a, T: Into> + Copy>( - tree: T, - key: &[u8], - value_to_remove: &str, -) -> Result<(), DataError> { - let mut set = get_set(tree, key)?; - set.remove(value_to_remove); - insert_set(tree, key, set)?; - Ok(()) -} +/// Functions that specifically relate to the "timestamp index" tree, +/// which is stored on the Room sinstance as a tree called +/// roomtimestamp_eventid. Tightly coupled to the event watcher in the +/// Rooms impl, and only factored out for unit testing. +mod timestamp_index { + use super::*; -fn insert_set<'a, T: Into>>( - tree: T, - key: &[u8], - set: HashSet, -) -> Result<(), DataError> { - let serialized = bincode::serialize(&set)?; - match tree.into() { - TxableTree::Tree(tree) => tree.insert(key, serialized)?, - TxableTree::Tx(tx) => tx.insert(key, serialized)?, - }; - Ok(()) + /// Insert an entry from the main roomeventid_timestamp Tree into + /// the timestamp index. Keys in this Tree are stored as room ID + /// 0xff timestamp, with the value being a hashset of event IDs + /// received at the time. The parameters come from an insert to + /// that Tree, where the key is room ID 0xff event ID, and the + /// value is the timestamp. + pub(super) fn insert( + roomtimestamp_eventid: &Tree, + key: &[u8], + timestamp_bytes: &[u8], + ) -> Result<(), DataError> { + let parts: Vec<&[u8]> = key.split(|&b| b == 0xff).collect(); + if let [room_id, event_id] = parts[..] { + let mut ts_key = room_id.to_vec(); + ts_key.push(0xff); + ts_key.extend_from_slice(×tamp_bytes); + trace_index_record(room_id, event_id, ×tamp_bytes); + + let event_id = str::from_utf8(event_id)?; + hashset_tree::add_to_set(roomtimestamp_eventid, &ts_key, event_id.to_owned())?; + Ok(()) + } else { + Err(DataError::InvalidValue) + } + } + + /// Log a trace message. + fn trace_index_record(room_id: &[u8], event_id: &[u8], timestamp: &[u8]) { + if log_enabled!(log::Level::Trace) { + trace!( + "Recording event {} | {} received at {} in timestamp index.", + str::from_utf8(room_id).unwrap_or("[invalid room id]"), + str::from_utf8(event_id).unwrap_or("[invalid event id]"), + convert_u64(timestamp).unwrap_or(0) + ); + } + } } impl Rooms { @@ -76,29 +166,81 @@ impl Rooms { roomid_roominfo: db.open_tree("roomid_roominfo")?, roomid_usernames: db.open_tree("roomid_usernames")?, username_roomids: db.open_tree("username_roomids")?, + roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?, + roomtimestamp_eventid: db.open_tree("roomtimestamp_eventid")?, }) } + /// Start an event subscriber that listens for inserts made by the + /// `should_process` function. This event handler duplicates the + /// entry by timestamp instead of event ID. + pub(in crate::db) fn start_handler(&self) -> JoinHandle<()> { + //Clone due to lifetime requirements. + let roomeventid_timestamp = self.roomeventid_timestamp.clone(); + let roomtimestamp_eventid = self.roomtimestamp_eventid.clone(); + + tokio::spawn(async move { + let mut subscriber = roomeventid_timestamp.watch_prefix(b""); + + // TODO make this handler receive kill messages somehow so + // we can unit test it and gracefully shut it down. + while let Some(event) = (&mut subscriber).await { + if let sled::Event::Insert { key, value } = event { + match timestamp_index::insert(&roomtimestamp_eventid, &key, &value) { + Err(e) => { + error!("Unable to update the timestamp index: {}", e); + } + _ => (), + } + } + } + }) + } + + /// Determine if an event in a room should be processed. The event + /// is atomically recorded and true returned if the database has + /// not seen tis event yet. If the event already exists in the + /// database, the function returns false. Events are recorded by + /// this function by inserting the (system-local) timestamp in + /// epoch seconds. + pub fn should_process(&self, room_id: &str, event_id: &str) -> Result { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(event_id.as_bytes()); + + let timestamp: U64 = U64::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Clock has gone backwards") + .as_secs(), + ); + + match self.roomeventid_timestamp.compare_and_swap( + key, + None as Option<&[u8]>, + Some(timestamp.as_bytes()), + )? { + Ok(()) => Ok(true), + Err(CompareAndSwapError { .. }) => Ok(false), + } + } + pub fn get_rooms_for_user(&self, username: &str) -> Result, DataError> { - get_set(&self.username_roomids, username.as_bytes()) + hashset_tree::get_set(&self.username_roomids, username.as_bytes()) } pub fn get_users_in_room(&self, room_id: &str) -> Result, DataError> { - get_set(&self.roomid_usernames, room_id.as_bytes()) + hashset_tree::get_set(&self.roomid_usernames, room_id.as_bytes()) } pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_rooms, tx_room_usernames)| { let username_key = &username.as_bytes(); - let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; - user_to_rooms.insert(room_id.to_string()); - insert_set(tx_username_rooms, username_key, user_to_rooms)?; + hashset_tree::add_to_set(tx_username_rooms, username_key, room_id.to_owned())?; let roomid_key = &room_id.as_bytes(); - let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; - room_to_users.insert(username.to_string()); - insert_set(tx_room_usernames, roomid_key, room_to_users)?; + hashset_tree::add_to_set(tx_room_usernames, roomid_key, username.to_owned())?; Ok(()) }, @@ -111,14 +253,10 @@ impl Rooms { (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_rooms, tx_room_usernames)| { let username_key = &username.as_bytes(); - let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; - user_to_rooms.remove(room_id); - insert_set(tx_username_rooms, username_key, user_to_rooms)?; + hashset_tree::remove_from_set(tx_username_rooms, username_key, room_id)?; let roomid_key = &room_id.as_bytes(); - let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; - room_to_users.remove(username); - insert_set(tx_room_usernames, roomid_key, room_to_users)?; + hashset_tree::remove_from_set(tx_room_usernames, roomid_key, username)?; Ok(()) }, @@ -131,11 +269,15 @@ impl Rooms { (&self.username_roomids, &self.roomid_usernames).transaction( |(tx_username_roomids, tx_roomid_usernames)| { let roomid_key = room_id.as_bytes(); - let users_in_room = get_set(tx_roomid_usernames, roomid_key)?; + let users_in_room = hashset_tree::get_set(tx_roomid_usernames, roomid_key)?; //Remove the room ID from every user's room ID list. for username in users_in_room { - remove_from_set(tx_username_roomids, username.as_bytes(), room_id)?; + hashset_tree::remove_from_set( + tx_username_roomids, + username.as_bytes(), + room_id, + )?; } //Remove this room entry for the room ID -> username tree. @@ -164,6 +306,7 @@ mod tests { #[test] fn add_user_to_room() { let rooms = create_test_instance(); + rooms .add_user_to_room("testuser", "myroom") .expect("Could not add user to room"); @@ -189,6 +332,7 @@ mod tests { #[test] fn remove_user_from_room() { let rooms = create_test_instance(); + rooms .add_user_to_room("testuser", "myroom") .expect("Could not add user to room"); @@ -212,6 +356,7 @@ mod tests { #[test] fn clear_info() { let rooms = create_test_instance(); + rooms .add_user_to_room("testuser", "myroom1") .expect("Could not add user to room1"); @@ -246,4 +391,37 @@ mod tests { assert_eq!(expected_users_in_room2, users_in_room2); assert_eq!(expected_rooms_for_user, rooms_for_user); } + + #[test] + fn insert_to_timestamp_index() { + let rooms = create_test_instance(); + + // Insertion into timestamp index based on data that would go + // into main room x eventID -> timestamp tree. + let mut key = b"myroom".to_vec(); + key.push(0xff); + key.extend_from_slice(b"myeventid"); + + let timestamp: U64 = U64::new(1000); + + let result = timestamp_index::insert( + &rooms.roomtimestamp_eventid, + key.as_bytes(), + timestamp.as_bytes(), + ); + + assert!(result.is_ok()); + + // Retrieval of data from the timestamp index tree. + let mut ts_key = b"myroom".to_vec(); + ts_key.push(0xff); + ts_key.extend_from_slice(timestamp.as_bytes()); + + let expected_events: HashSet = + vec![String::from("myeventid")].into_iter().collect(); + + let event_ids = hashset_tree::get_set(&rooms.roomtimestamp_eventid, &ts_key) + .expect("Could not get set out of Tree"); + assert_eq!(expected_events, event_ids); + } } diff --git a/src/db/schema.rs b/src/db/schema.rs index 61eec0a..35a31ef 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -1,6 +1,6 @@ use crate::db::errors::DataError; -use byteorder::LittleEndian; -use zerocopy::byteorder::{I32, U32}; +use byteorder::{BigEndian, LittleEndian}; +use zerocopy::byteorder::{I32, U32, U64}; use zerocopy::LayoutVerified; /// User variables are stored as little-endian 32-bit integers in the @@ -10,6 +10,11 @@ type LittleEndianI32Layout<'a> = LayoutVerified<&'a [u8], I32>; type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32>; +#[allow(dead_code)] +type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64>; + +type BigEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64>; + /// Convert bytes to an i32 with zero-copy deserialization. An error /// is returned if the bytes do not represent an i32. pub(super) fn convert_i32(raw_value: &[u8]) -> Result { @@ -33,3 +38,15 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result { Err(DataError::I32SchemaViolation) } } + +#[allow(dead_code)] +pub(super) fn convert_u64(raw_value: &[u8]) -> Result { + let layout = BigEndianU64Layout::new_unaligned(raw_value.as_ref()); + + if let Some(layout) = layout { + let value: U64 = *layout; + Ok(value.get()) + } else { + Err(DataError::I32SchemaViolation) + } +}