#!.venv/bin/python # Scan filesystem to generate a list of files to back up, based on a # configuration file. Pass this list to borg to actually create the # backup. Execute a notification script on the remote server to # report the backup status. import os import re import sys import stat import time import pathlib import subprocess import typing import yaml import wcmatch.glob # type: ignore import humanfriendly # type: ignore def pstr(path: bytes) -> str: return path.decode(errors='backslashreplace') class Config: root: bytes max_file_size: typing.Optional[int] one_file_system: bool exclude_caches: bool exclude: list[bytes] force_include: list[bytes] notify_email: typing.Optional[str] def __init__(self, configfile: str): # Read config with open(configfile, 'r') as f: config = yaml.safe_load(f) self.root = config['root'].encode() self.one_file_system = config.get('one-file-system', False) self.exclude_caches = config.get('exclude-caches', False) if 'max-file-size' in config: self.max_file_size = humanfriendly.parse_size( config['max-file-size']) else: self.max_file_size = None def process_match_list(config_name): raw = config.get(config_name, '').encode().split(b'\n') pats = [] # Prepend '**/' to any relative patterns for x in raw: if not len(x): continue if x.startswith(b'/'): pats.append(x) else: pats.append(b'**/' + x) return pats self.exclude = process_match_list('exclude') self.force_include = process_match_list('force-include') self.notify_email = config.get('notify-email', None) # Compile patterns flags = (wcmatch.glob.GLOBSTAR | wcmatch.glob.DOTGLOB | wcmatch.glob.NODOTDIR | wcmatch.glob.EXTGLOB | wcmatch.glob.BRACE) # Path matches if it matches at least one regex in "a" and no # regex in "b" (a, b) = wcmatch.glob.translate(self.exclude, flags=flags) self.exclude_re = ([ re.compile(x) for x in a ], [ re.compile(x) for x in b ]) (a, b) = wcmatch.glob.translate(self.force_include, flags=flags) self.force_include_re = ([ re.compile(x) for x in a ], [ re.compile(x) for x in b ]) def match_re(self, re: tuple[list[typing.Pattern], list[typing.Pattern]], path: bytes): # Path matches if it matches at least one regex in # re[0] and no regex in re[1]. for a in re[0]: if a.match(path): for b in re[1]: if b.match(path): return False return True return False class Backup: def __init__(self, config: Config, dry_run: bool): self.config = config self.dry_run = dry_run # All logged messages, with severity self.logs: list[tuple[str, str]] = [] def out(self, path: bytes): self.outfile.write(path + (b'\n' if self.dry_run else b'\0')) def log(self, letter: str, msg: str): colors = { 'E': 31, 'W': 33, 'I': 36 }; if letter in colors: c = colors[letter] else: c = 0 sys.stderr.write(f"\033[1;{c}m{letter}:\033[22m {msg}\033[0m\n") self.logs.append((letter, msg)) def run(self, outfile: typing.IO[bytes]): self.outfile = outfile # Base should not end with a slash, but full path should if self.config.root.endswith(b'/'): base = self.config.root[:-1] path = self.config.root else: base = self.config.root path = self.config.root + b'/' self.scan(base, path) def scan(self, base: bytes, path: bytes, parent_st: os.stat_result=None): """If the given path should be backed up, print it. If it's a directory and its contents should be included, recurse. """ if base.endswith(b'/'): raise Exception("base must not end with /") relpath = path[len(base):] if not relpath.startswith(b'/'): raise Exception(f"relative path (from {repr(base)}, {repr(path)})" + f" must start with /") try: st = os.lstat(path) is_dir = stat.S_ISDIR(st.st_mode) is_reg = stat.S_ISREG(st.st_mode) size = st.st_blocks * 512 # Decorated path ends with a '/' if it's a directory. decorated_path = path if is_dir and not decorated_path.endswith(b'/'): decorated_path += b'/' # See if there's a reason to exclude it exclude_reason = None if self.config.match_re(self.config.exclude_re, decorated_path): # Config file says to exclude exclude_reason = ('I', f"skipping, excluded by config file") elif (self.config.one_file_system and parent_st is not None and is_dir and st.st_dev != parent_st.st_dev): # Crosses a mount point exclude_reason = ('I', "skipping, on different filesystem") elif (is_reg and self.config.max_file_size and size > self.config.max_file_size): # Too big def format_size(n): return humanfriendly.format_size( n, keep_width=True, binary=True) a = format_size(size) b = format_size(self.config.max_file_size) exclude_reason = ('W', f"file size {a} exceeds limit {b}") # If we have a reason to exclude it, stop now unless it's # force-included force = self.config.match_re(self.config.force_include_re, decorated_path) if exclude_reason and not force: self.log(exclude_reason[0], f"{exclude_reason[1]}: {pstr(path)}") return # Print path for Borg self.out(path) # Process directories if is_dir: # Skip if it contains CACHEDIR.TAG # (mirroring the --exclude-caches borg option) if self.config.exclude_caches: try: tag = b'Signature: 8a477f597d28d172789f06886806bc55' with open(path + b'/CACHEDIR.TAG', 'rb') as f: if f.read(len(tag)) == tag: self.log( 'I', f"skipping, cache dir: {pstr(path)}") return except: pass # Recurse with os.scandir(path) as it: for entry in it: self.scan(base=base, path=entry.path, parent_st=st) except PermissionError as e: self.log('E', f"can't read {pstr(path)}") return def main(argv: list[str]): import argparse def humansize(string): return humanfriendly.parse_size(string) parser = argparse.ArgumentParser( prog=argv[0], description="Back up the local system using borg", formatter_class=argparse.ArgumentDefaultsHelpFormatter) base = pathlib.Path(__file__).parent parser.add_argument('-c', '--config', help="Config file", default=str(base / "config.yaml")) parser.add_argument('-b', '--borg', help="Borg command", default=str(base / "borg.sh")) parser.add_argument('-n', '--dry-run', action="store_true", help="Just print log output, don't run borg") parser.add_argument('-d', '--debug', action="store_true", help="Print filenames for --dry-run") args = parser.parse_args() config = Config(args.config) backup = Backup(config, args.dry_run) if args.dry_run: if args.debug: backup.run(sys.stdout.buffer) else: with open(os.devnull, "wb") as out: backup.run(out) else: borg = subprocess.Popen([args.borg, "create", "--verbose", "--list", "--filter", "E", "--stats", "--checkpoint-interval", "900", "--compression", "zstd,3", "--paths-from-stdin", "--paths-delimiter", "\\0", "::'{hostname}-{now:%Y%m%d-%H%M%S}'"], stdin=subprocess.PIPE) if borg.stdin is None: raise Exception("no pipe") try: # Give borg some time to start, just to clean up stdout time.sleep(2) backup.run(borg.stdin) except BrokenPipeError: sys.stderr.write(f"broken pipe\n") finally: try: borg.stdin.close() except BrokenPipeError: pass borg.wait() ret = borg.returncode if ret < 0: sys.stderr.write(f"error: process exited with signal {-ret}\n") return 1 elif ret != 0: sys.stderr.write(f"error: process exited with return code {ret}\n") return ret return 0 if __name__ == "__main__": import sys raise SystemExit(main(sys.argv))