aboutsummaryrefslogtreecommitdiffstats
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs178
1 files changed, 146 insertions, 32 deletions
diff --git a/src/main.rs b/src/main.rs
index c736603..a151217 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,6 +2,7 @@ use anyhow::{self, Context};
use chrono::DateTime;
use exitcode;
use filetime;
+use futures::{executor, future};
use getopts::Options;
use sqlx;
use tokio;
@@ -13,11 +14,11 @@ use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::process;
+use std::sync::Arc;
-#[tokio::main]
-async fn main() {
- run().await.unwrap();
+fn main() {
+ run().unwrap();
}
fn print_usage(opts: &Options) {
@@ -27,7 +28,7 @@ fn print_usage(opts: &Options) {
);
}
-async fn run() -> anyhow::Result<()> {
+fn run() -> anyhow::Result<()> {
let args: Vec<String> = env::args().collect();
let mut opts = Options::new();
@@ -63,6 +64,9 @@ async fn run() -> anyhow::Result<()> {
let base_cgitrc = opt_matches.opt_str("cgitrc")
.map(|s| PathBuf::from(s));
+ let rt = tokio::runtime::Runtime::new()?;
+ let _rt_guard = rt.enter();
+
// let repos = github::fetch_repos(username).await?
let test_repos = vec![
@@ -90,40 +94,150 @@ async fn run() -> anyhow::Result<()> {
},
];
- let mut db = database::Db::connect(&database_file).await?;
+ let mut db = Arc::new(
+ tokio::sync::Mutex::new(
+ executor::block_on(database::Db::connect(&database_file))?,
+ )
+ );
- db.create().await?;
+ executor::block_on(async {
+ db.lock().await.create().await
+ })?;
+
+ // let pool = executor::ThreadPool::builder()
+ // .pool_size(4)
+ // .create()?;
+
+ // for repo in test_repos {
+ // let join = test_repos.par_iter()
+ // let join = tokio::spawn(async move {
+ // let id = repo.id;
+ // let path = clone_path(&mirror_root, &repo);
+ // let db_repo = database::Repo::from(&repo);
+ //
+ // match db.repo_get(id).await {
+ // // If we've already seen the repo and it's been updated, fetch the
+ // // latest.
+ // Ok(current_repo) => {
+ // if db.repo_is_updated(&db_repo).await? {
+ // update(&path, &current_repo, &repo)?;
+ //
+ // db.repo_update(&db_repo).await?;
+ // }
+ // },
+ //
+ // // If the repo doesn't exist, mirror it and store it in the
+ // // database.
+ // Err(database::Error::Db(sqlx::Error::RowNotFound)) => {
+ // mirror(
+ // &path,
+ // &repo,
+ // base_cgitrc.as_ref(),
+ // )?;
+ //
+ // db.repo_insert(db_repo).await?;
+ // },
+ //
+ // Err(e) => anyhow::bail!(e),
+ // }
+ //
+ // Ok(())
+ // });
+
+ // let join = tokio::spawn(
+ // process_repo(&repo, &mut db, &mirror_root, base_cgitrc),
+ // );
+ //
+ // join.await?;
+ // }
+
+ // let tasks = test_repos.iter().map(|repo|
+ // pool.spawn_ok(process_repo(&repo, &mut db, &mirror_root, base_cgitrc))
+ // );
+ //
+ // executor::block_on(future::join_all(tasks));
+
+ // let tasks = test_repos.iter().map(|repo|
+ // process_repo(&repo, &mut db, &mirror_root, base_cgitrc));
+ //
+ // executor::block_on(async { tokio::try_join!(tasks) })?;
+
+ // let joins = test_repos.into_iter()
+ // .map(|repo| {
+ // let db = db.clone();
+ //
+ // let result = process_repo(
+ // &repo,
+ // &mut db.lock().unwrap(),
+ // &mirror_root.clone(),
+ // base_cgitrc.clone(),
+ // );
+ //
+ // result
+ // })
+ // .map(tokio::spawn);
+ // .collect::<Vec<_>>();
+
+ let mut joins = Vec::new();
for repo in test_repos {
- let id = repo.id;
- let path = clone_path(&mirror_root, &repo);
- let db_repo = database::Repo::from(&repo);
-
- match db.repo_get(id).await {
- // If we've already seen the repo and it's been updated, fetch the
- // latest.
- Ok(current_repo) => {
- if db.repo_is_updated(&db_repo).await? {
- update(&path, &current_repo, &repo)?;
-
- db.repo_update(&db_repo).await?;
- }
- },
+ let db = db.clone();
+ let mirror_root = mirror_root.clone();
+ let base_cgitrc = base_cgitrc.clone();
+
+ let join = tokio::spawn(async move {
+ let mut db = db.lock().await;
+
+ process_repo(
+ &repo,
+ &mut db,
+ &mirror_root,
+ base_cgitrc,
+ ).await
+ });
+
+ joins.push(join);
+ }
- // If the repo doesn't exist, mirror it and store it in the
- // database.
- Err(database::Error::Db(sqlx::Error::RowNotFound)) => {
- mirror(
- &path,
- &repo,
- base_cgitrc.as_ref(),
- )?;
+ executor::block_on(future::join_all(joins));
- db.repo_insert(db_repo).await?;
- },
+ Ok(())
+}
+
+async fn process_repo(
+ repo: &github::Repo,
+ db: &mut database::Db,
+ mirror_root: &str,
+ base_cgitrc: Option<PathBuf>,
+) -> anyhow::Result<()> {
+ let id = repo.id;
+ let path = clone_path(&mirror_root, &repo);
+ let db_repo = database::Repo::from(repo);
+
+ match db.repo_get(id).await {
+ // If we've already seen the repo and it's been updated, fetch the
+ // latest.
+ Ok(current_repo) => {
+ if db.repo_is_updated(&db_repo).await? {
+ update(&path, &current_repo, &repo)?;
+
+ db.repo_update(&db_repo).await?;
+ }
+ },
+
+ // If the repo doesn't exist, mirror it and store it in the
+ // database.
+ Err(database::Error::Db(sqlx::Error::RowNotFound)) => {
+ mirror(
+ &path,
+ &repo,
+ base_cgitrc.as_ref(),
+ )?;
+
+ db.repo_insert(db_repo).await?;
+ },
- Err(e) => anyhow::bail!(e),
- }
+ Err(e) => anyhow::bail!(e),
}
Ok(())