This found a small number of real bugs too, for example, this one that looked weird because of a 2to3 conversion, but was wrong both before and after: - except IndexError as TypeError: + except (IndexError, TypeError):
268 lines
9.7 KiB
Python
Executable File
268 lines
9.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from nilmdb.utils.printf import printf, fprintf, sprintf
|
|
from nilmdb.utils.time import (timestamp_to_human,
|
|
timestamp_to_seconds, seconds_to_timestamp)
|
|
from nilmdb.utils.diskusage import human_size
|
|
from nilmdb.utils.interval import Interval
|
|
import nilmdb.client
|
|
import nilmdb.client.numpyclient
|
|
import nilmtools
|
|
import argparse
|
|
import configparser
|
|
import sys
|
|
import collections
|
|
import fnmatch
|
|
import re
|
|
import os
|
|
|
|
|
|
def warn(msg, *args):
|
|
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
|
|
|
|
|
|
class TimePeriod(object):
|
|
_units = {'h': ('hour', 60*60),
|
|
'd': ('day', 60*60*24),
|
|
'w': ('week', 60*60*24*7),
|
|
'm': ('month', 60*60*24*30),
|
|
'y': ('year', 60*60*24*365)}
|
|
|
|
def __init__(self, val):
|
|
for u in self._units:
|
|
if val.endswith(u):
|
|
self.unit = self._units[u][0]
|
|
self.scale = self._units[u][1]
|
|
self.count = float(val[:-len(u)])
|
|
break
|
|
else:
|
|
raise ValueError("unknown units: " + val)
|
|
|
|
def seconds(self):
|
|
return self.count * self.scale
|
|
|
|
def describe_seconds(self, seconds):
|
|
count = seconds / self.scale
|
|
units = self.unit if count == 1 else (self.unit + "s")
|
|
if count == int(count):
|
|
return sprintf("%d %s", count, units)
|
|
else:
|
|
return sprintf("%.2f %s", count, units)
|
|
|
|
def __str__(self):
|
|
return self.describe_seconds(self.seconds())
|
|
|
|
|
|
class StreamCleanupConfig(object):
|
|
def __init__(self, info):
|
|
self.path = info[0]
|
|
self.layout = info[1]
|
|
if info[4] != 0 and info[5] != 0:
|
|
self.rate = info[4] / timestamp_to_seconds(info[5])
|
|
else:
|
|
self.rate = None
|
|
self.keep = None
|
|
self.clean_decimated = True
|
|
self.decimated_from = None
|
|
self.also_clean_paths = []
|
|
|
|
|
|
def main(argv=None):
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description="""\
|
|
Clean up old data from streams using a configuration file to specify
|
|
which data to remove.
|
|
|
|
The format of the config file is as follows:
|
|
|
|
[/stream/path]
|
|
keep = 3w # keep up to 3 weeks of data
|
|
rate = 8000 # optional, used for the --estimate option
|
|
decimated = false # whether to delete decimated data too (default true)
|
|
|
|
[*/prep]
|
|
keep = 3.5m # or 2520h or 105d or 15w or 0.29y
|
|
|
|
The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks,
|
|
'm' for months, or 'y' for years.
|
|
|
|
Streams paths may include wildcards. If a path is matched by more than
|
|
one config section, data from the last config section counts.
|
|
|
|
Decimated streams (paths containing '~decim-') are treated specially:
|
|
- They don't match wildcards
|
|
- When deleting data from a parent stream, data is also deleted
|
|
from its decimated streams, unless decimated=false
|
|
|
|
Rate is optional and is only used for the --estimate option.
|
|
""")
|
|
parser.add_argument("-v", "--version", action="version",
|
|
version=nilmtools.__version__)
|
|
def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
|
|
parser.add_argument("-u", "--url", action="store", default=def_url,
|
|
help="NilmDB server URL (default: %(default)s)")
|
|
parser.add_argument("-y", "--yes", action="store_true",
|
|
default=False,
|
|
help="Actually remove the data (default: no)")
|
|
parser.add_argument("-e", "--estimate", action="store_true",
|
|
default=False,
|
|
help="Estimate how much disk space will be used")
|
|
parser.add_argument("configfile", type=argparse.FileType('r'),
|
|
help="Configuration file")
|
|
args = parser.parse_args(argv)
|
|
|
|
# Parse config file
|
|
config = configparser.RawConfigParser()
|
|
config.readfp(args.configfile)
|
|
|
|
# List all streams
|
|
client = nilmdb.client.Client(args.url)
|
|
streamlist = client.stream_list(extended=True)
|
|
|
|
# Create config objects
|
|
streams = collections.OrderedDict()
|
|
for s in streamlist:
|
|
streams[s[0]] = StreamCleanupConfig(s)
|
|
m = re.search(r"^(.*)~decim-[0-9]+$", s[0])
|
|
if m:
|
|
streams[s[0]].decimated_from = m.group(1)
|
|
|
|
# Build up configuration
|
|
for section in config.sections():
|
|
matched = False
|
|
for path in streams.keys():
|
|
# Decimated streams only allow exact matches
|
|
if streams[path].decimated_from and path != section:
|
|
continue
|
|
if not fnmatch.fnmatch(path, section):
|
|
continue
|
|
matched = True
|
|
options = config.options(section)
|
|
|
|
# Keep period (days, weeks, months, years)
|
|
if 'keep' in options:
|
|
streams[path].keep = TimePeriod(config.get(section, 'keep'))
|
|
options.remove('keep')
|
|
|
|
# Rate
|
|
if 'rate' in options:
|
|
streams[path].rate = config.getfloat(section, 'rate')
|
|
options.remove('rate')
|
|
|
|
# Decimated
|
|
if 'decimated' in options:
|
|
val = config.getboolean(section, 'decimated')
|
|
streams[path].clean_decimated = val
|
|
options.remove('decimated')
|
|
|
|
for leftover in options:
|
|
warn("option '%s' for '%s' is unknown", leftover, section)
|
|
|
|
if not matched:
|
|
warn("config for '%s' did not match any existing streams", section)
|
|
|
|
# List all decimated streams in the parent stream's info
|
|
for path in list(streams.keys()):
|
|
src = streams[path].decimated_from
|
|
if src and src in streams:
|
|
if streams[src].clean_decimated:
|
|
streams[src].also_clean_paths.append(path)
|
|
del streams[path]
|
|
|
|
# Warn about streams that aren't getting cleaned up
|
|
for path in list(streams.keys()):
|
|
if streams[path].keep is None or streams[path].keep.seconds() < 0:
|
|
warn("no config for existing stream '%s'", path)
|
|
del streams[path]
|
|
|
|
if args.estimate:
|
|
# Estimate disk usage
|
|
total = 0
|
|
for path in list(streams.keys()):
|
|
rate = streams[path].rate
|
|
if not rate or rate < 0:
|
|
warn("unable to estimate disk usage for stream '%s' because "
|
|
"the data rate is unknown", path)
|
|
continue
|
|
printf("%s:\n", path)
|
|
layout = streams[path].layout
|
|
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
|
|
per_row = dtype.itemsize
|
|
per_sec = per_row * rate
|
|
printf("%17s: %s per row, %s rows per second\n",
|
|
"base rate",
|
|
human_size(per_row),
|
|
round(rate, 1))
|
|
printf("%17s: %s per hour, %s per day\n",
|
|
"base size",
|
|
human_size(per_sec * 3600),
|
|
human_size(per_sec * 3600 * 24))
|
|
|
|
# If we'll be cleaning up decimated data, add an
|
|
# estimation for how much room decimated data takes up.
|
|
if streams[path].clean_decimated:
|
|
d_layout = "float32_" + str(3*(int(layout.split('_')[1])))
|
|
d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout)
|
|
# Assume the decimations will be a factor of 4
|
|
# sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize
|
|
d_per_row = d_dtype.itemsize
|
|
factor = 4.0
|
|
d_per_sec = (d_per_row *
|
|
(rate / factor) *
|
|
(1 / (1 - (1/factor))))
|
|
per_sec += d_per_sec
|
|
printf("%17s: %s per hour, %s per day\n",
|
|
"with decimation",
|
|
human_size(per_sec * 3600),
|
|
human_size(per_sec * 3600 * 24))
|
|
|
|
keep = per_sec * streams[path].keep.seconds()
|
|
printf("%17s: %s\n\n",
|
|
"keep " + str(streams[path].keep), human_size(keep))
|
|
total += keep
|
|
printf("Total estimated disk usage for these streams:\n")
|
|
printf(" %s\n", human_size(total))
|
|
raise SystemExit(0)
|
|
|
|
# Do the cleanup
|
|
for path in streams:
|
|
printf("%s: keep %s\n", path, streams[path].keep)
|
|
|
|
# Figure out the earliest timestamp we should keep.
|
|
intervals = [Interval(start, end) for (start, end) in
|
|
reversed(list(client.stream_intervals(path)))]
|
|
total = 0
|
|
keep = seconds_to_timestamp(streams[path].keep.seconds())
|
|
for i in intervals:
|
|
total += i.end - i.start
|
|
if total <= keep:
|
|
continue
|
|
remove_before = i.start + (total - keep)
|
|
break
|
|
else:
|
|
printf(" nothing to do (only %s of data present)\n",
|
|
streams[path].keep.describe_seconds(
|
|
timestamp_to_seconds(total)))
|
|
continue
|
|
printf(" removing data before %s\n",
|
|
timestamp_to_human(remove_before))
|
|
# Clean in reverse order. Since we only use the primary stream and not
|
|
# the decimated streams to figure out which data to remove, removing
|
|
# the primary stream last means that we might recover more nicely if
|
|
# we are interrupted and restarted.
|
|
clean_paths = list(reversed(streams[path].also_clean_paths)) + [path]
|
|
for p in clean_paths:
|
|
printf(" removing from %s\n", p)
|
|
if args.yes:
|
|
client.stream_remove(p, None, remove_before)
|
|
|
|
# All done
|
|
if not args.yes:
|
|
printf("Note: specify --yes to actually perform removals\n")
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|