maintain list of events in db. must convert allh handlers to use it.
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
This commit is contained in:
parent
a77d066daf
commit
bf151a0a40
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::db::Database;
|
||||||
use crate::error::BotError;
|
use crate::error::BotError;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
|
@ -16,11 +17,6 @@ use std::clone::Clone;
|
||||||
use std::ops::Sub;
|
use std::ops::Sub;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
fn check_age(timestamp: &SystemTime, how_far_back: u64) -> bool {
|
|
||||||
let oldest_timestamp = SystemTime::now().sub(Duration::new(how_far_back, 0));
|
|
||||||
timestamp > &oldest_timestamp
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if a message is recent enough to actually process. If the
|
/// Check if a message is recent enough to actually process. If the
|
||||||
/// message is within "oldest_message_age" seconds, this function
|
/// message is within "oldest_message_age" seconds, this function
|
||||||
/// returns true. If it's older than that, it returns false and logs a
|
/// returns true. If it's older than that, it returns false and logs a
|
||||||
|
@ -78,17 +74,32 @@ async fn should_process<'a>(
|
||||||
Ok((msg_body, sender_username))
|
Ok((msg_body, sender_username))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn should_process_event(db: &Database, room_id: &str, event_id: &str) -> bool {
|
||||||
|
db.rooms
|
||||||
|
.should_process(room_id, event_id)
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
error!(
|
||||||
|
"Database error when checking if we should process an event: {}",
|
||||||
|
e.to_string()
|
||||||
|
);
|
||||||
|
false
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// This event emitter listens for messages with dice rolling commands.
|
/// This event emitter listens for messages with dice rolling commands.
|
||||||
/// Originally adapted from the matrix-rust-sdk examples.
|
/// Originally adapted from the matrix-rust-sdk examples.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EventEmitter for DiceBot {
|
impl EventEmitter for DiceBot {
|
||||||
async fn on_room_member(&self, room: SyncRoom, event: &SyncStateEvent<MemberEventContent>) {
|
async fn on_room_member(&self, room: SyncRoom, event: &SyncStateEvent<MemberEventContent>) {
|
||||||
if !check_age(&event.origin_server_ts, 2) {
|
if let SyncRoom::Joined(room) | SyncRoom::Left(room) = room {
|
||||||
debug!("Ignoring old event: {:#?}", event);
|
//Clone to avoid holding lock.
|
||||||
|
let room = room.read().await.clone();
|
||||||
|
let (room_id, username) = (room.room_id.as_str(), &event.state_key);
|
||||||
|
|
||||||
|
if !should_process_event(&self.db, room_id, event.event_id.as_str()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let SyncRoom::Joined(room) | SyncRoom::Left(room) = room {
|
|
||||||
let event_affects_us = if let Some(our_user_id) = self.client.user_id().await {
|
let event_affects_us = if let Some(our_user_id) = self.client.user_id().await {
|
||||||
event.state_key == our_user_id
|
event.state_key == our_user_id
|
||||||
} else {
|
} else {
|
||||||
|
@ -102,10 +113,6 @@ impl EventEmitter for DiceBot {
|
||||||
_ => return,
|
_ => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
//Clone to avoid holding lock.
|
|
||||||
let room = room.read().await.clone();
|
|
||||||
let (room_id, username) = (room.room_id.as_str(), &event.state_key);
|
|
||||||
|
|
||||||
let result = if event_affects_us && !adding_user {
|
let result = if event_affects_us && !adding_user {
|
||||||
info!("Clearing all information for room ID {}", room_id);
|
info!("Clearing all information for room ID {}", room_id);
|
||||||
self.db.rooms.clear_info(room_id)
|
self.db.rooms.clear_info(room_id)
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
use crate::db::errors::DataError;
|
use crate::db::errors::DataError;
|
||||||
|
use byteorder::LittleEndian;
|
||||||
use sled::transaction::TransactionalTree;
|
use sled::transaction::TransactionalTree;
|
||||||
use sled::Transactional;
|
use sled::Transactional;
|
||||||
use sled::Tree;
|
use sled::{CompareAndSwapError, Tree};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::str;
|
use std::str;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
use zerocopy::byteorder::U64;
|
||||||
|
use zerocopy::AsBytes;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Rooms {
|
pub struct Rooms {
|
||||||
|
@ -15,6 +19,9 @@ pub struct Rooms {
|
||||||
|
|
||||||
/// Username -> list of room IDs user is in.
|
/// Username -> list of room IDs user is in.
|
||||||
pub(in crate::db) username_roomids: Tree,
|
pub(in crate::db) username_roomids: Tree,
|
||||||
|
|
||||||
|
/// Room ID 0xff event ID -> received timestamp.
|
||||||
|
pub(in crate::db) roomeventid_timestamp: Tree,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
|
@ -76,9 +83,38 @@ impl Rooms {
|
||||||
roomid_roominfo: db.open_tree("roomid_roominfo")?,
|
roomid_roominfo: db.open_tree("roomid_roominfo")?,
|
||||||
roomid_usernames: db.open_tree("roomid_usernames")?,
|
roomid_usernames: db.open_tree("roomid_usernames")?,
|
||||||
username_roomids: db.open_tree("username_roomids")?,
|
username_roomids: db.open_tree("username_roomids")?,
|
||||||
|
roomeventid_timestamp: db.open_tree("roomeventid_timestamp")?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Determine if an event in a room should be processed. The event
|
||||||
|
/// is atomically recorded and true returned if the database has
|
||||||
|
/// not seen tis event yet. If the event already exists in the
|
||||||
|
/// database, the function returns false. Events are recorded by
|
||||||
|
/// this function by inserting the (system-local) timestamp in
|
||||||
|
/// epoch seconds.
|
||||||
|
pub fn should_process(&self, room_id: &str, event_id: &str) -> Result<bool, DataError> {
|
||||||
|
let mut key = room_id.as_bytes().to_vec();
|
||||||
|
key.push(0xff);
|
||||||
|
key.extend_from_slice(event_id.as_bytes());
|
||||||
|
|
||||||
|
let timestamp: U64<LittleEndian> = U64::new(
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("Clock has gone backwards")
|
||||||
|
.as_secs(),
|
||||||
|
);
|
||||||
|
|
||||||
|
match self.roomeventid_timestamp.compare_and_swap(
|
||||||
|
key,
|
||||||
|
None as Option<&[u8]>,
|
||||||
|
Some(timestamp.as_bytes()),
|
||||||
|
)? {
|
||||||
|
Ok(()) => Ok(true),
|
||||||
|
Err(CompareAndSwapError { .. }) => Ok(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> {
|
pub fn get_rooms_for_user(&self, username: &str) -> Result<HashSet<String>, DataError> {
|
||||||
get_set(&self.username_roomids, username.as_bytes())
|
get_set(&self.username_roomids, username.as_bytes())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::db::errors::DataError;
|
use crate::db::errors::DataError;
|
||||||
use byteorder::LittleEndian;
|
use byteorder::LittleEndian;
|
||||||
use zerocopy::byteorder::{I32, U32};
|
use zerocopy::byteorder::{I32, U32, U64};
|
||||||
use zerocopy::LayoutVerified;
|
use zerocopy::LayoutVerified;
|
||||||
|
|
||||||
/// User variables are stored as little-endian 32-bit integers in the
|
/// User variables are stored as little-endian 32-bit integers in the
|
||||||
|
@ -10,6 +10,9 @@ type LittleEndianI32Layout<'a> = LayoutVerified<&'a [u8], I32<LittleEndian>>;
|
||||||
|
|
||||||
type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>;
|
type LittleEndianU32Layout<'a> = LayoutVerified<&'a [u8], U32<LittleEndian>>;
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
type LittleEndianU64Layout<'a> = LayoutVerified<&'a [u8], U64<LittleEndian>>;
|
||||||
|
|
||||||
/// Convert bytes to an i32 with zero-copy deserialization. An error
|
/// Convert bytes to an i32 with zero-copy deserialization. An error
|
||||||
/// is returned if the bytes do not represent an i32.
|
/// is returned if the bytes do not represent an i32.
|
||||||
pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> {
|
pub(super) fn convert_i32(raw_value: &[u8]) -> Result<i32, DataError> {
|
||||||
|
@ -33,3 +36,15 @@ pub(super) fn convert_u32(raw_value: &[u8]) -> Result<u32, DataError> {
|
||||||
Err(DataError::I32SchemaViolation)
|
Err(DataError::I32SchemaViolation)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub(super) fn convert_u64(raw_value: &[u8]) -> Result<u64, DataError> {
|
||||||
|
let layout = LittleEndianU64Layout::new_unaligned(raw_value.as_ref());
|
||||||
|
|
||||||
|
if let Some(layout) = layout {
|
||||||
|
let value: U64<LittleEndian> = *layout;
|
||||||
|
Ok(value.get())
|
||||||
|
} else {
|
||||||
|
Err(DataError::I32SchemaViolation)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue