|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- #!/usr/bin/python3
-
- import os
- import sys
- import stat
- from typing import Optional
- import humanfriendly # type: ignore
- import wcmatch.glob # type: ignore
- import re
- import dataclasses
- import enum
-
- class MatchResult(Enum):
- INCLUDE_IF_SIZE_OK = 0
- INCLUDE_ALWAYS = 1
- EXCLUDE_ALWAYS = 2
-
- @dataclasses.dataclass
- class PatternRule:
- re_inc: list[re.Pattern]
- re_exc: list[re.Pattern]
-
- def match(self, path: str) -> (bool, bool):
- if "big" in path:
- print(self, file=sys.stderr)
-
- for inc in self.re_inc:
- if inc.match(path):
- break
- else:
- return
-
- for exc in self.re_exc:
- if exc.match(path):
- return False
- return True
-
- class Lister:
- def __init__(self, one_file_system: bool, max_size: bool):
- self.one_file_system = one_file_system
- self.max_size = max_size
- if max_size is None:
- max_size = float('inf')
- self.stdout = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
-
- # Remember files we've skipped because they were too big, so that
- # we can warn again at the end.
- self.skipped_size: set[bytes] = set()
-
- # Remember errors
- self.skipped_error: set[bytes] = set()
-
- def __del__(self):
- self.stdout.close()
-
- def out(self, path: bytes):
- # Use '\0\n' as a separator, so that we can both separate it
- # cleanly in Borg, and also view it on stdout.
- self.stdout.write(path + b'\0\n')
-
- def log(self, letter: str, msg: str):
- colors = { 'E': 31, 'W': 33, 'I': 36 };
- if letter in colors:
- c = colors[letter]
- else:
- c = 0
- sys.stderr.write(f"\033[1;{c}m{letter}:\033[22m {msg}\033[0m\n")
-
- def scan(self, path: bytes,
- parent_st: os.stat_result=None,
- rules: list[PatternRule]=[]):
- """If the given path should be backed up, print it. If it's
- a directory and its contents should be included, recurse."""
-
- # Copy the path in string form, for logging and pathspec
- # parsing. Otherwise, we use bytes directly.
- pathstr = path.decode(errors='backslashreplace')
-
- try:
- # See if we match any rules
- for r in rules:
- if r.match(pathstr):
- self.log('I', f"ignore {pathstr}")
- return
-
- # Stat the path
- st = os.lstat(path)
- is_dir = stat.S_ISDIR(st.st_mode)
-
- if is_dir:
- # Skip if it crosses a mount point
- if self.one_file_system:
- if parent_st is not None and st.st_dev != parent_st.st_dev:
- self.log('I', f"skipping {pathstr}: "
- "on different filesystem")
- return
-
- # Add contents of any .nobackup file to our
- # parser rules
- child_rules = rules
-
- try:
- def prepend_base(regex):
- if regex[0] != '^':
- raise Exception(f'bad regex: {regex}')
- return '^' + os.path.join(pathstr, '') + regex[1:]
- with open(os.path.join(path, b".nobackup")) as f:
- rule = PatternRule([], [])
- for line in f:
- if line[0] == '#':
- continue
- (inc, exc) = wcmatch.glob.translate(
- [ line.rstrip('\r\n') ],
- flags=(wcmatch.glob.NEGATE |
- wcmatch.glob.GLOBSTAR |
- wcmatch.glob.DOTGLOB |
- wcmatch.glob.EXTGLOB |
- wcmatch.glob.BRACE))
- for x in inc:
- rule.re_inc.append(re.compile(prepend_base(x)))
- for x in exc:
- rule.re_exc.append(re.compile(prepend_base(x)))
- child_rules.append(rule)
- except FileNotFoundError:
- pass
-
- # Recurse and process each entry
- with os.scandir(path) as it:
- for entry in it:
- self.scan(entry.path, st, child_rules)
-
- else:
- # For regular files, ensure they're not too big
- if stat.S_ISREG(st.st_mode) and st.st_size > self.max_size:
- def format_size(n):
- return humanfriendly.format_size(
- n, keep_width=True, binary=True)
- a = format_size(st.st_size)
- b = format_size(self.max_size)
- self.log('W', f"skipping {pathstr}: "
- + f"file size {a} exceeds limit {b}")
- self.skipped_size.add(path)
- return
-
- # Every other filename gets printed; devices, symlinks, etc
- # will get handled by Borg
- self.out(path)
-
- except PermissionError as e:
- self.log('E', f"can't read {pathstr}")
- self.skipped_error.add(path)
- return
-
- def main(argv):
- import argparse
-
- def humansize(string):
- return humanfriendly.parse_size(string)
-
- parser = argparse.ArgumentParser(
- prog=argv[0],
- description="Build up a directory and file list for backups")
-
- parser.add_argument('-s', '--max-size', type=humansize,
- help="Ignore files bigger than this, by default")
- parser.add_argument('-x', '--one-file-system', action='store_true',
- help="Don't cross mount points when recursing")
- parser.add_argument('dirs', metavar='DIR', nargs='+',
- help="Root directories to scan recursively")
-
- args = parser.parse_args()
-
- lister = Lister(one_file_system=args.one_file_system,
- max_size=args.max_size)
- for p in args.dirs:
- lister.scan(os.fsencode(p))
-
- if __name__ == "__main__":
- import sys
- main(sys.argv)
|