|
|
@@ -247,72 +247,7 @@ class Filter(object): |
|
|
|
# All good -- write the metadata in case it's not already there |
|
|
|
self._client_dest.stream_update_metadata(self.dest.path, data) |
|
|
|
|
|
|
|
# Main processing helper |
|
|
|
def process_python(self, function, rows, args = None, partial = False): |
|
|
|
"""Process data in chunks of 'rows' data at a time. |
|
|
|
|
|
|
|
This provides data as nested Python lists and expects the same |
|
|
|
back. |
|
|
|
|
|
|
|
function: function to process the data |
|
|
|
rows: maximum number of rows to pass to 'function' at once |
|
|
|
args: tuple containing extra arguments to pass to 'function' |
|
|
|
partial: if true, less than 'rows' may be passed to 'function'. |
|
|
|
if false, partial data at the end of an interval will |
|
|
|
be dropped. |
|
|
|
|
|
|
|
'function' should be defined like: |
|
|
|
function(data, *args) |
|
|
|
It will be passed a list containing up to 'rows' rows of |
|
|
|
data from the source stream, and any arguments passed in |
|
|
|
'args'. It should transform the data as desired, and return a |
|
|
|
new list of rdata, which will be inserted into the destination |
|
|
|
stream. |
|
|
|
""" |
|
|
|
if args is None: |
|
|
|
args = [] |
|
|
|
extractor = Client(self.src.url).stream_extract |
|
|
|
inserter = Client(self.dest.url).stream_insert_context |
|
|
|
|
|
|
|
# Parse input data. We use homogenous types for now, which |
|
|
|
# means the timestamp type will be either float or int. |
|
|
|
if "int" in self.src.layout_type: |
|
|
|
parser = lambda line: [ int(x) for x in line.split() ] |
|
|
|
else: |
|
|
|
parser = lambda line: [ float(x) for x in line.split() ] |
|
|
|
|
|
|
|
# Format output data. |
|
|
|
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n" |
|
|
|
|
|
|
|
for interval in self.intervals(): |
|
|
|
print "Processing", self.interval_string(interval) |
|
|
|
with inserter(self.dest.path, |
|
|
|
interval.start, interval.end) as insert_ctx: |
|
|
|
src_array = [] |
|
|
|
for line in extractor(self.src.path, |
|
|
|
interval.start, interval.end): |
|
|
|
# Read in data |
|
|
|
src_array.append([ float(x) for x in line.split() ]) |
|
|
|
|
|
|
|
if len(src_array) == rows: |
|
|
|
# Pass through filter function |
|
|
|
dest_array = function(src_array, *args) |
|
|
|
|
|
|
|
# Write result to destination |
|
|
|
out = [ formatter(row) for row in dest_array ] |
|
|
|
insert_ctx.insert("".join(out)) |
|
|
|
|
|
|
|
# Clear source array |
|
|
|
src_array = [] |
|
|
|
|
|
|
|
# Take care of partial chunk |
|
|
|
if len(src_array) and partial: |
|
|
|
dest_array = function(src_array, *args) |
|
|
|
out = [ formatter(row) for row in dest_array ] |
|
|
|
insert_ctx.insert("".join(out)) |
|
|
|
|
|
|
|
# Like process_python, but provides Numpy arrays and allows for |
|
|
|
# partial processing. |
|
|
|
# The main filter processing method. |
|
|
|
def process_numpy(self, function, args = None, rows = 100000): |
|
|
|
"""For all intervals that exist in self.src but don't exist in |
|
|
|
self.dest, call 'function' with a Numpy array corresponding to |
|
|
|