Compare commits
2 Commits
nilmtools-
...
nilmtools-
Author | SHA1 | Date | |
---|---|---|---|
dc26e32b6e | |||
981f23ff14 |
@@ -133,6 +133,34 @@ def process_numpy_interval(interval, extractor, inserter, warn_rows,
|
||||
# we'll not miss any data when we run again later.
|
||||
insert_ctx.update_end(old_array[processed][0])
|
||||
|
||||
def example_callback_function(data, interval, args, insert_func, final):
|
||||
"""Example of the signature for the function that gets passed
|
||||
to process_numpy_interval.
|
||||
|
||||
'data': array of data to process -- may be empty
|
||||
|
||||
'interval': overall interval we're processing (but not necessarily
|
||||
the interval of this particular chunk of data)
|
||||
|
||||
'args': opaque arguments passed to process_numpy
|
||||
|
||||
'insert_func': function to call in order to insert array of data.
|
||||
Should be passed a 2-dimensional array of data to insert.
|
||||
Data timestamps must be within the provided interval.
|
||||
|
||||
'final': True if this is the last bit of data for this
|
||||
contiguous interval, False otherwise.
|
||||
|
||||
Return value of 'function' is the number of data rows processed.
|
||||
Unprocessed data will be provided again in a subsequent call
|
||||
(unless 'final' is True).
|
||||
|
||||
If unprocessed data remains after 'final' is True, the interval
|
||||
being inserted will be ended at the timestamp of the first
|
||||
unprocessed data point.
|
||||
"""
|
||||
raise NotImplementedError("example_callback_function does nothing")
|
||||
|
||||
class Filter(object):
|
||||
|
||||
def __init__(self, parser_description = None):
|
||||
@@ -144,8 +172,8 @@ class Filter(object):
|
||||
self.dest = None
|
||||
self.start = None
|
||||
self.end = None
|
||||
self.interhost = False
|
||||
self.force_metadata = False
|
||||
self._interhost = False
|
||||
self._force_metadata = False
|
||||
if parser_description is not None:
|
||||
self.setup_parser(parser_description)
|
||||
self.parse_args()
|
||||
@@ -208,12 +236,12 @@ class Filter(object):
|
||||
if dest_url is None:
|
||||
dest_url = url
|
||||
if url != dest_url:
|
||||
self.interhost = True
|
||||
self._interhost = True
|
||||
|
||||
self._client_src = Client(url)
|
||||
self._client_dest = Client(dest_url)
|
||||
|
||||
if (not self.interhost) and (srcpath == destpath):
|
||||
if (not self._interhost) and (srcpath == destpath):
|
||||
raise ArgumentError("source and destination path must be different")
|
||||
|
||||
# Open the streams
|
||||
@@ -231,8 +259,8 @@ class Filter(object):
|
||||
|
||||
# Print info
|
||||
if not quiet:
|
||||
print "Source:", self.src.string(self.interhost)
|
||||
print " Dest:", self.dest.string(self.interhost)
|
||||
print "Source:", self.src.string(self._interhost)
|
||||
print " Dest:", self.dest.string(self._interhost)
|
||||
|
||||
def parse_args(self, argv = None):
|
||||
"""Parse arguments from a command line"""
|
||||
@@ -241,7 +269,7 @@ class Filter(object):
|
||||
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
|
||||
args.start, args.end, quiet = False, parsed_args = args)
|
||||
|
||||
self.force_metadata = args.force_metadata
|
||||
self._force_metadata = args.force_metadata
|
||||
if args.dry_run:
|
||||
for interval in self.intervals():
|
||||
print interval.human_string()
|
||||
@@ -252,7 +280,7 @@ class Filter(object):
|
||||
"""Generate all the intervals that this filter should process"""
|
||||
self._using_client = True
|
||||
|
||||
if self.interhost:
|
||||
if self._interhost:
|
||||
# Do the difference ourselves
|
||||
s_intervals = ( Interval(start, end)
|
||||
for (start, end) in
|
||||
@@ -289,10 +317,11 @@ class Filter(object):
|
||||
str(e), toparse))
|
||||
|
||||
def check_dest_metadata(self, data):
|
||||
"""See if the metadata jives, and complain if it doesn't. If
|
||||
there's no conflict, update the metadata to match 'data'."""
|
||||
"""See if the metadata jives, and complain if it doesn't. For
|
||||
each key in data, if the stream contains the key, it must match
|
||||
values. If the stream does not contain the key, it is created."""
|
||||
metadata = self._client_dest.stream_get_metadata(self.dest.path)
|
||||
if not self.force_metadata:
|
||||
if not self._force_metadata:
|
||||
for key in data:
|
||||
wanted = data[key]
|
||||
if not isinstance(wanted, basestring):
|
||||
@@ -329,30 +358,10 @@ class Filter(object):
|
||||
If 'intervals' is not None, process those intervals instead of
|
||||
the default list.
|
||||
|
||||
'function' should be defined as:
|
||||
# def function(data, interval, args, insert_func, final)
|
||||
|
||||
'data': array of data to process -- may be empty
|
||||
|
||||
'interval': overall interval we're processing (but not necessarily
|
||||
the interval of this particular chunk of data)
|
||||
|
||||
'args': opaque arguments passed to process_numpy
|
||||
|
||||
'insert_func': function to call in order to insert array of data.
|
||||
Should be passed a 2-dimensional array of data to insert.
|
||||
Data timestamps must be within the provided interval.
|
||||
|
||||
'final': True if this is the last bit of data for this
|
||||
contiguous interval, False otherwise.
|
||||
|
||||
Return value of 'function' is the number of data rows processed.
|
||||
Unprocessed data will be provided again in a subsequent call
|
||||
(unless 'final' is True).
|
||||
|
||||
If unprocessed data remains after 'final' is True, the interval
|
||||
being inserted will be ended at the timestamp of the first
|
||||
unprocessed data point.
|
||||
'function' should be defined with the same interface as
|
||||
nilmtools.filter.example_callback_function. See the
|
||||
documentation of that for details. 'args' are passed to
|
||||
'function'.
|
||||
"""
|
||||
extractor = NumpyClient(self.src.url).stream_extract_numpy
|
||||
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
|
||||
|
Reference in New Issue
Block a user