|
- #!.venv/bin/python
-
- # Scan filesystem to generate a list of files to back up, based on a
- # configuration file. Pass this list to borg to actually create the
- # backup. Execute a notification script on the remote server to
- # report the backup status.
-
- import os
- import re
- import sys
- import json
- import stat
- import time
- import select
- import pathlib
- import threading
- import subprocess
- import _thread # for interrupt_main
-
- import typing
-
- import yaml
- import wcmatch.glob # type: ignore
- import humanfriendly # type: ignore
-
- def b2s(raw: bytes) -> str:
- return raw.decode(errors='backslashreplace')
-
- def format_size(n: int) -> str:
- return humanfriendly.format_size(n, keep_width=True, binary=True)
-
- # Type corresponding to patterns that are generated by
- # wcmatch.translate: two lists of compiled REs (a,b). A path matches
- # if it matches at least one regex in "a" and none in "b".
- MatchPatterns = typing.Tuple[typing.List[re.Pattern], typing.List[re.Pattern]]
-
- class Config:
- roots: typing.List[bytes]
- one_file_system: bool
- exclude_caches: bool
- exclude: MatchPatterns
- unexclude: MatchPatterns
- max_size_rules: typing.List[typing.Tuple[int, MatchPatterns]]
- notify_email: typing.Optional[str]
-
- def __init__(self, configfile: str):
-
- # Helper to process lists of patterns into regexes
- def process_match_list(config_entry):
- raw = config_entry.encode().split(b'\n')
- pats = []
- # Prepend '**/' to any relative patterns
- for x in raw:
- if not len(x):
- continue
- if x.startswith(b'/'):
- pats.append(x)
- else:
- pats.append(b'**/' + x)
-
- # Compile patterns.
- (a, b) = wcmatch.glob.translate(
- pats, flags=(wcmatch.glob.GLOBSTAR |
- wcmatch.glob.DOTGLOB |
- wcmatch.glob.NODOTDIR |
- wcmatch.glob.EXTGLOB |
- wcmatch.glob.BRACE))
- return ([ re.compile(x) for x in a ],
- [ re.compile(x) for x in b ])
-
- # Read config
- with open(configfile, 'r') as f:
- config = yaml.safe_load(f)
- self.one_file_system = config.get('one-file-system', False)
- self.exclude_caches = config.get('exclude-caches', False)
-
- raw = config.get('roots', '').encode().split(b'\n')
- self.roots = []
- for x in raw:
- if not len(x):
- continue
- self.roots.append(x)
- self.roots.sort(key=len)
-
- self.exclude = process_match_list(config.get('exclude', ''))
- self.unexclude = process_match_list(config.get('unexclude', ''))
-
- self.max_size_rules = []
- rules = { humanfriendly.parse_size(k): v
- for k, v in config.get('max-size-rules', {}).items() }
- for size in reversed(sorted(rules)):
- self.max_size_rules.append(
- (size, process_match_list(rules[size])))
-
- self.notify_email = config.get('notify-email', None)
-
- def match_re(self, r: MatchPatterns, path: bytes):
- # Path matches if it matches at least one regex in
- # r[0] and no regex in r[1].
- for a in r[0]:
- if a.match(path):
- for b in r[1]:
- if b.match(path):
- return False
- return True
- return False
-
- class Backup:
- def __init__(self, config: Config, dry_run: bool):
- self.config = config
- self.dry_run = dry_run
- self.root_seen: typing.Dict[bytes, bool] = {}
-
- # Saved log messages (which includes borg output)
- self.logs: typing.List[typing.Tuple[str, str]] = []
-
- def out(self, path: bytes):
- self.outfile.write(path + (b'\n' if self.dry_run else b'\0'))
-
- def log(self, letter: str, msg: str, bold: bool=False):
- colors = {
- 'E': 31, # red: error
- 'W': 33, # yellow: warning
- 'N': 34, # blue: notice, a weaker warning (no email generated)
- 'I': 36, # cyan: info, backup.py script output
- 'O': 37, # white: regular output from borg
- };
- c = colors[letter] if letter in colors else 0
- b = "" if bold else "\033[22m"
- sys.stdout.write(f"\033[1;{c}m{letter}:{b} {msg}\033[0m\n")
- sys.stdout.flush()
- self.logs.append((letter, msg))
-
- def run(self, outfile: typing.IO[bytes]):
- self.outfile = outfile
- for root in self.config.roots:
- if root in self.root_seen:
- self.log('I', f"ignoring root, already seen: {b2s(root)}")
- continue
-
- try:
- st = os.lstat(root)
- if not stat.S_ISDIR(st.st_mode):
- raise NotADirectoryError
- except FileNotFoundError:
- self.log('E', f"root does not exist: {b2s(root)}")
- continue
- except NotADirectoryError:
- self.log('E', f"root is not a directory: {b2s(root)}")
- continue
-
- self.log('I', f"processing root {b2s(root)}")
- self.scan(root)
-
- def scan(self, path: bytes, parent_st: os.stat_result=None):
- """If the given path should be backed up, print it. If it's
- a directory and its contents should be included, recurse.
- """
- try:
- st = os.lstat(path)
- is_dir = stat.S_ISDIR(st.st_mode)
- is_reg = stat.S_ISREG(st.st_mode)
- size = st.st_blocks * 512
-
- # Decorated path ends with a '/' if it's a directory.
- decorated_path = path
- if is_dir and not decorated_path.endswith(b'/'):
- decorated_path += b'/'
-
- # See if there's a reason to exclude it
- exclude_reason = None
-
- if self.config.match_re(self.config.exclude, decorated_path):
- # Config file says to exclude
- exclude_reason = ('I', f"skipping, excluded by config file")
-
- elif (self.config.one_file_system
- and parent_st is not None
- and is_dir
- and st.st_dev != parent_st.st_dev):
- # Crosses a mount point
- exclude_reason = ('I', "skipping, on different filesystem")
-
- elif (is_reg
- and len(self.config.max_size_rules)
- and size > self.config.max_size_rules[-1][0]):
- # Check file sizes against our list.
- # Only need to check if the size is bigger than the smallest
- # entry on the list; then, we need to check it against all rules
- # to see which one applies.
- for (max_size, patterns) in self.config.max_size_rules:
- if self.config.match_re(patterns, decorated_path):
- if size > max_size:
- a = format_size(size)
- b = format_size(max_size)
- exclude_reason = (
- 'W', f"file size {a} exceeds limit {b}")
- break
-
- # If we have a reason to exclude it, stop now unless it's
- # force-included
- force = self.config.match_re(self.config.unexclude, decorated_path)
- if exclude_reason and not force:
- self.log(exclude_reason[0],
- f"{exclude_reason[1]}: {b2s(path)}")
- return
-
- # Print path for Borg
- self.out(path)
-
- # Process directories
- if is_dir:
-
- if path in self.config.roots:
- self.root_seen[path] = True
- if decorated_path in self.config.roots:
- self.root_seen[decorated_path] = True
-
- # Skip if it contains CACHEDIR.TAG
- # (mirroring the --exclude-caches borg option)
- if self.config.exclude_caches:
- try:
- tag = b'Signature: 8a477f597d28d172789f06886806bc55'
- with open(path + b'/CACHEDIR.TAG', 'rb') as f:
- if f.read(len(tag)) == tag:
- self.log(
- 'I', f"skipping, cache dir: {b2s(path)}")
- return
- except:
- pass
-
- # Recurse
- with os.scandir(path) as it:
- for entry in it:
- self.scan(path=entry.path, parent_st=st)
-
- except (FileNotFoundError,
- IsADirectoryError,
- NotADirectoryError,
- PermissionError) as e:
- self.log('E', f"can't read {b2s(path)}: {str(e)}")
- return
-
- def run_borg(self, argv: typing.List[str],
- stdin_writer: typing.Callable[[typing.IO[bytes]],
- typing.Any]=None):
- """Run a borg command, capturing and displaying output, while feeding
- input using stdin_writer. Returns True on Borg success, False on error.
- """
- borg = subprocess.Popen(argv,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- if borg.stdin is None:
- raise Exception("no pipe")
-
- # Count warnings and errors from Borg, so we can interpret its
- # error codes correctly (e.g. ignoring exit codes if warnings
- # were all harmless).
- borg_saw_warnings = 0
- borg_saw_errors = 0
-
- # Use a thread to capture output
- def reader_thread(fh):
- nonlocal borg_saw_warnings
- nonlocal borg_saw_errors
- last_progress = 0
- for line in fh:
- try:
- data = json.loads(line)
-
- if data['type'] == 'log_message':
- changed_msg = "file changed while we backed it up"
- if data['levelname'] == 'WARNING':
- if changed_msg in data['message']:
- # harmless; don't count as a Borg warning
- outlevel = 'N'
- else:
- borg_saw_warnings += 1
- outlevel = 'W'
- output = "warning: "
- elif data['levelname'] not in ('DEBUG', 'INFO'):
- borg_saw_errors += 1
- outlevel = 'E'
- output = "error: "
- else:
- outlevel = 'O'
- output = ""
- output += data['message']
-
- elif (data['type'] == 'progress_message'
- and 'message' in data):
- outlevel = 'O'
- output = data['message']
-
- elif data['type'] == 'archive_progress':
- now = time.time()
- if now - last_progress > 10:
- last_progress = now
- def size(short: str, full: str) -> str:
- return f" {short}={format_size(data[full])}"
- outlevel = 'O'
- output = (f"progress:" +
- f" files={data['nfiles']}" +
- size('orig', 'original_size') +
- size('comp', 'compressed_size') +
- size('dedup', 'deduplicated_size'))
- else:
- continue
- else:
- # ignore unknown progress line
- continue
- except Exception as e:
- # on error, print raw line with exception
- outlevel = 'E'
- output = f"[exception: {str(e)}] " + b2s(line).rstrip()
- self.log(outlevel, output)
- fh.close()
- def _reader_thread(fh):
- try:
- return reader_thread(fh)
- except BrokenPipeError:
- pass
- except Exception:
- _thread.interrupt_main()
- reader = threading.Thread(target=_reader_thread, args=(borg.stdout,))
- reader.daemon = True
- reader.start()
-
- try:
- if stdin_writer:
- # Give borg some time to start, just to clean up stdout
- time.sleep(1)
- stdin_writer(borg.stdin)
- except BrokenPipeError:
- self.log('E', "<broken pipe>")
- finally:
- try:
- borg.stdin.close()
- except BrokenPipeError:
- pass
- borg.wait()
- reader.join()
- ret = borg.returncode
- if ret < 0:
- self.log('E', f"borg exited with signal {-ret}")
- elif ret == 2 or borg_saw_errors:
- self.log('E', f"borg exited with errors (ret={ret})")
- elif ret == 1:
- if borg_saw_warnings:
- self.log('W', f"borg exited with warnings (ret={ret})")
- else:
- return True
- elif ret != 0:
- self.log('E', f"borg exited with unknown error code {ret}")
- else:
- return True
- return False
-
- def main(argv: typing.List[str]):
- import argparse
-
- def humansize(string):
- return humanfriendly.parse_size(string)
-
- # Parse args
- parser = argparse.ArgumentParser(
- prog=argv[0],
- description="Back up the local system using borg",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
-
- base = pathlib.Path(__file__).parent
- parser.add_argument('-c', '--config',
- help="Config file", default=str(base / "config.yaml"))
- parser.add_argument('-v', '--vars',
- help="Variables file", default=str(base / "vars.sh"))
- parser.add_argument('-n', '--dry-run', action="store_true",
- help="Just print log output, don't run borg")
- parser.add_argument('-d', '--debug', action="store_true",
- help="Print filenames for --dry-run")
-
- args = parser.parse_args()
- config = Config(args.config)
- backup = Backup(config, args.dry_run)
-
- # Parse variables from vars.sh
- hostname = os.uname().nodename
- borg_sh = str(base / "borg.sh")
- notify_sh = str(base / "notify.sh")
- try:
- with open(args.vars) as f:
- for line in f:
- m = re.match(r"\s*export\s*([A-Z_]+)=(.*)", line)
- if not m:
- continue
- var = m.group(1)
- value = m.group(2)
- if var == "HOSTNAME":
- hostname = value
- if var == "BORG":
- borg_sh = value
- if var == "BORG_DIR":
- notify_sh = str(pathlib.Path(value) / "notify.sh")
- except Exception as e:
- backup.log('W', f"failed to parse variables from {args.vars}: {str(e)}")
-
- # Run backup
- if args.dry_run:
- if args.debug:
- backup.run(sys.stdout.buffer)
- else:
- with open(os.devnull, "wb") as out:
- backup.run(out)
- sys.stdout.flush()
- else:
- if backup.run_borg([borg_sh,
- "create",
- "--verbose",
- "--progress",
- "--log-json",
- "--list",
- "--filter", "E",
- "--stats",
- "--checkpoint-interval", "900",
- "--compression", "zstd,3",
- "--paths-from-stdin",
- "--paths-delimiter", "\\0",
- "::" + hostname + "-{now:%Y%m%d-%H%M%S}"],
- stdin_writer=backup.run):
-
- # backup success; run prune. Note that this won't actually free
- # space until a "./borg.sh --rw compact", because we're in
- # append-only mode.
- backup.log('I', f"pruning archives", bold=True)
- backup.run_borg([borg_sh,
- "prune",
- "--verbose",
- "--list",
- "--progress",
- "--log-json",
- "--stats",
- "--keep-within=7d",
- "--keep-daily=14",
- "--keep-weekly=8",
- "--keep-monthly=-1",
- "--glob-archives", hostname + "-????????-??????"])
-
- # See if we had any errors
- warnings = sum(1 for (letter, msg) in backup.logs if letter == 'W')
- errors = sum(1 for (letter, msg) in backup.logs if letter == 'E')
-
- def plural(num: int, word: str) -> str:
- suffix = "" if num == 1 else "s"
- return f"{num} {word}{suffix}"
-
- warnmsg = plural(warnings, "warning") if warnings else None
- errmsg = plural(errors, "error") if errors else None
-
- if not warnings and not errors:
- backup.log('I', f"backup successful", bold=True)
-
- else:
- if warnmsg:
- backup.log('W', f"reported {warnmsg}", bold=True)
- if errors:
- backup.log('E', f"reported {errmsg}", bold=True)
-
- # Send a notification of errors
- email = backup.config.notify_email
- if email and not args.dry_run:
- backup.log('I', f"sending error notification to {email}")
-
- def write_logs(title, only_include=None):
- body = [ title ]
- for (letter, msg) in backup.logs:
- if only_include and letter not in only_include:
- continue
- # Use a ":" prefix for warnings/errors/notices so that
- # the mail reader highlights them.
- if letter in "EWN":
- prefix = ":"
- else:
- prefix = " "
- body.append(f"{prefix}{letter}: {msg}")
- return "\n".join(body).encode()
-
-
- body_text = write_logs("Logged errors and warnings:", "EWN")
- body_text += "\n"
- body_text += write_logs("All logs:")
-
- # Subject summary
- if errmsg and warnmsg:
- summary = f"{errmsg}, {warnmsg}"
- elif errors:
- summary = errmsg or ""
- else:
- summary = warnmsg or ""
-
- # Call notify.sh
- res = subprocess.run([notify_sh, summary, email], input=body_text)
- if res.returncode != 0:
- backup.log('E', f"failed to send notification")
- errors += 1
-
- # Exit with an error code if we had any errors
- if errors:
- return 1
- return 0
-
- if __name__ == "__main__":
- import sys
- raise SystemExit(main(sys.argv))
|