diff options
| author | Teddy Wing | 2021-06-12 21:00:48 +0200 | 
|---|---|---|
| committer | Teddy Wing | 2021-06-12 21:00:48 +0200 | 
| commit | e4a41075af126dbcac19fe91267f785b3b391ee0 (patch) | |
| tree | 52d2a4a0a6f60ff386396d056625611824aa4e90 /src/database.rs | |
| parent | 46497bbae37f89e449b7a049663fe774843beb9b (diff) | |
| download | reflectub-e4a41075af126dbcac19fe91267f785b3b391ee0.tar.bz2 | |
Process repositories on multiple threads
Use 'rayon' to parallelise the repository processing. Each repository is
processed in a thread in the default 'rayon' pool.
In order to get thread-safe access to the database, I followed some
advice from a Stack Overflow answer by VasiliNovikov
(https://stackoverflow.com/users/1091436/vasilinovikov):
https://stackoverflow.com/questions/62560396/how-to-use-sqlite-via-rusqlite-from-multiple-threads/62560397#62560397
VasiliNovikov recommended creating a database connection pool using
'r2d2_sqlite'. This way we don't have to share a database connection
between threads, but each thread can have its own connection.
This also means we can remove mutable requirements in a bunch of places
involving our `database::Db` type since we're no longer managing the
database connections directly.
Diffstat (limited to 'src/database.rs')
| -rw-r--r-- | src/database.rs | 44 | 
1 files changed, 28 insertions, 16 deletions
| diff --git a/src/database.rs b/src/database.rs index 0785dd5..107a6d6 100644 --- a/src/database.rs +++ b/src/database.rs @@ -16,6 +16,8 @@  // along with Reflectub. If not, see <https://www.gnu.org/licenses/>. +use r2d2; +use r2d2_sqlite::SqliteConnectionManager;  use rusqlite::{self, OptionalExtension};  use thiserror; @@ -55,31 +57,37 @@ impl From<&github::Repo> for Repo {  pub enum Error {      #[error("database error")]      Db(#[from] rusqlite::Error), + +    #[error("connection pool error")] +    Pool(#[from] r2d2::Error),  }  #[derive(Debug)]  pub struct Db { -    connection: rusqlite::Connection, +    pool: r2d2::Pool<SqliteConnectionManager>,  }  impl Db {      /// Open a connection to the database.      pub fn connect(path: &str) -> Result<Self, Error> { +        let manager = SqliteConnectionManager::file(path) +            .with_flags( +                rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE +                | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, +            ); +          Ok(              Db { -                connection: rusqlite::Connection::open_with_flags( -                    path, -                    rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE -                    | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, -                )?, +                pool: r2d2::Pool::new(manager)?,              }          )      }      /// Initialise the database with tables and indexes. -    pub fn create(&mut self) -> Result<(), Error> { -        let tx = self.connection.transaction()?; +    pub fn create(&self) -> Result<(), Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          tx.execute(              r#" @@ -110,8 +118,9 @@ impl Db {      ///      /// Returns a `rusqlite::Error::QueryReturnedNoRows` error if the row      /// doesn't exist. -    pub fn repo_get(&mut self, id: i64) -> Result<Repo, Error> { -        let tx = self.connection.transaction()?; +    pub fn repo_get(&self, id: i64) -> Result<Repo, Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          let repo = tx.query_row(              r#" @@ -142,8 +151,9 @@ impl Db {      }      /// Insert a new repository. -    pub fn repo_insert(&mut self, repo: Repo) -> Result<(), Error> { -        let tx = self.connection.transaction()?; +    pub fn repo_insert(&self, repo: Repo) -> Result<(), Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          tx.execute(              r#" @@ -170,10 +180,11 @@ impl Db {      /// Compares the `updated_at` field to find out whether the repository was      /// updated.      pub fn repo_is_updated( -        &mut self, +        &self,          repo: &Repo,      ) -> Result<bool, Error> { -        let tx = self.connection.transaction()?; +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          let is_updated = match tx.query_row(              r#" @@ -201,8 +212,9 @@ impl Db {      }      /// Update an existing repository. -    pub fn repo_update(&mut self, repo: &Repo) -> Result<(), Error> { -        let tx = self.connection.transaction()?; +    pub fn repo_update(&self, repo: &Repo) -> Result<(), Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          tx.execute(              r#" | 
