use git2::{Cred, Error, FetchOptions, Index, Progress, PushOptions, RemoteCallbacks, Repository}; use indicatif::{ProgressBar, ProgressStyle}; use log::{info, trace}; use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::string::String; use std::str; pub struct Job { pub id: u32, pub key: String, pub result_repository_path: PathBuf, remote_name: String, } pub struct BenchmarkRepository { base_path: PathBuf, repository: Repository, jobs: HashMap, ssh_user: String, } pub struct TargetPath { pub source: PathBuf, pub destination: PathBuf, } impl BenchmarkRepository { fn progress_bar_style() -> ProgressStyle { ProgressStyle::default_bar() .template("{bar:74.on_black} {percent:>3} %") .progress_chars("█▉▊▋▌▍▎▏ ") } fn transfer_progress_callback(progress: Progress, progress_bar: &mut ProgressBar) -> bool { progress_bar.set_length(progress.total_objects() as u64); progress_bar.set_position(progress.received_objects() as u64); // continue the transfer true } fn push_update_reference_callback(reference: &str, status: Option<&str>) -> Result<(), Error> { match status { None => trace!("reference “{}” pushed successfully to remote “origin”", reference), Some(error) => panic!("couldn’t push reference “{}” to remote “origin”: {}", reference, error), }; Ok(()) } fn reset_origin(&self, remote_url: &str) -> &Self { if let Err(error) = self.repository.find_remote("origin") .or_else(|_| self.repository.remote("origin", remote_url)) { panic!("could not reset remote “origin”: {}", error); } info!("reset origin to “{}”", remote_url); self } fn fetch_branch(&self, remote_name: &str, branch_name: &str) -> &Self { let mut progress_bar = ProgressBar::new(0); progress_bar.set_style(BenchmarkRepository::progress_bar_style()); { let mut remote_callbacks = RemoteCallbacks::new(); remote_callbacks.credentials( |_, _, _| { match Cred::ssh_key_from_agent(&self.ssh_user) { Ok(credentials) => Ok(credentials), Err(error) => panic!("could not retrieve key pair for SSH authentication as user “{}”: {}", self.ssh_user, error), } }); remote_callbacks.transfer_progress( |progress| BenchmarkRepository::transfer_progress_callback(progress, &mut progress_bar)); let mut fetch_options = FetchOptions::new(); fetch_options.remote_callbacks(remote_callbacks); let mut remote = match self.repository.find_remote(remote_name) { Ok(remote) => remote, Err(error) => panic!("could not find remote “{}”", error), }; info!("fetching branch “{}” on remote “{}”", branch_name, remote_name); if let Err(error) = remote.fetch(&[branch_name], Some(&mut fetch_options), None) { panic!("failed to fetch branch “{}” from remote “{}”: {}", branch_name, remote_name, error); } } progress_bar.finish_and_clear(); trace!("branch “{}” on remote “{}” is up-to-date", branch_name, remote_name); self } fn clean_up_remotes(&self) -> &Self { let remotes = match self.repository.remotes() { Ok(remotes) => remotes, Err(error) => panic!("could not enumerate remotes: {}", error), }; for remote in remotes.iter() { let remote = match remote { Some(remote) => remote, None => panic!("remote name not in UTF-8"), }; if remote == "origin" { continue; } match self.repository.remote_delete(remote) { Ok(()) => (), Err(error) => panic!("could not clean up remote “{}”: {}", remote, error), }; } trace!("cleaned up remotes"); self } fn init(base_path: &Path) -> Repository { let repository_path = base_path.join("upstream"); let repository = match Repository::init_bare(&repository_path) { Ok(repository) => repository, Err(error) => panic!("failed to initialize Git repository in “{}”: {}", repository_path.display(), error), }; info!("initialized Git repository in “{}”", repository_path.display()); repository } pub fn new(remote_url: &str, base_path: PathBuf, ssh_user: &str) -> BenchmarkRepository { let repository = match Repository::open(&base_path) { Ok(repository) => { info!("Using existing Git repository"); repository }, Err(_) => BenchmarkRepository::init(&base_path), }; let benchmark_repository = BenchmarkRepository { base_path: base_path, repository: repository, jobs: HashMap::new(), ssh_user: ssh_user.to_string(), }; benchmark_repository .reset_origin(remote_url) .fetch_branch("origin", "config") .fetch_branch("origin", "results") .clean_up_remotes(); benchmark_repository } pub fn directory_exists(&self, directory_path: &Path, branch_name: &str) -> bool { let tip_reference_name = format!("refs/remotes/origin/{}", branch_name); let tip_reference = match self.repository.find_reference(&tip_reference_name) { Ok(value) => value, Err(error) => panic!("Could not find reference “{}”: {}", tip_reference_name, error), }; let tree = match tip_reference.peel_to_tree() { Ok(value) => value, Err(error) => panic!("Could not peel reference to tree: {}", error), }; let object_id = match tree.get_path(directory_path) { Ok(tree_entry) => tree_entry.id(), Err(_) => return false, }; if let Err(_) = self.repository.find_tree(object_id) { return false; } true } pub fn read_file(&self, file_path: &Path, branch_name: &str) -> Option { let tip_reference_name = format!("refs/remotes/origin/{}", branch_name); let tip_reference = match self.repository.find_reference(&tip_reference_name) { Ok(value) => value, Err(error) => panic!("Could not find reference “{}”: {}", tip_reference_name, error), }; let tree = match tip_reference.peel_to_tree() { Ok(value) => value, Err(error) => panic!("Could not peel reference to tree: {}", error), }; let object_id = match tree.get_path(file_path) { Ok(tree_entry) => tree_entry.id(), Err(_) => return None, }; let blob = match self.repository.find_blob(object_id) { Ok(blob) => blob, Err(_) => return None, }; let content = match std::str::from_utf8(blob.content()) { Ok(content) => content, Err(error) => panic!("Could not interpret file “{}” as UTF-8: {}", file_path.display(), error), }; Some(content.to_owned()) } pub fn is_job_done(&self, key: &str) -> bool { self.directory_exists(Path::new(key), "results") } pub fn create_job(&mut self, key: String) -> &Job { let id = self.jobs.len() as u32; let remote_name = format!("job-{}", id); let result_repository_path = self.base_path.join(&remote_name); if result_repository_path.exists() { match std::fs::remove_dir_all(&result_repository_path) { Ok(_) => (), Err(error) => panic!("failed to initialize result Git repository for job {} in “{}”: {}", id, result_repository_path.display(), error), }; } match Repository::init_bare(&result_repository_path) { Ok(_) => (), Err(error) => panic!("failed to initialize result Git repository for job {} in “{}”: {}", id, result_repository_path.display(), error), }; match self.repository.remote(&remote_name, &format!("file://{}", std::fs::canonicalize(&result_repository_path).unwrap().display())) { Ok(_) => (), Err(error) => panic!("could not create remote: {}", error), }; info!("initialized result Git repository for job {} in “{}”", id, result_repository_path.display()); self.jobs.insert(id, Job{id, key, result_repository_path, remote_name}); self.jobs.get(&id).unwrap() } fn tip_commit(&self, remote_name: &str, branch_name: &str) -> Result { let tip_reference_name = format!("refs/remotes/{}/{}", remote_name, branch_name); self.repository.find_reference(&tip_reference_name).and_then(|tip_reference| tip_reference.peel_to_commit()) } pub fn join(&self) { let mut active_job_ids = HashSet::new(); for (job_id, _) in &self.jobs { active_job_ids.insert(job_id); } while !active_job_ids.is_empty() { info!("waiting for jobs to finish"); std::thread::sleep(std::time::Duration::from_secs(2)); let mut changed = false; active_job_ids.retain ( |job_id| { let job = self.jobs.get(job_id).unwrap(); self.fetch_branch(&job.remote_name, "master"); let job_commit = match self.tip_commit(&job.remote_name, "master") { Ok(value) => value, // Job is not done yet, so skip it until it is Err(_) => return true, }; info!("job {} finished ({})", job_id, job.key); let upstream_commit = match self.tip_commit("origin", "results") { Ok(value) => value, Err(error) => panic!("could not access tip commit of “results” branch: {}", error), }; let upstream_tree = match upstream_commit.tree() { Ok(value) => value, Err(error) => panic!("could not access tip commit tree of “results” branch: {}", error), }; let job_tree = match job_commit.tree() { Ok(value) => value, Err(error) => panic!("could not read results of job {}: {}", job_id, error), }; let mut upstream_index = match Index::new() { Ok(value) => value, Err(error) => panic!("could not create index: {}", error), }; match upstream_index.read_tree(&upstream_tree) { Ok(value) => value, Err(error) => panic!("could not read tree into index: {}", error), }; let mut job_index = match Index::new() { Ok(value) => value, Err(error) => panic!("could not create index: {}", error), }; match job_index.read_tree(&job_tree) { Ok(value) => value, Err(error) => panic!("could not read tree into index: {}", error), }; for mut job_index_entry in job_index.iter() { let result_path = Path::new(&job.key); job_index_entry.path = format!("{}/{}", result_path.display(), String::from_utf8_lossy(&job_index_entry.path)).as_bytes().to_owned(); match upstream_index.add(&job_index_entry) { Ok(_) => (), Err(error) => panic!("could not add results to index: {}", error), }; } let new_tree_object_id = match upstream_index.write_tree_to(&self.repository) { Ok(value) => value, Err(error) => panic!("could not write index to tree: {}", error), }; let new_tree = match self.repository.find_tree(new_tree_object_id) { Ok(value) => value, Err(error) => panic!("could not obtain tree: {}", error), }; trace!("created tree object “{}”", new_tree_object_id); let author = job_commit.author(); let committer = job_commit.committer(); let message = job_commit.message().unwrap_or(""); let tip_reference_name = "refs/remotes/origin/results"; let commit_id = match self.repository.commit(Some(&tip_reference_name), &author, &committer, &message, &new_tree, &[&upstream_commit]) { Ok(value) => value, Err(error) => panic!("could not write commit: {}", error), }; trace!("created commit “{}”", commit_id); changed = true; false } ); if changed { let push_refspec = format!("refs/remotes/origin/results:refs/heads/results"); trace!("using push refspec “{}”", push_refspec); let mut remote_callbacks = RemoteCallbacks::new(); remote_callbacks.credentials( |_, _, _| { match Cred::ssh_key_from_agent(&self.ssh_user) { Ok(value) => Ok(value), Err(error) => panic!("could not retrieve key pair for SSH authentication as user “{}”: {}", self.ssh_user, error), } }); remote_callbacks.push_update_reference( |reference, status| BenchmarkRepository::push_update_reference_callback(reference, status)); let mut push_options = PushOptions::new(); push_options.remote_callbacks(remote_callbacks); let mut remote = self.repository.find_remote("origin").expect(""); remote.push(&[&push_refspec], Some(&mut push_options)).expect("couldn’t push"); } } info!("all jobs done, all results uploaded"); } } #[cfg(test)] mod tests { }