Browse Source

Add process_numpy function to

This converts data into big Numpy arrays and lets the user-provided
function process as much as they'd like at a time.
Jim Paris 9 years ago
1 changed files with 79 additions and 0 deletions
  1. +79

+ 79
- 0
src/ View File

@@ -16,6 +16,8 @@ import time
import sys
import re
import argparse
import numpy as np
import cStringIO

class MissingDestination(Exception):
def __init__(self, src, dest):
@@ -190,6 +192,7 @@ class Filter(object):
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in self._optimize_int(intervals):
yield interval
self._using_client = False
@@ -287,6 +290,82 @@ class Filter(object):
out = [ formatter(row) for row in dest_array ]

# Like process_python, but provides Numpy arrays and allows for
# partial processing.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
the data. The data is converted to a Numpy array in chunks of
'rows' rows at a time.

'function' should be defined as:
def function(data, interval, args, insert_func, final)

'data': array of data to process -- may be empty

'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)

'args': opaque arguments passed to process_numpy

'insert_func': function to call in order to insert array of data.
Data timestamps must be within the provided interval.

'final': True if this is the last bit of data for this
contiguous interval, False otherwise.

Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context

# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"

def batch(iterable, size):
c = itertools.count()
for k, g in itertools.groupby(iterable, lambda x: // size):
yield g

for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
def insert_function(array):
s = cStringIO.StringIO()
np.savetxt(s, array)

extract = extractor(self.src.path, interval.start, interval.end)
old_array = np.array([])
for batched in batch(extract, rows):
# Read in this batch of data
new_array = np.loadtxt(batched)

# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
array = new_array

# Pass it to the process function
processed = function(array, interval, args,
insert_function, False)

# Save the unprocessed parts
if processed > 0:
old_array = array[processed:]
old_array = array

# Last call for this contiguous interval
if old_array.shape[0] != 0:
function(old_array, interval, args, insert_function, True)

def main():
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.