aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/main.rs24
3 files changed, 20 insertions, 6 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 183afd6..fe42006 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1056,6 +1056,7 @@ dependencies = [
"sqlx",
"thiserror",
"tokio",
+ "tokio-stream",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 14e779a..4188321 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@ reqwest = { version = "0.11.3", features = ["json"] }
serde = { version = "1.0.126", features = ["derive"] }
thiserror = "1.0.25"
tokio = { version = "1.6.1", features = ["macros", "rt-multi-thread"] }
+tokio-stream = "0.1.6"
[dependencies.sqlx]
version = "0.5.5"
diff --git a/src/main.rs b/src/main.rs
index a8c35be..a476ef0 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,11 +2,12 @@ use anyhow::{self, Context};
use chrono::DateTime;
use exitcode;
use filetime;
-use futures::{executor, future};
+use futures::{self, executor, future};
use getopts::Options;
use parse_size::parse_size;
use sqlx;
use tokio;
+use tokio_stream::StreamExt;
use reflectub::{database, git, github};
@@ -99,20 +100,21 @@ fn run() -> anyhow::Result<()> {
.build()?;
let _rt_guard = rt.enter();
- let repos = executor::block_on(github::fetch_repos(username))?;
+ let repos = rt.block_on(github::fetch_repos(username))?;
let db = Arc::new(
tokio::sync::Mutex::new(
- executor::block_on(database::Db::connect(&database_file))?,
+ rt.block_on(database::Db::connect(&database_file))?,
)
);
- executor::block_on(async {
+ rt.block_on(async {
db.lock().await
.create().await
})?;
- let mut joins = Vec::new();
+ // let mut joins = futures::stream::FuturesUnordered::new();
+ let mut joins = Vec::with_capacity(repos.len());
for repo in repos {
let db = db.clone();
@@ -134,7 +136,17 @@ fn run() -> anyhow::Result<()> {
joins.push(join);
}
- executor::block_on(future::join_all(joins));
+ // executor::block_on(future::join_all(joins));
+ rt.block_on(async {
+ let mut joins = tokio_stream::iter(&joins);
+
+ while let Some(task) = joins.next().await {
+ let a = task.await?;
+ dbg!(a);
+ }
+
+ Ok::<(), anyhow::Error>(())
+ })?;
Ok(())
}