From e4a41075af126dbcac19fe91267f785b3b391ee0 Mon Sep 17 00:00:00 2001 From: Teddy Wing Date: Sat, 12 Jun 2021 21:00:48 +0200 Subject: Process repositories on multiple threads Use 'rayon' to parallelise the repository processing. Each repository is processed in a thread in the default 'rayon' pool. In order to get thread-safe access to the database, I followed some advice from a Stack Overflow answer by VasiliNovikov (https://stackoverflow.com/users/1091436/vasilinovikov): https://stackoverflow.com/questions/62560396/how-to-use-sqlite-via-rusqlite-from-multiple-threads/62560397#62560397 VasiliNovikov recommended creating a database connection pool using 'r2d2_sqlite'. This way we don't have to share a database connection between threads, but each thread can have its own connection. This also means we can remove mutable requirements in a bunch of places involving our `database::Db` type since we're no longer managing the database connections directly. --- src/database.rs | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) (limited to 'src/database.rs') diff --git a/src/database.rs b/src/database.rs index 0785dd5..107a6d6 100644 --- a/src/database.rs +++ b/src/database.rs @@ -16,6 +16,8 @@ // along with Reflectub. If not, see . +use r2d2; +use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{self, OptionalExtension}; use thiserror; @@ -55,31 +57,37 @@ impl From<&github::Repo> for Repo { pub enum Error { #[error("database error")] Db(#[from] rusqlite::Error), + + #[error("connection pool error")] + Pool(#[from] r2d2::Error), } #[derive(Debug)] pub struct Db { - connection: rusqlite::Connection, + pool: r2d2::Pool, } impl Db { /// Open a connection to the database. pub fn connect(path: &str) -> Result { + let manager = SqliteConnectionManager::file(path) + .with_flags( + rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE + | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, + ); + Ok( Db { - connection: rusqlite::Connection::open_with_flags( - path, - rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE - | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, - )?, + pool: r2d2::Pool::new(manager)?, } ) } /// Initialise the database with tables and indexes. - pub fn create(&mut self) -> Result<(), Error> { - let tx = self.connection.transaction()?; + pub fn create(&self) -> Result<(), Error> { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; tx.execute( r#" @@ -110,8 +118,9 @@ impl Db { /// /// Returns a `rusqlite::Error::QueryReturnedNoRows` error if the row /// doesn't exist. - pub fn repo_get(&mut self, id: i64) -> Result { - let tx = self.connection.transaction()?; + pub fn repo_get(&self, id: i64) -> Result { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; let repo = tx.query_row( r#" @@ -142,8 +151,9 @@ impl Db { } /// Insert a new repository. - pub fn repo_insert(&mut self, repo: Repo) -> Result<(), Error> { - let tx = self.connection.transaction()?; + pub fn repo_insert(&self, repo: Repo) -> Result<(), Error> { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; tx.execute( r#" @@ -170,10 +180,11 @@ impl Db { /// Compares the `updated_at` field to find out whether the repository was /// updated. pub fn repo_is_updated( - &mut self, + &self, repo: &Repo, ) -> Result { - let tx = self.connection.transaction()?; + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; let is_updated = match tx.query_row( r#" @@ -201,8 +212,9 @@ impl Db { } /// Update an existing repository. - pub fn repo_update(&mut self, repo: &Repo) -> Result<(), Error> { - let tx = self.connection.transaction()?; + pub fn repo_update(&self, repo: &Repo) -> Result<(), Error> { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; tx.execute( r#" -- cgit v1.2.3