Use ON CONFLICT and transactions where appropriate.
This commit is contained in:
parent
7eee16961e
commit
a3b39ee42c
|
@ -51,18 +51,15 @@ impl Rooms for Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_room_info(&self, info: &RoomInfo) -> Result<(), DataError> {
|
async fn insert_room_info(&self, info: &RoomInfo) -> Result<(), DataError> {
|
||||||
//Clear out old info first, because we want this to be an "upsert."
|
sqlx::query(
|
||||||
sqlx::query("DELETE FROM room_info where room_id = ?")
|
r#"INSERT INTO room_info (room_id, room_name) VALUES (?, ?)
|
||||||
.bind(&info.room_id)
|
ON CONFLICT(room_id) DO UPDATE SET room_name = ?"#,
|
||||||
.execute(&self.conn)
|
)
|
||||||
.await
|
.bind(&info.room_id)
|
||||||
.ok();
|
.bind(&info.room_name)
|
||||||
|
.bind(&info.room_name)
|
||||||
sqlx::query(r#"INSERT INTO room_info (room_id, room_name) VALUES (?, ?)"#)
|
.execute(&self.conn)
|
||||||
.bind(&info.room_id)
|
.await?;
|
||||||
.bind(&info.room_name)
|
|
||||||
.execute(&self.conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -70,7 +67,7 @@ impl Rooms for Database {
|
||||||
async fn get_room_info(&self, room_id: &str) -> Result<Option<RoomInfo>, DataError> {
|
async fn get_room_info(&self, room_id: &str) -> Result<Option<RoomInfo>, DataError> {
|
||||||
let info = sqlx::query!(
|
let info = sqlx::query!(
|
||||||
r#"SELECT room_id, room_name FROM room_info
|
r#"SELECT room_id, room_name FROM room_info
|
||||||
WHERE room_id = ?"#,
|
WHERE room_id = ?"#,
|
||||||
room_id
|
room_id
|
||||||
)
|
)
|
||||||
.fetch_optional(&self.conn)
|
.fetch_optional(&self.conn)
|
||||||
|
@ -85,7 +82,7 @@ impl Rooms for Database {
|
||||||
async fn get_rooms_for_user(&self, user_id: &str) -> Result<HashSet<String>, DataError> {
|
async fn get_rooms_for_user(&self, user_id: &str) -> Result<HashSet<String>, DataError> {
|
||||||
let room_ids = sqlx::query!(
|
let room_ids = sqlx::query!(
|
||||||
r#"SELECT room_id FROM room_users
|
r#"SELECT room_id FROM room_users
|
||||||
WHERE username = ?"#,
|
WHERE username = ?"#,
|
||||||
user_id
|
user_id
|
||||||
)
|
)
|
||||||
.fetch_all(&self.conn)
|
.fetch_all(&self.conn)
|
||||||
|
@ -107,16 +104,14 @@ impl Rooms for Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> {
|
async fn add_user_to_room(&self, username: &str, room_id: &str) -> Result<(), DataError> {
|
||||||
// This is here because it is possible to process a bunch of
|
sqlx::query(
|
||||||
// user join/leave events at once, and we don't want to cause
|
"INSERT INTO room_users (room_id, username) VALUES (?, ?)
|
||||||
// constraint violation errors.
|
ON CONFLICT DO NOTHING",
|
||||||
self.remove_user_from_room(username, room_id).await.ok();
|
)
|
||||||
|
.bind(room_id)
|
||||||
sqlx::query("INSERT INTO room_users (room_id, username) VALUES (?, ?)")
|
.bind(username)
|
||||||
.bind(room_id)
|
.execute(&self.conn)
|
||||||
.bind(username)
|
.await?;
|
||||||
.execute(&self.conn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -134,16 +129,19 @@ impl Rooms for Database {
|
||||||
async fn clear_info(&self, room_id: &str) -> Result<(), DataError> {
|
async fn clear_info(&self, room_id: &str) -> Result<(), DataError> {
|
||||||
// We do not clear event history here, because if we rejoin a
|
// We do not clear event history here, because if we rejoin a
|
||||||
// room, we would re-process events we've already seen.
|
// room, we would re-process events we've already seen.
|
||||||
|
let mut tx = self.conn.begin().await?;
|
||||||
|
|
||||||
sqlx::query("DELETE FROM room_info where room_id = ?")
|
sqlx::query("DELETE FROM room_info where room_id = ?")
|
||||||
.bind(room_id)
|
.bind(room_id)
|
||||||
.execute(&self.conn)
|
.execute(&mut tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
sqlx::query("DELETE FROM room_users where room_id = ?")
|
sqlx::query("DELETE FROM room_users where room_id = ?")
|
||||||
.bind(room_id)
|
.bind(room_id)
|
||||||
.execute(&self.conn)
|
.execute(&mut tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
tx.commit().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue