diff options
| author | Teddy Wing | 2021-06-12 21:00:48 +0200 | 
|---|---|---|
| committer | Teddy Wing | 2021-06-12 21:00:48 +0200 | 
| commit | e4a41075af126dbcac19fe91267f785b3b391ee0 (patch) | |
| tree | 52d2a4a0a6f60ff386396d056625611824aa4e90 | |
| parent | 46497bbae37f89e449b7a049663fe774843beb9b (diff) | |
| download | reflectub-e4a41075af126dbcac19fe91267f785b3b391ee0.tar.bz2 | |
Process repositories on multiple threads
Use 'rayon' to parallelise the repository processing. Each repository is
processed in a thread in the default 'rayon' pool.
In order to get thread-safe access to the database, I followed some
advice from a Stack Overflow answer by VasiliNovikov
(https://stackoverflow.com/users/1091436/vasilinovikov):
https://stackoverflow.com/questions/62560396/how-to-use-sqlite-via-rusqlite-from-multiple-threads/62560397#62560397
VasiliNovikov recommended creating a database connection pool using
'r2d2_sqlite'. This way we don't have to share a database connection
between threads, but each thread can have its own connection.
This also means we can remove mutable requirements in a bunch of places
involving our `database::Db` type since we're no longer managing the
database connections directly.
| -rw-r--r-- | Cargo.lock | 185 | ||||
| -rw-r--r-- | Cargo.toml | 3 | ||||
| -rw-r--r-- | src/database.rs | 44 | ||||
| -rw-r--r-- | src/main.rs | 74 | 
4 files changed, 263 insertions, 43 deletions
| @@ -76,6 +76,56 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e"  [[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if", + "lazy_static", +] + +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + +[[package]]  name = "exitcode"  version = "1.1.2"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -169,6 +219,15 @@ dependencies = [  ]  [[package]] +name = "hermit-abi" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +dependencies = [ + "libc", +] + +[[package]]  name = "idna"  version = "0.2.3"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -180,6 +239,15 @@ dependencies = [  ]  [[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if", +] + +[[package]]  name = "itoa"  version = "0.4.7"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -266,6 +334,15 @@ dependencies = [  ]  [[package]] +name = "lock_api" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb" +dependencies = [ + "scopeguard", +] + +[[package]]  name = "log"  version = "0.4.14"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -287,6 +364,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"  [[package]] +name = "memoffset" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" +dependencies = [ + "autocfg", +] + +[[package]]  name = "num-integer"  version = "0.1.44"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -306,6 +392,16 @@ dependencies = [  ]  [[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]]  name = "once_cell"  version = "1.7.2"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -331,6 +427,31 @@ dependencies = [  ]  [[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]]  name = "parse-size"  version = "1.0.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -367,6 +488,52 @@ dependencies = [  ]  [[package]] +name = "r2d2" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_sqlite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d24607049214c5e42d3df53ac1d8a23c34cc6a5eefe3122acb2c72174719959" +dependencies = [ + "r2d2", + "rusqlite", +] + +[[package]] +name = "rayon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + +[[package]]  name = "redox_syscall"  version = "0.2.8"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -386,6 +553,9 @@ dependencies = [   "getopts",   "git2",   "parse-size", + "r2d2", + "r2d2_sqlite", + "rayon",   "rusqlite",   "serde",   "thiserror", @@ -442,6 +612,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"  [[package]] +name = "scheduled-thread-pool" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7" +dependencies = [ + "parking_lot", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]]  name = "sct"  version = "0.6.1"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -11,6 +11,9 @@ filetime = "0.2.14"  getopts = "0.2.21"  git2 = "0.13.20"  parse-size = { version = "1.0.0", features = ["std"] } +r2d2 = "0.8.9" +r2d2_sqlite = "0.18.0" +rayon = "1.5.1"  rusqlite = "0.25.3"  serde = { version = "1.0.126", features = ["derive"] }  thiserror = "1.0.25" diff --git a/src/database.rs b/src/database.rs index 0785dd5..107a6d6 100644 --- a/src/database.rs +++ b/src/database.rs @@ -16,6 +16,8 @@  // along with Reflectub. If not, see <https://www.gnu.org/licenses/>. +use r2d2; +use r2d2_sqlite::SqliteConnectionManager;  use rusqlite::{self, OptionalExtension};  use thiserror; @@ -55,31 +57,37 @@ impl From<&github::Repo> for Repo {  pub enum Error {      #[error("database error")]      Db(#[from] rusqlite::Error), + +    #[error("connection pool error")] +    Pool(#[from] r2d2::Error),  }  #[derive(Debug)]  pub struct Db { -    connection: rusqlite::Connection, +    pool: r2d2::Pool<SqliteConnectionManager>,  }  impl Db {      /// Open a connection to the database.      pub fn connect(path: &str) -> Result<Self, Error> { +        let manager = SqliteConnectionManager::file(path) +            .with_flags( +                rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE +                | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, +            ); +          Ok(              Db { -                connection: rusqlite::Connection::open_with_flags( -                    path, -                    rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE -                    | rusqlite::OpenFlags::SQLITE_OPEN_CREATE, -                )?, +                pool: r2d2::Pool::new(manager)?,              }          )      }      /// Initialise the database with tables and indexes. -    pub fn create(&mut self) -> Result<(), Error> { -        let tx = self.connection.transaction()?; +    pub fn create(&self) -> Result<(), Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          tx.execute(              r#" @@ -110,8 +118,9 @@ impl Db {      ///      /// Returns a `rusqlite::Error::QueryReturnedNoRows` error if the row      /// doesn't exist. -    pub fn repo_get(&mut self, id: i64) -> Result<Repo, Error> { -        let tx = self.connection.transaction()?; +    pub fn repo_get(&self, id: i64) -> Result<Repo, Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          let repo = tx.query_row(              r#" @@ -142,8 +151,9 @@ impl Db {      }      /// Insert a new repository. -    pub fn repo_insert(&mut self, repo: Repo) -> Result<(), Error> { -        let tx = self.connection.transaction()?; +    pub fn repo_insert(&self, repo: Repo) -> Result<(), Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          tx.execute(              r#" @@ -170,10 +180,11 @@ impl Db {      /// Compares the `updated_at` field to find out whether the repository was      /// updated.      pub fn repo_is_updated( -        &mut self, +        &self,          repo: &Repo,      ) -> Result<bool, Error> { -        let tx = self.connection.transaction()?; +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          let is_updated = match tx.query_row(              r#" @@ -201,8 +212,9 @@ impl Db {      }      /// Update an existing repository. -    pub fn repo_update(&mut self, repo: &Repo) -> Result<(), Error> { -        let tx = self.connection.transaction()?; +    pub fn repo_update(&self, repo: &Repo) -> Result<(), Error> { +        let mut pool = self.pool.get()?; +        let tx = pool.transaction()?;          tx.execute(              r#" diff --git a/src/main.rs b/src/main.rs index 4e4d580..87b2320 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,8 @@ use filetime;  // use futures::{self, executor, future};  use getopts::Options;  use parse_size::parse_size; +use r2d2_sqlite::SqliteConnectionManager; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator};  use rusqlite;  use reflectub::{database, git, github}; @@ -112,8 +114,12 @@ fn run() -> anyhow::Result<()> {      let repos = github::fetch_repos(username)?; -    let mut db = database::Db::connect(&database_file) -        .context("unable to connect to database")?; +    let db = //Arc::new( +        // Mutex::new( +            database::Db::connect(&database_file) +                .context("unable to connect to database")?; +        // ) +    // );      db.create()          .context("unable to create database")?; @@ -121,31 +127,46 @@ fn run() -> anyhow::Result<()> {      // let mut joins = futures::stream::FuturesUnordered::new();      // let mut joins = Vec::with_capacity(repos.len()); -    let mut i = 0; -    for repo in repos { -        // let db = db.clone(); -        let mirror_root = mirror_root.clone(); -        let base_cgitrc = base_cgitrc.clone(); - -        // let join = tokio::runtime::Handle::current().spawn(async move { -        // let mut db = db.lock()?; - -        process_repo( -            &repo, -            &mut db, -            &mirror_root, -            base_cgitrc, -            max_repo_size_bytes, -        )?; -        // }); +    // let mut i = 0; +    // for repo in repos { +    //     // let db = db.clone(); +    //     let mirror_root = mirror_root.clone(); +    //     let base_cgitrc = base_cgitrc.clone(); +    // +    //     // let join = tokio::runtime::Handle::current().spawn(async move { +    //     // let mut db = db.lock()?; +    // +    //     process_repo( +    //         &repo, +    //         &mut db, +    //         &mirror_root, +    //         base_cgitrc, +    //         max_repo_size_bytes, +    //     )?; +    //     // }); +    // +    //     // joins.push(join); +    // +    //     if i == 2 { +    //         break; +    //     } +    //     i += 1; +    // } -        // joins.push(join); +    let _results: anyhow::Result<()> = repos[..2].par_iter() +        .map(|repo| { +            dbg!("Thread", std::thread::current().id()); -        if i == 5 { -            break; -        } -        i += 1; -    } +            process_repo( +                &repo, +                &db, +                &mirror_root, +                base_cgitrc.clone(), // TODO: Can we avoid cloning +                max_repo_size_bytes, +            ) +        }) +        .collect(); +    // TODO: Return errors      // executor::block_on(future::join_all(joins));      // let mut joins = tokio_stream::iter(&mut joins); @@ -172,12 +193,11 @@ fn run() -> anyhow::Result<()> {  /// Mirror or update `repo`.  fn process_repo(      repo: &github::Repo, -    db: &mut database::Db, +    db: &database::Db,      mirror_root: &str,      base_cgitrc: Option<PathBuf>,      max_repo_size_bytes: Option<u64>,  ) -> anyhow::Result<()> { -    return anyhow::bail!("test");      if let Some(max_repo_size_bytes) = max_repo_size_bytes {          if is_repo_oversize(repo.size, max_repo_size_bytes) {              return Ok(()); | 
