Compare commits

...

4 Commits

Author SHA1 Message Date
60f09427cf Update decimate to use process_numpy 2013-04-06 15:56:36 -04:00
d6d31190eb Fix fromstring usage 2013-04-06 13:40:09 -04:00
2ec574c59d Use np.fromstring instead of np.loadtxt 2013-04-06 13:32:16 -04:00
1988955671 Accumulate delta separately from data timestamp 2013-04-05 17:41:48 -04:00
4 changed files with 50 additions and 39 deletions

View File

@@ -9,6 +9,9 @@ else
endif endif
test: test:
src/decimate.py
test_insert:
@make install >/dev/null @make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null src/insert.py --file --dry-run /test/foo </dev/null

View File

@@ -41,41 +41,45 @@ def main(argv = None):
# If source is decimated, we have to decimate a bit differently # If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath): if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3 again = True
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
else: else:
n = f.src.layout_count again = False
f.process_python(function = decimate_first, rows = args.factor, f.process_numpy(decimate, args = (args.factor, again))
args = (n,))
def decimate_first(data, n): def decimate(data, interval, args, insert_function, final):
"""Decimate original data -- result has 3 times as many columns""" """Decimate data"""
# For this simple calculation, converting to a Numpy array (factor, again) = args
# and doing the math is slower than just doing it directly. (n, m) = data.shape
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate_again(data, n): # Figure out which columns to use as the source for mean, min, and max,
"""Decimate already-decimated data -- result has the same number # depending on whether this is the first decimation or we're decimating
of columns""" # again. Note that we include the timestamp in the means.
rows = iter(data) if again:
r = rows.next() c = (m - 1) // 3
r_sum = r[0:(n+1)] # e.g. c = 3
r_min = r[(n+1):(2*n+1)] # ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
r_max = r[(2*n+1):(3*n+1)] mean_col = slice(0, c + 1)
for r in rows: min_col = slice(c + 1, 2 * c + 1)
r_sum = map(operator.add, r_sum, r[0:(n+1)]) max_col = slice(2 * c + 1, 3 * c + 1)
r_min = map(min, r_min, r[(n+1):(2*n+1)]) else:
r_max = map(max, r_max, r[(2*n+1):(3*n+1)]) mean_col = slice(0, m)
r_mean = [ x / len(data) for x in r_sum ] min_col = slice(1, m)
return [ r_mean + r_min + r_max ] max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data.shape = (n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -367,8 +367,11 @@ class Filter(object):
extract = extractor(self.src.path, interval.start, interval.end) extract = extractor(self.src.path, interval.start, interval.end)
old_array = np.array([]) old_array = np.array([])
for batched in batch(extract, rows): for batched in batch(extract, rows):
# Read in this batch of data # Read in this batch of data. This turns out to
new_array = np.loadtxt(batched) # be a very fast way to read and convert it (order
# of magnitude faster than numpy.loadtxt)
new_array = np.fromstring("\n".join(batched), sep=' ')
new_array = new_array.reshape(-1, self.src.total_count)
# If we still had old data left, combine it # If we still had old data left, combine it
if old_array.shape[0] != 0: if old_array.shape[0] != 0:

View File

@@ -132,9 +132,10 @@ def main(argv = None):
data_ts_base = 0 data_ts_base = 0
data_ts_inc = 0 data_ts_inc = 0
data_ts_rate = args.rate data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts(): def get_data_ts():
if args.delta: if args.delta:
return data_ts_base return data_ts_base + data_ts_delta
else: else:
return data_ts_base + rate_to_period(data_ts_rate, return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc) data_ts_inc)
@@ -207,12 +208,12 @@ def main(argv = None):
pass pass
continue continue
# If --delta mode, increment data_ts_base by the delta # If --delta mode, increment data_ts_delta by the
# from the file. # delta from the file.
if args.delta: if args.delta:
try: try:
(delta, line) = line.split(None, 1) (delta, line) = line.split(None, 1)
data_ts_base += float(delta) data_ts_delta += float(delta)
except ValueError: except ValueError:
raise ParseError(filename, "can't parse delta") raise ParseError(filename, "can't parse delta")
@@ -247,7 +248,7 @@ def main(argv = None):
timestamp_to_human(clock_ts)) timestamp_to_human(clock_ts))
stream.finalize() stream.finalize()
data_ts_base = data_ts = clock_ts data_ts_base = data_ts = clock_ts
data_ts_inc = 0 data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it # Don't use this clock time anymore until we update it
clock_ts = None clock_ts = None