Compare commits

...

3 Commits

Author SHA1 Message Date
60f09427cf Update decimate to use process_numpy 2013-04-06 15:56:36 -04:00
d6d31190eb Fix fromstring usage 2013-04-06 13:40:09 -04:00
2ec574c59d Use np.fromstring instead of np.loadtxt 2013-04-06 13:32:16 -04:00
3 changed files with 44 additions and 34 deletions

View File

@@ -9,6 +9,9 @@ else
endif endif
test: test:
src/decimate.py
test_insert:
@make install >/dev/null @make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null src/insert.py --file --dry-run /test/foo </dev/null

View File

@@ -41,41 +41,45 @@ def main(argv = None):
# If source is decimated, we have to decimate a bit differently # If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath): if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3 again = True
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
else: else:
n = f.src.layout_count again = False
f.process_python(function = decimate_first, rows = args.factor, f.process_numpy(decimate, args = (args.factor, again))
args = (n,))
def decimate_first(data, n): def decimate(data, interval, args, insert_function, final):
"""Decimate original data -- result has 3 times as many columns""" """Decimate data"""
# For this simple calculation, converting to a Numpy array (factor, again) = args
# and doing the math is slower than just doing it directly. (n, m) = data.shape
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate_again(data, n): # Figure out which columns to use as the source for mean, min, and max,
"""Decimate already-decimated data -- result has the same number # depending on whether this is the first decimation or we're decimating
of columns""" # again. Note that we include the timestamp in the means.
rows = iter(data) if again:
r = rows.next() c = (m - 1) // 3
r_sum = r[0:(n+1)] # e.g. c = 3
r_min = r[(n+1):(2*n+1)] # ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
r_max = r[(2*n+1):(3*n+1)] mean_col = slice(0, c + 1)
for r in rows: min_col = slice(c + 1, 2 * c + 1)
r_sum = map(operator.add, r_sum, r[0:(n+1)]) max_col = slice(2 * c + 1, 3 * c + 1)
r_min = map(min, r_min, r[(n+1):(2*n+1)]) else:
r_max = map(max, r_max, r[(2*n+1):(3*n+1)]) mean_col = slice(0, m)
r_mean = [ x / len(data) for x in r_sum ] min_col = slice(1, m)
return [ r_mean + r_min + r_max ] max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data.shape = (n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -367,8 +367,11 @@ class Filter(object):
extract = extractor(self.src.path, interval.start, interval.end) extract = extractor(self.src.path, interval.start, interval.end)
old_array = np.array([]) old_array = np.array([])
for batched in batch(extract, rows): for batched in batch(extract, rows):
# Read in this batch of data # Read in this batch of data. This turns out to
new_array = np.loadtxt(batched) # be a very fast way to read and convert it (order
# of magnitude faster than numpy.loadtxt)
new_array = np.fromstring("\n".join(batched), sep=' ')
new_array = new_array.reshape(-1, self.src.total_count)
# If we still had old data left, combine it # If we still had old data left, combine it
if old_array.shape[0] != 0: if old_array.shape[0] != 0: