@@ -23,7 +23,7 @@ def pstr(path: bytes) -> str:
return path.decode(errors='backslashreplace')
class Config:
root: bytes
roots : list[ bytes]
max_file_size: typing.Optional[int]
one_file_system: bool
exclude_caches: bool
@@ -35,7 +35,6 @@ class Config:
# Read config
with open(configfile, 'r') as f:
config = yaml.safe_load(f)
self.root = config['root'].encode()
self.one_file_system = config.get('one-file-system', False)
self.exclude_caches = config.get('exclude-caches', False)
@@ -45,6 +44,14 @@ class Config:
else:
self.max_file_size = None
raw = config.get('roots', '').encode().split(b'\n')
self.roots = []
for x in raw:
if not len(x):
continue
self.roots.append(x)
self.roots.sort(key=len)
def process_match_list(config_name):
raw = config.get(config_name, '').encode().split(b'\n')
pats = []
@@ -96,6 +103,7 @@ class Backup:
def __init__(self, config: Config, dry_run: bool):
self.config = config
self.dry_run = dry_run
self.root_seen: dict[bytes, bool] = {}
# All logged messages, with severity
self.logs: list[tuple[str, str]] = []
@@ -114,28 +122,29 @@ class Backup:
def run(self, outfile: typing.IO[bytes]):
self.outfile = outfile
# Base should not end with a slash, but full path should
if self.config.root.endswith(b'/'):
base = self.config.root[:-1]
path = self.config.root
else:
base = self.config.root
path = self.config.root + b'/'
self.scan(base, path)
for root in self.config.roots:
if root in self.root_seen:
self.log('I', f"ignoring root, already seen: {pstr(root)}")
continue
def scan(self, base: bytes, path: bytes,
parent_st: os.stat_result=None):
try:
st = os.lstat(root)
if not stat.S_ISDIR(st.st_mode):
raise NotADirectoryError
except FileNotFoundError:
self.log('W', f"ignoring root, does not exist: {pstr(root)}")
continue
except NotADirectoryError:
self.log('W', f"ignoring root, not a directory: {pstr(root)}")
continue
self.log('I', f"processing root {pstr(root)}")
self.scan(root)
def scan(self, path: bytes, parent_st: os.stat_result=None):
"""If the given path should be backed up, print it. If it's
a directory and its contents should be included, recurse.
"""
if base.endswith(b'/'):
raise Exception("base must not end with /")
relpath = path[len(base):]
if not relpath.startswith(b'/'):
raise Exception(f"relative path (from {repr(base)}, {repr(path)})"
+ f" must start with /")
try:
st = os.lstat(path)
is_dir = stat.S_ISDIR(st.st_mode)
@@ -187,6 +196,11 @@ class Backup:
# Process directories
if is_dir:
if path in self.config.roots:
self.root_seen[path] = True
if decorated_path in self.config.roots:
self.root_seen[decorated_path] = True
# Skip if it contains CACHEDIR.TAG
# (mirroring the --exclude-caches borg option)
if self.config.exclude_caches:
@@ -203,8 +217,7 @@ class Backup:
# Recurse
with os.scandir(path) as it:
for entry in it:
self.scan(base=base, path=entry.path,
parent_st=st)
self.scan(path=entry.path, parent_st=st)
except PermissionError as e:
self.log('E', f"can't read {pstr(path)}")