nilmtools/nilmtools/filter.py
Jim Paris cfc66b6847 Fix flake8 errors throughout code
This found a small number of real bugs too, for example,
this one that looked weird because of a 2to3 conversion,
but was wrong both before and after:
-        except IndexError as TypeError:
+        except (IndexError, TypeError):
2020-08-06 17:58:41 -04:00

394 lines
16 KiB
Python

#!/usr/bin/env python3
import nilmdb.client
from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import printf, sprintf
from nilmdb.utils.interval import Interval
import nilmtools
import os
import argparse
import numpy as np
import functools
class ArgumentError(Exception):
pass
class MissingDestination(Exception):
def __init__(self, args, src, dest):
self.parsed_args = args
self.src = src
self.dest = dest
Exception.__init__(self, f"destination path {dest.path} not found")
class StreamInfo(object):
def __init__(self, url, info):
self.url = url
self.info = info
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except (IndexError, TypeError):
pass
def string(self, interhost):
"""Return stream info as a string. If interhost is true,
include the host URL."""
if interhost:
return sprintf("[%s] ", self.url) + str(self)
return str(self)
def __str__(self):
"""Return stream info as a string."""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
def get_stream_info(client, path):
"""Return a StreamInfo object about the given path, or None if it
doesn't exist"""
streams = client.stream_list(path, extended=True)
if len(streams) != 1:
return None
return StreamInfo(client.geturl(), streams[0])
# Filter processing for a single interval of data.
def process_numpy_interval(interval, extractor, inserter, warn_rows,
function, args=None):
"""For the given 'interval' of data, extract data, process it
through 'function', and insert the result.
'extractor' should be a function like NumpyClient.stream_extract_numpy
but with the the interval 'start' and 'end' as the only parameters,
e.g.:
extractor = functools.partial(NumpyClient.stream_extract_numpy,
src_path, layout = l, maxrows = m)
'inserter' should be a function like NumpyClient.stream_insert_context
but with the interval 'start' and 'end' as the only parameters, e.g.:
inserter = functools.partial(NumpyClient.stream_insert_context,
dest_path)
If 'warn_rows' is not None, print a warning to stdout when the
number of unprocessed rows exceeds this amount.
See process_numpy for details on 'function' and 'args'.
"""
if args is None:
args = []
with inserter(interval.start, interval.end) as insert_ctx:
insert_func = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(interval.start, interval.end):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass the data to the user provided function
processed = function(array, interval, args, insert_func, False)
# Send any pending data that the user function inserted
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if warn_rows is not None and old_array.shape[0] > warn_rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
processed = function(old_array, interval, args,
insert_func, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object):
def __init__(self, parser_description=None):
self._parser = None
self._client_src = None
self._client_dest = None
self._using_client = False
self.src = None
self.dest = None
self.start = None
self.end = None
self._interhost = False
self._force_metadata = False
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@property
def client_src(self):
if self._using_client:
raise Exception("Filter src client is in use; make another")
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter dest client is in use; make another")
return self._client_dest
def setup_parser(self, description="Filter data", skip_paths=False):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default=self.def_url,
help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default=False,
help="Just print intervals that would be "
"processed")
group.add_argument("-q", "--quiet", action="store_true",
default=False,
help="Don't print source and dest stream info")
group.add_argument("-F", "--force-metadata", action="store_true",
default=False,
help="Force metadata changes if the dest "
"doesn't match")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
if not skip_paths:
# Individual filter scripts might want to add these arguments
# themselves, to include multiple sources in a different order
# (for example). "srcpath" and "destpath" arguments must exist,
# though.
group.add_argument("srcpath", action="store",
help="Path of source stream, eg. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, eg. /foo/bar")
self._parser = parser
return parser
def set_args(self, url, dest_url, srcpath, destpath, start, end,
parsed_args=None, quiet=True):
"""Set arguments directly from parameters"""
if dest_url is None:
dest_url = url
if url != dest_url:
self._interhost = True
self._client_src = Client(url)
self._client_dest = Client(dest_url)
if (not self._interhost) and (srcpath == destpath):
raise ArgumentError(
"source and destination path must be different")
# Open the streams
self.src = get_stream_info(self._client_src, srcpath)
if not self.src:
raise ArgumentError("source path " + srcpath + " not found")
self.dest = get_stream_info(self._client_dest, destpath)
if not self.dest:
raise MissingDestination(parsed_args, self.src,
StreamInfo(dest_url, [destpath]))
self.start = start
self.end = end
# Print info
if not quiet:
print("Source:", self.src.string(self._interhost))
print(" Dest:", self.dest.string(self._interhost))
def parse_args(self, argv=None):
"""Parse arguments from a command line"""
args = self._parser.parse_args(argv)
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet=args.quiet, parsed_args=args)
self._force_metadata = args.force_metadata
if args.dry_run:
for interval in self.intervals():
print(interval.human_string())
raise SystemExit(0)
return args
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self._interhost:
# Do the difference ourselves
s_intervals = (Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start=self.start, end=self.end))
d_intervals = (Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start=self.start, end=self.end))
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = (Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath=self.dest.path,
start=self.start, end=self.end))
# Optimize intervals: join intervals that are adjacent
for interval in nilmdb.utils.interval.optimize(intervals):
yield interval
self._using_client = False
# Misc helpers
@staticmethod
def arg_time(toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. For
each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self._force_metadata:
for key in data:
wanted = data[key]
if not isinstance(wanted, str):
wanted = str(wanted)
val = metadata.get(key, wanted)
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. To prevent this error, "
m += "change or delete the metadata with nilmtool,\n"
m += "remove existing data from the stream, or "
m += "retry with --force-metadata."
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args=None, rows=100000,
intervals=None):
"""Calls process_numpy_interval for each interval that currently
exists in self.src, but doesn't exist in self.dest. It will
process the data in chunks as follows:
For each chunk of data, call 'function' with a Numpy array
corresponding to the data. The data is converted to a Numpy
array in chunks of 'rows' rows at a time.
If 'intervals' is not None, process those intervals instead of
the default list.
'function' should be defined with the same interface as
nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'function'.
"""
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
extractor_func = functools.partial(extractor, self.src.path,
layout=self.src.layout,
maxrows=rows)
inserter_func = functools.partial(inserter, self.dest.path)
for interval in (intervals or self.intervals()):
print("Processing", interval.human_string())
process_numpy_interval(interval, extractor_func, inserter_func,
rows * 3, function, args)
def main(argv=None):
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser() # noqa: F841
args = f.parse_args(argv) # noqa: F841
for i in f.intervals():
print("Generic filter: need to handle", i.human_string())
if __name__ == "__main__":
main()