aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTeddy Wing2021-06-12 21:00:48 +0200
committerTeddy Wing2021-06-12 21:00:48 +0200
commite4a41075af126dbcac19fe91267f785b3b391ee0 (patch)
tree52d2a4a0a6f60ff386396d056625611824aa4e90 /src
parent46497bbae37f89e449b7a049663fe774843beb9b (diff)
downloadreflectub-e4a41075af126dbcac19fe91267f785b3b391ee0.tar.bz2
Process repositories on multiple threads
Use 'rayon' to parallelise the repository processing. Each repository is processed in a thread in the default 'rayon' pool. In order to get thread-safe access to the database, I followed some advice from a Stack Overflow answer by VasiliNovikov (https://stackoverflow.com/users/1091436/vasilinovikov): https://stackoverflow.com/questions/62560396/how-to-use-sqlite-via-rusqlite-from-multiple-threads/62560397#62560397 VasiliNovikov recommended creating a database connection pool using 'r2d2_sqlite'. This way we don't have to share a database connection between threads, but each thread can have its own connection. This also means we can remove mutable requirements in a bunch of places involving our `database::Db` type since we're no longer managing the database connections directly.
Diffstat (limited to 'src')
-rw-r--r--src/database.rs44
-rw-r--r--src/main.rs74
2 files changed, 75 insertions, 43 deletions
diff --git a/src/database.rs b/src/database.rs
index 0785dd5..107a6d6 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -16,6 +16,8 @@
// along with Reflectub. If not, see <https://www.gnu.org/licenses/>.
+use r2d2;
+use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{self, OptionalExtension};
use thiserror;
@@ -55,31 +57,37 @@ impl From<&github::Repo> for Repo {
pub enum Error {
#[error("database error")]
Db(#[from] rusqlite::Error),
+
+ #[error("connection pool error")]
+ Pool(#[from] r2d2::Error),
}
#[derive(Debug)]
pub struct Db {
- connection: rusqlite::Connection,
+ pool: r2d2::Pool<SqliteConnectionManager>,
}
impl Db {
/// Open a connection to the database.
pub fn connect(path: &str) -> Result<Self, Error> {
+ let manager = SqliteConnectionManager::file(path)
+ .with_flags(
+ rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
+ | rusqlite::OpenFlags::SQLITE_OPEN_CREATE,
+ );
+
Ok(
Db {
- connection: rusqlite::Connection::open_with_flags(
- path,
- rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
- | rusqlite::OpenFlags::SQLITE_OPEN_CREATE,
- )?,
+ pool: r2d2::Pool::new(manager)?,
}
)
}
/// Initialise the database with tables and indexes.
- pub fn create(&mut self) -> Result<(), Error> {
- let tx = self.connection.transaction()?;
+ pub fn create(&self) -> Result<(), Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
tx.execute(
r#"
@@ -110,8 +118,9 @@ impl Db {
///
/// Returns a `rusqlite::Error::QueryReturnedNoRows` error if the row
/// doesn't exist.
- pub fn repo_get(&mut self, id: i64) -> Result<Repo, Error> {
- let tx = self.connection.transaction()?;
+ pub fn repo_get(&self, id: i64) -> Result<Repo, Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
let repo = tx.query_row(
r#"
@@ -142,8 +151,9 @@ impl Db {
}
/// Insert a new repository.
- pub fn repo_insert(&mut self, repo: Repo) -> Result<(), Error> {
- let tx = self.connection.transaction()?;
+ pub fn repo_insert(&self, repo: Repo) -> Result<(), Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
tx.execute(
r#"
@@ -170,10 +180,11 @@ impl Db {
/// Compares the `updated_at` field to find out whether the repository was
/// updated.
pub fn repo_is_updated(
- &mut self,
+ &self,
repo: &Repo,
) -> Result<bool, Error> {
- let tx = self.connection.transaction()?;
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
let is_updated = match tx.query_row(
r#"
@@ -201,8 +212,9 @@ impl Db {
}
/// Update an existing repository.
- pub fn repo_update(&mut self, repo: &Repo) -> Result<(), Error> {
- let tx = self.connection.transaction()?;
+ pub fn repo_update(&self, repo: &Repo) -> Result<(), Error> {
+ let mut pool = self.pool.get()?;
+ let tx = pool.transaction()?;
tx.execute(
r#"
diff --git a/src/main.rs b/src/main.rs
index 4e4d580..87b2320 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -23,6 +23,8 @@ use filetime;
// use futures::{self, executor, future};
use getopts::Options;
use parse_size::parse_size;
+use r2d2_sqlite::SqliteConnectionManager;
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rusqlite;
use reflectub::{database, git, github};
@@ -112,8 +114,12 @@ fn run() -> anyhow::Result<()> {
let repos = github::fetch_repos(username)?;
- let mut db = database::Db::connect(&database_file)
- .context("unable to connect to database")?;
+ let db = //Arc::new(
+ // Mutex::new(
+ database::Db::connect(&database_file)
+ .context("unable to connect to database")?;
+ // )
+ // );
db.create()
.context("unable to create database")?;
@@ -121,31 +127,46 @@ fn run() -> anyhow::Result<()> {
// let mut joins = futures::stream::FuturesUnordered::new();
// let mut joins = Vec::with_capacity(repos.len());
- let mut i = 0;
- for repo in repos {
- // let db = db.clone();
- let mirror_root = mirror_root.clone();
- let base_cgitrc = base_cgitrc.clone();
-
- // let join = tokio::runtime::Handle::current().spawn(async move {
- // let mut db = db.lock()?;
-
- process_repo(
- &repo,
- &mut db,
- &mirror_root,
- base_cgitrc,
- max_repo_size_bytes,
- )?;
- // });
+ // let mut i = 0;
+ // for repo in repos {
+ // // let db = db.clone();
+ // let mirror_root = mirror_root.clone();
+ // let base_cgitrc = base_cgitrc.clone();
+ //
+ // // let join = tokio::runtime::Handle::current().spawn(async move {
+ // // let mut db = db.lock()?;
+ //
+ // process_repo(
+ // &repo,
+ // &mut db,
+ // &mirror_root,
+ // base_cgitrc,
+ // max_repo_size_bytes,
+ // )?;
+ // // });
+ //
+ // // joins.push(join);
+ //
+ // if i == 2 {
+ // break;
+ // }
+ // i += 1;
+ // }
- // joins.push(join);
+ let _results: anyhow::Result<()> = repos[..2].par_iter()
+ .map(|repo| {
+ dbg!("Thread", std::thread::current().id());
- if i == 5 {
- break;
- }
- i += 1;
- }
+ process_repo(
+ &repo,
+ &db,
+ &mirror_root,
+ base_cgitrc.clone(), // TODO: Can we avoid cloning
+ max_repo_size_bytes,
+ )
+ })
+ .collect();
+ // TODO: Return errors
// executor::block_on(future::join_all(joins));
// let mut joins = tokio_stream::iter(&mut joins);
@@ -172,12 +193,11 @@ fn run() -> anyhow::Result<()> {
/// Mirror or update `repo`.
fn process_repo(
repo: &github::Repo,
- db: &mut database::Db,
+ db: &database::Db,
mirror_root: &str,
base_cgitrc: Option<PathBuf>,
max_repo_size_bytes: Option<u64>,
) -> anyhow::Result<()> {
- return anyhow::bail!("test");
if let Some(max_repo_size_bytes) = max_repo_size_bytes {
if is_repo_oversize(repo.size, max_repo_size_bytes) {
return Ok(());