Compare commits

...

4 Commits

Author SHA1 Message Date
projectmoon 0939feee84 Placeholder to record all user info when joining room
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details
2020-11-09 21:17:03 +00:00
projectmoon 9349dd5f00 Add event processing check to received messages.
Also rename the existing `should_process` function to be more clear,
given presence of another similarly named function:
should_process_message.
2020-11-09 21:16:20 +00:00
projectmoon 74d0b88e80 Add should process check to room member event 2020-11-09 21:16:07 +00:00
projectmoon fb24090952 Keep seen events in database, don't process already-seen events.
Adds a new function `should_process` to rooms impl that determines if
calling could should proceed with processing an event ID. Event IDs
are recorded (along with room ID) as a key pointing to the
system-local timestamp of when the event was received. If the key was
not originally present, we instruct calling code to process the event.

Events are also asychronously recorded by timestamp using a sled event
watcher that listens to inserts in the main tree (described above).
This secondary tree will allow easy cleanup of old events in the
future.
2020-11-09 21:16:07 +00:00
5 changed files with 309 additions and 77 deletions

View File

@ -1,10 +1,11 @@
use crate::db::Database;
use crate::error::BotError;
use async_trait::async_trait;
use log::{debug, error, info, warn};
use matrix_sdk::{
self,
events::{
room::member::{MemberEventContent, MembershipState},
room::member::{MemberEventContent, MembershipChange},
room::message::{MessageEventContent, TextMessageEventContent},
StrippedStateEvent, SyncMessageEvent, SyncStateEvent,
},
@ -40,7 +41,13 @@ fn check_message_age(
}
}
async fn should_process<'a>(
/// Determine whether or not to process a received message. This check
/// is necessary in addition to the event processing check because we
/// may receive message events when entering a room for the first
/// time, and we don't want to respond to things before the bot was in
/// the channel, but we do want to respond to things that were sent if
/// the bot left and rejoined quickly.
async fn should_process_message<'a>(
bot: &DiceBot,
event: &SyncMessageEvent<MessageEventContent>,
) -> Result<(String, String), BotError> {
@ -73,35 +80,51 @@ async fn should_process<'a>(
Ok((msg_body, sender_username))
}
fn should_process_event(db: &Database, room_id: &str, event_id: &str) -> bool {
db.rooms
.should_process(room_id, event_id)
.unwrap_or_else(|e| {
error!(
"Database error when checking if we should process an event: {}",
e.to_string()
);
false
})
}
/// This event emitter listens for messages with dice rolling commands.
/// Originally adapted from the matrix-rust-sdk examples.
#[async_trait]
impl EventEmitter for DiceBot {
async fn on_room_member(
&self,
room: SyncRoom,
room_member: &SyncStateEvent<MemberEventContent>,
) {
if let SyncRoom::Joined(room) = room {
async fn on_room_member(&self, room: SyncRoom, event: &SyncStateEvent<MemberEventContent>) {
if let SyncRoom::Joined(room) | SyncRoom::Left(room) = room {
//Clone to avoid holding lock.
let room = room.read().await.clone();
let (room_id, username) = (room.room_id.as_str(), &event.state_key);
if !should_process_event(&self.db, room_id, event.event_id.as_str()) {
return;
}
let event_affects_us = if let Some(our_user_id) = self.client.user_id().await {
room_member.state_key == our_user_id
event.state_key == our_user_id
} else {
false
};
let adding_user = match room_member.content.membership {
MembershipState::Join => true,
MembershipState::Leave | MembershipState::Ban => false,
use MembershipChange::*;
let adding_user = match event.membership_change() {
Joined => true,
Banned | Left | Kicked | KickedAndBanned => false,
_ => return,
};
//Clone to avoid holding lock.
let room = room.read().await.clone();
let (room_id, username) = (room.room_id.as_str(), &room_member.state_key);
let result = if event_affects_us && !adding_user {
info!("Clearing all information for room ID {}", room_id);
self.db.rooms.clear_info(room_id)
} else if event_affects_us && adding_user {
info!("Joined room {}; recording user information", room_id);
Ok(())
} else if !event_affects_us && adding_user {
info!("Adding user {} to room ID {}", username, room_id);
self.db.rooms.add_user_to_room(username, room_id)
@ -109,7 +132,7 @@ impl EventEmitter for DiceBot {
info!("Removing user {} from room ID {}", username, room_id);
self.db.rooms.remove_user_from_room(username, room_id)
} else {
debug!("Ignoring a room member event: {:#?}", room_member);
debug!("Ignoring a room member event: {:#?}", event);
Ok(())
};
@ -124,12 +147,12 @@ impl EventEmitter for DiceBot {
async fn on_stripped_state_member(
&self,
room: SyncRoom,
room_member: &StrippedStateEvent<MemberEventContent>,
event: &StrippedStateEvent<MemberEventContent>,
_: Option<MemberEventContent>,
) {
if let SyncRoom::Invited(room) = room {
if let Some(user_id) = self.client.user_id().await {
if room_member.state_key != user_id {
if event.state_key != user_id {
return;
}
}
@ -146,17 +169,22 @@ impl EventEmitter for DiceBot {
async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent<MessageEventContent>) {
if let SyncRoom::Joined(room) = room {
let (msg_body, sender_username) =
if let Ok((msg_body, sender_username)) = should_process(self, &event).await {
//Clone to avoid holding lock.
let room = room.read().await.clone();
let room_id = room.room_id.as_str();
if !should_process_event(&self.db, room_id, event.event_id.as_str()) {
return;
}
let (msg_body, sender_username) = if let Ok((msg_body, sender_username)) =
should_process_message(self, &event).await
{
(msg_body, sender_username)
} else {
return;
};
//we clone here to hold the lock for as little time as possible.
let real_room = room.read().await.clone();
self.execute_commands(&real_room, &sender_username, &msg_body)
self.execute_commands(&room, &sender_username, &msg_body)
.await;
}
}

View File

@ -25,12 +25,18 @@ impl Database {
fn new_db(db: sled::Db) -> Result<Database, DataError> {
let migrations = db.open_tree("migrations")?;
Ok(Database {
let database = Database {
db: db.clone(),
variables: Variables::new(&db)?,
migrations: Migrations(migrations),
rooms: Rooms::new(&db)?,
})
};
//Start any event handlers.
database.rooms.start_handler();
info!("Opened database.");
Ok(database)
}
pub fn new<P: AsRef<Path>>(path: P) -> Result<Database, DataError> {

View File

@ -26,6 +26,9 @@ pub enum DataError {
#[error("expected i32, but i32 schema was violated")]
I32SchemaViolation,
#[error("unexpected or corruptd data bytes")]
InvalidValue,
#[error("expected string, but utf8 schema was violated: {0}")]
Utf8chemaViolation(#[from] std::str::Utf8Error),

View File

@ -1,9 +1,16 @@
use crate::db::errors::DataError;
use crate::db::schema::convert_u64;
use byteorder::BigEndian;
use log::{error, log_enabled, trace};
use sled::transaction::TransactionalTree;
use sled::Transactional;
use sled::Tree;
use sled::{CompareAndSwapError, Tree};
use std::collections::HashSet;
use std::str;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
use zerocopy::byteorder::U64;
use zerocopy::AsBytes;
#[derive(Clone)]
pub struct Rooms {
@ -15,8 +22,23 @@ pub struct Rooms {
/// Username -> list of room IDs user is in.
pub(in crate::db) username_roomids: Tree,
/// Room ID(str) 0xff event ID(str) -> timestamp. Records event
/// IDs that we have received, so we do not process twice.
pub(in crate::db) roomeventid_timestamp: Tree,
/// Room ID(str) 0xff timestamp(u64) -> event ID. Records event
/// IDs with timestamp as the primary key instead. Exists to allow
/// easy scanning of old roomeventid_timestamp records for
/// removal. Be careful with u64, it can have 0xff and 0xfe bytes.
/// A simple split on 0xff will not work with this key. Instead,
/// it is meant to be split on the first 0xff byte only, and
/// separated into room ID and timestamp.
pub(in crate::db) roomtimestamp_eventid: Tree,
}
/// An enum that can hold either a regular sled Tree, or a
/// Transactional tree.
#[derive(Clone, Copy)]
enum TxableTree<'a> {
Tree(&'a Tree),
@ -35,7 +57,30 @@ impl<'a> Into<TxableTree<'a>> for &'a TransactionalTree {
}
}
fn get_set<'a, T: Into<TxableTree<'a>>>(tree: T, key: &[u8]) -> Result<HashSet<String>, DataError> {
/// A set of functions that can be used with a sled::Tree that stores
/// HashSets as its values. Atomicity is partially handled. If the
/// Tree is a transactional tree, operations will be atomic.
/// Otherwise, there is a potential non-atomic step.
mod hashset_tree {
use super::*;
fn insert_set<'a, T: Into<TxableTree<'a>>>(
tree: T,
key: &[u8],
set: HashSet<String>,
) -> Result<(), DataError> {
let serialized = bincode::serialize(&set)?;
match tree.into() {
TxableTree::Tree(tree) => tree.insert(key, serialized)?,
TxableTree::Tx(tx) => tx.insert(key, serialized)?,
};
Ok(())
}
pub(super) fn get_set<'a, T: Into<TxableTree<'a>>>(
tree: T,
key: &[u8],
) -> Result<HashSet<String>, DataError> {
let set: HashSet<String> = match tree.into() {
TxableTree::Tree(tree) => tree.get(key)?,
TxableTree::Tx(tx) => tx.get(key)?,
@ -44,30 +89,75 @@ fn get_set<'a, T: Into<TxableTree<'a>>>(tree: T, key: &[u8]) -> Result<HashSet<S
.unwrap_or(Ok(HashSet::new()))?;
Ok(set)
}
}
fn remove_from_set<'a, T: Into<TxableTree<'a>> + Copy>(
pub(super) fn remove_from_set<'a, T: Into<TxableTree<'a>> + Copy>(
tree: T,
key: &[u8],
value_to_remove: &str,
) -> Result<(), DataError> {
) -> Result<(), DataError> {
let mut set = get_set(tree, key)?;
set.remove(value_to_remove);
insert_set(tree, key, set)?;
Ok(())
}
}
fn insert_set<'a, T: Into<TxableTree<'a>>>(
pub(super) fn add_to_set<'a, T: Into<TxableTree<'a>> + Copy>(
tree: T,
key: &[u8],
set: HashSet<String>,
) -> Result<(), DataError> {
let serialized = bincode::serialize(&set)?;
match tree.into() {
TxableTree::Tree(tree) => tree.insert(key, serialized)?,
TxableTree::Tx(tx) => tx.insert(key, serialized)?,
};
value_to_add: String,
) -> Result<(), DataError> {
let mut set = get_set(tree, key)?;
set.insert(value_to_add);
insert_set(tree, key, set)?;
Ok(())
}
}
/// Functions that specifically relate to the "timestamp index" tree,
/// which is stored on the Room sinstance as a tree called
/// roomtimestamp_eventid. Tightly coupled to the event watcher in the
/// Rooms impl, and only factored out for unit testing.
mod timestamp_index {
use super::*;
/// Insert an entry from the main roomeventid_timestamp Tree into
/// the timestamp index. Keys in this Tree are stored as room ID
/// 0xff timestamp, with the value being a hashset of event IDs
/// received at the time. The parameters come from an insert to
/// that Tree, where the key is room ID 0xff event ID, and the
/// value is the timestamp.
pub(super) fn insert(
roomtimestamp_eventid: &Tree,
key: &[u8],
timestamp_bytes: &[u8],
) -> Result<(), DataError> {
let parts: Vec<&[u8]> = key.split(|&b| b == 0xff).collect();
if let [room_id, event_id] = parts[..] {
let mut ts_key = room_id.to_vec();
ts_key.push(0xff);
ts_key.extend_from_slice(&timestamp_bytes);
trace_index_record(room_id, event_id, &timestamp_bytes);
let event_id = str::from_utf8(event_id)?;
hashset_tree::add_to_set(roomtimestamp_eventid, &ts_key, event_id.to_owned())?;
Ok(())
} else {
Err(DataError::InvalidValue)
}
}
/// Log a trace message.
fn trace_index_record(room_id: &[u8], event_id: &[u8], timestamp: &[u8]) {
if log_enabled!(log::Level::Trace) {
trace!(
"Recording event {} | {} received at {} in timestamp index.",
str::from_utf8(room_id).unwrap_or("[invalid room id]"),
str::from_utf8(event_id).unwrap_or("[invalid event id]"),
convert_u64(timestamp).unwrap_or(0)
);
}
}
}
impl Rooms {
@ -76,29 +166,81 @@ impl Rooms {
roomid_roominfo: db.open_tree("roomid_roominfo")?,
roomid_usernames: db.open_tree("roomid_usernames")?,
username_roomids: db.open_tree("username_roomids")?,
roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?,
roomtimestamp_eventid: db.open_tree("roomtimestamp_eventid")?,
})
}
/// Start an event subscriber that listens for inserts made by the
/// `should_process` function. This event handler duplicates the
/// entry by timestamp instead of event ID.
pub(in crate::db) fn start_handler(&self) -> JoinHandle<()> {
//Clone due to lifetime requirements.
let roomeventid_timestamp = self.roomeventid_timestamp.clone();
let roomtimestamp_eventid = self.roomtimestamp_eventid.clone();
tokio::spawn(async move {
let mut subscriber = roomeventid_timestamp.watch_prefix(b"");
// TODO make this handler receive kill messages somehow so
// we can unit test it and gracefully shut it down.
while let Some(event) = (&mut subscriber).await {
if let sled::Event::Insert { key, value } = event {
match timestamp_index::insert(&roomtimestamp_eventid, &key, &value) {
Err(e) => {
error!("Unable to update the timestamp index: {}", e);
}
_ => (),
}
}
}
})
}
/// Determine if an event in a room should be processed. The event
/// is atomically recorded and true returned if the database has
/// not seen tis event yet. If the event already exists in the
/// database, the function returns false. Events are recorded by
/// this function by inserting the (system-local) timestamp in
/// epoch seconds.
pub fn should_process(&self, room_id: &str, event_id: &str) -> Result<bool, DataError> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(event_id.as_bytes());
let timestamp: U64<BigEndian> = U64::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Clock has gone backwards")
.as_secs(),
);
match self.roomeventid_timestamp.compare_and_swap(
key,
None as Option<&[u8]>,
Some(timestamp.as_bytes()),
)? {
Ok(()) => Ok(true),
Err(CompareAndSwapError { .. }) => Ok(false),
}
}
pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> {
get_set(&self.username_roomids, username.as_bytes())
hashset_tree::get_set(&self.username_roomids, username.as_bytes())
}
pub fn get_users_in_room(&self, room_id: &str) -> Result<HashSet<String>, DataError> {
get_set(&self.roomid_usernames, room_id.as_bytes())
hashset_tree::get_set(&self.roomid_usernames, room_id.as_bytes())
}
pub fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> {
(&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_rooms, tx_room_usernames)| {
let username_key = &username.as_bytes();
let mut user_to_rooms = get_set(tx_username_rooms, username_key)?;
user_to_rooms.insert(room_id.to_string());
insert_set(tx_username_rooms, username_key, user_to_rooms)?;
hashset_tree::add_to_set(tx_username_rooms, username_key, room_id.to_owned())?;
let roomid_key = &room_id.as_bytes();
let mut room_to_users = get_set(tx_room_usernames, roomid_key)?;
room_to_users.insert(username.to_string());
insert_set(tx_room_usernames, roomid_key, room_to_users)?;
hashset_tree::add_to_set(tx_room_usernames, roomid_key, username.to_owned())?;
Ok(())
},
@ -111,14 +253,10 @@ impl Rooms {
(&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_rooms, tx_room_usernames)| {
let username_key = &username.as_bytes();
let mut user_to_rooms = get_set(tx_username_rooms, username_key)?;
user_to_rooms.remove(room_id);
insert_set(tx_username_rooms, username_key, user_to_rooms)?;
hashset_tree::remove_from_set(tx_username_rooms, username_key, room_id)?;
let roomid_key = &room_id.as_bytes();
let mut room_to_users = get_set(tx_room_usernames, roomid_key)?;
room_to_users.remove(username);
insert_set(tx_room_usernames, roomid_key, room_to_users)?;
hashset_tree::remove_from_set(tx_room_usernames, roomid_key, username)?;
Ok(())
},
@ -131,11 +269,15 @@ impl Rooms {
(&self.username_roomids, &self.roomid_usernames).transaction(
|(tx_username_roomids, tx_roomid_usernames)| {
let roomid_key = room_id.as_bytes();
let users_in_room = get_set(tx_roomid_usernames, roomid_key)?;
let users_in_room = hashset_tree::get_set(tx_roomid_usernames, roomid_key)?;
//Remove the room ID from every user's room ID list.
for username in users_in_room {
remove_from_set(tx_username_roomids, username.as_bytes(), room_id)?;
hashset_tree::remove_from_set(
tx_username_roomids,
username.as_bytes(),
room_id,
)?;
}
//Remove this room entry for the room ID -> username tree.
@ -164,6 +306,7 @@ mod tests {
#[test]
fn add_user_to_room() {
let rooms = create_test_instance();
rooms
.add_user_to_room("testuser", "myroom")
.expect("Could not add user to room");
@ -189,6 +332,7 @@ mod tests {
#[test]
fn remove_user_from_room() {
let rooms = create_test_instance();
rooms
.add_user_to_room("testuser", "myroom")
.expect("Could not add user to room");
@ -212,6 +356,7 @@ mod tests {
#[test]
fn clear_info() {
let rooms = create_test_instance();
rooms
.add_user_to_room("testuser", "myroom1")
.expect("Could not add user to room1");
@ -246,4 +391,37 @@ mod tests {
assert_eq!(expected_users_in_room2, users_in_room2);
assert_eq!(expected_rooms_for_user, rooms_for_user);
}
#[test]
fn insert_to_timestamp_index() {
let rooms = create_test_instance();
// Insertion into timestamp index based on data that would go
// into main room x eventID -> timestamp tree.
let mut key = b"myroom".to_vec();
key.push(0xff);
key.extend_from_slice(b"myeventid");
let timestamp: U64<BigEndian> = U64::new(1000);
let result = timestamp_index::insert(
&rooms.roomtimestamp_eventid,
key.as_bytes(),
timestamp.as_bytes(),
);
assert!(result.is_ok());
// Retrieval of data from the timestamp index tree.
let mut ts_key = b"myroom".to_vec();
ts_key.push(0xff);
ts_key.extend_from_slice(timestamp.as_bytes());
let expected_events: HashSet<String> =
vec![String::from("myeventid")].into_iter().collect();
let event_ids = hashset_tree::get_set(&rooms.roomtimestamp_eventid, &ts_key)
.expect("Could not get set out of Tree");
assert_eq!(expected_events, event_ids);
}
}

View File

@ -1,6 +1,6 @@
use crate::db::errors::DataError;
use byteorder::LittleEndian;
use zerocopy::byteorder::{I32, U32};
use byteorder::{BigEndian, LittleEndian};
use zerocopy::byteorder::{I32, U32, U64};
use zerocopy::LayoutVerified;
/// User variables are stored as little-endian 32-bit integers in the
@ -10,6 +10,11 @@ type LittleEndianI32Layout<'a> = LayoutVerified<&'a [u8], I32<LittleEndian>>;
type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>;
#[allow(dead_code)]
type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<LittleEndian>>;
type BigEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<BigEndian>>;
/// Convert bytes to an i32 with zero-copy deserialization. An error
/// is returned if the bytes do not represent an i32.
pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> {
@ -33,3 +38,15 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result<u32, DataError> {
Err(DataError::I32SchemaViolation)
}
}
#[allow(dead_code)]
pub(super) fn convert_u64(raw_value: &[u8]) -> Result<u64, DataError> {
let layout = BigEndianU64Layout::new_unaligned(raw_value.as_ref());
if let Some(layout) = layout {
let value: U64<BigEndian> = *layout;
Ok(value.get())
} else {
Err(DataError::I32SchemaViolation)
}
}