#!.venv/bin/python # Scan filesystem to generate a list of files to back up, based on a # configuration file. Pass this list to borg to actually create the # backup. Execute a notification script on the remote server to # report the backup status. import os import re import sys import json import stat import time import select import pathlib import threading import subprocess import _thread # for interrupt_main import typing import yaml import wcmatch.glob # type: ignore import humanfriendly # type: ignore def b2s(raw: bytes) -> str: return raw.decode(errors='backslashreplace') def format_size(n: int) -> str: return humanfriendly.format_size(n, keep_width=True, binary=True) # Type corresponding to patterns that are generated by # wcmatch.translate: two lists of compiled REs (a,b). A path matches # if it matches at least one regex in "a" and none in "b". MatchPatterns = typing.Tuple[typing.List[re.Pattern], typing.List[re.Pattern]] class Config: roots: typing.List[bytes] one_file_system: bool exclude_caches: bool exclude: MatchPatterns unexclude: MatchPatterns max_size_rules: typing.List[typing.Tuple[int, MatchPatterns]] notify_email: typing.Optional[str] def __init__(self, configfile: str): # Helper to process lists of patterns into regexes def process_match_list(config_entry): raw = config_entry.encode().split(b'\n') pats = [] # Prepend '**/' to any relative patterns for x in raw: if not len(x): continue if x.startswith(b'/'): pats.append(x) else: pats.append(b'**/' + x) # Compile patterns. (a, b) = wcmatch.glob.translate( pats, flags=(wcmatch.glob.GLOBSTAR | wcmatch.glob.DOTGLOB | wcmatch.glob.NODOTDIR | wcmatch.glob.EXTGLOB | wcmatch.glob.BRACE)) return ([ re.compile(x) for x in a ], [ re.compile(x) for x in b ]) # Read config with open(configfile, 'r') as f: config = yaml.safe_load(f) self.one_file_system = config.get('one-file-system', False) self.exclude_caches = config.get('exclude-caches', False) raw = config.get('roots', '').encode().split(b'\n') self.roots = [] for x in raw: if not len(x): continue self.roots.append(x) self.roots.sort(key=len) self.exclude = process_match_list(config.get('exclude', '')) self.unexclude = process_match_list(config.get('unexclude', '')) self.max_size_rules = [] rules = { humanfriendly.parse_size(k): v for k, v in config.get('max-size-rules', {}).items() } for size in reversed(sorted(rules)): self.max_size_rules.append( (size, process_match_list(rules[size]))) self.notify_email = config.get('notify-email', None) def match_re(self, r: MatchPatterns, path: bytes): # Path matches if it matches at least one regex in # r[0] and no regex in r[1]. for a in r[0]: if a.match(path): for b in r[1]: if b.match(path): return False return True return False class Backup: def __init__(self, config: Config, dry_run: bool): self.config = config self.dry_run = dry_run self.root_seen: typing.Dict[bytes, bool] = {} # Saved log messages (which includes borg output) self.logs: typing.List[typing.Tuple[str, str]] = [] def out(self, path: bytes): self.outfile.write(path + (b'\n' if self.dry_run else b'\0')) def log(self, letter: str, msg: str, bold: bool=False): colors = { 'E': 31, # red: error 'W': 33, # yellow: warning 'N': 34, # blue: notice, a weaker warning (no email generated) 'I': 36, # cyan: info, backup.py script output 'O': 37, # white: regular output from borg }; c = colors[letter] if letter in colors else 0 b = "" if bold else "\033[22m" sys.stdout.write(f"\033[1;{c}m{letter}:{b} {msg}\033[0m\n") sys.stdout.flush() self.logs.append((letter, msg)) def run(self, outfile: typing.IO[bytes]): self.outfile = outfile for root in self.config.roots: if root in self.root_seen: self.log('I', f"ignoring root, already seen: {b2s(root)}") continue try: st = os.lstat(root) if not stat.S_ISDIR(st.st_mode): raise NotADirectoryError except FileNotFoundError: self.log('E', f"root does not exist: {b2s(root)}") continue except NotADirectoryError: self.log('E', f"root is not a directory: {b2s(root)}") continue self.log('I', f"processing root {b2s(root)}") self.scan(root) def scan(self, path: bytes, parent_st: os.stat_result=None): """If the given path should be backed up, print it. If it's a directory and its contents should be included, recurse. """ try: st = os.lstat(path) is_dir = stat.S_ISDIR(st.st_mode) is_reg = stat.S_ISREG(st.st_mode) size = st.st_blocks * 512 # Decorated path ends with a '/' if it's a directory. decorated_path = path if is_dir and not decorated_path.endswith(b'/'): decorated_path += b'/' # See if there's a reason to exclude it exclude_reason = None if self.config.match_re(self.config.exclude, decorated_path): # Config file says to exclude exclude_reason = ('I', f"skipping, excluded by config file") elif (self.config.one_file_system and parent_st is not None and is_dir and st.st_dev != parent_st.st_dev): # Crosses a mount point exclude_reason = ('I', "skipping, on different filesystem") elif (is_reg and len(self.config.max_size_rules) and size > self.config.max_size_rules[-1][0]): # Check file sizes against our list. # Only need to check if the size is bigger than the smallest # entry on the list; then, we need to check it against all rules # to see which one applies. for (max_size, patterns) in self.config.max_size_rules: if self.config.match_re(patterns, decorated_path): if size > max_size: a = format_size(size) b = format_size(max_size) exclude_reason = ( 'W', f"file size {a} exceeds limit {b}") break # If we have a reason to exclude it, stop now unless it's # force-included force = self.config.match_re(self.config.unexclude, decorated_path) if exclude_reason and not force: self.log(exclude_reason[0], f"{exclude_reason[1]}: {b2s(path)}") return # Print path for Borg self.out(path) # Process directories if is_dir: if path in self.config.roots: self.root_seen[path] = True if decorated_path in self.config.roots: self.root_seen[decorated_path] = True # Skip if it contains CACHEDIR.TAG # (mirroring the --exclude-caches borg option) if self.config.exclude_caches: try: tag = b'Signature: 8a477f597d28d172789f06886806bc55' with open(path + b'/CACHEDIR.TAG', 'rb') as f: if f.read(len(tag)) == tag: self.log( 'I', f"skipping, cache dir: {b2s(path)}") return except: pass # Recurse with os.scandir(path) as it: for entry in it: self.scan(path=entry.path, parent_st=st) except (FileNotFoundError, IsADirectoryError, NotADirectoryError, PermissionError) as e: self.log('E', f"can't read {b2s(path)}: {str(e)}") return def run_borg(self, argv: typing.List[str], stdin_writer: typing.Callable[[typing.IO[bytes]], typing.Any]=None): """Run a borg command, capturing and displaying output, while feeding input using stdin_writer. Returns True on Borg success, False on error. """ borg = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if borg.stdin is None: raise Exception("no pipe") # Count warnings and errors from Borg, so we can interpret its # error codes correctly (e.g. ignoring exit codes if warnings # were all harmless). borg_saw_warnings = 0 borg_saw_errors = 0 # Use a thread to capture output def reader_thread(fh): nonlocal borg_saw_warnings nonlocal borg_saw_errors last_progress = 0 for line in fh: try: data = json.loads(line) if data['type'] == 'log_message': changed_msg = "file changed while we backed it up" if data['levelname'] == 'WARNING': if changed_msg in data['message']: # harmless; don't count as a Borg warning outlevel = 'N' else: borg_saw_warnings += 1 outlevel = 'W' output = "warning: " elif data['levelname'] not in ('DEBUG', 'INFO'): borg_saw_errors += 1 outlevel = 'E' output = "error: " else: outlevel = 'O' output = "" output += data['message'] elif (data['type'] == 'progress_message' and 'message' in data): outlevel = 'O' output = data['message'] elif data['type'] == 'archive_progress': now = time.time() if now - last_progress > 10: last_progress = now def size(short: str, full: str) -> str: return f" {short}={format_size(data[full])}" outlevel = 'O' output = (f"progress:" + f" files={data['nfiles']}" + size('orig', 'original_size') + size('comp', 'compressed_size') + size('dedup', 'deduplicated_size')) else: continue else: # ignore unknown progress line continue except Exception as e: # on error, print raw line with exception outlevel = 'E' output = f"[exception: {str(e)}] " + b2s(line).rstrip() self.log(outlevel, output) fh.close() def _reader_thread(fh): try: return reader_thread(fh) except BrokenPipeError: pass except Exception: _thread.interrupt_main() reader = threading.Thread(target=_reader_thread, args=(borg.stdout,)) reader.daemon = True reader.start() try: if stdin_writer: # Give borg some time to start, just to clean up stdout time.sleep(1) stdin_writer(borg.stdin) except BrokenPipeError: self.log('E', "") finally: try: borg.stdin.close() except BrokenPipeError: pass borg.wait() reader.join() ret = borg.returncode if ret < 0: self.log('E', f"borg exited with signal {-ret}") elif ret == 2 or borg_saw_errors: self.log('E', f"borg exited with errors (ret={ret})") elif ret == 1: if borg_saw_warnings: self.log('W', f"borg exited with warnings (ret={ret})") else: return True elif ret != 0: self.log('E', f"borg exited with unknown error code {ret}") else: return True return False def main(argv: typing.List[str]): import argparse def humansize(string): return humanfriendly.parse_size(string) # Parse args parser = argparse.ArgumentParser( prog=argv[0], description="Back up the local system using borg", formatter_class=argparse.ArgumentDefaultsHelpFormatter) base = pathlib.Path(__file__).parent parser.add_argument('-c', '--config', help="Config file", default=str(base / "config.yaml")) parser.add_argument('-v', '--vars', help="Variables file", default=str(base / "vars.sh")) parser.add_argument('-n', '--dry-run', action="store_true", help="Just print log output, don't run borg") parser.add_argument('-d', '--debug', action="store_true", help="Print filenames for --dry-run") args = parser.parse_args() config = Config(args.config) backup = Backup(config, args.dry_run) # Parse variables from vars.sh hostname = os.uname().nodename borg_sh = str(base / "borg.sh") notify_sh = str(base / "notify.sh") try: with open(args.vars) as f: for line in f: m = re.match(r"\s*export\s*([A-Z_]+)=(.*)", line) if not m: continue var = m.group(1) value = m.group(2) if var == "HOSTNAME": hostname = value if var == "BORG": borg_sh = value if var == "BORG_DIR": notify_sh = str(pathlib.Path(value) / "notify.sh") except Exception as e: backup.log('W', f"failed to parse variables from {args.vars}: {str(e)}") # Run backup if args.dry_run: if args.debug: backup.run(sys.stdout.buffer) else: with open(os.devnull, "wb") as out: backup.run(out) sys.stdout.flush() else: if backup.run_borg([borg_sh, "create", "--verbose", "--progress", "--log-json", "--list", "--filter", "E", "--stats", "--checkpoint-interval", "900", "--compression", "zstd,3", "--paths-from-stdin", "--paths-delimiter", "\\0", "::" + hostname + "-{now:%Y%m%d-%H%M%S}"], stdin_writer=backup.run): # backup success; run prune. Note that this won't actually free # space until a "./borg.sh --rw compact", because we're in # append-only mode. backup.log('I', f"pruning archives", bold=True) backup.run_borg([borg_sh, "prune", "--verbose", "--list", "--progress", "--log-json", "--stats", "--keep-within=7d", "--keep-daily=14", "--keep-weekly=8", "--keep-monthly=-1", "--glob-archives", hostname + "-????????-??????"]) # See if we had any errors warnings = sum(1 for (letter, msg) in backup.logs if letter == 'W') errors = sum(1 for (letter, msg) in backup.logs if letter == 'E') def plural(num: int, word: str) -> str: suffix = "" if num == 1 else "s" return f"{num} {word}{suffix}" warnmsg = plural(warnings, "warning") if warnings else None errmsg = plural(errors, "error") if errors else None if not warnings and not errors: backup.log('I', f"backup successful", bold=True) else: if warnmsg: backup.log('W', f"reported {warnmsg}", bold=True) if errors: backup.log('E', f"reported {errmsg}", bold=True) # Send a notification of errors email = backup.config.notify_email if email and not args.dry_run: backup.log('I', f"sending error notification to {email}") def write_logs(title, only_include=None): body = [ title ] for (letter, msg) in backup.logs: if only_include and letter not in only_include: continue # Use a ":" prefix for warnings/errors/notices so that # the mail reader highlights them. if letter in "EWN": prefix = ":" else: prefix = " " body.append(f"{prefix}{letter}: {msg}") return "\n".join(body).encode() body_text = write_logs("Logged errors and warnings:", "EWN") body_text += b"\n\n" body_text += write_logs("All log messages:") # Subject summary if errmsg and warnmsg: summary = f"{errmsg}, {warnmsg}" elif errors: summary = errmsg or "" else: summary = warnmsg or "" # Call notify.sh res = subprocess.run([notify_sh, summary, email], input=body_text) if res.returncode != 0: backup.log('E', f"failed to send notification") errors += 1 # Exit with an error code if we had any errors if errors: return 1 return 0 if __name__ == "__main__": import sys raise SystemExit(main(sys.argv))