514 lines
19 KiB
Python
Executable File
514 lines
19 KiB
Python
Executable File
#!.venv/bin/python
|
|
|
|
# Scan filesystem to generate a list of files to back up, based on a
|
|
# configuration file. Pass this list to borg to actually create the
|
|
# backup. Execute a notification script on the remote server to
|
|
# report the backup status.
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import stat
|
|
import time
|
|
import select
|
|
import pathlib
|
|
import threading
|
|
import subprocess
|
|
import _thread # for interrupt_main
|
|
|
|
import typing
|
|
|
|
import yaml
|
|
import wcmatch.glob # type: ignore
|
|
import humanfriendly # type: ignore
|
|
|
|
def b2s(raw: bytes) -> str:
|
|
return raw.decode(errors='backslashreplace')
|
|
|
|
def format_size(n: int) -> str:
|
|
return humanfriendly.format_size(n, keep_width=True, binary=True)
|
|
|
|
# Type corresponding to patterns that are generated by
|
|
# wcmatch.translate: two lists of compiled REs (a,b). A path matches
|
|
# if it matches at least one regex in "a" and none in "b".
|
|
MatchPatterns = typing.Tuple[typing.List[re.Pattern], typing.List[re.Pattern]]
|
|
|
|
class Config:
|
|
roots: typing.List[bytes]
|
|
one_file_system: bool
|
|
exclude_caches: bool
|
|
exclude: MatchPatterns
|
|
unexclude: MatchPatterns
|
|
max_size_rules: typing.List[typing.Tuple[int, MatchPatterns]]
|
|
notify_email: typing.Optional[str]
|
|
|
|
def __init__(self, configfile: str):
|
|
|
|
# Helper to process lists of patterns into regexes
|
|
def process_match_list(config_entry):
|
|
raw = config_entry.encode().split(b'\n')
|
|
pats = []
|
|
# Prepend '**/' to any relative patterns
|
|
for x in raw:
|
|
if not len(x):
|
|
continue
|
|
if x.startswith(b'/'):
|
|
pats.append(x)
|
|
else:
|
|
pats.append(b'**/' + x)
|
|
|
|
# Compile patterns.
|
|
(a, b) = wcmatch.glob.translate(
|
|
pats, flags=(wcmatch.glob.GLOBSTAR |
|
|
wcmatch.glob.DOTGLOB |
|
|
wcmatch.glob.NODOTDIR |
|
|
wcmatch.glob.EXTGLOB |
|
|
wcmatch.glob.BRACE))
|
|
return ([ re.compile(x) for x in a ],
|
|
[ re.compile(x) for x in b ])
|
|
|
|
# Read config
|
|
with open(configfile, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
self.one_file_system = config.get('one-file-system', False)
|
|
self.exclude_caches = config.get('exclude-caches', False)
|
|
|
|
raw = config.get('roots', '').encode().split(b'\n')
|
|
self.roots = []
|
|
for x in raw:
|
|
if not len(x):
|
|
continue
|
|
self.roots.append(x)
|
|
self.roots.sort(key=len)
|
|
|
|
self.exclude = process_match_list(config.get('exclude', ''))
|
|
self.unexclude = process_match_list(config.get('unexclude', ''))
|
|
|
|
self.max_size_rules = []
|
|
rules = { humanfriendly.parse_size(k): v
|
|
for k, v in config.get('max-size-rules', {}).items() }
|
|
for size in reversed(sorted(rules)):
|
|
self.max_size_rules.append(
|
|
(size, process_match_list(rules[size])))
|
|
|
|
self.notify_email = config.get('notify-email', None)
|
|
|
|
def match_re(self, r: MatchPatterns, path: bytes):
|
|
# Path matches if it matches at least one regex in
|
|
# r[0] and no regex in r[1].
|
|
for a in r[0]:
|
|
if a.match(path):
|
|
for b in r[1]:
|
|
if b.match(path):
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
class Backup:
|
|
def __init__(self, config: Config, dry_run: bool):
|
|
self.config = config
|
|
self.dry_run = dry_run
|
|
self.root_seen: typing.Dict[bytes, bool] = {}
|
|
|
|
# Saved log messages (which includes borg output)
|
|
self.logs: typing.List[typing.Tuple[str, str]] = []
|
|
|
|
def out(self, path: bytes):
|
|
self.outfile.write(path + (b'\n' if self.dry_run else b'\0'))
|
|
|
|
def log(self, letter: str, msg: str, bold: bool=False):
|
|
colors = {
|
|
'E': 31, # red: error
|
|
'W': 33, # yellow: warning
|
|
'N': 34, # blue: notice, a weaker warning (no email generated)
|
|
'I': 36, # cyan: info, backup.py script output
|
|
'O': 37, # white: regular output from borg
|
|
};
|
|
c = colors[letter] if letter in colors else 0
|
|
b = "" if bold else "\033[22m"
|
|
sys.stdout.write(f"\033[1;{c}m{letter}:{b} {msg}\033[0m\n")
|
|
sys.stdout.flush()
|
|
self.logs.append((letter, msg))
|
|
|
|
def run(self, outfile: typing.IO[bytes]):
|
|
self.outfile = outfile
|
|
for root in self.config.roots:
|
|
if root in self.root_seen:
|
|
self.log('I', f"ignoring root, already seen: {b2s(root)}")
|
|
continue
|
|
|
|
try:
|
|
st = os.lstat(root)
|
|
if not stat.S_ISDIR(st.st_mode):
|
|
raise NotADirectoryError
|
|
except FileNotFoundError:
|
|
self.log('E', f"root does not exist: {b2s(root)}")
|
|
continue
|
|
except NotADirectoryError:
|
|
self.log('E', f"root is not a directory: {b2s(root)}")
|
|
continue
|
|
|
|
self.log('I', f"processing root {b2s(root)}")
|
|
self.scan(root)
|
|
|
|
def scan(self, path: bytes, parent_st: os.stat_result=None):
|
|
"""If the given path should be backed up, print it. If it's
|
|
a directory and its contents should be included, recurse.
|
|
"""
|
|
try:
|
|
st = os.lstat(path)
|
|
is_dir = stat.S_ISDIR(st.st_mode)
|
|
is_reg = stat.S_ISREG(st.st_mode)
|
|
size = st.st_blocks * 512
|
|
|
|
# Decorated path ends with a '/' if it's a directory.
|
|
decorated_path = path
|
|
if is_dir and not decorated_path.endswith(b'/'):
|
|
decorated_path += b'/'
|
|
|
|
# See if there's a reason to exclude it
|
|
exclude_reason = None
|
|
|
|
if self.config.match_re(self.config.exclude, decorated_path):
|
|
# Config file says to exclude
|
|
exclude_reason = ('I', f"skipping, excluded by config file")
|
|
|
|
elif (self.config.one_file_system
|
|
and parent_st is not None
|
|
and is_dir
|
|
and st.st_dev != parent_st.st_dev):
|
|
# Crosses a mount point
|
|
exclude_reason = ('I', "skipping, on different filesystem")
|
|
|
|
elif (is_reg
|
|
and len(self.config.max_size_rules)
|
|
and size > self.config.max_size_rules[-1][0]):
|
|
# Check file sizes against our list.
|
|
# Only need to check if the size is bigger than the smallest
|
|
# entry on the list; then, we need to check it against all rules
|
|
# to see which one applies.
|
|
for (max_size, patterns) in self.config.max_size_rules:
|
|
if self.config.match_re(patterns, decorated_path):
|
|
if size > max_size:
|
|
a = format_size(size)
|
|
b = format_size(max_size)
|
|
exclude_reason = (
|
|
'W', f"file size {a} exceeds limit {b}")
|
|
break
|
|
|
|
# If we have a reason to exclude it, stop now unless it's
|
|
# force-included
|
|
force = self.config.match_re(self.config.unexclude, decorated_path)
|
|
if exclude_reason and not force:
|
|
self.log(exclude_reason[0],
|
|
f"{exclude_reason[1]}: {b2s(path)}")
|
|
return
|
|
|
|
# Print path for Borg
|
|
self.out(path)
|
|
|
|
# Process directories
|
|
if is_dir:
|
|
|
|
if path in self.config.roots:
|
|
self.root_seen[path] = True
|
|
if decorated_path in self.config.roots:
|
|
self.root_seen[decorated_path] = True
|
|
|
|
# Skip if it contains CACHEDIR.TAG
|
|
# (mirroring the --exclude-caches borg option)
|
|
if self.config.exclude_caches:
|
|
try:
|
|
tag = b'Signature: 8a477f597d28d172789f06886806bc55'
|
|
with open(path + b'/CACHEDIR.TAG', 'rb') as f:
|
|
if f.read(len(tag)) == tag:
|
|
self.log(
|
|
'I', f"skipping, cache dir: {b2s(path)}")
|
|
return
|
|
except:
|
|
pass
|
|
|
|
# Recurse
|
|
with os.scandir(path) as it:
|
|
for entry in it:
|
|
self.scan(path=entry.path, parent_st=st)
|
|
|
|
except (FileNotFoundError,
|
|
IsADirectoryError,
|
|
NotADirectoryError,
|
|
PermissionError) as e:
|
|
self.log('E', f"can't read {b2s(path)}: {str(e)}")
|
|
return
|
|
|
|
def run_borg(self, argv: typing.List[str],
|
|
stdin_writer: typing.Callable[[typing.IO[bytes]],
|
|
typing.Any]=None):
|
|
"""Run a borg command, capturing and displaying output, while feeding
|
|
input using stdin_writer. Returns True on Borg success, False on error.
|
|
"""
|
|
borg = subprocess.Popen(argv,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT)
|
|
if borg.stdin is None:
|
|
raise Exception("no pipe")
|
|
|
|
# Count warnings and errors from Borg, so we can interpret its
|
|
# error codes correctly (e.g. ignoring exit codes if warnings
|
|
# were all harmless).
|
|
borg_saw_warnings = 0
|
|
borg_saw_errors = 0
|
|
|
|
# Use a thread to capture output
|
|
def reader_thread(fh):
|
|
nonlocal borg_saw_warnings
|
|
nonlocal borg_saw_errors
|
|
last_progress = 0
|
|
for line in fh:
|
|
try:
|
|
data = json.loads(line)
|
|
|
|
if data['type'] == 'log_message':
|
|
changed_msg = "file changed while we backed it up"
|
|
if data['levelname'] == 'WARNING':
|
|
if changed_msg in data['message']:
|
|
# harmless; don't count as a Borg warning
|
|
outlevel = 'N'
|
|
else:
|
|
borg_saw_warnings += 1
|
|
outlevel = 'W'
|
|
output = "warning: "
|
|
elif data['levelname'] not in ('DEBUG', 'INFO'):
|
|
borg_saw_errors += 1
|
|
outlevel = 'E'
|
|
output = "error: "
|
|
else:
|
|
outlevel = 'O'
|
|
output = ""
|
|
output += data['message']
|
|
|
|
elif (data['type'] == 'progress_message'
|
|
and 'message' in data):
|
|
outlevel = 'O'
|
|
output = data['message']
|
|
|
|
elif data['type'] == 'archive_progress':
|
|
now = time.time()
|
|
if now - last_progress > 10:
|
|
last_progress = now
|
|
def size(short: str, full: str) -> str:
|
|
return f" {short}={format_size(data[full])}"
|
|
outlevel = 'O'
|
|
output = (f"progress:" +
|
|
f" files={data['nfiles']}" +
|
|
size('orig', 'original_size') +
|
|
size('comp', 'compressed_size') +
|
|
size('dedup', 'deduplicated_size'))
|
|
else:
|
|
continue
|
|
else:
|
|
# ignore unknown progress line
|
|
continue
|
|
except Exception as e:
|
|
# on error, print raw line with exception
|
|
outlevel = 'E'
|
|
output = f"[exception: {str(e)}] " + b2s(line).rstrip()
|
|
self.log(outlevel, output)
|
|
fh.close()
|
|
def _reader_thread(fh):
|
|
try:
|
|
return reader_thread(fh)
|
|
except BrokenPipeError:
|
|
pass
|
|
except Exception:
|
|
_thread.interrupt_main()
|
|
reader = threading.Thread(target=_reader_thread, args=(borg.stdout,))
|
|
reader.daemon = True
|
|
reader.start()
|
|
|
|
try:
|
|
if stdin_writer:
|
|
# Give borg some time to start, just to clean up stdout
|
|
time.sleep(1)
|
|
stdin_writer(borg.stdin)
|
|
except BrokenPipeError:
|
|
self.log('E', "<broken pipe>")
|
|
finally:
|
|
try:
|
|
borg.stdin.close()
|
|
except BrokenPipeError:
|
|
pass
|
|
borg.wait()
|
|
reader.join()
|
|
ret = borg.returncode
|
|
if ret < 0:
|
|
self.log('E', f"borg exited with signal {-ret}")
|
|
elif ret == 2 or borg_saw_errors:
|
|
self.log('E', f"borg exited with errors (ret={ret})")
|
|
elif ret == 1:
|
|
if borg_saw_warnings:
|
|
self.log('W', f"borg exited with warnings (ret={ret})")
|
|
else:
|
|
return True
|
|
elif ret != 0:
|
|
self.log('E', f"borg exited with unknown error code {ret}")
|
|
else:
|
|
return True
|
|
return False
|
|
|
|
def main(argv: typing.List[str]):
|
|
import argparse
|
|
|
|
def humansize(string):
|
|
return humanfriendly.parse_size(string)
|
|
|
|
# Parse args
|
|
parser = argparse.ArgumentParser(
|
|
prog=argv[0],
|
|
description="Back up the local system using borg",
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
|
|
base = pathlib.Path(__file__).parent
|
|
parser.add_argument('-c', '--config',
|
|
help="Config file", default=str(base / "config.yaml"))
|
|
parser.add_argument('-v', '--vars',
|
|
help="Variables file", default=str(base / "vars.sh"))
|
|
parser.add_argument('-n', '--dry-run', action="store_true",
|
|
help="Just print log output, don't run borg")
|
|
parser.add_argument('-d', '--debug', action="store_true",
|
|
help="Print filenames for --dry-run")
|
|
|
|
args = parser.parse_args()
|
|
config = Config(args.config)
|
|
backup = Backup(config, args.dry_run)
|
|
|
|
# Parse variables from vars.sh
|
|
hostname = os.uname().nodename
|
|
borg_sh = str(base / "borg.sh")
|
|
notify_sh = str(base / "notify.sh")
|
|
try:
|
|
with open(args.vars) as f:
|
|
for line in f:
|
|
m = re.match(r"\s*export\s*([A-Z_]+)=(.*)", line)
|
|
if not m:
|
|
continue
|
|
var = m.group(1)
|
|
value = m.group(2)
|
|
if var == "HOSTNAME":
|
|
hostname = value
|
|
if var == "BORG":
|
|
borg_sh = value
|
|
if var == "BORG_DIR":
|
|
notify_sh = str(pathlib.Path(value) / "notify.sh")
|
|
except Exception as e:
|
|
backup.log('W', f"failed to parse variables from {args.vars}: {str(e)}")
|
|
|
|
# Run backup
|
|
if args.dry_run:
|
|
if args.debug:
|
|
backup.run(sys.stdout.buffer)
|
|
else:
|
|
with open(os.devnull, "wb") as out:
|
|
backup.run(out)
|
|
sys.stdout.flush()
|
|
else:
|
|
if backup.run_borg([borg_sh,
|
|
"create",
|
|
"--verbose",
|
|
"--progress",
|
|
"--log-json",
|
|
"--list",
|
|
"--filter", "E",
|
|
"--stats",
|
|
"--checkpoint-interval", "900",
|
|
"--compression", "zstd,3",
|
|
"--paths-from-stdin",
|
|
"--paths-delimiter", "\\0",
|
|
"::" + hostname + "-{now:%Y%m%d-%H%M%S}"],
|
|
stdin_writer=backup.run):
|
|
|
|
# backup success; run prune. Note that this won't actually free
|
|
# space until a "./borg.sh --rw compact", because we're in
|
|
# append-only mode.
|
|
backup.log('I', f"pruning archives", bold=True)
|
|
backup.run_borg([borg_sh,
|
|
"prune",
|
|
"--verbose",
|
|
"--list",
|
|
"--progress",
|
|
"--log-json",
|
|
"--stats",
|
|
"--keep-within=7d",
|
|
"--keep-daily=14",
|
|
"--keep-weekly=8",
|
|
"--keep-monthly=-1",
|
|
"--glob-archives", hostname + "-????????-??????"])
|
|
|
|
# See if we had any errors
|
|
warnings = sum(1 for (letter, msg) in backup.logs if letter == 'W')
|
|
errors = sum(1 for (letter, msg) in backup.logs if letter == 'E')
|
|
|
|
def plural(num: int, word: str) -> str:
|
|
suffix = "" if num == 1 else "s"
|
|
return f"{num} {word}{suffix}"
|
|
|
|
warnmsg = plural(warnings, "warning") if warnings else None
|
|
errmsg = plural(errors, "error") if errors else None
|
|
|
|
if not warnings and not errors:
|
|
backup.log('I', f"backup successful", bold=True)
|
|
|
|
else:
|
|
if warnmsg:
|
|
backup.log('W', f"reported {warnmsg}", bold=True)
|
|
if errors:
|
|
backup.log('E', f"reported {errmsg}", bold=True)
|
|
|
|
# Send a notification of errors
|
|
email = backup.config.notify_email
|
|
if email and not args.dry_run:
|
|
backup.log('I', f"sending error notification to {email}")
|
|
|
|
def write_logs(title, only_include=None):
|
|
body = [ title ]
|
|
for (letter, msg) in backup.logs:
|
|
if only_include and letter not in only_include:
|
|
continue
|
|
# Use a ":" prefix for warnings/errors/notices so that
|
|
# the mail reader highlights them.
|
|
if letter in "EWN":
|
|
prefix = ":"
|
|
else:
|
|
prefix = " "
|
|
body.append(f"{prefix}{letter}: {msg}")
|
|
return "\n".join(body).encode()
|
|
|
|
|
|
body_text = write_logs("Logged errors and warnings:", "EWN")
|
|
body_text += b"\n\n"
|
|
body_text += write_logs("All log messages:")
|
|
|
|
# Subject summary
|
|
if errmsg and warnmsg:
|
|
summary = f"{errmsg}, {warnmsg}"
|
|
elif errors:
|
|
summary = errmsg or ""
|
|
else:
|
|
summary = warnmsg or ""
|
|
|
|
# Call notify.sh
|
|
res = subprocess.run([notify_sh, summary, email], input=body_text)
|
|
if res.returncode != 0:
|
|
backup.log('E', f"failed to send notification")
|
|
errors += 1
|
|
|
|
# Exit with an error code if we had any errors
|
|
if errors:
|
|
return 1
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
raise SystemExit(main(sys.argv))
|