Compare commits

..

2 Commits

Author SHA1 Message Date
dc26e32b6e Make interhost, force_metadata private to Filter 2013-08-02 23:14:19 -04:00
981f23ff14 Better documentation for callback function 2013-08-02 23:14:19 -04:00

View File

@@ -133,6 +133,34 @@ def process_numpy_interval(interval, extractor, inserter, warn_rows,
# we'll not miss any data when we run again later. # we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0]) insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object): class Filter(object):
def __init__(self, parser_description = None): def __init__(self, parser_description = None):
@@ -144,8 +172,8 @@ class Filter(object):
self.dest = None self.dest = None
self.start = None self.start = None
self.end = None self.end = None
self.interhost = False self._interhost = False
self.force_metadata = False self._force_metadata = False
if parser_description is not None: if parser_description is not None:
self.setup_parser(parser_description) self.setup_parser(parser_description)
self.parse_args() self.parse_args()
@@ -208,12 +236,12 @@ class Filter(object):
if dest_url is None: if dest_url is None:
dest_url = url dest_url = url
if url != dest_url: if url != dest_url:
self.interhost = True self._interhost = True
self._client_src = Client(url) self._client_src = Client(url)
self._client_dest = Client(dest_url) self._client_dest = Client(dest_url)
if (not self.interhost) and (srcpath == destpath): if (not self._interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different") raise ArgumentError("source and destination path must be different")
# Open the streams # Open the streams
@@ -231,8 +259,8 @@ class Filter(object):
# Print info # Print info
if not quiet: if not quiet:
print "Source:", self.src.string(self.interhost) print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self.interhost) print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv = None): def parse_args(self, argv = None):
"""Parse arguments from a command line""" """Parse arguments from a command line"""
@@ -241,7 +269,7 @@ class Filter(object):
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath, self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args) args.start, args.end, quiet = False, parsed_args = args)
self.force_metadata = args.force_metadata self._force_metadata = args.force_metadata
if args.dry_run: if args.dry_run:
for interval in self.intervals(): for interval in self.intervals():
print interval.human_string() print interval.human_string()
@@ -252,7 +280,7 @@ class Filter(object):
"""Generate all the intervals that this filter should process""" """Generate all the intervals that this filter should process"""
self._using_client = True self._using_client = True
if self.interhost: if self._interhost:
# Do the difference ourselves # Do the difference ourselves
s_intervals = ( Interval(start, end) s_intervals = ( Interval(start, end)
for (start, end) in for (start, end) in
@@ -289,10 +317,11 @@ class Filter(object):
str(e), toparse)) str(e), toparse))
def check_dest_metadata(self, data): def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If """See if the metadata jives, and complain if it doesn't. For
there's no conflict, update the metadata to match 'data'.""" each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path) metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata: if not self._force_metadata:
for key in data: for key in data:
wanted = data[key] wanted = data[key]
if not isinstance(wanted, basestring): if not isinstance(wanted, basestring):
@@ -329,30 +358,10 @@ class Filter(object):
If 'intervals' is not None, process those intervals instead of If 'intervals' is not None, process those intervals instead of
the default list. the default list.
'function' should be defined as: 'function' should be defined with the same interface as
# def function(data, interval, args, insert_func, final) nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'data': array of data to process -- may be empty 'function'.
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
""" """
extractor = NumpyClient(self.src.url).stream_extract_numpy extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context inserter = NumpyClient(self.dest.url).stream_insert_numpy_context