Compare commits

...

5 Commits

7 changed files with 197 additions and 39 deletions

12
Cargo.lock generated
View File

@ -42,7 +42,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "butterup" name = "butterup"
version = "0.1.0" version = "1.0.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"chrono", "chrono",
@ -260,6 +260,15 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "openssl-src"
version = "111.16.0+1.1.1l"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab2173f69416cf3ec12debb5823d244127d23a9b127d5a5189aa97c5fa2859f"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "openssl-sys" name = "openssl-sys"
version = "0.9.67" version = "0.9.67"
@ -269,6 +278,7 @@ dependencies = [
"autocfg", "autocfg",
"cc", "cc",
"libc", "libc",
"openssl-src",
"pkg-config", "pkg-config",
"vcpkg", "vcpkg",
] ]

View File

@ -1,7 +1,7 @@
[package] [package]
name = "butterup" name = "butterup"
description = "Backup btrfs snapshots over SSH" description = "Backup btrfs snapshots over SSH"
version = "0.1.0" version = "1.0.0"
authors = ["Joakim Hulthe <joakim@hulthe.net>"] authors = ["Joakim Hulthe <joakim@hulthe.net>"]
license = "MPL-2.0" license = "MPL-2.0"
edition = "2018" edition = "2018"
@ -12,4 +12,7 @@ chrono = "0.4.19"
clap = "3.0.0-beta.4" clap = "3.0.0-beta.4"
log = "0.4" log = "0.4"
pretty_env_logger = "0.4" pretty_env_logger = "0.4"
ssh2 = "0.9.1"
[dependencies.ssh2]
version = "0.9.1"
features = ["vendored-openssl"]

View File

@ -11,7 +11,7 @@ pub fn run(opt: &Opt) -> anyhow::Result<()> {
let presence = planner::presence(&local_list, &remote_list); let presence = planner::presence(&local_list, &remote_list);
println!( println!(
"found {} out of {} folders that need backup", "found that {} out of {} folders need backup",
plan.transfers.len(), plan.transfers.len(),
presence.len(), presence.len(),
); );
@ -19,7 +19,7 @@ pub fn run(opt: &Opt) -> anyhow::Result<()> {
if !plan.transfers.is_empty() { if !plan.transfers.is_empty() {
println!("plan:"); println!("plan:");
for (item, item_plan) in plan.transfers { for (item, item_plan) in plan.transfers {
println!("- {}: {:?}", item, item_plan); println!("- {:?}: {:?}", item, item_plan);
} }
} }

View File

@ -1,10 +1,16 @@
use crate::local; use crate::local;
use crate::planner::{self, TransferKind}; use crate::planner::{self, TransferKind};
use crate::remote; use crate::remote;
use crate::util::{format_duration, path_as_utf8};
use crate::Opt; use crate::Opt;
use ssh2::Session; use ssh2::Session;
use std::io::{self, Read}; use std::io::{self, Read, Write};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::sync::mpsc;
use std::thread;
use std::time::Instant;
const TMP_FOLDER: &str = ".tmp";
pub fn run(opt: &Opt, sync_all: bool) -> anyhow::Result<()> { pub fn run(opt: &Opt, sync_all: bool) -> anyhow::Result<()> {
// TODO: currently we only sync the latest local files // TODO: currently we only sync the latest local files
@ -44,13 +50,81 @@ pub fn run(opt: &Opt, sync_all: bool) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
struct CmdOutput {
exit_status: i32,
stdout: String,
stderr: String,
}
fn do_cmd(session: &Session, cmd: &str) -> anyhow::Result<CmdOutput> {
let mut ch = session.channel_session()?;
ch.exec(cmd)?;
ch.send_eof()?;
let mut stdout = String::new();
let mut stderr = String::new();
ch.stderr().read_to_string(&mut stderr)?;
ch.read_to_string(&mut stdout)?;
ch.close()?;
ch.wait_close()?;
let exit_status = ch.exit_status()?;
Ok(CmdOutput {
stdout,
stderr,
exit_status,
})
}
fn clear_tmp_dir(opt: &Opt, session: &Session) -> anyhow::Result<bool> {
let tmp_path = opt.remote.path.join(TMP_FOLDER);
let tmp_path = path_as_utf8(&tmp_path)?;
let cmd = format!(r#"rm -r "{}""#, tmp_path);
let result = do_cmd(session, &cmd)?;
let success = result.exit_status == 0;
Ok(success)
}
fn create_tmp_dir(opt: &Opt, session: &Session) -> anyhow::Result<()> {
let tmp_path = opt.remote.path.join(TMP_FOLDER);
let tmp_path = path_as_utf8(&tmp_path)?;
let cmd = format!(r#"mkdir "{}""#, tmp_path);
let result = do_cmd(session, &cmd)?;
if result.exit_status != 0 {
anyhow::bail!("failed to create {} dir on remote", TMP_FOLDER);
}
Ok(())
}
fn send_snapshot( fn send_snapshot(
opt: &Opt, opt: &Opt,
session: &Session, session: &Session,
snapshot: &str, snapshot: &str,
parent: Option<&str>, parent: Option<&str>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("[{}] transmitting delta", snapshot); if parent.is_none() {
info!("[{}] sending full snapshot data", snapshot);
} else {
info!("[{}] sending snapshot delta", snapshot);
}
if clear_tmp_dir(opt, session)? {
warn!(
"[{}] {} dir did already exist, it is likely that a previous upload failed.",
snapshot, TMP_FOLDER
);
}
create_tmp_dir(opt, session)?;
let start_time = Instant::now();
// spawn btrfs send // spawn btrfs send
let mut send = Command::new("btrfs") let mut send = Command::new("btrfs")
@ -59,7 +133,7 @@ fn send_snapshot(
.arg(snapshot) .arg(snapshot)
.current_dir(&opt.path) .current_dir(&opt.path)
.stdin(Stdio::null()) .stdin(Stdio::null())
.stderr(Stdio::null()) .stderr(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.spawn()?; .spawn()?;
@ -69,35 +143,83 @@ fn send_snapshot(
.take() .take()
.ok_or_else(|| anyhow::format_err!("failed to take stdout"))?; .ok_or_else(|| anyhow::format_err!("failed to take stdout"))?;
// start btrfs receive // #### UPLOAD SNAPSHOT FILE ####
let remote_path = opt const CHUNK_SIZE: usize = 1024 * 1024 * 100; // 100MB
.remote
.path
.to_str()
.ok_or_else(|| anyhow::format_err!("path not utf-8"))?;
let mut receive = session.channel_session()?;
receive.exec(&format!(r#"btrfs receive "{}""#, remote_path,))?;
// pipe send to receive let (data_tx, data_rx) = mpsc::sync_channel(10);
let num_bytes = io::copy(&mut send_stdout, &mut receive)?; let tmp_path = opt.remote.path.join(TMP_FOLDER);
info!("[{}] sent {} bytes", snapshot, num_bytes);
// wait for send to complete // spawn a thread to stream data from btrfs send in chunks
let local_out = send.wait_with_output()?; thread::spawn(move || -> io::Result<()> {
if !local_out.status.success() { 'outer: for _ in 0.. {
let stderr = std::str::from_utf8(&local_out.stderr) let mut buf: Vec<u8> = vec![0u8; CHUNK_SIZE];
.unwrap_or("failed to parse stderr, not valid utf8"); let mut len = 0;
anyhow::bail!("btrfs send failed\nstderr:\n{}", stderr); loop {
let free = &mut buf[len..];
let n = send_stdout.read(free)?;
len += n;
if n == 0 || n == free.len() {
buf.truncate(len);
if data_tx.send(buf).is_err() {
break 'outer;
}
// check if we reached EOF
if n == 0 {
break 'outer;
} else {
continue 'outer;
}
}
}
}
Ok(())
});
let mut byte_count = 0;
let mut i = 0;
while let Ok(data) = data_rx.recv() {
byte_count += data.len();
info!("[{}] uploading {} bytes...", snapshot, byte_count);
let snapshot_file = tmp_path.join(format!("{:016}", i));
let mut ch = session.scp_send(&snapshot_file, 0o600, data.len() as u64, None)?;
ch.write_all(&data)?;
i += 1;
} }
// wait for receive to complete info!(
receive.send_eof()?; "[{}] re-creating snapshot (this can take a while)",
let mut remote_err = String::new(); snapshot
receive.stderr().read_to_string(&mut remote_err)?; );
receive.wait_close()?; let remote_path = path_as_utf8(&opt.remote.path)?;
let status = receive.exit_status()?; let tmp_path = path_as_utf8(&tmp_path)?;
if status != 0 { let cmd = format!(
anyhow::bail!("btrfs receive failed\nstderr:\n{}", remote_err); r#"cat "{}"/* | btrfs receive -e "{}""#,
tmp_path, remote_path
);
let out = do_cmd(session, &cmd)?;
let time_elapsed = start_time.elapsed();
if out.exit_status != 0 {
anyhow::bail!(
"btrfs receive failed\nstdout:\n{}\nstderr:\n{}",
out.stdout,
out.stderr
);
}
info!(
"[{}] snapshot was {} bytes, time taken was {}",
snapshot,
byte_count,
format_duration(time_elapsed)
);
if !clear_tmp_dir(opt, session)? {
anyhow::bail!("failed to remove {} dir", TMP_FOLDER);
} }
Ok(()) Ok(())

View File

@ -6,10 +6,11 @@ mod local;
mod planner; mod planner;
mod remote; mod remote;
mod snapshot; mod snapshot;
mod util;
use actions::{list, show_plan, sync}; use actions::{list, show_plan, sync};
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use clap::{AppSettings, Clap}; use clap::{crate_version, AppSettings, Clap};
use remote::Remote; use remote::Remote;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::PathBuf; use std::path::PathBuf;
@ -19,7 +20,7 @@ pub type FileList = BTreeMap<TimeStamp, String>;
/// Backup btrfs snapshots over SSH /// Backup btrfs snapshots over SSH
#[derive(Clap)] #[derive(Clap)]
#[clap(setting = AppSettings::ColoredHelp)] #[clap(version = crate_version!(), setting = AppSettings::ColoredHelp)]
pub struct Opt { pub struct Opt {
/// The path of the backup directory on the local filesystem /// The path of the backup directory on the local filesystem
#[clap(short = 'l', long)] #[clap(short = 'l', long)]

View File

@ -15,14 +15,15 @@ pub struct Remote {
} }
pub fn connect(opt: &Opt) -> anyhow::Result<Session> { pub fn connect(opt: &Opt) -> anyhow::Result<Session> {
let stream = TcpStream::connect(&opt.remote.remote)?;
let mut session = Session::new()?;
session.set_tcp_stream(stream);
session.handshake()?;
info!( info!(
r#"connecting to {}@{}"#, r#"connecting to {}@{}"#,
opt.remote.username, opt.remote.remote, opt.remote.username, opt.remote.remote,
); );
let stream = TcpStream::connect(&opt.remote.remote)?;
let mut session = Session::new()?;
session.set_tcp_stream(stream);
session.handshake()?;
session.userauth_pubkey_file( session.userauth_pubkey_file(
&opt.remote.username, &opt.remote.username,
None, None,
@ -32,6 +33,7 @@ pub fn connect(opt: &Opt) -> anyhow::Result<Session> {
if !session.authenticated() { if !session.authenticated() {
anyhow::bail!("ssh not authenticated"); anyhow::bail!("ssh not authenticated");
} }
session.set_allow_sigpipe(true);
Ok(session) Ok(session)
} }

20
src/util.rs Normal file
View File

@ -0,0 +1,20 @@
use std::path::Path;
use std::time::Duration;
pub fn format_duration(d: Duration) -> String {
let seconds = d.as_secs_f32() % 60.0;
let minutes = d.as_secs() / 60 % 60;
let hours = d.as_secs() / 60 / 60;
match (hours, minutes) {
(0, 0) => format!("{:.2}s", seconds),
(0, _) => format!("{}m {:.2}s", minutes, seconds),
(_, 0) => format!("{}h {:.2}s", hours, seconds),
(_, _) => format!("{}h {}m {:.2}s", hours, minutes, seconds),
}
}
pub fn path_as_utf8(path: &Path) -> anyhow::Result<&str> {
path.to_str()
.ok_or_else(|| anyhow::format_err!("path not utf-8"))
}