|
|
@@ -95,13 +95,36 @@ class Filter(object): |
|
|
|
|
|
|
|
# Misc helpers |
|
|
|
def stream_info_string(self, info): |
|
|
|
"""Print stream info as a string""" |
|
|
|
return sprintf("%s (%s), %.2fM rows, %.2f hours", |
|
|
|
info[0], info[1], info[4] / 1e6, info[5] / 3600) |
|
|
|
|
|
|
|
def interval_string(self, interval): |
|
|
|
"""Print interval as a string""" |
|
|
|
return sprintf("[ %s -> %s ]", format_time(interval[0]), |
|
|
|
format_time(interval[1])) |
|
|
|
|
|
|
|
# Main processing helper |
|
|
|
def process(self, function, maxlen, args): |
|
|
|
"""Process data in chunks. |
|
|
|
|
|
|
|
function: function to process the data |
|
|
|
maxlen: maximum length of data to pass to function, in seconds |
|
|
|
args: tuple containing extra arguments to pass to function |
|
|
|
|
|
|
|
'function' should be defined like: |
|
|
|
function(data, start, end, *args) |
|
|
|
It will be passed a block of data from the source stream, |
|
|
|
the start and end times of that block, and any arguments |
|
|
|
that were passed to process in 'args'. The total |
|
|
|
length of the interval will be at most 'maxlen' seconds. |
|
|
|
|
|
|
|
'function' should transform the data as desired, and return |
|
|
|
a new list of data, which will be inserted into the |
|
|
|
destination stream.""" |
|
|
|
for (start, end) in self.intervals(): |
|
|
|
if (end - start) |
|
|
|
return |
|
|
|
|
|
|
|
def main(): |
|
|
|
# This is just a dummy function; actual filters can use the other |
|
|
|