aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTeddy Wing2021-06-12 21:00:48 +0200
committerTeddy Wing2021-06-12 21:00:48 +0200
commite4a41075af126dbcac19fe91267f785b3b391ee0 (patch)
tree52d2a4a0a6f60ff386396d056625611824aa4e90
parent46497bbae37f89e449b7a049663fe774843beb9b (diff)
downloadreflectub-e4a41075af126dbcac19fe91267f785b3b391ee0.tar.bz2
Process repositories on multiple threads
Use 'rayon' to parallelise the repository processing. Each repository is processed in a thread in the default 'rayon' pool. In order to get thread-safe access to the database, I followed some advice from a Stack Overflow answer by VasiliNovikov (https://stackoverflow.com/users/1091436/vasilinovikov): https://stackoverflow.com/questions/62560396/how-to-use-sqlite-via-rusqlite-from-multiple-threads/62560397#62560397 VasiliNovikov recommended creating a database connection pool using 'r2d2_sqlite'. This way we don't have to share a database connection between threads, but each thread can have its own connection. This also means we can remove mutable requirements in a bunch of places involving our `database::Db` type since we're no longer managing the database connections directly.
-rw-r--r--Cargo.lock185
-rw-r--r--Cargo.toml3
-rw-r--r--src/database.rs44
-rw-r--r--src/main.rs74
4 files changed, 263 insertions, 43 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0ee0ec4..53cf6c8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -76,6 +76,56 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e"
[[package]]
+name = "crossbeam-channel"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
+dependencies = [
+ "cfg-if",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
+dependencies = [
+ "cfg-if",
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
+dependencies = [
+ "cfg-if",
+ "crossbeam-utils",
+ "lazy_static",
+ "memoffset",
+ "scopeguard",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
+dependencies = [
+ "cfg-if",
+ "lazy_static",
+]
+
+[[package]]
+name = "either"
+version = "1.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
+
+[[package]]
name = "exitcode"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -169,6 +219,15 @@ dependencies = [
]
[[package]]
+name = "hermit-abi"
+version = "0.1.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c"
+dependencies = [
+ "libc",
+]
+
+[[package]]
name = "idna"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -180,6 +239,15 @@ dependencies = [
]
[[package]]
+name = "instant"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -266,6 +334,15 @@ dependencies = [
]
[[package]]
+name = "lock_api"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb"
+dependencies = [
+ "scopeguard",
+]
+
+[[package]]
name = "log"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -287,6 +364,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
[[package]]
+name = "memoffset"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -306,6 +392,16 @@ dependencies = [
]
[[package]]
+name = "num_cpus"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
+dependencies = [
+ "hermit-abi",
+ "libc",
+]
+
+[[package]]
name = "once_cell"
version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -331,6 +427,31 @@ dependencies = [
]
[[package]]
+name = "parking_lot"
+version = "0.11.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
+dependencies = [
+ "instant",
+ "lock_api",
+ "parking_lot_core",
+]
+
+[[package]]
+name = "parking_lot_core"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
+dependencies = [
+ "cfg-if",
+ "instant",
+ "libc",
+ "redox_syscall",
+ "smallvec",
+ "winapi",
+]
+
+[[package]]
name = "parse-size"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -367,6 +488,52 @@ dependencies = [
]
[[package]]
+name = "r2d2"
+version = "0.8.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f"
+dependencies = [
+ "log",
+ "parking_lot",
+ "scheduled-thread-pool",
+]
+
+[[package]]
+name = "r2d2_sqlite"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d24607049214c5e42d3df53ac1d8a23c34cc6a5eefe3122acb2c72174719959"
+dependencies = [
+ "r2d2",
+ "rusqlite",
+]
+
+[[package]]
+name = "rayon"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
+dependencies = [
+ "autocfg",
+ "crossbeam-deque",
+ "either",
+ "rayon-core",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-utils",
+ "lazy_static",
+ "num_cpus",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -386,6 +553,9 @@ dependencies = [
"getopts",
"git2",
"parse-size",
+ "r2d2",
+ "r2d2_sqlite",
+ "rayon",
"rusqlite",
"serde",
"thiserror",
@@ -442,6 +612,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
+name = "scheduled-thread-pool"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7"
+dependencies = [
+ "parking_lot",
+]
+
+[[package]]
+name = "scopeguard"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
+
+[[package]]
name = "sct"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index c56b025..6e61a17 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,9 @@ filetime = "0.2.14"
getopts = "0.2.21"
git2 = "0.13.20"
parse-size = { version = "1.0.0", features = ["std"] }
+r2d2 = "0.8.9"
+r2d2_sqlite = "0.18.0"
+rayon = "1.5.1"
rusqlite = "0.25.3"
serde = { version = "1.0.126", features = ["derive"] }
thiserror = "1.0.25"
diff --git a/src/database.rs b/src/database.rs
index 0785dd5..107a6d6 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -16,6 +16,8 @@
// along with Reflectub. If not, see <https://www.gnu.org/licenses/>.
+use r2d2;
+use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{self, OptionalExtension};
use thiserror;
@@ -55,31 +57,37 @@ impl From<&github::Repo> for Repo {
pub enum Error {
#[error("database error")]
Db(#[from] rusqlite::Error),
+
+ #[error("connection pool error")]
+ Pool(#[from] r2d2::Error),
}
#[derive(Debug)]
pub struct Db {
- connection: rusqlite::Connection,
+ pool: r2d2::Pool<SqliteConnectionManager>,
}
impl Db {
/// Open a connection to the database.
pub fn connect(path: &str) -> Result<Self, Error> {
+ let manager = SqliteConnectionManager::file(path)
+ .with_flags(
+ rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
+ | rusqlite::OpenFlags::SQLITE_OPEN_CREATE,
+ );
+
Ok(
Db {
- connection: rusqlite::Connection::open_with_flags(
- path,
- rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
- | rusqlite::OpenFlags::SQLITE_OPEN_CREATE,
- )?,
+ pool: r2d2::Pool::new(manager)?,
}
)
}
/// Initialise the database with tables and indexes.
- pub fn create(&mut self) -> Result<(), Error> {
- let tx = self.connection.transaction()?;
+ pub fn create(&self) -> Result<(), Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
tx.execute(
r#"
@@ -110,8 +118,9 @@ impl Db {
///
/// Returns a `rusqlite::Error::QueryReturnedNoRows` error if the row
/// doesn't exist.
- pub fn repo_get(&mut self, id: i64) -> Result<Repo, Error> {
- let tx = self.connection.transaction()?;
+ pub fn repo_get(&self, id: i64) -> Result<Repo, Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
let repo = tx.query_row(
r#"
@@ -142,8 +151,9 @@ impl Db {
}
/// Insert a new repository.
- pub fn repo_insert(&mut self, repo: Repo) -> Result<(), Error> {
- let tx = self.connection.transaction()?;
+ pub fn repo_insert(&self, repo: Repo) -> Result<(), Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
tx.execute(
r#"
@@ -170,10 +180,11 @@ impl Db {
/// Compares the `updated_at` field to find out whether the repository was
/// updated.
pub fn repo_is_updated(
- &mut self,
+ &self,
repo: &Repo,
) -> Result<bool, Error> {
- let tx = self.connection.transaction()?;
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
let is_updated = match tx.query_row(
r#"
@@ -201,8 +212,9 @@ impl Db {
}
/// Update an existing repository.
- pub fn repo_update(&mut self, repo: &Repo) -> Result<(), Error> {
- let tx = self.connection.transaction()?;
+ pub fn repo_update(&self, repo: &Repo) -> Result<(), Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
tx.execute(
r#"
diff --git a/src/main.rs b/src/main.rs
index 4e4d580..87b2320 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -23,6 +23,8 @@ use filetime;
// use futures::{self, executor, future};
use getopts::Options;
use parse_size::parse_size;
+use r2d2_sqlite::SqliteConnectionManager;
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rusqlite;
use reflectub::{database, git, github};
@@ -112,8 +114,12 @@ fn run() -> anyhow::Result<()> {
let repos = github::fetch_repos(username)?;
- let mut db = database::Db::connect(&database_file)
- .context("unable to connect to database")?;
+ let db = //Arc::new(
+ // Mutex::new(
+ database::Db::connect(&database_file)
+ .context("unable to connect to database")?;
+ // )
+ // );
db.create()
.context("unable to create database")?;
@@ -121,31 +127,46 @@ fn run() -> anyhow::Result<()> {
// let mut joins = futures::stream::FuturesUnordered::new();
// let mut joins = Vec::with_capacity(repos.len());
- let mut i = 0;
- for repo in repos {
- // let db = db.clone();
- let mirror_root = mirror_root.clone();
- let base_cgitrc = base_cgitrc.clone();
-
- // let join = tokio::runtime::Handle::current().spawn(async move {
- // let mut db = db.lock()?;
-
- process_repo(
- &repo,
- &mut db,
- &mirror_root,
- base_cgitrc,
- max_repo_size_bytes,
- )?;
- // });
+ // let mut i = 0;
+ // for repo in repos {
+ // // let db = db.clone();
+ // let mirror_root = mirror_root.clone();
+ // let base_cgitrc = base_cgitrc.clone();
+ //
+ // // let join = tokio::runtime::Handle::current().spawn(async move {
+ // // let mut db = db.lock()?;
+ //
+ // process_repo(
+ // &repo,
+ // &mut db,
+ // &mirror_root,
+ // base_cgitrc,
+ // max_repo_size_bytes,
+ // )?;
+ // // });
+ //
+ // // joins.push(join);
+ //
+ // if i == 2 {
+ // break;
+ // }
+ // i += 1;
+ // }
- // joins.push(join);
+ let _results: anyhow::Result<()> = repos[..2].par_iter()
+ .map(|repo| {
+ dbg!("Thread", std::thread::current().id());
- if i == 5 {
- break;
- }
- i += 1;
- }
+ process_repo(
+ &repo,
+ &db,
+ &mirror_root,
+ base_cgitrc.clone(), // TODO: Can we avoid cloning
+ max_repo_size_bytes,
+ )
+ })
+ .collect();
+ // TODO: Return errors
// executor::block_on(future::join_all(joins));
// let mut joins = tokio_stream::iter(&mut joins);
@@ -172,12 +193,11 @@ fn run() -> anyhow::Result<()> {
/// Mirror or update `repo`.
fn process_repo(
repo: &github::Repo,
- db: &mut database::Db,
+ db: &database::Db,
mirror_root: &str,
base_cgitrc: Option<PathBuf>,
max_repo_size_bytes: Option<u64>,
) -> anyhow::Result<()> {
- return anyhow::bail!("test");
if let Some(max_repo_size_bytes) = max_repo_size_bytes {
if is_repo_oversize(repo.size, max_repo_size_bytes) {
return Ok(());