Keep seen events in database, don't process already-seen events.

Adds a new function `should_process` to rooms impl that determines if
calling could should proceed with processing an event ID. Event IDs
are recorded (along with room ID) as a key pointing to the
system-local timestamp of when the event was received. If the key was
not originally present, we instruct calling code to process the event.

Events are also asychronously recorded by timestamp using a sled event
watcher that listens to inserts in the main tree (described above).
This secondary tree will allow easy cleanup of old events in the
future.
This commit is contained in:
projectmoon 2020-11-09 21:14:37 +00:00
parent d7aaed9e00
commit fb24090952
4 changed files with 253 additions and 49 deletions

View File

@ -25,12 +25,18 @@ impl Database {
fn new_db(db: sled::Db) -> Result<Database, DataError> { fn new_db(db: sled::Db) -> Result<Database, DataError> {
let migrations = db.open_tree("migrations")?; let migrations = db.open_tree("migrations")?;
Ok(Database { let database = Database {
db: db.clone(), db: db.clone(),
variables: Variables::new(&db)?, variables: Variables::new(&db)?,
migrations: Migrations(migrations), migrations: Migrations(migrations),
rooms: Rooms::new(&db)?, rooms: Rooms::new(&db)?,
}) };
//Start any event handlers.
database.rooms.start_handler();
info!("Opened database.");
Ok(database)
} }
pub fn new<P: AsRef<Path>>(path: P) -> Result<Database, DataError> { pub fn new<P: AsRef<Path>>(path: P) -> Result<Database, DataError> {

View File

@ -26,6 +26,9 @@ pub enum DataError {
#[error("expected i32, but i32 schema was violated")] #[error("expected i32, but i32 schema was violated")]
I32SchemaViolation, I32SchemaViolation,
#[error("unexpected or corruptd data bytes")]
InvalidValue,
#[error("expected string, but utf8 schema was violated: {0}")] #[error("expected string, but utf8 schema was violated: {0}")]
Utf8chemaViolation(#[from] std::str::Utf8Error), Utf8chemaViolation(#[from] std::str::Utf8Error),

View File

@ -1,9 +1,16 @@
use crate::db::errors::DataError; use crate::db::errors::DataError;
use crate::db::schema::convert_u64;
use byteorder::BigEndian;
use log::{error, log_enabled, trace};
use sled::transaction::TransactionalTree; use sled::transaction::TransactionalTree;
use sled::Transactional; use sled::Transactional;
use sled::Tree; use sled::{CompareAndSwapError, Tree};
use std::collections::HashSet; use std::collections::HashSet;
use std::str; use std::str;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
use zerocopy::byteorder::U64;
use zerocopy::AsBytes;
#[derive(Clone)] #[derive(Clone)]
pub struct Rooms { pub struct Rooms {
@ -15,8 +22,23 @@ pub struct Rooms {
/// Username -> list of room IDs user is in. /// Username -> list of room IDs user is in.
pub(in crate::db) username_roomids: Tree, pub(in crate::db) username_roomids: Tree,
/// Room ID(str) 0xff event ID(str) -> timestamp. Records event
/// IDs that we have received, so we do not process twice.
pub(in crate::db) roomeventid_timestamp: Tree,
/// Room ID(str) 0xff timestamp(u64) -> event ID. Records event
/// IDs with timestamp as the primary key instead. Exists to allow
/// easy scanning of old roomeventid_timestamp records for
/// removal. Be careful with u64, it can have 0xff and 0xfe bytes.
/// A simple split on 0xff will not work with this key. Instead,
/// it is meant to be split on the first 0xff byte only, and
/// separated into room ID and timestamp.
pub(in crate::db) roomtimestamp_eventid: Tree,
} }
/// An enum that can hold either a regular sled Tree, or a
/// Transactional tree.
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
enum TxableTree<'a> { enum TxableTree<'a> {
Tree(&'a Tree), Tree(&'a Tree),
@ -35,39 +57,107 @@ impl<'a> Into<TxableTree<'a>> for &'a TransactionalTree {
} }
} }
fn get_set<'a, T: Into<TxableTree<'a>>>(tree: T, key: &[u8]) -> Result<HashSet<String>, DataError> { /// A set of functions that can be used with a sled::Tree that stores
let set: HashSet<String> = match tree.into() { /// HashSets as its values. Atomicity is partially handled. If the
TxableTree::Tree(tree) => tree.get(key)?, /// Tree is a transactional tree, operations will be atomic.
TxableTree::Tx(tx) => tx.get(key)?, /// Otherwise, there is a potential non-atomic step.
mod hashset_tree {
use super::*;
fn insert_set<'a, T: Into<TxableTree<'a>>>(
tree: T,
key: &[u8],
set: HashSet<String>,
) -> Result<(), DataError> {
let serialized = bincode::serialize(&set)?;
match tree.into() {
TxableTree::Tree(tree) => tree.insert(key, serialized)?,
TxableTree::Tx(tx) => tx.insert(key, serialized)?,
};
Ok(())
} }
.map(|bytes| bincode::deserialize::<HashSet<String>>(&bytes))
.unwrap_or(Ok(HashSet::new()))?;
Ok(set) pub(super) fn get_set<'a, T: Into<TxableTree<'a>>>(
tree: T,
key: &[u8],
) -> Result<HashSet<String>, DataError> {
let set: HashSet<String> = match tree.into() {
TxableTree::Tree(tree) => tree.get(key)?,
TxableTree::Tx(tx) => tx.get(key)?,
}
.map(|bytes| bincode::deserialize::<HashSet<String>>(&bytes))
.unwrap_or(Ok(HashSet::new()))?;
Ok(set)
}
pub(super) fn remove_from_set<'a, T: Into<TxableTree<'a>> + Copy>(
tree: T,
key: &[u8],
value_to_remove: &str,
) -> Result<(), DataError> {
let mut set = get_set(tree, key)?;
set.remove(value_to_remove);
insert_set(tree, key, set)?;
Ok(())
}
pub(super) fn add_to_set<'a, T: Into<TxableTree<'a>> + Copy>(
tree: T,
key: &[u8],
value_to_add: String,
) -> Result<(), DataError> {
let mut set = get_set(tree, key)?;
set.insert(value_to_add);
insert_set(tree, key, set)?;
Ok(())
}
} }
fn remove_from_set<'a, T: Into<TxableTree<'a>> + Copy>( /// Functions that specifically relate to the "timestamp index" tree,
tree: T, /// which is stored on the Room sinstance as a tree called
key: &[u8], /// roomtimestamp_eventid. Tightly coupled to the event watcher in the
value_to_remove: &str, /// Rooms impl, and only factored out for unit testing.
) -> Result<(), DataError> { mod timestamp_index {
let mut set = get_set(tree, key)?; use super::*;
set.remove(value_to_remove);
insert_set(tree, key, set)?;
Ok(())
}
fn insert_set<'a, T: Into<TxableTree<'a>>>( /// Insert an entry from the main roomeventid_timestamp Tree into
tree: T, /// the timestamp index. Keys in this Tree are stored as room ID
key: &[u8], /// 0xff timestamp, with the value being a hashset of event IDs
set: HashSet<String>, /// received at the time. The parameters come from an insert to
) -> Result<(), DataError> { /// that Tree, where the key is room ID 0xff event ID, and the
let serialized = bincode::serialize(&set)?; /// value is the timestamp.
match tree.into() { pub(super) fn insert(
TxableTree::Tree(tree) => tree.insert(key, serialized)?, roomtimestamp_eventid: &Tree,
TxableTree::Tx(tx) => tx.insert(key, serialized)?, key: &[u8],
}; timestamp_bytes: &[u8],
Ok(()) ) -> Result<(), DataError> {
let parts: Vec<&[u8]> = key.split(|&b| b == 0xff).collect();
if let [room_id, event_id] = parts[..] {
let mut ts_key = room_id.to_vec();
ts_key.push(0xff);
ts_key.extend_from_slice(&timestamp_bytes);
trace_index_record(room_id, event_id, &timestamp_bytes);
let event_id = str::from_utf8(event_id)?;
hashset_tree::add_to_set(roomtimestamp_eventid, &ts_key, event_id.to_owned())?;
Ok(())
} else {
Err(DataError::InvalidValue)
}
}
/// Log a trace message.
fn trace_index_record(room_id: &[u8], event_id: &[u8], timestamp: &[u8]) {
if log_enabled!(log::Level::Trace) {
trace!(
"Recording event {} | {} received at {} in timestamp index.",
str::from_utf8(room_id).unwrap_or("[invalid room id]"),
str::from_utf8(event_id).unwrap_or("[invalid event id]"),
convert_u64(timestamp).unwrap_or(0)
);
}
}
} }
impl Rooms { impl Rooms {
@ -76,29 +166,81 @@ impl Rooms {
roomid_roominfo: db.open_tree("roomid_roominfo")?, roomid_roominfo: db.open_tree("roomid_roominfo")?,
roomid_usernames: db.open_tree("roomid_usernames")?, roomid_usernames: db.open_tree("roomid_usernames")?,
username_roomids: db.open_tree("username_roomids")?, username_roomids: db.open_tree("username_roomids")?,
roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?,
roomtimestamp_eventid: db.open_tree("roomtimestamp_eventid")?,
}) })
} }
/// Start an event subscriber that listens for inserts made by the
/// `should_process` function. This event handler duplicates the
/// entry by timestamp instead of event ID.
pub(in crate::db) fn start_handler(&self) -> JoinHandle<()> {
//Clone due to lifetime requirements.
let roomeventid_timestamp = self.roomeventid_timestamp.clone();
let roomtimestamp_eventid = self.roomtimestamp_eventid.clone();
tokio::spawn(async move {
let mut subscriber = roomeventid_timestamp.watch_prefix(b"");
// TODO make this handler receive kill messages somehow so
// we can unit test it and gracefully shut it down.
while let Some(event) = (&mut subscriber).await {
if let sled::Event::Insert { key, value } = event {
match timestamp_index::insert(&roomtimestamp_eventid, &key, &value) {
Err(e) => {
error!("Unable to update the timestamp index: {}", e);
}
_ => (),
}
}
}
})
}
/// Determine if an event in a room should be processed. The event
/// is atomically recorded and true returned if the database has
/// not seen tis event yet. If the event already exists in the
/// database, the function returns false. Events are recorded by
/// this function by inserting the (system-local) timestamp in
/// epoch seconds.
pub fn should_process(&self, room_id: &str, event_id: &str) -> Result<bool, DataError> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(event_id.as_bytes());
let timestamp: U64<BigEndian> = U64::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Clock has gone backwards")
.as_secs(),
);
match self.roomeventid_timestamp.compare_and_swap(
key,
None as Option<&[u8]>,
Some(timestamp.as_bytes()),
)? {
Ok(()) => Ok(true),
Err(CompareAndSwapError { .. }) => Ok(false),
}
}
pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> { pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> {
get_set(&self.username_roomids, username.as_bytes()) hashset_tree::get_set(&self.username_roomids, username.as_bytes())
} }
pub fn get_users_in_room(&self, room_id: &str) -> Result<HashSet<String>, DataError> { pub fn get_users_in_room(&self, room_id: &str) -> Result<HashSet<String>, DataError> {
get_set(&self.roomid_usernames, room_id.as_bytes()) hashset_tree::get_set(&self.roomid_usernames, room_id.as_bytes())
} }
pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> { pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> {
(&self.username_roomids, &self.roomid_usernames).transaction( (&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_rooms, tx_room_usernames)| { |(tx_username_rooms, tx_room_usernames)| {
let username_key = &username.as_bytes(); let username_key = &username.as_bytes();
let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; hashset_tree::add_to_set(tx_username_rooms, username_key, room_id.to_owned())?;
user_to_rooms.insert(room_id.to_string());
insert_set(tx_username_rooms, username_key, user_to_rooms)?;
let roomid_key = &room_id.as_bytes(); let roomid_key = &room_id.as_bytes();
let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; hashset_tree::add_to_set(tx_room_usernames, roomid_key, username.to_owned())?;
room_to_users.insert(username.to_string());
insert_set(tx_room_usernames, roomid_key, room_to_users)?;
Ok(()) Ok(())
}, },
@ -111,14 +253,10 @@ impl Rooms {
(&self.username_roomids, &self.roomid_usernames).transaction( (&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_rooms, tx_room_usernames)| { |(tx_username_rooms, tx_room_usernames)| {
let username_key = &username.as_bytes(); let username_key = &username.as_bytes();
let mut user_to_rooms = get_set(tx_username_rooms, username_key)?; hashset_tree::remove_from_set(tx_username_rooms, username_key, room_id)?;
user_to_rooms.remove(room_id);
insert_set(tx_username_rooms, username_key, user_to_rooms)?;
let roomid_key = &room_id.as_bytes(); let roomid_key = &room_id.as_bytes();
let mut room_to_users = get_set(tx_room_usernames, roomid_key)?; hashset_tree::remove_from_set(tx_room_usernames, roomid_key, username)?;
room_to_users.remove(username);
insert_set(tx_room_usernames, roomid_key, room_to_users)?;
Ok(()) Ok(())
}, },
@ -131,11 +269,15 @@ impl Rooms {
(&self.username_roomids, &self.roomid_usernames).transaction( (&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_roomids, tx_roomid_usernames)| { |(tx_username_roomids, tx_roomid_usernames)| {
let roomid_key = room_id.as_bytes(); let roomid_key = room_id.as_bytes();
let users_in_room = get_set(tx_roomid_usernames, roomid_key)?; let users_in_room = hashset_tree::get_set(tx_roomid_usernames, roomid_key)?;
//Remove the room ID from every user's room ID list. //Remove the room ID from every user's room ID list.
for username in users_in_room { for username in users_in_room {
remove_from_set(tx_username_roomids, username.as_bytes(), room_id)?; hashset_tree::remove_from_set(
tx_username_roomids,
username.as_bytes(),
room_id,
)?;
} }
//Remove this room entry for the room ID -> username tree. //Remove this room entry for the room ID -> username tree.
@ -164,6 +306,7 @@ mod tests {
#[test] #[test]
fn add_user_to_room() { fn add_user_to_room() {
let rooms = create_test_instance(); let rooms = create_test_instance();
rooms rooms
.add_user_to_room("testuser", "myroom") .add_user_to_room("testuser", "myroom")
.expect("Could not add user to room"); .expect("Could not add user to room");
@ -189,6 +332,7 @@ mod tests {
#[test] #[test]
fn remove_user_from_room() { fn remove_user_from_room() {
let rooms = create_test_instance(); let rooms = create_test_instance();
rooms rooms
.add_user_to_room("testuser", "myroom") .add_user_to_room("testuser", "myroom")
.expect("Could not add user to room"); .expect("Could not add user to room");
@ -212,6 +356,7 @@ mod tests {
#[test] #[test]
fn clear_info() { fn clear_info() {
let rooms = create_test_instance(); let rooms = create_test_instance();
rooms rooms
.add_user_to_room("testuser", "myroom1") .add_user_to_room("testuser", "myroom1")
.expect("Could not add user to room1"); .expect("Could not add user to room1");
@ -246,4 +391,37 @@ mod tests {
assert_eq!(expected_users_in_room2, users_in_room2); assert_eq!(expected_users_in_room2, users_in_room2);
assert_eq!(expected_rooms_for_user, rooms_for_user); assert_eq!(expected_rooms_for_user, rooms_for_user);
} }
#[test]
fn insert_to_timestamp_index() {
let rooms = create_test_instance();
// Insertion into timestamp index based on data that would go
// into main room x eventID -> timestamp tree.
let mut key = b"myroom".to_vec();
key.push(0xff);
key.extend_from_slice(b"myeventid");
let timestamp: U64<BigEndian> = U64::new(1000);
let result = timestamp_index::insert(
&rooms.roomtimestamp_eventid,
key.as_bytes(),
timestamp.as_bytes(),
);
assert!(result.is_ok());
// Retrieval of data from the timestamp index tree.
let mut ts_key = b"myroom".to_vec();
ts_key.push(0xff);
ts_key.extend_from_slice(timestamp.as_bytes());
let expected_events: HashSet<String> =
vec![String::from("myeventid")].into_iter().collect();
let event_ids = hashset_tree::get_set(&rooms.roomtimestamp_eventid, &ts_key)
.expect("Could not get set out of Tree");
assert_eq!(expected_events, event_ids);
}
} }

View File

@ -1,6 +1,6 @@
use crate::db::errors::DataError; use crate::db::errors::DataError;
use byteorder::LittleEndian; use byteorder::{BigEndian, LittleEndian};
use zerocopy::byteorder::{I32, U32}; use zerocopy::byteorder::{I32, U32, U64};
use zerocopy::LayoutVerified; use zerocopy::LayoutVerified;
/// User variables are stored as little-endian 32-bit integers in the /// User variables are stored as little-endian 32-bit integers in the
@ -10,6 +10,11 @@ type LittleEndianI32Layout<'a> = LayoutVerified<&'a [u8], I32<LittleEndian>>;
type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>; type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>;
#[allow(dead_code)]
type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<LittleEndian>>;
type BigEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<BigEndian>>;
/// Convert bytes to an i32 with zero-copy deserialization. An error /// Convert bytes to an i32 with zero-copy deserialization. An error
/// is returned if the bytes do not represent an i32. /// is returned if the bytes do not represent an i32.
pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> { pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> {
@ -33,3 +38,15 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result<u32, DataError> {
Err(DataError::I32SchemaViolation) Err(DataError::I32SchemaViolation)
} }
} }
#[allow(dead_code)]
pub(super) fn convert_u64(raw_value: &[u8]) -> Result<u64, DataError> {
let layout = BigEndianU64Layout::new_unaligned(raw_value.as_ref());
if let Some(layout) = layout {
let value: U64<BigEndian> = *layout;
Ok(value.get())
} else {
Err(DataError::I32SchemaViolation)
}
}