Compare commits

...

2 Commits

Author SHA1 Message Date
projectmoon bf151a0a40 maintain list of events in db. must convert allh handlers to use it.
continuous-integration/drone/push Build is passing Details
2020-11-08 22:25:55 +00:00
projectmoon a77d066daf Do not process old membership events. 2020-11-08 21:47:01 +00:00
3 changed files with 86 additions and 18 deletions

View File

@ -1,10 +1,11 @@
use crate::db::Database;
use crate::error::BotError; use crate::error::BotError;
use async_trait::async_trait; use async_trait::async_trait;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use matrix_sdk::{ use matrix_sdk::{
self, self,
events::{ events::{
room::member::{MemberEventContent, MembershipState}, room::member::{MemberEventContent, MembershipChange},
room::message::{MessageEventContent, TextMessageEventContent}, room::message::{MessageEventContent, TextMessageEventContent},
StrippedStateEvent, SyncMessageEvent, SyncStateEvent, StrippedStateEvent, SyncMessageEvent, SyncStateEvent,
}, },
@ -73,35 +74,51 @@ async fn should_process<'a>(
Ok((msg_body, sender_username)) Ok((msg_body, sender_username))
} }
fn should_process_event(db: &Database, room_id: &str, event_id: &str) -> bool {
db.rooms
.should_process(room_id, event_id)
.unwrap_or_else(|e| {
error!(
"Database error when checking if we should process an event: {}",
e.to_string()
);
false
})
}
/// This event emitter listens for messages with dice rolling commands. /// This event emitter listens for messages with dice rolling commands.
/// Originally adapted from the matrix-rust-sdk examples. /// Originally adapted from the matrix-rust-sdk examples.
#[async_trait] #[async_trait]
impl EventEmitter for DiceBot { impl EventEmitter for DiceBot {
async fn on_room_member( async fn on_room_member(&self, room: SyncRoom, event: &SyncStateEvent<MemberEventContent>) {
&self, if let SyncRoom::Joined(room) | SyncRoom::Left(room) = room {
room: SyncRoom, //Clone to avoid holding lock.
room_member: &SyncStateEvent<MemberEventContent>, let room = room.read().await.clone();
) { let (room_id, username) = (room.room_id.as_str(), &event.state_key);
if let SyncRoom::Joined(room) = room {
if !should_process_event(&self.db, room_id, event.event_id.as_str()) {
return;
}
let event_affects_us = if let Some(our_user_id) = self.client.user_id().await { let event_affects_us = if let Some(our_user_id) = self.client.user_id().await {
room_member.state_key == our_user_id event.state_key == our_user_id
} else { } else {
false false
}; };
let adding_user = match room_member.content.membership { use MembershipChange::*;
MembershipState::Join => true, let adding_user = match event.membership_change() {
MembershipState::Leave | MembershipState::Ban => false, Joined => true,
Banned | Left | Kicked | KickedAndBanned => false,
_ => return, _ => return,
}; };
//Clone to avoid holding lock.
let room = room.read().await.clone();
let (room_id, username) = (room.room_id.as_str(), &room_member.state_key);
let result = if event_affects_us && !adding_user { let result = if event_affects_us && !adding_user {
info!("Clearing all information for room ID {}", room_id); info!("Clearing all information for room ID {}", room_id);
self.db.rooms.clear_info(room_id) self.db.rooms.clear_info(room_id)
} else if event_affects_us && adding_user {
info!("Joined room {}; recording user information", room_id);
Ok(())
} else if !event_affects_us && adding_user { } else if !event_affects_us && adding_user {
info!("Adding user {} to room ID {}", username, room_id); info!("Adding user {} to room ID {}", username, room_id);
self.db.rooms.add_user_to_room(username, room_id) self.db.rooms.add_user_to_room(username, room_id)
@ -109,7 +126,7 @@ impl EventEmitter for DiceBot {
info!("Removing user {} from room ID {}", username, room_id); info!("Removing user {} from room ID {}", username, room_id);
self.db.rooms.remove_user_from_room(username, room_id) self.db.rooms.remove_user_from_room(username, room_id)
} else { } else {
debug!("Ignoring a room member event: {:#?}", room_member); debug!("Ignoring a room member event: {:#?}", event);
Ok(()) Ok(())
}; };

View File

@ -1,9 +1,13 @@
use crate::db::errors::DataError; use crate::db::errors::DataError;
use byteorder::LittleEndian;
use sled::transaction::TransactionalTree; use sled::transaction::TransactionalTree;
use sled::Transactional; use sled::Transactional;
use sled::Tree; use sled::{CompareAndSwapError, Tree};
use std::collections::HashSet; use std::collections::HashSet;
use std::str; use std::str;
use std::time::{SystemTime, UNIX_EPOCH};
use zerocopy::byteorder::U64;
use zerocopy::AsBytes;
#[derive(Clone)] #[derive(Clone)]
pub struct Rooms { pub struct Rooms {
@ -15,6 +19,9 @@ pub struct Rooms {
/// Username -> list of room IDs user is in. /// Username -> list of room IDs user is in.
pub(in crate::db) username_roomids: Tree, pub(in crate::db) username_roomids: Tree,
/// Room ID 0xff event ID -> received timestamp.
pub(in crate::db) roomeventid_timestamp: Tree,
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -76,9 +83,38 @@ impl Rooms {
roomid_roominfo: db.open_tree("roomid_roominfo")?, roomid_roominfo: db.open_tree("roomid_roominfo")?,
roomid_usernames: db.open_tree("roomid_usernames")?, roomid_usernames: db.open_tree("roomid_usernames")?,
username_roomids: db.open_tree("username_roomids")?, username_roomids: db.open_tree("username_roomids")?,
roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?,
}) })
} }
/// Determine if an event in a room should be processed. The event
/// is atomically recorded and true returned if the database has
/// not seen tis event yet. If the event already exists in the
/// database, the function returns false. Events are recorded by
/// this function by inserting the (system-local) timestamp in
/// epoch seconds.
pub fn should_process(&self, room_id: &str, event_id: &str) -> Result<bool, DataError> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(event_id.as_bytes());
let timestamp: U64<LittleEndian> = U64::new(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Clock has gone backwards")
.as_secs(),
);
match self.roomeventid_timestamp.compare_and_swap(
key,
None as Option<&[u8]>,
Some(timestamp.as_bytes()),
)? {
Ok(()) => Ok(true),
Err(CompareAndSwapError { .. }) => Ok(false),
}
}
pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> { pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> {
get_set(&self.username_roomids, username.as_bytes()) get_set(&self.username_roomids, username.as_bytes())
} }

View File

@ -1,6 +1,6 @@
use crate::db::errors::DataError; use crate::db::errors::DataError;
use byteorder::LittleEndian; use byteorder::LittleEndian;
use zerocopy::byteorder::{I32, U32}; use zerocopy::byteorder::{I32, U32, U64};
use zerocopy::LayoutVerified; use zerocopy::LayoutVerified;
/// User variables are stored as little-endian 32-bit integers in the /// User variables are stored as little-endian 32-bit integers in the
@ -10,6 +10,9 @@ type LittleEndianI32Layout<'a> = LayoutVerified<&'a [u8], I32<LittleEndian>>;
type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>; type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>;
#[allow(dead_code)]
type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<LittleEndian>>;
/// Convert bytes to an i32 with zero-copy deserialization. An error /// Convert bytes to an i32 with zero-copy deserialization. An error
/// is returned if the bytes do not represent an i32. /// is returned if the bytes do not represent an i32.
pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> { pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> {
@ -33,3 +36,15 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result<u32, DataError> {
Err(DataError::I32SchemaViolation) Err(DataError::I32SchemaViolation)
} }
} }
#[allow(dead_code)]
pub(super) fn convert_u64(raw_value: &[u8]) -> Result<u64, DataError> {
let layout = LittleEndianU64Layout::new_unaligned(raw_value.as_ref());
if let Some(layout) = layout {
let value: U64<LittleEndian> = *layout;
Ok(value.get())
} else {
Err(DataError::I32SchemaViolation)
}
}