Browse Source

Add initial version of backup file lister

master
Jim Paris 10 months ago
parent
commit
2dd60aaf28
5 changed files with 159 additions and 2 deletions
  1. +1
    -0
      .gitignore
  2. +10
    -2
      Makefile
  3. +6
    -0
      README.md
  4. +139
    -0
      lister.py
  5. +3
    -0
      requirements.txt

+ 1
- 0
.gitignore View File

@@ -0,0 +1 @@
venv

+ 10
- 2
Makefile View File

@@ -1,6 +1,14 @@
.PHONY: all
all: check
@echo "Use 'make deploy' to copy to https://psy.jim.sh/borg-setup.sh"
all: test-list

venv: requirements.txt
python3 -m venv venv
venv/bin/pip3 install -r requirements.txt

.PHONY: test-list
test-list: venv
venv/bin/mypy lister.py
venv/bin/python lister.py --max-size 1GiB --one-file-system /tmp >/dev/null

.PHONY: check
check:


+ 6
- 0
README.md View File

@@ -14,6 +14,12 @@

- Systemd timers start daily backups

# Setup

python3 -m venv venv
venv/bin/pip3 install -r requirements.txt
venv/bin/python3 lister.py

# Usage

Run on client:


+ 139
- 0
lister.py View File

@@ -0,0 +1,139 @@
#!/usr/bin/python3

import os
import sys
import stat
from typing import Optional
import humanfriendly # type: ignore
import igittigitt

class Lister:

def __init__(self, one_file_system: bool, max_size: bool):
self.one_file_system = one_file_system
self.max_size = max_size
if max_size is None:
max_size = float('inf')
self.stdout = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)

# Remember files we've skipped because they were too big, so that
# we can warn again at the end.
self.skipped_size: set[bytes] = set()

# Remember errors
self.skipped_error: set[bytes] = set()

# Parse gitignore-style rules to exclude files from backup
self.parser = igittigitt.IgnoreParser()

def __del__(self):
self.stdout.close()

def out(self, path: bytes):
# Use '\0\n' as a separator, so that we can both separate it
# cleanly in Borg, and also view it on stdout.
self.stdout.write(path + b'\0\n')

def log(self, letter: str, msg: str):
colors = { 'E': 31, 'W': 33, 'I': 36 };
if letter in colors:
c = colors[letter]
else:
c = 0
sys.stderr.write(f"\033[1;{c}m{letter}:\033[22m {msg}\033[0m\n")

def path_string(self, path: bytes) -> str:
return path.decode(errors='backslashreplace')

def scan(self, path: bytes, parent_st: os.stat_result=None):
"""If the given path should be backed up, print it. If it's
a directory and its contents should be included, recurse."""

# Need the path in string form, for igittigitt parser
pathstr = self.path_string(path)

try:
# Stat the path
st = os.lstat(path)

is_dir = stat.S_ISDIR(st.st_mode)

match = self.parser._match_rules(pathstr, not is_dir)
if match:
match = self.parser._match_negation_rules(pathstr)

if match:
self.log('I', f"ignored {pathstr}")
return

if is_dir:
# Skip if it crosses a mount point
if self.one_file_system:
if parent_st is not None and st.st_dev != parent_st.st_dev:
self.log('I', f"skipping {pathstr}: "
"on different filesystem")
return

# Add contents of any .nobackup file to our
# parser rules
try:
with open(os.path.join(path, b".nobackup")) as f:
for line in f:
self.parser.add_rule(line, base_path=pathstr)
except FileNotFoundError:
pass

# Recurse and process each entry
with os.scandir(path) as it:
for entry in it:
self.scan(entry.path, st)

else:
# For regular files, ensure they're not too big
if stat.S_ISREG(st.st_mode) and st.st_size > self.max_size:
def format_size(n):
return humanfriendly.format_size(
n, keep_width=True, binary=True)
a = format_size(st.st_size)
b = format_size(self.max_size)
self.log('W', f"skipping {self.path_string(path)}: "
+ f"file size {a} exceeds limit {b}")
self.skipped_size.add(path)
return

# Every other filename gets printed; devices, symlinks, etc
# will get handled by Borg
self.out(path)

except PermissionError as e:
self.log('E', f"can't read {self.path_string(path)}")
self.skipped_error.add(path)
return

def main(argv):
import argparse

def humansize(string):
return humanfriendly.parse_size(string)

parser = argparse.ArgumentParser(
prog=argv[0],
description="Build up a directory and file list for backups")

parser.add_argument('-s', '--max-size', type=humansize,
help="Ignore files bigger than this, by default")
parser.add_argument('-x', '--one-file-system', action='store_true',
help="Don't cross mount points when recursing")
parser.add_argument('dirs', metavar='DIR', nargs='+',
help="Root directories to scan recursively")

args = parser.parse_args()

lister = Lister(one_file_system=args.one_file_system,
max_size=args.max_size)
for p in args.dirs:
lister.scan(os.fsencode(p))

if __name__ == "__main__":
import sys
main(sys.argv)

+ 3
- 0
requirements.txt View File

@@ -0,0 +1,3 @@
humanfriendly>=9.2
igittigitt>=2.0.4
mypy

Loading…
Cancel
Save