#!/usr/bin/python from __future__ import absolute_import import nilmdb.client from nilmdb.client import Client from nilmdb.client.numpyclient import NumpyClient from nilmdb.utils.printf import * from nilmdb.utils.time import (parse_time, timestamp_to_human, timestamp_to_seconds) from nilmdb.utils.interval import Interval import nilmtools import itertools import time import sys import re import argparse import numpy as np import cStringIO import functools class ArgumentError(Exception): pass class MissingDestination(Exception): def __init__(self, args, src, dest): self.parsed_args = args self.src = src self.dest = dest Exception.__init__(self, "destination path " + dest.path + " not found") class StreamInfo(object): def __init__(self, url, info): self.url = url self.info = info try: self.path = info[0] self.layout = info[1] self.layout_type = self.layout.split('_')[0] self.layout_count = int(self.layout.split('_')[1]) self.total_count = self.layout_count + 1 self.timestamp_min = info[2] self.timestamp_max = info[3] self.rows = info[4] self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5]) except IndexError, TypeError: pass def string(self, interhost): """Return stream info as a string. If interhost is true, include the host URL.""" if interhost: return sprintf("[%s] ", self.url) + str(self) return str(self) def __str__(self): """Return stream info as a string.""" return sprintf("%s (%s), %.2fM rows, %.2f hours", self.path, self.layout, self.rows / 1e6, self.seconds / 3600.0) def get_stream_info(client, path): """Return a StreamInfo object about the given path, or None if it doesn't exist""" streams = client.stream_list(path, extended = True) if len(streams) != 1: return None return StreamInfo(client.geturl(), streams[0]) # Filter processing for a single interval of data. def process_numpy_interval(interval, extractor, inserter, warn_rows, function, args = None): """For the given 'interval' of data, extract data, process it through 'function', and insert the result. 'extractor' should be a function like NumpyClient.stream_extract_numpy but with the the interval 'start' and 'end' as the only parameters, e.g.: extractor = functools.partial(NumpyClient.stream_extract_numpy, src_path, layout = l, maxrows = m) 'inserter' should be a function like NumpyClient.stream_insert_context but with the interval 'start' and 'end' as the only parameters, e.g.: inserter = functools.partial(NumpyClient.stream_insert_context, dest_path) If 'warn_rows' is not None, print a warning to stdout when the number of unprocessed rows exceeds this amount. See process_numpy for details on 'function' and 'args'. """ if args is None: args = [] with inserter(interval.start, interval.end) as insert_ctx: insert_func = insert_ctx.insert old_array = np.array([]) for new_array in extractor(interval.start, interval.end): # If we still had old data left, combine it if old_array.shape[0] != 0: array = np.vstack((old_array, new_array)) else: array = new_array # Pass the data to the user provided function processed = function(array, interval, args, insert_func, False) # Send any pending data that the user function inserted insert_ctx.send() # Save the unprocessed parts if processed >= 0: old_array = array[processed:] else: raise Exception( sprintf("%s return value %s must be >= 0", str(function), str(processed))) # Warn if there's too much data remaining if warn_rows is not None and old_array.shape[0] > warn_rows: printf("warning: %d unprocessed rows in buffer\n", old_array.shape[0]) # Last call for this contiguous interval if old_array.shape[0] != 0: processed = function(old_array, interval, args, insert_func, True) if processed != old_array.shape[0]: # Truncate the interval we're inserting at the first # unprocessed data point. This ensures that # we'll not miss any data when we run again later. insert_ctx.update_end(old_array[processed][0]) def example_callback_function(data, interval, args, insert_func, final): """Example of the signature for the function that gets passed to process_numpy_interval. 'data': array of data to process -- may be empty 'interval': overall interval we're processing (but not necessarily the interval of this particular chunk of data) 'args': opaque arguments passed to process_numpy 'insert_func': function to call in order to insert array of data. Should be passed a 2-dimensional array of data to insert. Data timestamps must be within the provided interval. 'final': True if this is the last bit of data for this contiguous interval, False otherwise. Return value of 'function' is the number of data rows processed. Unprocessed data will be provided again in a subsequent call (unless 'final' is True). If unprocessed data remains after 'final' is True, the interval being inserted will be ended at the timestamp of the first unprocessed data point. """ raise NotImplementedError("example_callback_function does nothing") class Filter(object): def __init__(self, parser_description = None): self._parser = None self._client_src = None self._client_dest = None self._using_client = False self.src = None self.dest = None self.start = None self.end = None self.interhost = False self.force_metadata = False if parser_description is not None: self.setup_parser(parser_description) self.parse_args() @property def client_src(self): if self._using_client: raise Exception("Filter client is in use; make another") return self._client_src @property def client_dest(self): if self._using_client: raise Exception("Filter client is in use; make another") return self._client_dest def setup_parser(self, description = "Filter data", skip_paths = False): parser = argparse.ArgumentParser( formatter_class = argparse.RawDescriptionHelpFormatter, version = nilmtools.__version__, description = description) group = parser.add_argument_group("General filter arguments") group.add_argument("-u", "--url", action="store", default="http://localhost/nilmdb/", help="Server URL (default: %(default)s)") group.add_argument("-U", "--dest-url", action="store", help="Destination server URL " "(default: same as source)") group.add_argument("-D", "--dry-run", action="store_true", default = False, help="Just print intervals that would be " "processed") group.add_argument("--force-metadata", action="store_true", default = False, help="Force metadata changes if the dest " "doesn't match") group.add_argument("-s", "--start", metavar="TIME", type=self.arg_time, help="Starting timestamp for intervals " "(free-form, inclusive)") group.add_argument("-e", "--end", metavar="TIME", type=self.arg_time, help="Ending timestamp for intervals " "(free-form, noninclusive)") if not skip_paths: # Individual filter scripts might want to add these arguments # themselves, to include multiple sources in a different order # (for example). "srcpath" and "destpath" arguments must exist, # though. group.add_argument("srcpath", action="store", help="Path of source stream, e.g. /foo/bar") group.add_argument("destpath", action="store", help="Path of destination stream, e.g. /foo/bar") self._parser = parser return parser def set_args(self, url, dest_url, srcpath, destpath, start, end, parsed_args = None, quiet = True): """Set arguments directly from parameters""" if dest_url is None: dest_url = url if url != dest_url: self.interhost = True self._client_src = Client(url) self._client_dest = Client(dest_url) if (not self.interhost) and (srcpath == destpath): raise ArgumentError("source and destination path must be different") # Open the streams self.src = get_stream_info(self._client_src, srcpath) if not self.src: raise ArgumentError("source path " + srcpath + " not found") self.dest = get_stream_info(self._client_dest, destpath) if not self.dest: raise MissingDestination(parsed_args, self.src, StreamInfo(dest_url, [destpath])) self.start = start self.end = end # Print info if not quiet: print "Source:", self.src.string(self.interhost) print " Dest:", self.dest.string(self.interhost) def parse_args(self, argv = None): """Parse arguments from a command line""" args = self._parser.parse_args(argv) self.set_args(args.url, args.dest_url, args.srcpath, args.destpath, args.start, args.end, quiet = False, parsed_args = args) self.force_metadata = args.force_metadata if args.dry_run: for interval in self.intervals(): print interval.human_string() raise SystemExit(0) return args def intervals(self): """Generate all the intervals that this filter should process""" self._using_client = True if self.interhost: # Do the difference ourselves s_intervals = ( Interval(start, end) for (start, end) in self._client_src.stream_intervals( self.src.path, start = self.start, end = self.end) ) d_intervals = ( Interval(start, end) for (start, end) in self._client_dest.stream_intervals( self.dest.path, start = self.start, end = self.end) ) intervals = nilmdb.utils.interval.set_difference(s_intervals, d_intervals) else: # Let the server do the difference for us intervals = ( Interval(start, end) for (start, end) in self._client_src.stream_intervals( self.src.path, diffpath = self.dest.path, start = self.start, end = self.end) ) # Optimize intervals: join intervals that are adjacent for interval in nilmdb.utils.interval.optimize(intervals): yield interval self._using_client = False # Misc helpers @staticmethod def arg_time(toparse): """Parse a time string argument""" try: return nilmdb.utils.time.parse_time(toparse) except ValueError as e: raise argparse.ArgumentTypeError(sprintf("%s \"%s\"", str(e), toparse)) def check_dest_metadata(self, data): """See if the metadata jives, and complain if it doesn't. If there's no conflict, update the metadata to match 'data'.""" metadata = self._client_dest.stream_get_metadata(self.dest.path) if not self.force_metadata: for key in data: wanted = data[key] if not isinstance(wanted, basestring): wanted = str(wanted) val = metadata.get(key, wanted) # Force UTF-8 encoding for comparison and display wanted = wanted.encode('utf-8') val = val.encode('utf-8') key = key.encode('utf-8') if val != wanted and self.dest.rows > 0: m = "Metadata in destination stream:\n" m += " %s = %s\n" % (key, val) m += "doesn't match desired data:\n" m += " %s = %s\n" % (key, wanted) m += "Refusing to change it. To prevent this error, " m += "change or delete the metadata with nilmtool,\n" m += "remove existing data from the stream, or " m += "retry with --force-metadata." raise Exception(m) # All good -- write the metadata in case it's not already there self._client_dest.stream_update_metadata(self.dest.path, data) # The main filter processing method. def process_numpy(self, function, args = None, rows = 100000, intervals = None): """Calls process_numpy_interval for each interval that currently exists in self.src, but doesn't exist in self.dest. It will process the data in chunks as follows: For each chunk of data, call 'function' with a Numpy array corresponding to the data. The data is converted to a Numpy array in chunks of 'rows' rows at a time. If 'intervals' is not None, process those intervals instead of the default list. 'function' should be defined with the same interface as nilmtools.filter.example_callback_function. See the documentation of that for details. 'args' are passed to 'function'. """ extractor = NumpyClient(self.src.url).stream_extract_numpy inserter = NumpyClient(self.dest.url).stream_insert_numpy_context extractor_func = functools.partial(extractor, self.src.path, layout = self.src.layout, maxrows = rows) inserter_func = functools.partial(inserter, self.dest.path) for interval in (intervals or self.intervals()): print "Processing", interval.human_string() process_numpy_interval(interval, extractor_func, inserter_func, rows * 3, function, args) def main(argv = None): # This is just a dummy function; actual filters can use the other # functions to prepare stuff, and then do something with the data. f = Filter() parser = f.setup_parser() args = f.parse_args(argv) for i in f.intervals(): print "Generic filter: need to handle", i.human_string() if __name__ == "__main__": main()