#!/usr/bin/python from nilmdb.utils.printf import * from nilmdb.utils.time import (parse_time, timestamp_to_human, timestamp_to_seconds, seconds_to_timestamp) from nilmdb.utils.diskusage import human_size from nilmdb.utils.interval import Interval import nilmdb.client import nilmdb.client.numpyclient import nilmtools import argparse import ConfigParser import sys import collections import fnmatch import re def warn(msg, *args): fprintf(sys.stderr, "warning: " + msg + "\n", *args) class TimePeriod(object): _units = { 'h': ('hour', 60*60), 'd': ('day', 60*60*24), 'w': ('week', 60*60*24*7), 'm': ('month', 60*60*24*30), 'y': ('year', 60*60*24*365) } def __init__(self, val): for u in self._units: if val.endswith(u): self.unit = self._units[u][0] self.scale = self._units[u][1] self.count = float(val[:-len(u)]) break else: raise ValueError("unknown units: " + units) def seconds(self): return self.count * self.scale def describe_seconds(self, seconds): count = seconds / self.scale units = self.unit if count == 1 else (self.unit + "s") if count == int(count): return sprintf("%d %s", count, units) else: return sprintf("%.2f %s", count, units) def __str__(self): return self.describe_seconds(self.seconds()) class StreamCleanupConfig(object): def __init__(self, info): self.path = info[0] self.layout = info[1] if info[4] != 0 and info[5] != 0: self.rate = info[4] / timestamp_to_seconds(info[5]) else: self.rate = None self.keep = None self.clean_decimated = True self.decimated_from = None self.also_clean_paths = [] def main(argv = None): parser = argparse.ArgumentParser( formatter_class = argparse.RawDescriptionHelpFormatter, version = nilmtools.__version__, description = """\ Clean up old data from streams using a configuration file to specify which data to remove. The format of the config file is as follows: [/stream/path] keep = 3w # keep up to 3 weeks of data rate = 8000 # optional, used for the --estimate option decimated = false # whether to delete decimated data too (default true) [*/prep] keep = 3.5m # or 2520h or 105d or 15w or 0.29y The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks, 'm' for months, or 'y' for years. Streams paths may include wildcards. If a path is matched by more than one config section, data from the last config section counts. Decimated streams (paths containing '~decim-') are treated specially: - They don't match wildcards - When deleting data from a parent stream, data is also deleted from its decimated streams, unless decimated=false Rate is optional and is only used for the --estimate option. """) parser.add_argument("-u", "--url", action="store", default="http://localhost/nilmdb/", help="NilmDB server URL (default: %(default)s)") parser.add_argument("-y", "--yes", action="store_true", default = False, help="Actually remove the data (default: no)") parser.add_argument("-e", "--estimate", action="store_true", default = False, help="Estimate how much disk space will be used") parser.add_argument("configfile", type=argparse.FileType('r'), help="Configuration file") args = parser.parse_args(argv) # Parse config file config = ConfigParser.RawConfigParser() config.readfp(args.configfile) # List all streams client = nilmdb.client.Client(args.url) streamlist = client.stream_list(extended = True) # Create config objects streams = collections.OrderedDict() for s in streamlist: streams[s[0]] = StreamCleanupConfig(s) m = re.search(r"^(.*)~decim-[0-9]+$", s[0]) if m: streams[s[0]].decimated_from = m.group(1) # Build up configuration for section in config.sections(): matched = False for path in streams.iterkeys(): # Decimated streams only allow exact matches if streams[path].decimated_from and path != section: continue if not fnmatch.fnmatch(path, section): continue matched = True options = config.options(section) # Keep period (days, weeks, months, years) if 'keep' in options: streams[path].keep = TimePeriod(config.get(section, 'keep')) options.remove('keep') # Rate if 'rate' in options: streams[path].rate = config.getfloat(section, 'rate') options.remove('rate') # Decimated if 'decimated' in options: val = config.getboolean(section, 'decimated') streams[path].clean_decimated = val options.remove('decimated') for leftover in options: warn("option '%s' for '%s' is unknown", leftover, section) if not matched: warn("config for '%s' did not match any existing streams", section) # List all decimated streams in the parent stream's info for path in streams.keys(): src = streams[path].decimated_from if src and src in streams: if streams[src].clean_decimated: streams[src].also_clean_paths.append(path) del streams[path] # Warn about streams that aren't getting cleaned up for path in streams.keys(): if streams[path].keep is None or streams[path].keep.seconds() < 0: warn("no config for existing stream '%s'", path) del streams[path] if args.estimate: # Estimate disk usage total = 0 for path in streams.keys(): rate = streams[path].rate if not rate or rate < 0: warn("unable to estimate disk usage for stream '%s' because " "the data rate is unknown", path) continue printf("%s:\n", path) layout = streams[path].layout dtype = nilmdb.client.numpyclient.layout_to_dtype(layout) per_row = dtype.itemsize per_sec = per_row * rate printf("%17s: %s per row, %s rows per second\n", "base rate", human_size(per_row), round(rate,1)) printf("%17s: %s per hour, %s per day\n", "base size", human_size(per_sec * 3600), human_size(per_sec * 3600 * 24)) # If we'll be cleaning up decimated data, add an # estimation for how much room decimated data takes up. if streams[path].clean_decimated: d_layout = "float32_" + str(3*(int(layout.split('_')[1]))) d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout) # Assume the decimations will be a factor of 4 # sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize d_per_row = d_dtype.itemsize factor = 4.0 d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor))) per_sec += d_per_sec printf("%17s: %s per hour, %s per day\n", "with decimation", human_size(per_sec * 3600), human_size(per_sec * 3600 * 24)) keep = per_sec * streams[path].keep.seconds() printf("%17s: %s\n\n", "keep " + str(streams[path].keep), human_size(keep)) total += keep printf("Total estimated disk usage for these streams:\n") printf(" %s\n", human_size(total)) raise SystemExit(0) # Do the cleanup for path in streams: printf("%s: keep %s\n", path, streams[path].keep) # Figure out the earliest timestamp we should keep. intervals = [ Interval(start, end) for (start, end) in reversed(list(client.stream_intervals(path))) ] total = 0 keep = seconds_to_timestamp(streams[path].keep.seconds()) for i in intervals: total += i.end - i.start if total <= keep: continue remove_before = i.start + (total - keep) break else: printf(" nothing to do (only %s of data present)\n", streams[path].keep.describe_seconds( timestamp_to_seconds(total))) continue printf(" removing data before %s\n", timestamp_to_human(remove_before)) # Clean in reverse order. Since we only use the primary stream and not # the decimated streams to figure out which data to remove, removing # the primary stream last means that we might recover more nicely if # we are interrupted and restarted. clean_paths = list(reversed(streams[path].also_clean_paths)) + [ path ] for p in clean_paths: printf(" removing from %s\n", p) if args.yes: client.stream_remove(p, None, remove_before) # All done if not args.yes: printf("Note: specify --yes to actually perform removals\n") return if __name__ == "__main__": main()