diff options
author | Teddy Wing | 2021-06-12 21:00:48 +0200 |
---|---|---|
committer | Teddy Wing | 2021-06-12 21:00:48 +0200 |
commit | e4a41075af126dbcac19fe91267f785b3b391ee0 (patch) | |
tree | 52d2a4a0a6f60ff386396d056625611824aa4e90 /src | |
parent | 46497bbae37f89e449b7a049663fe774843beb9b (diff) | |
download | reflectub-e4a41075af126dbcac19fe91267f785b3b391ee0.tar.bz2 |
Process repositories on multiple threads
Use 'rayon' to parallelise the repository processing. Each repository is
processed in a thread in the default 'rayon' pool.
In order to get thread-safe access to the database, I followed some
advice from a Stack Overflow answer by VasiliNovikov
(https://stackoverflow.com/users/1091436/vasilinovikov):
https://stackoverflow.com/questions/62560396/how-to-use-sqlite-via-rusqlite-from-multiple-threads/62560397#62560397
VasiliNovikov recommended creating a database connection pool using
'r2d2_sqlite'. This way we don't have to share a database connection
between threads, but each thread can have its own connection.
This also means we can remove mutable requirements in a bunch of places
involving our `database::Db` type since we're no longer managing the
database connections directly.
Diffstat (limited to 'src')
-rw-r--r-- | src/database.rs | 44 | ||||
-rw-r--r-- | src/main.rs | 74 |
2 files changed, 75 insertions, 43 deletions
diff --git a/src/database.rs b/src/database.rs index 0785dd5..107a6d6 100644 --- a/src/database.rs +++ b/src/database.rs @@ -16,6 +16,8 @@ // along with Reflectub. If not, see <https://www.gnu.org/licenses/>. +use r2d2; +use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{self, OptionalExtension}; use thiserror; @@ -55,31 +57,37 @@ impl From<&github::Repo> for Repo { pub enum Error { #[error("database error")] Db(#[from] rusqlite::Error), + + #[error("connection pool error")] + Pool(#[from] r2d2::Error), } #[derive(Debug)] pub struct Db { - connection: rusqlite::Connection, + pool: r2d2::Pool<SqliteConnectionManager>, } impl Db { /// Open a connection to the database. pub fn connect(path: &str) -> Result<Self, Error> { + let manager = SqliteConnectionManager::file(path) + .with_flags( + rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE + | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, + ); + Ok( Db { - connection: rusqlite::Connection::open_with_flags( - path, - rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE - | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, - )?, + pool: r2d2::Pool::new(manager)?, } ) } /// Initialise the database with tables and indexes. - pub fn create(&mut self) -> Result<(), Error> { - let tx = self.connection.transaction()?; + pub fn create(&self) -> Result<(), Error> { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; tx.execute( r#" @@ -110,8 +118,9 @@ impl Db { /// /// Returns a `rusqlite::Error::QueryReturnedNoRows` error if the row /// doesn't exist. - pub fn repo_get(&mut self, id: i64) -> Result<Repo, Error> { - let tx = self.connection.transaction()?; + pub fn repo_get(&self, id: i64) -> Result<Repo, Error> { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; let repo = tx.query_row( r#" @@ -142,8 +151,9 @@ impl Db { } /// Insert a new repository. - pub fn repo_insert(&mut self, repo: Repo) -> Result<(), Error> { - let tx = self.connection.transaction()?; + pub fn repo_insert(&self, repo: Repo) -> Result<(), Error> { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; tx.execute( r#" @@ -170,10 +180,11 @@ impl Db { /// Compares the `updated_at` field to find out whether the repository was /// updated. pub fn repo_is_updated( - &mut self, + &self, repo: &Repo, ) -> Result<bool, Error> { - let tx = self.connection.transaction()?; + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; let is_updated = match tx.query_row( r#" @@ -201,8 +212,9 @@ impl Db { } /// Update an existing repository. - pub fn repo_update(&mut self, repo: &Repo) -> Result<(), Error> { - let tx = self.connection.transaction()?; + pub fn repo_update(&self, repo: &Repo) -> Result<(), Error> { + let mut pool = self.pool.get()?; + let tx = pool.transaction()?; tx.execute( r#" diff --git a/src/main.rs b/src/main.rs index 4e4d580..87b2320 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,8 @@ use filetime; // use futures::{self, executor, future}; use getopts::Options; use parse_size::parse_size; +use r2d2_sqlite::SqliteConnectionManager; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use rusqlite; use reflectub::{database, git, github}; @@ -112,8 +114,12 @@ fn run() -> anyhow::Result<()> { let repos = github::fetch_repos(username)?; - let mut db = database::Db::connect(&database_file) - .context("unable to connect to database")?; + let db = //Arc::new( + // Mutex::new( + database::Db::connect(&database_file) + .context("unable to connect to database")?; + // ) + // ); db.create() .context("unable to create database")?; @@ -121,31 +127,46 @@ fn run() -> anyhow::Result<()> { // let mut joins = futures::stream::FuturesUnordered::new(); // let mut joins = Vec::with_capacity(repos.len()); - let mut i = 0; - for repo in repos { - // let db = db.clone(); - let mirror_root = mirror_root.clone(); - let base_cgitrc = base_cgitrc.clone(); - - // let join = tokio::runtime::Handle::current().spawn(async move { - // let mut db = db.lock()?; - - process_repo( - &repo, - &mut db, - &mirror_root, - base_cgitrc, - max_repo_size_bytes, - )?; - // }); + // let mut i = 0; + // for repo in repos { + // // let db = db.clone(); + // let mirror_root = mirror_root.clone(); + // let base_cgitrc = base_cgitrc.clone(); + // + // // let join = tokio::runtime::Handle::current().spawn(async move { + // // let mut db = db.lock()?; + // + // process_repo( + // &repo, + // &mut db, + // &mirror_root, + // base_cgitrc, + // max_repo_size_bytes, + // )?; + // // }); + // + // // joins.push(join); + // + // if i == 2 { + // break; + // } + // i += 1; + // } - // joins.push(join); + let _results: anyhow::Result<()> = repos[..2].par_iter() + .map(|repo| { + dbg!("Thread", std::thread::current().id()); - if i == 5 { - break; - } - i += 1; - } + process_repo( + &repo, + &db, + &mirror_root, + base_cgitrc.clone(), // TODO: Can we avoid cloning + max_repo_size_bytes, + ) + }) + .collect(); + // TODO: Return errors // executor::block_on(future::join_all(joins)); // let mut joins = tokio_stream::iter(&mut joins); @@ -172,12 +193,11 @@ fn run() -> anyhow::Result<()> { /// Mirror or update `repo`. fn process_repo( repo: &github::Repo, - db: &mut database::Db, + db: &database::Db, mirror_root: &str, base_cgitrc: Option<PathBuf>, max_repo_size_bytes: Option<u64>, ) -> anyhow::Result<()> { - return anyhow::bail!("test"); if let Some(max_repo_size_bytes) = max_repo_size_bytes { if is_repo_oversize(repo.size, max_repo_size_bytes) { return Ok(()); |