|
- #!.venv/bin/python
-
- # Scan filesystem to generate a list of files to back up, based on a
- # configuration file. Pass this list to borg to actually create the
- # backup. Execute a notification script on the remote server to
- # report the backup status.
-
- import os
- import re
- import sys
- import stat
- import pathlib
-
- import typing
-
- import yaml
- import wcmatch.glob # type: ignore
- import humanfriendly # type: ignore
-
- class Config:
- root: str
- max_file_size: typing.Optional[int]
- one_file_system: bool
- exclude: list[bytes]
- force_include: list[bytes]
- notify_email: typing.Optional[str]
-
- def __init__(self, configfile: str):
- # Read config
- with open(configfile, 'r') as f:
- config = yaml.safe_load(f)
- self.root = config['root'].encode()
- self.one_file_system = config.get('one-file-system', False)
-
- if 'max-file-size' in config:
- self.max_file_size = humanfriendly.parse_size(
- config['max-file-size'])
- else:
- self.max_file_size = None
-
- utf = config.get('exclude', '').encode()
- self.exclude = list(filter(len, utf.split(b'\n')))
-
- utf = config.get('force-include', '').encode()
- self.force_include = list(filter(len, utf.split(b'\n')))
-
- self.notify_email = config.get('notify-email', None)
-
- # Compile patterns
- flags = (wcmatch.glob.GLOBSTAR |
- wcmatch.glob.DOTGLOB |
- wcmatch.glob.NODOTDIR |
- wcmatch.glob.EXTGLOB |
- wcmatch.glob.BRACE)
-
- # Path matches if it matches at least one regex in "a" and no
- # regex in "b"
- (a, b) = wcmatch.glob.translate(self.exclude, flags=flags)
- self.exclude_re = ([ re.compile(x) for x in a ],
- [ re.compile(x) for x in b ])
-
- (a, b) = wcmatch.glob.translate(self.force_include, flags=flags)
- self.force_include_re = ([ re.compile(x) for x in a ],
- [ re.compile(x) for x in b ])
-
- def match_compiled(self, re: tuple[list[typing.Pattern],
- list[typing.Pattern]],
- path: bytes):
- # Path matches if it matches at least one regex in
- # re[0] and no regex in re[1]
- for a in re[0]:
- if a.match(path):
- for b in re[1]:
- if b.match(path):
- return False
- return True
- return False
-
- def __str__(self):
- d = { 'root': self.root }
- if self.max_file_size:
- d['max-file-size'] = self.max_file_size
- if self.exclude:
- utf = b'\n'.join(self.exclude)
- d['exclude'] = utf.decode(errors='backslashreplace')
- if self.force_include:
- utf = b'\n'.join(self.force_include)
- d['force-include'] = utf.decode(errors='backslashreplace')
- if self.notify_email:
- d['notify-email'] = self.notify_email
- return yaml.dump(d, default_flow_style=False)
-
- class Backup:
- def __init__(self, config: Config, dry_run: bool, out: typing.BinaryIO):
- self.config = config
- self.outfile = out
- self.dry_run = dry_run
-
- # All logged messages, with severity
- self.logs: list[tuple[str, str]] = []
-
- def out(self, path: bytes):
- self.outfile.write(path + (b'\n' if self.dry_run else b'\0'))
-
- def log(self, letter: str, msg: str):
- colors = { 'E': 31, 'W': 33, 'I': 36 };
- if letter in colors:
- c = colors[letter]
- else:
- c = 0
- sys.stderr.write(f"\033[1;{c}m{letter}:\033[22m {msg}\033[0m\n")
- self.logs.append((letter, msg))
-
- def run(self):
- self.scan(self.config.root)
-
- def scan(self, path: bytes, parent_st: os.stat_result=None):
- """If the given path should be backed up, print it. If it's
- a directory and its contents should be included, recurse."""
-
- # Copy the path in string form, for logging. Otherwise, we use
- # bytes directly.
- pathstr = path.decode(errors='backslashreplace')
-
- try:
- # See if this path should be excluded or force-included
-
- # Only stat the file when we need it
- cached_st = None
- def st():
- nonlocal cached_st
- if not cached_st:
- cached_st = os.lstat(path)
- return cached_st
-
- # See if there's a reason to exclude it
- exclude_reason = None
-
- if self.config.match_compiled(self.config.exclude_re, path):
- # Config file says to exclude
- exclude_reason = ('I', f"skipping, excluded by config file")
-
- elif (stat.S_ISDIR(st().st_mode)
- and self.config.one_file_system
- and parent_st is not None
- and st().st_dev != parent_st.st_dev):
- # Crosses a mount point
- exclude_reason = ('I', "skipping, on different filesystem")
-
- elif (stat.S_ISREG(st().st_mode)
- and self.config.max_file_size
- and st().st_size > self.config.max_file_size):
- # Too big
- def format_size(n):
- return humanfriendly.format_size(
- n, keep_width=True, binary=True)
- a = format_size(st().st_size)
- b = format_size(self.config.max_file_size)
- exclude_reason = ('W', f"file size {a} exceeds limit {b}")
-
- # If we have a reason to exclude it, stop now unless it's
- # force-included
- if (exclude_reason
- and not self.config.match_compiled(
- self.config.force_include_re, path)):
-
- self.log(exclude_reason[0], f"{exclude_reason[1]}: {pathstr}")
- return
-
- # Print name of this path
- self.out(path)
-
- # If it's a directory, recurse
- if stat.S_ISDIR(st().st_mode):
- with os.scandir(path) as it:
- for entry in it:
- self.scan(path=entry.path, parent_st=st())
-
- except PermissionError as e:
- self.log('E', f"can't read {pathstr}")
- return
-
- def main(argv: list[str]):
- import argparse
-
- def humansize(string):
- return humanfriendly.parse_size(string)
-
- parser = argparse.ArgumentParser(
- prog=argv[0],
- description="Back up the local system using borg",
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
-
- default_config = str(pathlib.Path(__file__).parent / "config.yaml")
- parser.add_argument('-c', '--config',
- help="Config file", default=default_config)
- parser.add_argument('-n', '--dry-run', action="store_true",
- help="Just print filenames, don't run borg")
-
- args = parser.parse_args()
- config = Config(args.config)
- backup = Backup(config, args.dry_run, sys.stdout.buffer)
- backup.run()
-
- if __name__ == "__main__":
- import sys
- main(sys.argv)
|