sync: Save snapshot to a file before receive
This commit is contained in:
@ -11,7 +11,7 @@ pub fn run(opt: &Opt) -> anyhow::Result<()> {
|
||||
let presence = planner::presence(&local_list, &remote_list);
|
||||
|
||||
println!(
|
||||
"found {} out of {} folders that need backup",
|
||||
"found that {} out of {} folders need backup",
|
||||
plan.transfers.len(),
|
||||
presence.len(),
|
||||
);
|
||||
@ -19,7 +19,7 @@ pub fn run(opt: &Opt) -> anyhow::Result<()> {
|
||||
if !plan.transfers.is_empty() {
|
||||
println!("plan:");
|
||||
for (item, item_plan) in plan.transfers {
|
||||
println!("- {}: {:?}", item, item_plan);
|
||||
println!("- {:?}: {:?}", item, item_plan);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,13 +1,17 @@
|
||||
use crate::local;
|
||||
use crate::planner::{self, TransferKind};
|
||||
use crate::remote;
|
||||
use crate::util::format_duration;
|
||||
use crate::util::{format_duration, path_as_utf8};
|
||||
use crate::Opt;
|
||||
use ssh2::Session;
|
||||
use std::io::{self, Read};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::process::{Command, Stdio};
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
const TMP_FOLDER: &str = ".tmp";
|
||||
|
||||
pub fn run(opt: &Opt, sync_all: bool) -> anyhow::Result<()> {
|
||||
// TODO: currently we only sync the latest local files
|
||||
// --all will force a sync of ALL files on local which does not exist on remote
|
||||
@ -46,6 +50,59 @@ pub fn run(opt: &Opt, sync_all: bool) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct CmdOutput {
|
||||
exit_status: i32,
|
||||
stdout: String,
|
||||
stderr: String,
|
||||
}
|
||||
|
||||
fn do_cmd(session: &Session, cmd: &str) -> anyhow::Result<CmdOutput> {
|
||||
let mut ch = session.channel_session()?;
|
||||
|
||||
ch.exec(cmd)?;
|
||||
ch.send_eof()?;
|
||||
|
||||
let mut stdout = String::new();
|
||||
let mut stderr = String::new();
|
||||
|
||||
ch.stderr().read_to_string(&mut stderr)?;
|
||||
ch.read_to_string(&mut stdout)?;
|
||||
|
||||
ch.close()?;
|
||||
ch.wait_close()?;
|
||||
let exit_status = ch.exit_status()?;
|
||||
|
||||
Ok(CmdOutput {
|
||||
stdout,
|
||||
stderr,
|
||||
exit_status,
|
||||
})
|
||||
}
|
||||
|
||||
fn clear_tmp_dir(opt: &Opt, session: &Session) -> anyhow::Result<bool> {
|
||||
let tmp_path = opt.remote.path.join(TMP_FOLDER);
|
||||
let tmp_path = path_as_utf8(&tmp_path)?;
|
||||
let cmd = format!(r#"rm -r "{}""#, tmp_path);
|
||||
let result = do_cmd(session, &cmd)?;
|
||||
|
||||
let success = result.exit_status == 0;
|
||||
|
||||
Ok(success)
|
||||
}
|
||||
|
||||
fn create_tmp_dir(opt: &Opt, session: &Session) -> anyhow::Result<()> {
|
||||
let tmp_path = opt.remote.path.join(TMP_FOLDER);
|
||||
let tmp_path = path_as_utf8(&tmp_path)?;
|
||||
let cmd = format!(r#"mkdir "{}""#, tmp_path);
|
||||
let result = do_cmd(session, &cmd)?;
|
||||
|
||||
if result.exit_status != 0 {
|
||||
anyhow::bail!("failed to create {} dir on remote", TMP_FOLDER);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_snapshot(
|
||||
opt: &Opt,
|
||||
session: &Session,
|
||||
@ -58,6 +115,17 @@ fn send_snapshot(
|
||||
info!("[{}] sending snapshot delta", snapshot);
|
||||
}
|
||||
|
||||
if clear_tmp_dir(opt, session)? {
|
||||
warn!(
|
||||
"[{}] {} dir did already exist, it is likely that a previous upload failed.",
|
||||
snapshot, TMP_FOLDER
|
||||
);
|
||||
}
|
||||
|
||||
create_tmp_dir(opt, session)?;
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
// spawn btrfs send
|
||||
let mut send = Command::new("btrfs")
|
||||
.arg("send")
|
||||
@ -75,44 +143,84 @@ fn send_snapshot(
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::format_err!("failed to take stdout"))?;
|
||||
|
||||
// start btrfs receive
|
||||
let remote_path = opt
|
||||
.remote
|
||||
.path
|
||||
.to_str()
|
||||
.ok_or_else(|| anyhow::format_err!("path not utf-8"))?;
|
||||
let mut receive = session.channel_session()?;
|
||||
receive.exec(&format!(r#"btrfs receive "{}""#, remote_path,))?;
|
||||
// #### UPLOAD SNAPSHOT FILE ####
|
||||
const CHUNK_SIZE: usize = 1024 * 1024 * 100; // 100MB
|
||||
|
||||
// pipe send to receive
|
||||
let start_time = Instant::now();
|
||||
let num_bytes = io::copy(&mut send_stdout, &mut receive)?;
|
||||
let (data_tx, data_rx) = mpsc::sync_channel(10);
|
||||
let tmp_path = opt.remote.path.join(TMP_FOLDER);
|
||||
|
||||
// wait for send to complete
|
||||
let local_out = send.wait_with_output()?;
|
||||
if !local_out.status.success() {
|
||||
let stderr = std::str::from_utf8(&local_out.stderr)
|
||||
.unwrap_or("failed to parse stderr, not valid utf8");
|
||||
anyhow::bail!("btrfs send failed:\n{}", stderr);
|
||||
// spawn a thread to stream data from btrfs send in chunks
|
||||
thread::spawn(move || -> io::Result<()> {
|
||||
'outer: for _ in 0.. {
|
||||
let mut buf: Vec<u8> = vec![0u8; CHUNK_SIZE];
|
||||
let mut len = 0;
|
||||
loop {
|
||||
let free = &mut buf[len..];
|
||||
let n = send_stdout.read(free)?;
|
||||
len += n;
|
||||
|
||||
if n == 0 || n == free.len() {
|
||||
buf.truncate(len);
|
||||
if data_tx.send(buf).is_err() {
|
||||
break 'outer;
|
||||
}
|
||||
|
||||
// wait for receive to complete
|
||||
receive.send_eof()?;
|
||||
let mut remote_err = String::new();
|
||||
receive.stderr().read_to_string(&mut remote_err)?;
|
||||
receive.wait_close()?;
|
||||
let status = receive.exit_status()?;
|
||||
if status != 0 {
|
||||
anyhow::bail!("btrfs receive failed\nstderr:\n{}", remote_err);
|
||||
// check if we reached EOF
|
||||
if n == 0 {
|
||||
break 'outer;
|
||||
} else {
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let mut byte_count = 0;
|
||||
let mut i = 0;
|
||||
while let Ok(data) = data_rx.recv() {
|
||||
byte_count += data.len();
|
||||
info!("[{}] uploading {} bytes...", snapshot, byte_count);
|
||||
let snapshot_file = tmp_path.join(format!("{:016}", i));
|
||||
let mut ch = session.scp_send(&snapshot_file, 0o600, data.len() as u64, None)?;
|
||||
ch.write_all(&data)?;
|
||||
i += 1;
|
||||
}
|
||||
|
||||
info!(
|
||||
"[{}] re-creating snapshot (this can take a while)",
|
||||
snapshot
|
||||
);
|
||||
let remote_path = path_as_utf8(&opt.remote.path)?;
|
||||
let tmp_path = path_as_utf8(&tmp_path)?;
|
||||
let cmd = format!(
|
||||
r#"cat "{}"/* | btrfs receive -e "{}""#,
|
||||
tmp_path, remote_path
|
||||
);
|
||||
let out = do_cmd(session, &cmd)?;
|
||||
|
||||
let time_elapsed = start_time.elapsed();
|
||||
|
||||
if out.exit_status != 0 {
|
||||
anyhow::bail!(
|
||||
"btrfs receive failed\nstdout:\n{}\nstderr:\n{}",
|
||||
out.stdout,
|
||||
out.stderr
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
"[{}] sent {} bytes in {}",
|
||||
"[{}] snapshot was {} bytes, time taken was {}",
|
||||
snapshot,
|
||||
num_bytes,
|
||||
byte_count,
|
||||
format_duration(time_elapsed)
|
||||
);
|
||||
|
||||
if !clear_tmp_dir(opt, session)? {
|
||||
anyhow::bail!("failed to remove {} dir", TMP_FOLDER);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -15,14 +15,15 @@ pub struct Remote {
|
||||
}
|
||||
|
||||
pub fn connect(opt: &Opt) -> anyhow::Result<Session> {
|
||||
let stream = TcpStream::connect(&opt.remote.remote)?;
|
||||
let mut session = Session::new()?;
|
||||
session.set_tcp_stream(stream);
|
||||
session.handshake()?;
|
||||
info!(
|
||||
r#"connecting to {}@{}"#,
|
||||
opt.remote.username, opt.remote.remote,
|
||||
);
|
||||
|
||||
let stream = TcpStream::connect(&opt.remote.remote)?;
|
||||
let mut session = Session::new()?;
|
||||
session.set_tcp_stream(stream);
|
||||
session.handshake()?;
|
||||
session.userauth_pubkey_file(
|
||||
&opt.remote.username,
|
||||
None,
|
||||
@ -32,6 +33,7 @@ pub fn connect(opt: &Opt) -> anyhow::Result<Session> {
|
||||
if !session.authenticated() {
|
||||
anyhow::bail!("ssh not authenticated");
|
||||
}
|
||||
session.set_allow_sigpipe(true);
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn format_duration(d: Duration) -> String {
|
||||
@ -12,3 +13,8 @@ pub fn format_duration(d: Duration) -> String {
|
||||
(_, _) => format!("{}h {}m {:.2}s", hours, minutes, seconds),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn path_as_utf8(path: &Path) -> anyhow::Result<&str> {
|
||||
path.to_str()
|
||||
.ok_or_else(|| anyhow::format_err!("path not utf-8"))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user