Convert project to workspace with Tonic for gRPC. #84
|
@ -2,6 +2,7 @@
|
||||||
#![type_length_limit = "7605144"]
|
#![type_length_limit = "7605144"]
|
||||||
use futures::try_join;
|
use futures::try_join;
|
||||||
use log::error;
|
use log::error;
|
||||||
|
use matrix_sdk::Client;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use tenebrous_dicebot::bot::DiceBot;
|
use tenebrous_dicebot::bot::DiceBot;
|
||||||
|
@ -15,12 +16,13 @@ use tracing_subscriber::filter::EnvFilter;
|
||||||
/// Attempt to create config object and ddatabase connection pool from
|
/// Attempt to create config object and ddatabase connection pool from
|
||||||
/// the given config path. An error is returned if config creation or
|
/// the given config path. An error is returned if config creation or
|
||||||
/// database pool creation fails for some reason.
|
/// database pool creation fails for some reason.
|
||||||
async fn init(config_path: &str) -> Result<(Arc<Config>, Database), BotError> {
|
async fn init(config_path: &str) -> Result<(Arc<Config>, Database, Client), BotError> {
|
||||||
let cfg = read_config(config_path)?;
|
let cfg = read_config(config_path)?;
|
||||||
let cfg = Arc::new(cfg);
|
let cfg = Arc::new(cfg);
|
||||||
let sqlite_path = format!("{}/dicebot.sqlite", cfg.database_path());
|
let sqlite_path = format!("{}/dicebot.sqlite", cfg.database_path());
|
||||||
let db = Database::new(&sqlite_path).await?;
|
let db = Database::new(&sqlite_path).await?;
|
||||||
Ok((cfg, db))
|
let client = tenebrous_dicebot::matrix::create_client(&cfg)?;
|
||||||
|
Ok((cfg, db, client))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
@ -47,9 +49,9 @@ async fn run() -> Result<(), BotError> {
|
||||||
.next()
|
.next()
|
||||||
.expect("Need a config as an argument");
|
.expect("Need a config as an argument");
|
||||||
|
|
||||||
let (cfg, db) = init(&config_path).await?;
|
let (cfg, db, client) = init(&config_path).await?;
|
||||||
let grpc = rpc::serve_grpc(&cfg, &db);
|
let grpc = rpc::serve_grpc(&cfg, &db, &client);
|
||||||
let bot = run_bot(&cfg, &db);
|
let bot = run_bot(&cfg, &db, &client);
|
||||||
|
|
||||||
match try_join!(bot, grpc) {
|
match try_join!(bot, grpc) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
@ -59,10 +61,10 @@ async fn run() -> Result<(), BotError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_bot(cfg: &Arc<Config>, db: &Database) -> Result<(), BotError> {
|
async fn run_bot(cfg: &Arc<Config>, db: &Database, client: &Client) -> Result<(), BotError> {
|
||||||
let state = Arc::new(RwLock::new(DiceBotState::new(&cfg)));
|
let state = Arc::new(RwLock::new(DiceBotState::new(&cfg)));
|
||||||
|
|
||||||
match DiceBot::new(cfg, &state, db) {
|
match DiceBot::new(cfg, &state, db, client) {
|
||||||
Ok(bot) => bot.run().await?,
|
Ok(bot) => bot.run().await?,
|
||||||
Err(e) => println!("Error connecting: {:?}", e),
|
Err(e) => println!("Error connecting: {:?}", e),
|
||||||
};
|
};
|
||||||
|
|
|
@ -5,16 +5,21 @@ use tenebrous_rpc::protos::dicebot::{dicebot_client::DicebotClient, GetVariableR
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let mut client = DicebotClient::connect("http://0.0.0.0:9090").await?;
|
let mut client = DicebotClient::connect("http://0.0.0.0:9090").await?;
|
||||||
|
|
||||||
let request = tonic::Request::new(GetVariableRequest {
|
// let request = tonic::Request::new(GetVariableRequest {
|
||||||
|
// user_id: "@projectmoon:agnos.is".into(),
|
||||||
|
// room_id: "!agICWvldGfuCywUVUM:agnos.is".into(),
|
||||||
|
// variable_name: "stuff".into(),
|
||||||
|
// });
|
||||||
|
|
||||||
|
// let response = client.get_variable(request).await?.into_inner();
|
||||||
|
|
||||||
|
let request = tonic::Request::new(UserIdRequest {
|
||||||
user_id: "@projectmoon:agnos.is".into(),
|
user_id: "@projectmoon:agnos.is".into(),
|
||||||
room_id: "!agICWvldGfuCywUVUM:agnos.is".into(),
|
|
||||||
variable_name: "stuff".into(),
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let response = client.get_variable(request).await?.into_inner();
|
let response = client.rooms_for_user(request).await?.into_inner();
|
||||||
|
// println!("RESPONSE={:?}", response);
|
||||||
println!("RESPONSE={:?}", response);
|
// println!("User friendly response is: {:?}", response.value);
|
||||||
println!("User friendly response is: {:?}", response.value);
|
println!("Rooms: {:?}", response.rooms);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,22 +35,6 @@ pub struct DiceBot {
|
||||||
db: Database,
|
db: Database,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cache_dir() -> Result<PathBuf, BotError> {
|
|
||||||
let mut dir = dirs::cache_dir().ok_or(BotError::NoCacheDirectoryError)?;
|
|
||||||
dir.push("matrix-dicebot");
|
|
||||||
Ok(dir)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates the matrix client.
|
|
||||||
fn create_client(config: &Config) -> Result<Client, BotError> {
|
|
||||||
let cache_dir = cache_dir()?;
|
|
||||||
//let store = JsonStore::open(&cache_dir)?;
|
|
||||||
let client_config = ClientConfig::new().store_path(cache_dir);
|
|
||||||
let homeserver_url = Url::parse(&config.matrix_homeserver())?;
|
|
||||||
|
|
||||||
Ok(Client::new_with_config(homeserver_url, client_config)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DiceBot {
|
impl DiceBot {
|
||||||
/// Create a new dicebot with the given configuration and state
|
/// Create a new dicebot with the given configuration and state
|
||||||
/// actor. This function returns a Result because it is possible
|
/// actor. This function returns a Result because it is possible
|
||||||
|
@ -60,9 +44,10 @@ impl DiceBot {
|
||||||
config: &Arc<Config>,
|
config: &Arc<Config>,
|
||||||
state: &Arc<RwLock<DiceBotState>>,
|
state: &Arc<RwLock<DiceBotState>>,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
|
client: &Client,
|
||||||
) -> Result<Self, BotError> {
|
) -> Result<Self, BotError> {
|
||||||
Ok(DiceBot {
|
Ok(DiceBot {
|
||||||
client: create_client(&config)?,
|
client: client.clone(),
|
||||||
config: config.clone(),
|
config: config.clone(),
|
||||||
state: state.clone(),
|
state: state.clone(),
|
||||||
db: db.clone(),
|
db: db.clone(),
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use futures::stream::{self, StreamExt, TryStreamExt};
|
use futures::stream::{self, StreamExt, TryStreamExt};
|
||||||
use log::error;
|
use log::error;
|
||||||
use matrix_sdk::{events::room::message::NoticeMessageEventContent, room::Joined};
|
use matrix_sdk::{events::room::message::NoticeMessageEventContent, room::Joined, ClientConfig};
|
||||||
use matrix_sdk::{
|
use matrix_sdk::{
|
||||||
events::room::message::{InReplyTo, Relation},
|
events::room::message::{InReplyTo, Relation},
|
||||||
events::room::message::{MessageEventContent, MessageType},
|
events::room::message::{MessageEventContent, MessageType},
|
||||||
|
@ -9,6 +11,15 @@ use matrix_sdk::{
|
||||||
Error as MatrixError,
|
Error as MatrixError,
|
||||||
};
|
};
|
||||||
use matrix_sdk::{identifiers::RoomId, identifiers::UserId, Client};
|
use matrix_sdk::{identifiers::RoomId, identifiers::UserId, Client};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
use crate::{config::Config, error::BotError};
|
||||||
|
|
||||||
|
fn cache_dir() -> Result<PathBuf, BotError> {
|
||||||
|
let mut dir = dirs::cache_dir().ok_or(BotError::NoCacheDirectoryError)?;
|
||||||
|
dir.push("matrix-dicebot");
|
||||||
|
Ok(dir)
|
||||||
|
}
|
||||||
|
|
||||||
/// Extracts more detailed error messages out of a matrix SDK error.
|
/// Extracts more detailed error messages out of a matrix SDK error.
|
||||||
fn extract_error_message(error: MatrixError) -> String {
|
fn extract_error_message(error: MatrixError) -> String {
|
||||||
|
@ -20,6 +31,15 @@ fn extract_error_message(error: MatrixError) -> String {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Creates the matrix client.
|
||||||
|
pub fn create_client(config: &Config) -> Result<Client, BotError> {
|
||||||
|
let cache_dir = cache_dir()?;
|
||||||
|
let client_config = ClientConfig::new().store_path(cache_dir);
|
||||||
|
let homeserver_url = Url::parse(&config.matrix_homeserver())?;
|
||||||
|
|
||||||
|
Ok(Client::new_with_config(homeserver_url, client_config)?)
|
||||||
|
}
|
||||||
|
|
||||||
/// Retrieve a list of users in a given room.
|
/// Retrieve a list of users in a given room.
|
||||||
pub async fn get_users_in_room(
|
pub async fn get_users_in_room(
|
||||||
client: &Client,
|
client: &Client,
|
||||||
|
|
|
@ -1,10 +1,16 @@
|
||||||
use crate::db::{errors::DataError, Variables};
|
use crate::db::{errors::DataError, Variables};
|
||||||
use crate::error::BotError;
|
use crate::error::BotError;
|
||||||
|
use crate::matrix;
|
||||||
use crate::{config::Config, db::sqlite::Database};
|
use crate::{config::Config, db::sqlite::Database};
|
||||||
|
use futures::stream;
|
||||||
|
use futures::{StreamExt, TryFutureExt, TryStreamExt};
|
||||||
use log::info;
|
use log::info;
|
||||||
|
use matrix_sdk::{identifiers::UserId, Client};
|
||||||
|
use std::convert::TryFrom;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tenebrous_rpc::protos::dicebot::{
|
use tenebrous_rpc::protos::dicebot::{
|
||||||
dicebot_server::{Dicebot, DicebotServer},
|
dicebot_server::{Dicebot, DicebotServer},
|
||||||
|
rooms_list_reply::Room,
|
||||||
GetAllVariablesReply, GetAllVariablesRequest, RoomsListReply, SetVariableReply,
|
GetAllVariablesReply, GetAllVariablesRequest, RoomsListReply, SetVariableReply,
|
||||||
SetVariableRequest, UserIdRequest,
|
SetVariableRequest, UserIdRequest,
|
||||||
};
|
};
|
||||||
|
@ -26,6 +32,7 @@ impl From<DataError> for Status {
|
||||||
pub struct DicebotRpcService {
|
pub struct DicebotRpcService {
|
||||||
config: Arc<Config>,
|
config: Arc<Config>,
|
||||||
db: Database,
|
db: Database,
|
||||||
|
client: Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
|
@ -34,6 +41,17 @@ impl Dicebot for DicebotRpcService {
|
||||||
&self,
|
&self,
|
||||||
request: Request<SetVariableRequest>,
|
request: Request<SetVariableRequest>,
|
||||||
) -> Result<Response<SetVariableReply>, Status> {
|
) -> Result<Response<SetVariableReply>, Status> {
|
||||||
|
let SetVariableRequest {
|
||||||
|
user_id,
|
||||||
|
room_id,
|
||||||
|
variable_name,
|
||||||
|
value,
|
||||||
|
} = request.into_inner();
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.set_user_variable(&user_id, &room_id, &variable_name, value)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(Response::new(SetVariableReply { success: true }))
|
Ok(Response::new(SetVariableReply { success: true }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,28 +72,63 @@ impl Dicebot for DicebotRpcService {
|
||||||
&self,
|
&self,
|
||||||
request: Request<GetAllVariablesRequest>,
|
request: Request<GetAllVariablesRequest>,
|
||||||
) -> Result<Response<GetAllVariablesReply>, Status> {
|
) -> Result<Response<GetAllVariablesReply>, Status> {
|
||||||
Ok(Response::new(GetAllVariablesReply {
|
let request = request.into_inner();
|
||||||
variables: std::collections::HashMap::new(),
|
let variables = self
|
||||||
}))
|
.db
|
||||||
|
.get_user_variables(&request.user_id, &request.room_id)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Response::new(GetAllVariablesReply { variables }))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn rooms_for_user(
|
async fn rooms_for_user(
|
||||||
&self,
|
&self,
|
||||||
request: Request<UserIdRequest>,
|
request: Request<UserIdRequest>,
|
||||||
) -> Result<Response<RoomsListReply>, Status> {
|
) -> Result<Response<RoomsListReply>, Status> {
|
||||||
Ok(Response::new(RoomsListReply {
|
let UserIdRequest { user_id } = request.into_inner();
|
||||||
room_ids: Vec::new(),
|
let user_id = UserId::try_from(user_id).map_err(BotError::from)?;
|
||||||
}))
|
|
||||||
|
let rooms_for_user = matrix::get_rooms_for_user(&self.client, &user_id)
|
||||||
|
.err_into::<BotError>()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut rooms: Vec<Room> = stream::iter(rooms_for_user)
|
||||||
|
.filter_map(|room| async move {
|
||||||
|
let rooms = room.display_name().await.map(|room_name| Room {
|
||||||
|
room_id: room.room_id().to_string(),
|
||||||
|
display_name: room_name,
|
||||||
|
});
|
||||||
|
|
||||||
|
Some(rooms)
|
||||||
|
})
|
||||||
|
.err_into::<BotError>()
|
||||||
|
.try_collect()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let sort = |r1: &Room, r2: &Room| {
|
||||||
|
r1.display_name
|
||||||
|
.to_lowercase()
|
||||||
|
.cmp(&r2.display_name.to_lowercase())
|
||||||
|
};
|
||||||
|
|
||||||
|
rooms.sort_by(sort);
|
||||||
|
|
||||||
|
Ok(Response::new(RoomsListReply { rooms }))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn serve_grpc(config: &Arc<Config>, db: &Database) -> Result<(), BotError> {
|
pub async fn serve_grpc(
|
||||||
|
config: &Arc<Config>,
|
||||||
|
db: &Database,
|
||||||
|
client: &Client,
|
||||||
|
) -> Result<(), BotError> {
|
||||||
match config.rpc_addr() {
|
match config.rpc_addr() {
|
||||||
Some(addr) => {
|
Some(addr) => {
|
||||||
let addr = addr.parse()?;
|
let addr = addr.parse()?;
|
||||||
let rpc_service = DicebotRpcService {
|
let rpc_service = DicebotRpcService {
|
||||||
db: db.clone(),
|
db: db.clone(),
|
||||||
config: config.clone(),
|
config: config.clone(),
|
||||||
|
client: client.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Serving Dicebot gRPC service on {}", addr);
|
info!("Serving Dicebot gRPC service on {}", addr);
|
||||||
|
|
|
@ -43,5 +43,10 @@ message UserIdRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
message RoomsListReply {
|
message RoomsListReply {
|
||||||
repeated string room_ids = 1;
|
message Room {
|
||||||
|
string room_id = 1;
|
||||||
|
string display_name = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
repeated Room rooms = 1;
|
||||||
}
|
}
|
Loading…
Reference in New Issue