diff --git a/src/actions/show_plan.rs b/src/actions/show_plan.rs index 5ef0966..55c65c6 100644 --- a/src/actions/show_plan.rs +++ b/src/actions/show_plan.rs @@ -11,7 +11,7 @@ pub fn run(opt: &Opt) -> anyhow::Result<()> { let presence = planner::presence(&local_list, &remote_list); println!( - "found {} out of {} folders that need backup", + "found that {} out of {} folders need backup", plan.transfers.len(), presence.len(), ); @@ -19,7 +19,7 @@ pub fn run(opt: &Opt) -> anyhow::Result<()> { if !plan.transfers.is_empty() { println!("plan:"); for (item, item_plan) in plan.transfers { - println!("- {}: {:?}", item, item_plan); + println!("- {:?}: {:?}", item, item_plan); } } diff --git a/src/actions/sync.rs b/src/actions/sync.rs index 55af3e9..5cc369e 100644 --- a/src/actions/sync.rs +++ b/src/actions/sync.rs @@ -1,13 +1,17 @@ use crate::local; use crate::planner::{self, TransferKind}; use crate::remote; -use crate::util::format_duration; +use crate::util::{format_duration, path_as_utf8}; use crate::Opt; use ssh2::Session; -use std::io::{self, Read}; +use std::io::{self, Read, Write}; use std::process::{Command, Stdio}; +use std::sync::mpsc; +use std::thread; use std::time::Instant; +const TMP_FOLDER: &str = ".tmp"; + pub fn run(opt: &Opt, sync_all: bool) -> anyhow::Result<()> { // TODO: currently we only sync the latest local files // --all will force a sync of ALL files on local which does not exist on remote @@ -46,6 +50,59 @@ pub fn run(opt: &Opt, sync_all: bool) -> anyhow::Result<()> { Ok(()) } +struct CmdOutput { + exit_status: i32, + stdout: String, + stderr: String, +} + +fn do_cmd(session: &Session, cmd: &str) -> anyhow::Result { + let mut ch = session.channel_session()?; + + ch.exec(cmd)?; + ch.send_eof()?; + + let mut stdout = String::new(); + let mut stderr = String::new(); + + ch.stderr().read_to_string(&mut stderr)?; + ch.read_to_string(&mut stdout)?; + + ch.close()?; + ch.wait_close()?; + let exit_status = ch.exit_status()?; + + Ok(CmdOutput { + stdout, + stderr, + exit_status, + }) +} + +fn clear_tmp_dir(opt: &Opt, session: &Session) -> anyhow::Result { + let tmp_path = opt.remote.path.join(TMP_FOLDER); + let tmp_path = path_as_utf8(&tmp_path)?; + let cmd = format!(r#"rm -r "{}""#, tmp_path); + let result = do_cmd(session, &cmd)?; + + let success = result.exit_status == 0; + + Ok(success) +} + +fn create_tmp_dir(opt: &Opt, session: &Session) -> anyhow::Result<()> { + let tmp_path = opt.remote.path.join(TMP_FOLDER); + let tmp_path = path_as_utf8(&tmp_path)?; + let cmd = format!(r#"mkdir "{}""#, tmp_path); + let result = do_cmd(session, &cmd)?; + + if result.exit_status != 0 { + anyhow::bail!("failed to create {} dir on remote", TMP_FOLDER); + } + + Ok(()) +} + fn send_snapshot( opt: &Opt, session: &Session, @@ -58,6 +115,17 @@ fn send_snapshot( info!("[{}] sending snapshot delta", snapshot); } + if clear_tmp_dir(opt, session)? { + warn!( + "[{}] {} dir did already exist, it is likely that a previous upload failed.", + snapshot, TMP_FOLDER + ); + } + + create_tmp_dir(opt, session)?; + + let start_time = Instant::now(); + // spawn btrfs send let mut send = Command::new("btrfs") .arg("send") @@ -75,44 +143,84 @@ fn send_snapshot( .take() .ok_or_else(|| anyhow::format_err!("failed to take stdout"))?; - // start btrfs receive - let remote_path = opt - .remote - .path - .to_str() - .ok_or_else(|| anyhow::format_err!("path not utf-8"))?; - let mut receive = session.channel_session()?; - receive.exec(&format!(r#"btrfs receive "{}""#, remote_path,))?; + // #### UPLOAD SNAPSHOT FILE #### + const CHUNK_SIZE: usize = 1024 * 1024 * 100; // 100MB - // pipe send to receive - let start_time = Instant::now(); - let num_bytes = io::copy(&mut send_stdout, &mut receive)?; + let (data_tx, data_rx) = mpsc::sync_channel(10); + let tmp_path = opt.remote.path.join(TMP_FOLDER); - // wait for send to complete - let local_out = send.wait_with_output()?; - if !local_out.status.success() { - let stderr = std::str::from_utf8(&local_out.stderr) - .unwrap_or("failed to parse stderr, not valid utf8"); - anyhow::bail!("btrfs send failed:\n{}", stderr); + // spawn a thread to stream data from btrfs send in chunks + thread::spawn(move || -> io::Result<()> { + 'outer: for _ in 0.. { + let mut buf: Vec = vec![0u8; CHUNK_SIZE]; + let mut len = 0; + loop { + let free = &mut buf[len..]; + let n = send_stdout.read(free)?; + len += n; + + if n == 0 || n == free.len() { + buf.truncate(len); + if data_tx.send(buf).is_err() { + break 'outer; + } + + // check if we reached EOF + if n == 0 { + break 'outer; + } else { + continue 'outer; + } + } + } + } + + Ok(()) + }); + + let mut byte_count = 0; + let mut i = 0; + while let Ok(data) = data_rx.recv() { + byte_count += data.len(); + info!("[{}] uploading {} bytes...", snapshot, byte_count); + let snapshot_file = tmp_path.join(format!("{:016}", i)); + let mut ch = session.scp_send(&snapshot_file, 0o600, data.len() as u64, None)?; + ch.write_all(&data)?; + i += 1; } - // wait for receive to complete - receive.send_eof()?; - let mut remote_err = String::new(); - receive.stderr().read_to_string(&mut remote_err)?; - receive.wait_close()?; - let status = receive.exit_status()?; - if status != 0 { - anyhow::bail!("btrfs receive failed\nstderr:\n{}", remote_err); - } + info!( + "[{}] re-creating snapshot (this can take a while)", + snapshot + ); + let remote_path = path_as_utf8(&opt.remote.path)?; + let tmp_path = path_as_utf8(&tmp_path)?; + let cmd = format!( + r#"cat "{}"/* | btrfs receive -e "{}""#, + tmp_path, remote_path + ); + let out = do_cmd(session, &cmd)?; let time_elapsed = start_time.elapsed(); + + if out.exit_status != 0 { + anyhow::bail!( + "btrfs receive failed\nstdout:\n{}\nstderr:\n{}", + out.stdout, + out.stderr + ); + } + info!( - "[{}] sent {} bytes in {}", + "[{}] snapshot was {} bytes, time taken was {}", snapshot, - num_bytes, + byte_count, format_duration(time_elapsed) ); + if !clear_tmp_dir(opt, session)? { + anyhow::bail!("failed to remove {} dir", TMP_FOLDER); + } + Ok(()) } diff --git a/src/remote.rs b/src/remote.rs index 29893ab..fc23533 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -15,14 +15,15 @@ pub struct Remote { } pub fn connect(opt: &Opt) -> anyhow::Result { - let stream = TcpStream::connect(&opt.remote.remote)?; - let mut session = Session::new()?; - session.set_tcp_stream(stream); - session.handshake()?; info!( r#"connecting to {}@{}"#, opt.remote.username, opt.remote.remote, ); + + let stream = TcpStream::connect(&opt.remote.remote)?; + let mut session = Session::new()?; + session.set_tcp_stream(stream); + session.handshake()?; session.userauth_pubkey_file( &opt.remote.username, None, @@ -32,6 +33,7 @@ pub fn connect(opt: &Opt) -> anyhow::Result { if !session.authenticated() { anyhow::bail!("ssh not authenticated"); } + session.set_allow_sigpipe(true); Ok(session) } diff --git a/src/util.rs b/src/util.rs index 8107138..1119790 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,3 +1,4 @@ +use std::path::Path; use std::time::Duration; pub fn format_duration(d: Duration) -> String { @@ -12,3 +13,8 @@ pub fn format_duration(d: Duration) -> String { (_, _) => format!("{}h {}m {:.2}s", hours, minutes, seconds), } } + +pub fn path_as_utf8(path: &Path) -> anyhow::Result<&str> { + path.to_str() + .ok_or_else(|| anyhow::format_err!("path not utf-8")) +}