From 80d642e52ef17d6f01de436dc26f05d0964e9428 Mon Sep 17 00:00:00 2001 From: Jim Paris Date: Tue, 9 Apr 2013 19:43:41 -0400 Subject: [PATCH] Change nilm-cleanup config file format, tweak output --- Makefile | 7 +- extras/cleanup.cfg | 22 ++++ setup.py | 1 + src/cleanup.py | 252 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 280 insertions(+), 2 deletions(-) create mode 100644 extras/cleanup.cfg create mode 100755 src/cleanup.py diff --git a/Makefile b/Makefile index cd33d76..b415b23 100644 --- a/Makefile +++ b/Makefile @@ -8,8 +8,11 @@ else @echo "Try 'make install'" endif -test: - src/decimate.py +test: test_cleanup + +test_cleanup: + src/cleanup.py -e extras/cleanup.cfg + src/cleanup.py -D extras/cleanup.cfg test_insert: @make install >/dev/null diff --git a/extras/cleanup.cfg b/extras/cleanup.cfg new file mode 100644 index 0000000..466606d --- /dev/null +++ b/extras/cleanup.cfg @@ -0,0 +1,22 @@ +[/lees-compressor/no-leak/prep] +keep = 2d +rate = 60 + +[*/raw] +keep = 2d + +[*/something] +rate = 10 + +[*/sinefit] +keep = 1w +decimated = False + +[/test/raw] +keep = 0.01d + +[/test/sinefit] +keep = 0.01d + +[/test/prep] +keep = 0.01d diff --git a/setup.py b/setup.py index f9ea3d8..7b2cb91 100755 --- a/setup.py +++ b/setup.py @@ -78,6 +78,7 @@ setup(name='nilmtools', 'nilm-prep = nilmtools.prep:main', 'nilm-copy-wildcard = nilmtools.copy_wildcard:main', 'nilm-sinefit = nilmtools.sinefit:main', + 'nilm-cleanup = nilmtools.cleanup:main', ], }, zip_safe = False, diff --git a/src/cleanup.py b/src/cleanup.py new file mode 100755 index 0000000..128191d --- /dev/null +++ b/src/cleanup.py @@ -0,0 +1,252 @@ +#!/usr/bin/python + +from nilmdb.utils.printf import * +from nilmdb.utils.time import (parse_time, timestamp_to_human, + timestamp_to_seconds, seconds_to_timestamp) +from nilmdb.utils.diskusage import human_size +from nilmdb.utils.interval import Interval +import nilmdb.client +import nilmdb.client.numpyclient +import nilmtools +import argparse +import ConfigParser +import sys +import collections +import fnmatch +import re + +def warn(msg, *args): + fprintf(sys.stderr, "warning: " + msg + "\n", *args) + +class TimePeriod(object): + _units = { 'h': ('hour', 60*60*24), + 'd': ('day', 60*60*24), + 'w': ('week', 60*60*24*7), + 'm': ('month', 60*60*24*30), + 'y': ('year', 60*60*24*365) } + + def __init__(self, val): + for u in self._units: + if val.endswith(u): + self.unit = self._units[u][0] + self.scale = self._units[u][1] + self.count = float(val[:-len(u)]) + break + else: + raise ValueError("unknown units: " + units) + + def seconds(self): + return self.count * self.scale + + def describe_seconds(self, seconds): + count = seconds / self.scale + units = self.unit if count == 1 else (self.unit + "s") + if count == int(count): + return sprintf("%d %s", count, units) + else: + return sprintf("%.2f %s", count, units) + + def __str__(self): + return self.describe_seconds(self.seconds()) + +class StreamCleanupConfig(object): + def __init__(self, info): + self.path = info[0] + self.layout = info[1] + if info[4] != 0 and info[5] != 0: + self.rate = info[4] / timestamp_to_seconds(info[5]) + else: + self.rate = None + self.keep = None + self.clean_decimated = True + self.decimated_from = None + self.also_clean_paths = [] + +def main(argv = None): + parser = argparse.ArgumentParser( + formatter_class = argparse.RawDescriptionHelpFormatter, + version = nilmtools.__version__, + description = """\ + Clean up old data from streams using a configuration file to specify + which data to remove. + + The format of the config file is as follows: + + [/stream/path] + keep = 3w # keep up to 3 weeks of data + rate = 8000 # optional, used for the --estimate option + decimated = false # whether to delete decimated data too (default true) + + [*/prep] + keep = 3.5m # or 2520h or 105d or 15w or 0.29y + + The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks, + 'm' for months, or 'y' for years. + + Streams paths may include wildcards. If a path is matched by more than + one config section, data from the last config section counts. + + Decimated streams (paths containing '~decim-') are treated specially: + - They don't match wildcards + - When deleting data from a parent stream, data is also deleted + from its decimated streams, unless decimated=false + + Rate is optional and is only used for the --estimate option. + """) + parser.add_argument("-u", "--url", action="store", + default="http://localhost/nilmdb/", + help="NilmDB server URL (default: %(default)s)") + parser.add_argument("-D", "--dry-run", action="store_true", + default = False, + help="Don't actually remove any data") + parser.add_argument("-e", "--estimate", action="store_true", + default = False, + help="Estimate how much disk space will be used") + parser.add_argument("configfile", type=argparse.FileType('r'), + help="Configuration file") + args = parser.parse_args(argv) + + # Parse config file + config = ConfigParser.RawConfigParser() + config.readfp(args.configfile) + + # List all streams + client = nilmdb.client.Client(args.url) + streamlist = client.stream_list(extended = True) + + # Create config objects + streams = collections.OrderedDict() + for s in streamlist: + streams[s[0]] = StreamCleanupConfig(s) + m = re.search(r"^(.*)~decim-[0-9]+$", s[0]) + if m: + streams[s[0]].decimated_from = m.group(1) + + # Build up configuration + for section in config.sections(): + matched = False + for path in streams.iterkeys(): + # Decimated streams only allow exact matches + if streams[path].decimated_from and path != section: + continue + if not fnmatch.fnmatch(path, section): + continue + matched = True + options = config.options(section) + + # Keep period (days, weeks, months, years) + if 'keep' in options: + streams[path].keep = TimePeriod(config.get(section, 'keep')) + options.remove('keep') + + # Rate + if 'rate' in options: + streams[path].rate = config.getfloat(section, 'rate') + options.remove('rate') + + # Decimated + if 'decimated' in options: + val = config.getboolean(section, 'decimated') + streams[path].clean_decimated = val + options.remove('decimated') + + for leftover in options: + warn("option '%s' for '%s' is unknown", leftover, section) + + if not matched: + warn("config for '%s' did not match any existing streams", section) + + # List all decimated streams in the parent stream's info + for path in streams.keys(): + src = streams[path].decimated_from + if src and src in streams: + if streams[src].clean_decimated: + streams[src].also_clean_paths.append(path) + del streams[path] + + # Warn about streams that aren't getting cleaned up + for path in streams.keys(): + if streams[path].keep is None or streams[path].keep.seconds() < 0: + warn("no config for existing stream '%s'", path) + del streams[path] + + if args.estimate: + # Estimate disk usage + total = 0 + for path in streams.keys(): + rate = streams[path].rate + if not rate or rate < 0: + warn("unable to estimate disk usage for stream '%s' because " + "the data rate is unknown", path) + continue + printf("%s:\n", path) + layout = streams[path].layout + dtype = nilmdb.client.numpyclient.layout_to_dtype(layout) + per_row = dtype.itemsize + per_sec = per_row * rate + printf("%17s: %s per row, %s rows per second\n", + "base rate", + human_size(per_row), + round(rate,1)) + printf("%17s: %s per hour, %s per day\n", + "base size", + human_size(per_sec * 3600), + human_size(per_sec * 3600 * 24)) + + # If we'll be cleaning up decimated data, add an + # estimation for how much room decimated data takes up. + if streams[path].clean_decimated: + d_layout = "float32_" + str(3*(int(layout.split('_')[1]))) + d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout) + # Assume the decimations will be a factor of 4 + # sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize + d_per_row = d_dtype.itemsize + factor = 4.0 + d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor))) + per_sec += d_per_sec + printf("%17s: %s per hour, %s per day\n", + "with decimation", + human_size(per_sec * 3600), + human_size(per_sec * 3600 * 24)) + + keep = per_sec * streams[path].keep.seconds() + printf("%17s: %s\n\n", + "keep " + str(streams[path].keep), human_size(keep)) + total += keep + printf("Total estimated disk usage for these streams:\n") + printf(" %s\n", human_size(total)) + raise SystemExit(0) + + # Do the cleanup + for path in streams: + printf("%s: keep %s\n", path, streams[path].keep) + + # Figure out the earliest timestamp we should keep. + intervals = [ Interval(start, end) for (start, end) in + reversed(list(client.stream_intervals(path))) ] + total = 0 + keep = seconds_to_timestamp(streams[path].keep.seconds()) + for i in intervals: + total += i.end - i.start + if total < keep: + continue + remove_before = i.start + (total - keep) + break + else: + printf(" nothing to do (only %s of data present)\n", + streams[path].keep.describe_seconds( + timestamp_to_seconds(total))) + continue + printf(" removing data before %s\n", timestamp_to_human(remove_before)) + if not args.dry_run: + client.stream_remove(path, None, remove_before) + for ap in streams[path].also_clean_paths: + printf(" also removing from %s\n", ap) + if not args.dry_run: + client.stream_remove(ap, None, remove_before) + + # All done + return + +if __name__ == "__main__": + main()