This found a small number of real bugs too, for example, this one that looked weird because of a 2to3 conversion, but was wrong both before and after: - except IndexError as TypeError: + except (IndexError, TypeError):
394 lines
16 KiB
Python
394 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import nilmdb.client
|
|
from nilmdb.client import Client
|
|
from nilmdb.client.numpyclient import NumpyClient
|
|
from nilmdb.utils.printf import printf, sprintf
|
|
from nilmdb.utils.interval import Interval
|
|
|
|
import nilmtools
|
|
|
|
import os
|
|
import argparse
|
|
import numpy as np
|
|
import functools
|
|
|
|
|
|
class ArgumentError(Exception):
|
|
pass
|
|
|
|
|
|
class MissingDestination(Exception):
|
|
def __init__(self, args, src, dest):
|
|
self.parsed_args = args
|
|
self.src = src
|
|
self.dest = dest
|
|
Exception.__init__(self, f"destination path {dest.path} not found")
|
|
|
|
|
|
class StreamInfo(object):
|
|
def __init__(self, url, info):
|
|
self.url = url
|
|
self.info = info
|
|
try:
|
|
self.path = info[0]
|
|
self.layout = info[1]
|
|
self.layout_type = self.layout.split('_')[0]
|
|
self.layout_count = int(self.layout.split('_')[1])
|
|
self.total_count = self.layout_count + 1
|
|
self.timestamp_min = info[2]
|
|
self.timestamp_max = info[3]
|
|
self.rows = info[4]
|
|
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
|
|
except (IndexError, TypeError):
|
|
pass
|
|
|
|
def string(self, interhost):
|
|
"""Return stream info as a string. If interhost is true,
|
|
include the host URL."""
|
|
if interhost:
|
|
return sprintf("[%s] ", self.url) + str(self)
|
|
return str(self)
|
|
|
|
def __str__(self):
|
|
"""Return stream info as a string."""
|
|
return sprintf("%s (%s), %.2fM rows, %.2f hours",
|
|
self.path, self.layout, self.rows / 1e6,
|
|
self.seconds / 3600.0)
|
|
|
|
|
|
def get_stream_info(client, path):
|
|
"""Return a StreamInfo object about the given path, or None if it
|
|
doesn't exist"""
|
|
streams = client.stream_list(path, extended=True)
|
|
if len(streams) != 1:
|
|
return None
|
|
return StreamInfo(client.geturl(), streams[0])
|
|
|
|
|
|
# Filter processing for a single interval of data.
|
|
def process_numpy_interval(interval, extractor, inserter, warn_rows,
|
|
function, args=None):
|
|
"""For the given 'interval' of data, extract data, process it
|
|
through 'function', and insert the result.
|
|
|
|
'extractor' should be a function like NumpyClient.stream_extract_numpy
|
|
but with the the interval 'start' and 'end' as the only parameters,
|
|
e.g.:
|
|
extractor = functools.partial(NumpyClient.stream_extract_numpy,
|
|
src_path, layout = l, maxrows = m)
|
|
|
|
'inserter' should be a function like NumpyClient.stream_insert_context
|
|
but with the interval 'start' and 'end' as the only parameters, e.g.:
|
|
inserter = functools.partial(NumpyClient.stream_insert_context,
|
|
dest_path)
|
|
|
|
If 'warn_rows' is not None, print a warning to stdout when the
|
|
number of unprocessed rows exceeds this amount.
|
|
|
|
See process_numpy for details on 'function' and 'args'.
|
|
"""
|
|
if args is None:
|
|
args = []
|
|
|
|
with inserter(interval.start, interval.end) as insert_ctx:
|
|
insert_func = insert_ctx.insert
|
|
old_array = np.array([])
|
|
for new_array in extractor(interval.start, interval.end):
|
|
# If we still had old data left, combine it
|
|
if old_array.shape[0] != 0:
|
|
array = np.vstack((old_array, new_array))
|
|
else:
|
|
array = new_array
|
|
|
|
# Pass the data to the user provided function
|
|
processed = function(array, interval, args, insert_func, False)
|
|
|
|
# Send any pending data that the user function inserted
|
|
insert_ctx.send()
|
|
|
|
# Save the unprocessed parts
|
|
if processed >= 0:
|
|
old_array = array[processed:]
|
|
else:
|
|
raise Exception(
|
|
sprintf("%s return value %s must be >= 0",
|
|
str(function), str(processed)))
|
|
|
|
# Warn if there's too much data remaining
|
|
if warn_rows is not None and old_array.shape[0] > warn_rows:
|
|
printf("warning: %d unprocessed rows in buffer\n",
|
|
old_array.shape[0])
|
|
|
|
# Last call for this contiguous interval
|
|
if old_array.shape[0] != 0:
|
|
processed = function(old_array, interval, args,
|
|
insert_func, True)
|
|
if processed != old_array.shape[0]:
|
|
# Truncate the interval we're inserting at the first
|
|
# unprocessed data point. This ensures that
|
|
# we'll not miss any data when we run again later.
|
|
insert_ctx.update_end(old_array[processed][0])
|
|
|
|
|
|
def example_callback_function(data, interval, args, insert_func, final):
|
|
"""Example of the signature for the function that gets passed
|
|
to process_numpy_interval.
|
|
|
|
'data': array of data to process -- may be empty
|
|
|
|
'interval': overall interval we're processing (but not necessarily
|
|
the interval of this particular chunk of data)
|
|
|
|
'args': opaque arguments passed to process_numpy
|
|
|
|
'insert_func': function to call in order to insert array of data.
|
|
Should be passed a 2-dimensional array of data to insert.
|
|
Data timestamps must be within the provided interval.
|
|
|
|
'final': True if this is the last bit of data for this
|
|
contiguous interval, False otherwise.
|
|
|
|
Return value of 'function' is the number of data rows processed.
|
|
Unprocessed data will be provided again in a subsequent call
|
|
(unless 'final' is True).
|
|
|
|
If unprocessed data remains after 'final' is True, the interval
|
|
being inserted will be ended at the timestamp of the first
|
|
unprocessed data point.
|
|
"""
|
|
raise NotImplementedError("example_callback_function does nothing")
|
|
|
|
|
|
class Filter(object):
|
|
|
|
def __init__(self, parser_description=None):
|
|
self._parser = None
|
|
self._client_src = None
|
|
self._client_dest = None
|
|
self._using_client = False
|
|
self.src = None
|
|
self.dest = None
|
|
self.start = None
|
|
self.end = None
|
|
self._interhost = False
|
|
self._force_metadata = False
|
|
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
|
|
if parser_description is not None:
|
|
self.setup_parser(parser_description)
|
|
self.parse_args()
|
|
|
|
@property
|
|
def client_src(self):
|
|
if self._using_client:
|
|
raise Exception("Filter src client is in use; make another")
|
|
return self._client_src
|
|
|
|
@property
|
|
def client_dest(self):
|
|
if self._using_client:
|
|
raise Exception("Filter dest client is in use; make another")
|
|
return self._client_dest
|
|
|
|
def setup_parser(self, description="Filter data", skip_paths=False):
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=description)
|
|
group = parser.add_argument_group("General filter arguments")
|
|
group.add_argument("-u", "--url", action="store",
|
|
default=self.def_url,
|
|
help="Server URL (default: %(default)s)")
|
|
group.add_argument("-U", "--dest-url", action="store",
|
|
help="Destination server URL "
|
|
"(default: same as source)")
|
|
group.add_argument("-D", "--dry-run", action="store_true",
|
|
default=False,
|
|
help="Just print intervals that would be "
|
|
"processed")
|
|
group.add_argument("-q", "--quiet", action="store_true",
|
|
default=False,
|
|
help="Don't print source and dest stream info")
|
|
group.add_argument("-F", "--force-metadata", action="store_true",
|
|
default=False,
|
|
help="Force metadata changes if the dest "
|
|
"doesn't match")
|
|
group.add_argument("-s", "--start",
|
|
metavar="TIME", type=self.arg_time,
|
|
help="Starting timestamp for intervals "
|
|
"(free-form, inclusive)")
|
|
group.add_argument("-e", "--end",
|
|
metavar="TIME", type=self.arg_time,
|
|
help="Ending timestamp for intervals "
|
|
"(free-form, noninclusive)")
|
|
group.add_argument("-v", "--version", action="version",
|
|
version=nilmtools.__version__)
|
|
|
|
if not skip_paths:
|
|
# Individual filter scripts might want to add these arguments
|
|
# themselves, to include multiple sources in a different order
|
|
# (for example). "srcpath" and "destpath" arguments must exist,
|
|
# though.
|
|
group.add_argument("srcpath", action="store",
|
|
help="Path of source stream, eg. /foo/bar")
|
|
group.add_argument("destpath", action="store",
|
|
help="Path of destination stream, eg. /foo/bar")
|
|
self._parser = parser
|
|
return parser
|
|
|
|
def set_args(self, url, dest_url, srcpath, destpath, start, end,
|
|
parsed_args=None, quiet=True):
|
|
"""Set arguments directly from parameters"""
|
|
if dest_url is None:
|
|
dest_url = url
|
|
if url != dest_url:
|
|
self._interhost = True
|
|
|
|
self._client_src = Client(url)
|
|
self._client_dest = Client(dest_url)
|
|
|
|
if (not self._interhost) and (srcpath == destpath):
|
|
raise ArgumentError(
|
|
"source and destination path must be different")
|
|
|
|
# Open the streams
|
|
self.src = get_stream_info(self._client_src, srcpath)
|
|
if not self.src:
|
|
raise ArgumentError("source path " + srcpath + " not found")
|
|
|
|
self.dest = get_stream_info(self._client_dest, destpath)
|
|
if not self.dest:
|
|
raise MissingDestination(parsed_args, self.src,
|
|
StreamInfo(dest_url, [destpath]))
|
|
|
|
self.start = start
|
|
self.end = end
|
|
|
|
# Print info
|
|
if not quiet:
|
|
print("Source:", self.src.string(self._interhost))
|
|
print(" Dest:", self.dest.string(self._interhost))
|
|
|
|
def parse_args(self, argv=None):
|
|
"""Parse arguments from a command line"""
|
|
args = self._parser.parse_args(argv)
|
|
|
|
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
|
|
args.start, args.end, quiet=args.quiet, parsed_args=args)
|
|
|
|
self._force_metadata = args.force_metadata
|
|
if args.dry_run:
|
|
for interval in self.intervals():
|
|
print(interval.human_string())
|
|
raise SystemExit(0)
|
|
return args
|
|
|
|
def intervals(self):
|
|
"""Generate all the intervals that this filter should process"""
|
|
self._using_client = True
|
|
|
|
if self._interhost:
|
|
# Do the difference ourselves
|
|
s_intervals = (Interval(start, end)
|
|
for (start, end) in
|
|
self._client_src.stream_intervals(
|
|
self.src.path,
|
|
start=self.start, end=self.end))
|
|
d_intervals = (Interval(start, end)
|
|
for (start, end) in
|
|
self._client_dest.stream_intervals(
|
|
self.dest.path,
|
|
start=self.start, end=self.end))
|
|
intervals = nilmdb.utils.interval.set_difference(s_intervals,
|
|
d_intervals)
|
|
else:
|
|
# Let the server do the difference for us
|
|
intervals = (Interval(start, end)
|
|
for (start, end) in
|
|
self._client_src.stream_intervals(
|
|
self.src.path, diffpath=self.dest.path,
|
|
start=self.start, end=self.end))
|
|
# Optimize intervals: join intervals that are adjacent
|
|
for interval in nilmdb.utils.interval.optimize(intervals):
|
|
yield interval
|
|
self._using_client = False
|
|
|
|
# Misc helpers
|
|
@staticmethod
|
|
def arg_time(toparse):
|
|
"""Parse a time string argument"""
|
|
try:
|
|
return nilmdb.utils.time.parse_time(toparse)
|
|
except ValueError as e:
|
|
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
|
|
str(e), toparse))
|
|
|
|
def check_dest_metadata(self, data):
|
|
"""See if the metadata jives, and complain if it doesn't. For
|
|
each key in data, if the stream contains the key, it must match
|
|
values. If the stream does not contain the key, it is created."""
|
|
metadata = self._client_dest.stream_get_metadata(self.dest.path)
|
|
if not self._force_metadata:
|
|
for key in data:
|
|
wanted = data[key]
|
|
if not isinstance(wanted, str):
|
|
wanted = str(wanted)
|
|
val = metadata.get(key, wanted)
|
|
if val != wanted and self.dest.rows > 0:
|
|
m = "Metadata in destination stream:\n"
|
|
m += " %s = %s\n" % (key, val)
|
|
m += "doesn't match desired data:\n"
|
|
m += " %s = %s\n" % (key, wanted)
|
|
m += "Refusing to change it. To prevent this error, "
|
|
m += "change or delete the metadata with nilmtool,\n"
|
|
m += "remove existing data from the stream, or "
|
|
m += "retry with --force-metadata."
|
|
raise Exception(m)
|
|
# All good -- write the metadata in case it's not already there
|
|
self._client_dest.stream_update_metadata(self.dest.path, data)
|
|
|
|
# The main filter processing method.
|
|
def process_numpy(self, function, args=None, rows=100000,
|
|
intervals=None):
|
|
"""Calls process_numpy_interval for each interval that currently
|
|
exists in self.src, but doesn't exist in self.dest. It will
|
|
process the data in chunks as follows:
|
|
|
|
For each chunk of data, call 'function' with a Numpy array
|
|
corresponding to the data. The data is converted to a Numpy
|
|
array in chunks of 'rows' rows at a time.
|
|
|
|
If 'intervals' is not None, process those intervals instead of
|
|
the default list.
|
|
|
|
'function' should be defined with the same interface as
|
|
nilmtools.filter.example_callback_function. See the
|
|
documentation of that for details. 'args' are passed to
|
|
'function'.
|
|
"""
|
|
extractor = NumpyClient(self.src.url).stream_extract_numpy
|
|
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
|
|
|
|
extractor_func = functools.partial(extractor, self.src.path,
|
|
layout=self.src.layout,
|
|
maxrows=rows)
|
|
inserter_func = functools.partial(inserter, self.dest.path)
|
|
|
|
for interval in (intervals or self.intervals()):
|
|
print("Processing", interval.human_string())
|
|
process_numpy_interval(interval, extractor_func, inserter_func,
|
|
rows * 3, function, args)
|
|
|
|
|
|
def main(argv=None):
|
|
# This is just a dummy function; actual filters can use the other
|
|
# functions to prepare stuff, and then do something with the data.
|
|
f = Filter()
|
|
parser = f.setup_parser() # noqa: F841
|
|
args = f.parse_args(argv) # noqa: F841
|
|
for i in f.intervals():
|
|
print("Generic filter: need to handle", i.human_string())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|