From bf151a0a404c507c981b3f4d72caeb68a124e3d3 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Sun, 8 Nov 2020 22:25:55 +0000 Subject: [PATCH] maintain list of events in db. must convert allh handlers to use it. --- src/bot/event_handlers.rs | 35 +++++++++++++++++++++-------------- src/db/rooms.rs | 38 +++++++++++++++++++++++++++++++++++++- src/db/schema.rs | 17 ++++++++++++++++- 3 files changed, 74 insertions(+), 16 deletions(-) diff --git a/src/bot/event_handlers.rs b/src/bot/event_handlers.rs index 861f672..14af592 100644 --- a/src/bot/event_handlers.rs +++ b/src/bot/event_handlers.rs @@ -1,3 +1,4 @@ +use crate::db::Database; use crate::error::BotError; use async_trait::async_trait; use log::{debug, error, info, warn}; @@ -16,11 +17,6 @@ use std::clone::Clone; use std::ops::Sub; use std::time::{Duration, SystemTime}; -fn check_age(timestamp: &SystemTime, how_far_back: u64) -> bool { - let oldest_timestamp = SystemTime::now().sub(Duration::new(how_far_back, 0)); - timestamp > &oldest_timestamp -} - /// Check if a message is recent enough to actually process. If the /// message is within "oldest_message_age" seconds, this function /// returns true. If it's older than that, it returns false and logs a @@ -78,17 +74,32 @@ async fn should_process<'a>( Ok((msg_body, sender_username)) } +fn should_process_event(db: &Database, room_id: &str, event_id: &str) -> bool { + db.rooms + .should_process(room_id, event_id) + .unwrap_or_else(|e| { + error!( + "Database error when checking if we should process an event: {}", + e.to_string() + ); + false + }) +} + /// This event emitter listens for messages with dice rolling commands. /// Originally adapted from the matrix-rust-sdk examples. #[async_trait] impl EventEmitter for DiceBot { async fn on_room_member(&self, room: SyncRoom, event: &SyncStateEvent) { - if !check_age(&event.origin_server_ts, 2) { - debug!("Ignoring old event: {:#?}", event); - return; - } - if let SyncRoom::Joined(room) | SyncRoom::Left(room) = room { + //Clone to avoid holding lock. + let room = room.read().await.clone(); + let (room_id, username) = (room.room_id.as_str(), &event.state_key); + + if !should_process_event(&self.db, room_id, event.event_id.as_str()) { + return; + } + let event_affects_us = if let Some(our_user_id) = self.client.user_id().await { event.state_key == our_user_id } else { @@ -102,10 +113,6 @@ impl EventEmitter for DiceBot { _ => return, }; - //Clone to avoid holding lock. - let room = room.read().await.clone(); - let (room_id, username) = (room.room_id.as_str(), &event.state_key); - let result = if event_affects_us && !adding_user { info!("Clearing all information for room ID {}", room_id); self.db.rooms.clear_info(room_id) diff --git a/src/db/rooms.rs b/src/db/rooms.rs index c80db5a..1cdb988 100644 --- a/src/db/rooms.rs +++ b/src/db/rooms.rs @@ -1,9 +1,13 @@ use crate::db::errors::DataError; +use byteorder::LittleEndian; use sled::transaction::TransactionalTree; use sled::Transactional; -use sled::Tree; +use sled::{CompareAndSwapError, Tree}; use std::collections::HashSet; use std::str; +use std::time::{SystemTime, UNIX_EPOCH}; +use zerocopy::byteorder::U64; +use zerocopy::AsBytes; #[derive(Clone)] pub struct Rooms { @@ -15,6 +19,9 @@ pub struct Rooms { /// Username -> list of room IDs user is in. pub(in crate::db) username_roomids: Tree, + + /// Room ID 0xff event ID -> received timestamp. + pub(in crate::db) roomeventid_timestamp: Tree, } #[derive(Clone, Copy)] @@ -76,9 +83,38 @@ impl Rooms { roomid_roominfo: db.open_tree("roomid_roominfo")?, roomid_usernames: db.open_tree("roomid_usernames")?, username_roomids: db.open_tree("username_roomids")?, + roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?, }) } + /// Determine if an event in a room should be processed. The event + /// is atomically recorded and true returned if the database has + /// not seen tis event yet. If the event already exists in the + /// database, the function returns false. Events are recorded by + /// this function by inserting the (system-local) timestamp in + /// epoch seconds. + pub fn should_process(&self, room_id: &str, event_id: &str) -> Result { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(event_id.as_bytes()); + + let timestamp: U64 = U64::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Clock has gone backwards") + .as_secs(), + ); + + match self.roomeventid_timestamp.compare_and_swap( + key, + None as Option<&[u8]>, + Some(timestamp.as_bytes()), + )? { + Ok(()) => Ok(true), + Err(CompareAndSwapError { .. }) => Ok(false), + } + } + pub fn get_rooms_for_user(&self, username: &str) -> Result, DataError> { get_set(&self.username_roomids, username.as_bytes()) } diff --git a/src/db/schema.rs b/src/db/schema.rs index 61eec0a..1dd4978 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -1,6 +1,6 @@ use crate::db::errors::DataError; use byteorder::LittleEndian; -use zerocopy::byteorder::{I32, U32}; +use zerocopy::byteorder::{I32, U32, U64}; use zerocopy::LayoutVerified; /// User variables are stored as little-endian 32-bit integers in the @@ -10,6 +10,9 @@ type LittleEndianI32Layout<'a> = LayoutVerified<&'a [u8], I32>; type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32>; +#[allow(dead_code)] +type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64>; + /// Convert bytes to an i32 with zero-copy deserialization. An error /// is returned if the bytes do not represent an i32. pub(super) fn convert_i32(raw_value: &[u8]) -> Result { @@ -33,3 +36,15 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result { Err(DataError::I32SchemaViolation) } } + +#[allow(dead_code)] +pub(super) fn convert_u64(raw_value: &[u8]) -> Result { + let layout = LittleEndianU64Layout::new_unaligned(raw_value.as_ref()); + + if let Some(layout) = layout { + let value: U64 = *layout; + Ok(value.get()) + } else { + Err(DataError::I32SchemaViolation) + } +}