diff options
-rw-r--r-- | Cargo.lock | 7 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | src/main.rs | 178 |
3 files changed, 152 insertions, 36 deletions
@@ -188,11 +188,10 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4feb231f0d4d6af81aed15928e58ecf5816aa62a2393e2c82f46973e92a9a278" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "autocfg", "cfg-if", "lazy_static", ] @@ -322,6 +321,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -1041,6 +1041,7 @@ dependencies = [ "chrono", "exitcode", "filetime", + "futures", "getopts", "git2", "reqwest", @@ -8,12 +8,13 @@ anyhow = "1.0.40" chrono = "0.4.19" exitcode = "1.1.2" filetime = "0.2.14" +futures = { version = "0.3.15", features = ["thread-pool"] } getopts = "0.2.21" git2 = "0.13.20" reqwest = { version = "0.11.3", features = ["json"] } serde = { version = "1.0.126", features = ["derive"] } thiserror = "1.0.25" -tokio = { version = "1.6.1", features = ["macros", "rt"] } +tokio = { version = "1.6.1", features = ["macros", "rt-multi-thread"] } [dependencies.sqlx] version = "0.5.5" diff --git a/src/main.rs b/src/main.rs index c736603..a151217 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use anyhow::{self, Context}; use chrono::DateTime; use exitcode; use filetime; +use futures::{executor, future}; use getopts::Options; use sqlx; use tokio; @@ -13,11 +14,11 @@ use std::fs; use std::io; use std::path::{Path, PathBuf}; use std::process; +use std::sync::Arc; -#[tokio::main] -async fn main() { - run().await.unwrap(); +fn main() { + run().unwrap(); } fn print_usage(opts: &Options) { @@ -27,7 +28,7 @@ fn print_usage(opts: &Options) { ); } -async fn run() -> anyhow::Result<()> { +fn run() -> anyhow::Result<()> { let args: Vec<String> = env::args().collect(); let mut opts = Options::new(); @@ -63,6 +64,9 @@ async fn run() -> anyhow::Result<()> { let base_cgitrc = opt_matches.opt_str("cgitrc") .map(|s| PathBuf::from(s)); + let rt = tokio::runtime::Runtime::new()?; + let _rt_guard = rt.enter(); + // let repos = github::fetch_repos(username).await? let test_repos = vec![ @@ -90,40 +94,150 @@ async fn run() -> anyhow::Result<()> { }, ]; - let mut db = database::Db::connect(&database_file).await?; + let mut db = Arc::new( + tokio::sync::Mutex::new( + executor::block_on(database::Db::connect(&database_file))?, + ) + ); - db.create().await?; + executor::block_on(async { + db.lock().await.create().await + })?; + + // let pool = executor::ThreadPool::builder() + // .pool_size(4) + // .create()?; + + // for repo in test_repos { + // let join = test_repos.par_iter() + // let join = tokio::spawn(async move { + // let id = repo.id; + // let path = clone_path(&mirror_root, &repo); + // let db_repo = database::Repo::from(&repo); + // + // match db.repo_get(id).await { + // // If we've already seen the repo and it's been updated, fetch the + // // latest. + // Ok(current_repo) => { + // if db.repo_is_updated(&db_repo).await? { + // update(&path, ¤t_repo, &repo)?; + // + // db.repo_update(&db_repo).await?; + // } + // }, + // + // // If the repo doesn't exist, mirror it and store it in the + // // database. + // Err(database::Error::Db(sqlx::Error::RowNotFound)) => { + // mirror( + // &path, + // &repo, + // base_cgitrc.as_ref(), + // )?; + // + // db.repo_insert(db_repo).await?; + // }, + // + // Err(e) => anyhow::bail!(e), + // } + // + // Ok(()) + // }); + + // let join = tokio::spawn( + // process_repo(&repo, &mut db, &mirror_root, base_cgitrc), + // ); + // + // join.await?; + // } + + // let tasks = test_repos.iter().map(|repo| + // pool.spawn_ok(process_repo(&repo, &mut db, &mirror_root, base_cgitrc)) + // ); + // + // executor::block_on(future::join_all(tasks)); + + // let tasks = test_repos.iter().map(|repo| + // process_repo(&repo, &mut db, &mirror_root, base_cgitrc)); + // + // executor::block_on(async { tokio::try_join!(tasks) })?; + + // let joins = test_repos.into_iter() + // .map(|repo| { + // let db = db.clone(); + // + // let result = process_repo( + // &repo, + // &mut db.lock().unwrap(), + // &mirror_root.clone(), + // base_cgitrc.clone(), + // ); + // + // result + // }) + // .map(tokio::spawn); + // .collect::<Vec<_>>(); + + let mut joins = Vec::new(); for repo in test_repos { - let id = repo.id; - let path = clone_path(&mirror_root, &repo); - let db_repo = database::Repo::from(&repo); - - match db.repo_get(id).await { - // If we've already seen the repo and it's been updated, fetch the - // latest. - Ok(current_repo) => { - if db.repo_is_updated(&db_repo).await? { - update(&path, ¤t_repo, &repo)?; - - db.repo_update(&db_repo).await?; - } - }, + let db = db.clone(); + let mirror_root = mirror_root.clone(); + let base_cgitrc = base_cgitrc.clone(); + + let join = tokio::spawn(async move { + let mut db = db.lock().await; + + process_repo( + &repo, + &mut db, + &mirror_root, + base_cgitrc, + ).await + }); + + joins.push(join); + } - // If the repo doesn't exist, mirror it and store it in the - // database. - Err(database::Error::Db(sqlx::Error::RowNotFound)) => { - mirror( - &path, - &repo, - base_cgitrc.as_ref(), - )?; + executor::block_on(future::join_all(joins)); - db.repo_insert(db_repo).await?; - }, + Ok(()) +} + +async fn process_repo( + repo: &github::Repo, + db: &mut database::Db, + mirror_root: &str, + base_cgitrc: Option<PathBuf>, +) -> anyhow::Result<()> { + let id = repo.id; + let path = clone_path(&mirror_root, &repo); + let db_repo = database::Repo::from(repo); + + match db.repo_get(id).await { + // If we've already seen the repo and it's been updated, fetch the + // latest. + Ok(current_repo) => { + if db.repo_is_updated(&db_repo).await? { + update(&path, ¤t_repo, &repo)?; + + db.repo_update(&db_repo).await?; + } + }, + + // If the repo doesn't exist, mirror it and store it in the + // database. + Err(database::Error::Db(sqlx::Error::RowNotFound)) => { + mirror( + &path, + &repo, + base_cgitrc.as_ref(), + )?; + + db.repo_insert(db_repo).await?; + }, - Err(e) => anyhow::bail!(e), - } + Err(e) => anyhow::bail!(e), } Ok(()) |