Compare commits

...

10 Commits

5 changed files with 106 additions and 42 deletions

View File

@@ -12,7 +12,7 @@ test: test_cleanup
test_cleanup:
src/cleanup.py -e extras/cleanup.cfg
src/cleanup.py -D extras/cleanup.cfg
src/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null

View File

@@ -19,7 +19,7 @@ def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = { 'h': ('hour', 60*60*24),
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
@@ -96,9 +96,9 @@ def main(argv = None):
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-D", "--dry-run", action="store_true",
parser.add_argument("-y", "--yes", action="store_true",
default = False,
help="Don't actually remove any data")
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default = False,
help="Estimate how much disk space will be used")
@@ -228,7 +228,7 @@ def main(argv = None):
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
total += i.end - i.start
if total < keep:
if total <= keep:
continue
remove_before = i.start + (total - keep)
break
@@ -238,14 +238,16 @@ def main(argv = None):
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n", timestamp_to_human(remove_before))
if not args.dry_run:
if args.yes:
client.stream_remove(path, None, remove_before)
for ap in streams[path].also_clean_paths:
printf(" also removing from %s\n", ap)
if not args.dry_run:
if args.yes:
client.stream_remove(ap, None, remove_before)
# All done
if not args.yes:
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":

View File

@@ -4,15 +4,19 @@ import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = "1.0",
version = nilmtools.__version__,
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
Wildcards and multiple paths are accepted. Decimated paths are
ignored when matching wildcards.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
@@ -23,20 +27,36 @@ def main(argv = None):
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store",
parser.add_argument("path", action="store", nargs='+',
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
info = nilmtools.filter.get_stream_info(client, args.path)
if not info:
raise Exception("path " + args.path + " not found")
# Find list of paths to process
streams = [ unicode(s[0]) for s in client.stream_list() ]
streams = [ s for s in streams if "~decim-" not in s ]
paths = []
for path in args.path:
new = fnmatch.filter(streams, unicode(path))
if not new:
print "error: no stream matched path:", path
raise SystemExit(1)
paths.extend(new)
meta = client.stream_get_metadata(args.path)
for path in paths:
do_decimation(client, args, path)
def do_decimation(client, args, path):
print "Decimating", path
info = nilmtools.filter.get_stream_info(client, path)
if not info:
raise Exception("path " + path + " not found")
meta = client.stream_get_metadata(path)
if "decimate_source" in meta:
print "Stream", args.path, "was decimated from", meta["decimate_source"]
print "Stream", path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
@@ -53,7 +73,7 @@ def main(argv = None):
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (args.path, factor)
new_path = "%s~decim-%d" % (path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
@@ -72,5 +92,7 @@ def main(argv = None):
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
return
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,8 @@
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_human
import nilmtools.filter
import nilmdb.client
from numpy import *
@@ -46,6 +48,10 @@ def main(argv = None):
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
@@ -73,7 +79,8 @@ def main(argv = None):
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column })
"prep_column": args.column,
"prep_rotation": rotation })
# Run the processing function on all data
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
@@ -101,7 +108,6 @@ def process(data, interval, args, insert_function, final):
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
def prep_period(t_min, t_max, rot):
"""
Compute prep coefficients from time t_min to t_max, which
@@ -158,7 +164,15 @@ def process(data, interval, args, insert_function, final):
break
processed = idx_max
print "Processed", processed, "of", rows, "rows"
# If we processed no data but there's lots in here, pretend we
# processed half of it.
if processed == 0 and rows > 10000:
processed = rows / 2
printf("%s: warning: no periods found; skipping %d rows\n",
timestamp_to_human(data[0][0]), processed)
else:
printf("%s: processed %d of %d rows\n",
timestamp_to_human(data[0][0]), processed, rows)
return processed
if __name__ == "__main__":

View File

@@ -18,6 +18,15 @@ def main(argv = None):
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
group.add_argument('-m', '--min-freq', action='store', type=float,
help='Minimum valid frequency '
'(default: approximate frequency / 2))')
group.add_argument('-M', '--max-freq', action='store', type=float,
help='Maximum valid frequency '
'(default: approximate frequency * 2))')
group.add_argument('-a', '--min-amp', action='store', type=float,
default=20.0,
help='Minimum signal amplitude (default: %(default)s)')
# Parse arguments
try:
@@ -34,13 +43,24 @@ def main(argv = None):
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
if args.min_freq is None:
args.min_freq = args.frequency / 2
if args.max_freq is None:
args.max_freq = args.frequency * 2
if (args.min_freq > args.max_freq or
args.min_freq > args.frequency or
args.max_freq < args.frequency):
parser.error("invalid min or max frequency")
if args.min_amp < 0:
parser.error("min amplitude must be >= 0")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency))
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
def process(data, interval, args, insert_function, final):
(column, f_expected) = args
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
@@ -66,8 +86,14 @@ def process(data, interval, args, insert_function, final):
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < (f_expected/2) or f0 > (f_expected*2):
print "frequency", f0, "too far from expected value", f_expected
if f0 < f_min or f0 > f_max:
print "frequency", f0, "outside valid range", f_min, "-", f_max
start += N
continue
# If amplitude is too low, results are probably just noise
if A < a_min:
print "amplitude", A, "below minimum threshold", a_min
start += N
continue
@@ -158,30 +184,30 @@ def sfit4(data, fs):
# Convert to Hz
f0 = i * fs / N
## Fit it
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = -arctan2(s[1], s[0]) # eqn B.22
except TypeError:
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down, just return zeros
return (0, 0, 0, 0)
C = s[2]
return (A, f0, phi, C)
if __name__ == "__main__":
main()