diff --git a/Makefile b/Makefile index 0788ec5..cd33d76 100644 --- a/Makefile +++ b/Makefile @@ -21,12 +21,16 @@ test_copy: test_prep: @make install >/dev/null - src/prep.py -c 3 \ - /lees-compressor/no-leak/raw \ - /lees-compressor/no-leak/sinefit \ - /lees-compressor/no-leak/prep \ - -s '2013-02-19 18:00:00' \ - -r 0 + -nilmtool destroy -R /test/raw + -nilmtool destroy -R /test/sinefit + -nilmtool destroy -R /test/prep + nilmtool create /test/raw float32_2 + nilmtool create /test/sinefit float32_3 + nilmtool create /test/prep float32_8 + nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat + src/sinefit.py -c 1 /test/raw /test/sinefit + src/prep.py -c 2 /test/raw /test/sinefit /test/prep + nilmtool extract -s min -e max /test/prep | head -20 test_decimate: -@nilmtool destroy /lees-compressor/no-leak/raw/4 || true diff --git a/src/prep.py b/src/prep.py index f1043e2..cdd1fc9 100755 --- a/src/prep.py +++ b/src/prep.py @@ -19,12 +19,14 @@ def main(argv = None): group.add_argument("-c", "--column", action="store", type=int, help="Column number (first data column is 1)") group.add_argument("-n", "--nharm", action="store", type=int, default=4, - help="number of odd harmonics to compute") + help="number of odd harmonics to compute (default 4)") + group.add_argument("-N", "--nshift", action="store", type=int, default=1, + help="number of shifted FFTs per period (default 1)") exc = group.add_mutually_exclusive_group() exc.add_argument("-r", "--rotate", action="store", type=float, - help="rotate FFT output by this many degrees") + help="rotate FFT output by this many degrees (default 0)") exc.add_argument("-R", "--rotate-rad", action="store", type=float, - help="rotate FFT output by this many radians") + help="rotate FFT output by this many radians (default 0)") group.add_argument("srcpath", action="store", help="Path of raw input, e.g. /foo/raw") @@ -51,6 +53,9 @@ def main(argv = None): if args.nharm < 1 or args.nharm > 32: parser.error("number of odd harmonics must be 1-32") + if args.nshift < 1: + parser.error("number of shifted FFTs must be >= 1") + if args.rotate is not None: rotation = args.rotate * 2.0 * pi / 360.0 else: @@ -72,54 +77,86 @@ def main(argv = None): # Run the processing function on all data f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column, - args.nharm, rotation)) + args.nharm, rotation, args.nshift)) def process(data, interval, args, insert_function, final): - (client, sinefit_path, column, nharm, rotation) = args + (client, sinefit_path, column, nharm, rotation, nshift) = args rows = data.shape[0] data_timestamps = data[:,0] + if rows < 2: + return 0 + + last_inserted = [nilmdb.utils.time.min_timestamp] + def insert_if_nonoverlapping(data): + """Call insert_function to insert data, but only if this + data doesn't overlap with other data that we inserted.""" + if data[0][0] <= last_inserted[0]: + return + last_inserted[0] = data[-1][0] + insert_function(data) + processed = 0 out = zeros((1, nharm * 2 + 1)) # Pull out sinefit data for the entire time range of this block for sinefit_line in client.stream_extract(sinefit_path, data[0, 0], data[rows-1, 0]): + + def prep_period(t_min, t_max, rot): + """ + Compute prep coefficients from time t_min to t_max, which + are the timestamps of the start and end of one period. + Results are rotated by an additional extra_rot before + being inserted into the database. Returns the maximum + index processed, or None if the period couldn't be + processed. + """ + # Find the indices of data that correspond to (t_min, t_max) + idx_min = bisect.bisect_left(data_timestamps, t_min) + idx_max = bisect.bisect_left(data_timestamps, t_max) + if idx_min >= idx_max or idx_max >= len(data_timestamps): + return None + + # Perform FFT over those indices + N = idx_max - idx_min + d = data[idx_min:idx_max, column] + F = scipy.fftpack.fft(d) * 2.0 / N + + # If we wanted more harmonics than the FFT gave us, pad with zeros + if N < (nharm * 2): + F = r_[F, zeros(nharm * 2 - N)] + + # Fill output data. + out[0, 0] = round(t_min) + for k in range(nharm): + Fk = F[2 * k + 1] * e**(rot * 1j * (k+1)) + out[0, 2 * k + 1] = -imag(Fk) # Pk + out[0, 2 * k + 2] = real(Fk) # Qk + + insert_if_nonoverlapping(out) + return idx_max + # Extract sinefit data to get zero crossing timestamps. # t_min = beginning of period # t_max = end of period (t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ] t_max = t_min + 1e6 / f0 - # Find the indices of data that correspond to (t_min, t_max) - idx_min = bisect.bisect_left(data_timestamps, t_min) - idx_max = bisect.bisect_left(data_timestamps, t_max) - if idx_min >= idx_max: - # something's wonky; ignore this period - continue - if idx_max >= len(data_timestamps): - # max is likely past the end of our chunk, so stop - # processing this chunk now. - break - - # Perform FFT over those indices - N = idx_max - idx_min - d = data[idx_min:idx_max, column] - F = scipy.fftpack.fft(d) * 2.0 / N - - # If we wanted more harmonics than the FFT gave us, pad with zeros - if N < (nharm * 2): - F = r_[F, zeros(nharm * 2 - N)] - - # Fill output data. - out[0, 0] = t_min - for k in range(nharm): - Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1)) - out[0, 2 * k + 1] = -imag(Fk) # Pk - out[0, 2 * k + 2] = real(Fk) # Qk - - # Insert this point and continue - insert_function(out) - processed = idx_max + # Compute prep over shifted windows of the period + # (nshift is typically 1) + for n in range(nshift): + # Compute timestamps and rotations for shifted window + time_shift = n * (t_max - t_min) / nshift + shifted_min = t_min + time_shift + shifted_max = t_max + time_shift + angle_shift = n * 2 * pi / nshift + shifted_rot = rotation - angle_shift + + # Run prep computation + idx_max = prep_period(shifted_min, shifted_max, shifted_rot) + if not idx_max: + break + processed = idx_max print "Processed", processed, "of", rows, "rows" return processed