diff options
| author | Teddy Wing | 2021-06-06 04:09:31 +0200 | 
|---|---|---|
| committer | Teddy Wing | 2021-06-06 04:09:31 +0200 | 
| commit | 5539a9e8cc6e0b348f0428502b80ba32843ec36a (patch) | |
| tree | 657434730a47df33de8830b74aea11250343e65b /src | |
| parent | 6d5a4095ba4fd78707dc6a8d6321d72ebfc80f1c (diff) | |
| download | reflectub-5539a9e8cc6e0b348f0428502b80ba32843ec36a.tar.bz2 | |
Make repo mirroring multi-threaded
I think, at least. Took a lot of research and trial and error to get
this to compile, working out how to set up the multi-threading for async
code. The idea here is to be able to process each repo in potentially
multiple threads and do that processing work in parallel.
Diffstat (limited to 'src')
| -rw-r--r-- | src/main.rs | 178 | 
1 files changed, 146 insertions, 32 deletions
| diff --git a/src/main.rs b/src/main.rs index c736603..a151217 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use anyhow::{self, Context};  use chrono::DateTime;  use exitcode;  use filetime; +use futures::{executor, future};  use getopts::Options;  use sqlx;  use tokio; @@ -13,11 +14,11 @@ use std::fs;  use std::io;  use std::path::{Path, PathBuf};  use std::process; +use std::sync::Arc; -#[tokio::main] -async fn main() { -    run().await.unwrap(); +fn main() { +    run().unwrap();  }  fn print_usage(opts: &Options) { @@ -27,7 +28,7 @@ fn print_usage(opts: &Options) {      );  } -async fn run() -> anyhow::Result<()> { +fn run() -> anyhow::Result<()> {      let args: Vec<String> = env::args().collect();      let mut opts = Options::new(); @@ -63,6 +64,9 @@ async fn run() -> anyhow::Result<()> {      let base_cgitrc = opt_matches.opt_str("cgitrc")          .map(|s| PathBuf::from(s)); +    let rt = tokio::runtime::Runtime::new()?; +    let _rt_guard = rt.enter(); +      // let repos = github::fetch_repos(username).await?      let test_repos = vec![ @@ -90,40 +94,150 @@ async fn run() -> anyhow::Result<()> {          },      ]; -    let mut db = database::Db::connect(&database_file).await?; +    let mut db = Arc::new( +        tokio::sync::Mutex::new( +            executor::block_on(database::Db::connect(&database_file))?, +        ) +    ); -    db.create().await?; +    executor::block_on(async { +        db.lock().await.create().await +    })?; + +    // let pool = executor::ThreadPool::builder() +    //     .pool_size(4) +    //     .create()?; + +    // for repo in test_repos { +    // let join = test_repos.par_iter() +    // let join = tokio::spawn(async move { +    //     let id = repo.id; +    //     let path = clone_path(&mirror_root, &repo); +    //     let db_repo = database::Repo::from(&repo); +    // +    //     match db.repo_get(id).await { +    //         // If we've already seen the repo and it's been updated, fetch the +    //         // latest. +    //         Ok(current_repo) => { +    //             if db.repo_is_updated(&db_repo).await? { +    //                 update(&path, ¤t_repo, &repo)?; +    // +    //                 db.repo_update(&db_repo).await?; +    //             } +    //         }, +    // +    //         // If the repo doesn't exist, mirror it and store it in the +    //         // database. +    //         Err(database::Error::Db(sqlx::Error::RowNotFound)) => { +    //             mirror( +    //                 &path, +    //                 &repo, +    //                 base_cgitrc.as_ref(), +    //             )?; +    // +    //             db.repo_insert(db_repo).await?; +    //         }, +    // +    //         Err(e) => anyhow::bail!(e), +    //     } +    // +    //     Ok(()) +    // }); + +        // let join = tokio::spawn( +        //     process_repo(&repo, &mut db, &mirror_root, base_cgitrc), +        // ); +        // +        // join.await?; +    // } + +    // let tasks = test_repos.iter().map(|repo| +    //     pool.spawn_ok(process_repo(&repo, &mut db, &mirror_root, base_cgitrc)) +    // ); +    // +    // executor::block_on(future::join_all(tasks)); + +    // let tasks = test_repos.iter().map(|repo| +    //     process_repo(&repo, &mut db, &mirror_root, base_cgitrc)); +    // +    // executor::block_on(async { tokio::try_join!(tasks) })?; + +    // let joins = test_repos.into_iter() +    //     .map(|repo| { +    //         let db = db.clone(); +    // +    //         let result = process_repo( +    //             &repo, +    //             &mut db.lock().unwrap(), +    //             &mirror_root.clone(), +    //             base_cgitrc.clone(), +    //         ); +    // +    //         result +    //     }) +    //     .map(tokio::spawn); +        // .collect::<Vec<_>>(); + +    let mut joins = Vec::new();      for repo in test_repos { -        let id = repo.id; -        let path = clone_path(&mirror_root, &repo); -        let db_repo = database::Repo::from(&repo); - -        match db.repo_get(id).await { -            // If we've already seen the repo and it's been updated, fetch the -            // latest. -            Ok(current_repo) => { -                if db.repo_is_updated(&db_repo).await? { -                    update(&path, ¤t_repo, &repo)?; - -                    db.repo_update(&db_repo).await?; -                } -            }, +        let db = db.clone(); +        let mirror_root = mirror_root.clone(); +        let base_cgitrc = base_cgitrc.clone(); + +        let join = tokio::spawn(async move { +            let mut db = db.lock().await; + +            process_repo( +                &repo, +                &mut db, +                &mirror_root, +                base_cgitrc, +            ).await +        }); + +        joins.push(join); +    } -            // If the repo doesn't exist, mirror it and store it in the -            // database. -            Err(database::Error::Db(sqlx::Error::RowNotFound)) => { -                mirror( -                    &path, -                    &repo, -                    base_cgitrc.as_ref(), -                )?; +    executor::block_on(future::join_all(joins)); -                db.repo_insert(db_repo).await?; -            }, +    Ok(()) +} + +async fn process_repo( +    repo: &github::Repo, +    db: &mut database::Db, +    mirror_root: &str, +    base_cgitrc: Option<PathBuf>, +) -> anyhow::Result<()> { +    let id = repo.id; +    let path = clone_path(&mirror_root, &repo); +    let db_repo = database::Repo::from(repo); + +    match db.repo_get(id).await { +        // If we've already seen the repo and it's been updated, fetch the +        // latest. +        Ok(current_repo) => { +            if db.repo_is_updated(&db_repo).await? { +                update(&path, ¤t_repo, &repo)?; + +                db.repo_update(&db_repo).await?; +            } +        }, + +        // If the repo doesn't exist, mirror it and store it in the +        // database. +        Err(database::Error::Db(sqlx::Error::RowNotFound)) => { +            mirror( +                &path, +                &repo, +                base_cgitrc.as_ref(), +            )?; + +            db.repo_insert(db_repo).await?; +        }, -            Err(e) => anyhow::bail!(e), -        } +        Err(e) => anyhow::bail!(e),      }      Ok(()) | 
