diff options
author | Teddy Wing | 2021-06-06 04:09:31 +0200 |
---|---|---|
committer | Teddy Wing | 2021-06-06 04:09:31 +0200 |
commit | 5539a9e8cc6e0b348f0428502b80ba32843ec36a (patch) | |
tree | 657434730a47df33de8830b74aea11250343e65b /src/main.rs | |
parent | 6d5a4095ba4fd78707dc6a8d6321d72ebfc80f1c (diff) | |
download | reflectub-5539a9e8cc6e0b348f0428502b80ba32843ec36a.tar.bz2 |
Make repo mirroring multi-threaded
I think, at least. Took a lot of research and trial and error to get
this to compile, working out how to set up the multi-threading for async
code. The idea here is to be able to process each repo in potentially
multiple threads and do that processing work in parallel.
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 178 |
1 files changed, 146 insertions, 32 deletions
diff --git a/src/main.rs b/src/main.rs index c736603..a151217 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use anyhow::{self, Context}; use chrono::DateTime; use exitcode; use filetime; +use futures::{executor, future}; use getopts::Options; use sqlx; use tokio; @@ -13,11 +14,11 @@ use std::fs; use std::io; use std::path::{Path, PathBuf}; use std::process; +use std::sync::Arc; -#[tokio::main] -async fn main() { - run().await.unwrap(); +fn main() { + run().unwrap(); } fn print_usage(opts: &Options) { @@ -27,7 +28,7 @@ fn print_usage(opts: &Options) { ); } -async fn run() -> anyhow::Result<()> { +fn run() -> anyhow::Result<()> { let args: Vec<String> = env::args().collect(); let mut opts = Options::new(); @@ -63,6 +64,9 @@ async fn run() -> anyhow::Result<()> { let base_cgitrc = opt_matches.opt_str("cgitrc") .map(|s| PathBuf::from(s)); + let rt = tokio::runtime::Runtime::new()?; + let _rt_guard = rt.enter(); + // let repos = github::fetch_repos(username).await? let test_repos = vec![ @@ -90,40 +94,150 @@ async fn run() -> anyhow::Result<()> { }, ]; - let mut db = database::Db::connect(&database_file).await?; + let mut db = Arc::new( + tokio::sync::Mutex::new( + executor::block_on(database::Db::connect(&database_file))?, + ) + ); - db.create().await?; + executor::block_on(async { + db.lock().await.create().await + })?; + + // let pool = executor::ThreadPool::builder() + // .pool_size(4) + // .create()?; + + // for repo in test_repos { + // let join = test_repos.par_iter() + // let join = tokio::spawn(async move { + // let id = repo.id; + // let path = clone_path(&mirror_root, &repo); + // let db_repo = database::Repo::from(&repo); + // + // match db.repo_get(id).await { + // // If we've already seen the repo and it's been updated, fetch the + // // latest. + // Ok(current_repo) => { + // if db.repo_is_updated(&db_repo).await? { + // update(&path, ¤t_repo, &repo)?; + // + // db.repo_update(&db_repo).await?; + // } + // }, + // + // // If the repo doesn't exist, mirror it and store it in the + // // database. + // Err(database::Error::Db(sqlx::Error::RowNotFound)) => { + // mirror( + // &path, + // &repo, + // base_cgitrc.as_ref(), + // )?; + // + // db.repo_insert(db_repo).await?; + // }, + // + // Err(e) => anyhow::bail!(e), + // } + // + // Ok(()) + // }); + + // let join = tokio::spawn( + // process_repo(&repo, &mut db, &mirror_root, base_cgitrc), + // ); + // + // join.await?; + // } + + // let tasks = test_repos.iter().map(|repo| + // pool.spawn_ok(process_repo(&repo, &mut db, &mirror_root, base_cgitrc)) + // ); + // + // executor::block_on(future::join_all(tasks)); + + // let tasks = test_repos.iter().map(|repo| + // process_repo(&repo, &mut db, &mirror_root, base_cgitrc)); + // + // executor::block_on(async { tokio::try_join!(tasks) })?; + + // let joins = test_repos.into_iter() + // .map(|repo| { + // let db = db.clone(); + // + // let result = process_repo( + // &repo, + // &mut db.lock().unwrap(), + // &mirror_root.clone(), + // base_cgitrc.clone(), + // ); + // + // result + // }) + // .map(tokio::spawn); + // .collect::<Vec<_>>(); + + let mut joins = Vec::new(); for repo in test_repos { - let id = repo.id; - let path = clone_path(&mirror_root, &repo); - let db_repo = database::Repo::from(&repo); - - match db.repo_get(id).await { - // If we've already seen the repo and it's been updated, fetch the - // latest. - Ok(current_repo) => { - if db.repo_is_updated(&db_repo).await? { - update(&path, ¤t_repo, &repo)?; - - db.repo_update(&db_repo).await?; - } - }, + let db = db.clone(); + let mirror_root = mirror_root.clone(); + let base_cgitrc = base_cgitrc.clone(); + + let join = tokio::spawn(async move { + let mut db = db.lock().await; + + process_repo( + &repo, + &mut db, + &mirror_root, + base_cgitrc, + ).await + }); + + joins.push(join); + } - // If the repo doesn't exist, mirror it and store it in the - // database. - Err(database::Error::Db(sqlx::Error::RowNotFound)) => { - mirror( - &path, - &repo, - base_cgitrc.as_ref(), - )?; + executor::block_on(future::join_all(joins)); - db.repo_insert(db_repo).await?; - }, + Ok(()) +} + +async fn process_repo( + repo: &github::Repo, + db: &mut database::Db, + mirror_root: &str, + base_cgitrc: Option<PathBuf>, +) -> anyhow::Result<()> { + let id = repo.id; + let path = clone_path(&mirror_root, &repo); + let db_repo = database::Repo::from(repo); + + match db.repo_get(id).await { + // If we've already seen the repo and it's been updated, fetch the + // latest. + Ok(current_repo) => { + if db.repo_is_updated(&db_repo).await? { + update(&path, ¤t_repo, &repo)?; + + db.repo_update(&db_repo).await?; + } + }, + + // If the repo doesn't exist, mirror it and store it in the + // database. + Err(database::Error::Db(sqlx::Error::RowNotFound)) => { + mirror( + &path, + &repo, + base_cgitrc.as_ref(), + )?; + + db.repo_insert(db_repo).await?; + }, - Err(e) => anyhow::bail!(e), - } + Err(e) => anyhow::bail!(e), } Ok(()) |