Compare commits

...

8 Commits

Author SHA1 Message Date
fd1b33401f Require a --yes argument before actually cleaning data 2013-04-09 20:13:38 -04:00
4c748ec00c Fix minor bugs 2013-04-09 20:08:25 -04:00
b72d6b6908 Warn if column count is wrong for this nharm value 2013-04-09 19:59:59 -04:00
80d642e52e Change nilm-cleanup config file format, tweak output 2013-04-09 19:43:41 -04:00
001b89b1d2 Support multiple shifted FFTs per period in nilm-prep.
New option --nshift controls how many shifted FFT windows to perform
per period.  "nilm-prep -N 2" is similar to old prep behavior.  Note
that this is redundant information and takes up extra storage space,
though.
2013-04-09 18:53:27 -04:00
f978823505 Fix prep scaling and fix comments 2013-04-09 17:44:13 -04:00
ffd6675979 Remove outdated code 2013-04-08 19:46:16 -04:00
5b67b68fd2 Don't import matplotlib if we don't need it 2013-04-08 18:59:23 -04:00
6 changed files with 371 additions and 52 deletions

View File

@@ -8,8 +8,11 @@ else
@echo "Try 'make install'"
endif
test:
src/decimate.py
test: test_cleanup
test_cleanup:
src/cleanup.py -e extras/cleanup.cfg
src/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null
@@ -21,12 +24,16 @@ test_copy:
test_prep:
@make install >/dev/null
src/prep.py -c 3 \
/lees-compressor/no-leak/raw \
/lees-compressor/no-leak/sinefit \
/lees-compressor/no-leak/prep \
-s '2013-02-19 18:00:00' \
-r 0
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
-nilmtool destroy -R /test/prep
nilmtool create /test/raw float32_2
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
src/sinefit.py -c 1 /test/raw /test/sinefit
src/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true

22
extras/cleanup.cfg Normal file
View File

@@ -0,0 +1,22 @@
[/lees-compressor/no-leak/prep]
keep = 2d
rate = 60
[*/raw]
keep = 2d
[*/something]
rate = 10
[*/sinefit]
keep = 1w
decimated = False
[/test/raw]
keep = 0.01d
[/test/sinefit]
keep = 0.01d
[/test/prep]
keep = 0.01d

View File

@@ -78,6 +78,7 @@ setup(name='nilmtools',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
],
},
zip_safe = False,

254
src/cleanup.py Executable file
View File

@@ -0,0 +1,254 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp)
from nilmdb.utils.diskusage import human_size
from nilmdb.utils.interval import Interval
import nilmdb.client
import nilmdb.client.numpyclient
import nilmtools
import argparse
import ConfigParser
import sys
import collections
import fnmatch
import re
def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
'y': ('year', 60*60*24*365) }
def __init__(self, val):
for u in self._units:
if val.endswith(u):
self.unit = self._units[u][0]
self.scale = self._units[u][1]
self.count = float(val[:-len(u)])
break
else:
raise ValueError("unknown units: " + units)
def seconds(self):
return self.count * self.scale
def describe_seconds(self, seconds):
count = seconds / self.scale
units = self.unit if count == 1 else (self.unit + "s")
if count == int(count):
return sprintf("%d %s", count, units)
else:
return sprintf("%.2f %s", count, units)
def __str__(self):
return self.describe_seconds(self.seconds())
class StreamCleanupConfig(object):
def __init__(self, info):
self.path = info[0]
self.layout = info[1]
if info[4] != 0 and info[5] != 0:
self.rate = info[4] / timestamp_to_seconds(info[5])
else:
self.rate = None
self.keep = None
self.clean_decimated = True
self.decimated_from = None
self.also_clean_paths = []
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Clean up old data from streams using a configuration file to specify
which data to remove.
The format of the config file is as follows:
[/stream/path]
keep = 3w # keep up to 3 weeks of data
rate = 8000 # optional, used for the --estimate option
decimated = false # whether to delete decimated data too (default true)
[*/prep]
keep = 3.5m # or 2520h or 105d or 15w or 0.29y
The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks,
'm' for months, or 'y' for years.
Streams paths may include wildcards. If a path is matched by more than
one config section, data from the last config section counts.
Decimated streams (paths containing '~decim-') are treated specially:
- They don't match wildcards
- When deleting data from a parent stream, data is also deleted
from its decimated streams, unless decimated=false
Rate is optional and is only used for the --estimate option.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-y", "--yes", action="store_true",
default = False,
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default = False,
help="Estimate how much disk space will be used")
parser.add_argument("configfile", type=argparse.FileType('r'),
help="Configuration file")
args = parser.parse_args(argv)
# Parse config file
config = ConfigParser.RawConfigParser()
config.readfp(args.configfile)
# List all streams
client = nilmdb.client.Client(args.url)
streamlist = client.stream_list(extended = True)
# Create config objects
streams = collections.OrderedDict()
for s in streamlist:
streams[s[0]] = StreamCleanupConfig(s)
m = re.search(r"^(.*)~decim-[0-9]+$", s[0])
if m:
streams[s[0]].decimated_from = m.group(1)
# Build up configuration
for section in config.sections():
matched = False
for path in streams.iterkeys():
# Decimated streams only allow exact matches
if streams[path].decimated_from and path != section:
continue
if not fnmatch.fnmatch(path, section):
continue
matched = True
options = config.options(section)
# Keep period (days, weeks, months, years)
if 'keep' in options:
streams[path].keep = TimePeriod(config.get(section, 'keep'))
options.remove('keep')
# Rate
if 'rate' in options:
streams[path].rate = config.getfloat(section, 'rate')
options.remove('rate')
# Decimated
if 'decimated' in options:
val = config.getboolean(section, 'decimated')
streams[path].clean_decimated = val
options.remove('decimated')
for leftover in options:
warn("option '%s' for '%s' is unknown", leftover, section)
if not matched:
warn("config for '%s' did not match any existing streams", section)
# List all decimated streams in the parent stream's info
for path in streams.keys():
src = streams[path].decimated_from
if src and src in streams:
if streams[src].clean_decimated:
streams[src].also_clean_paths.append(path)
del streams[path]
# Warn about streams that aren't getting cleaned up
for path in streams.keys():
if streams[path].keep is None or streams[path].keep.seconds() < 0:
warn("no config for existing stream '%s'", path)
del streams[path]
if args.estimate:
# Estimate disk usage
total = 0
for path in streams.keys():
rate = streams[path].rate
if not rate or rate < 0:
warn("unable to estimate disk usage for stream '%s' because "
"the data rate is unknown", path)
continue
printf("%s:\n", path)
layout = streams[path].layout
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
per_row = dtype.itemsize
per_sec = per_row * rate
printf("%17s: %s per row, %s rows per second\n",
"base rate",
human_size(per_row),
round(rate,1))
printf("%17s: %s per hour, %s per day\n",
"base size",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
# If we'll be cleaning up decimated data, add an
# estimation for how much room decimated data takes up.
if streams[path].clean_decimated:
d_layout = "float32_" + str(3*(int(layout.split('_')[1])))
d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout)
# Assume the decimations will be a factor of 4
# sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize
d_per_row = d_dtype.itemsize
factor = 4.0
d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor)))
per_sec += d_per_sec
printf("%17s: %s per hour, %s per day\n",
"with decimation",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
keep = per_sec * streams[path].keep.seconds()
printf("%17s: %s\n\n",
"keep " + str(streams[path].keep), human_size(keep))
total += keep
printf("Total estimated disk usage for these streams:\n")
printf(" %s\n", human_size(total))
raise SystemExit(0)
# Do the cleanup
for path in streams:
printf("%s: keep %s\n", path, streams[path].keep)
# Figure out the earliest timestamp we should keep.
intervals = [ Interval(start, end) for (start, end) in
reversed(list(client.stream_intervals(path))) ]
total = 0
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
total += i.end - i.start
if total <= keep:
continue
remove_before = i.start + (total - keep)
break
else:
printf(" nothing to do (only %s of data present)\n",
streams[path].keep.describe_seconds(
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n", timestamp_to_human(remove_before))
if args.yes:
client.stream_remove(path, None, remove_before)
for ap in streams[path].also_clean_paths:
printf(" also removing from %s\n", ap)
if args.yes:
client.stream_remove(ap, None, remove_before)
# All done
if not args.yes:
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":
main()

View File

@@ -281,14 +281,6 @@ class Filter(object):
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
def batch(iterable, size):
c = itertools.count()
for k, g in itertools.groupby(iterable, lambda x: c.next() // size):
yield g
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,

View File

@@ -8,7 +8,7 @@ import nilmdb.client
from numpy import *
import scipy.fftpack
import scipy.signal
from matplotlib import pyplot as p
#from matplotlib import pyplot as p
import bisect
def main(argv = None):
@@ -19,12 +19,14 @@ def main(argv = None):
group.add_argument("-c", "--column", action="store", type=int,
help="Column number (first data column is 1)")
group.add_argument("-n", "--nharm", action="store", type=int, default=4,
help="number of odd harmonics to compute")
help="number of odd harmonics to compute (default 4)")
group.add_argument("-N", "--nshift", action="store", type=int, default=1,
help="number of shifted FFTs per period (default 1)")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rotate", action="store", type=float,
help="rotate FFT output by this many degrees")
help="rotate FFT output by this many degrees (default 0)")
exc.add_argument("-R", "--rotate-rad", action="store", type=float,
help="rotate FFT output by this many radians")
help="rotate FFT output by this many radians (default 0)")
group.add_argument("srcpath", action="store",
help="Path of raw input, e.g. /foo/raw")
@@ -44,6 +46,10 @@ def main(argv = None):
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
@@ -51,6 +57,9 @@ def main(argv = None):
if args.nharm < 1 or args.nharm > 32:
parser.error("number of odd harmonics must be 1-32")
if args.nshift < 1:
parser.error("number of shifted FFTs must be >= 1")
if args.rotate is not None:
rotation = args.rotate * 2.0 * pi / 360.0
else:
@@ -72,52 +81,86 @@ def main(argv = None):
# Run the processing function on all data
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation))
args.nharm, rotation, args.nshift))
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation) = args
(client, sinefit_path, column, nharm, rotation, nshift) = args
rows = data.shape[0]
data_timestamps = data[:,0]
if rows < 2:
return 0
last_inserted = [nilmdb.utils.time.min_timestamp]
def insert_if_nonoverlapping(data):
"""Call insert_function to insert data, but only if this
data doesn't overlap with other data that we inserted."""
if data[0][0] <= last_inserted[0]:
return
last_inserted[0] = data[-1][0]
insert_function(data)
processed = 0
out = zeros((1, nharm * 2 + 1))
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
# Extract sinefit data to get zero crossing timestamps
def prep_period(t_min, t_max, rot):
"""
Compute prep coefficients from time t_min to t_max, which
are the timestamps of the start and end of one period.
Results are rotated by an additional extra_rot before
being inserted into the database. Returns the maximum
index processed, or None if the period couldn't be
processed.
"""
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max or idx_max >= len(data_timestamps):
return None
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) * 2.0 / N
# If we wanted more harmonics than the FFT gave us, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data.
out[0, 0] = round(t_min)
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rot * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
insert_if_nonoverlapping(out)
return idx_max
# Extract sinefit data to get zero crossing timestamps.
# t_min = beginning of period
# t_max = end of period
(t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ]
t_max = t_min + 1e6 / f0
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max:
# something's wonky; ignore this period
continue
if idx_max >= len(data_timestamps):
# max is likely past the end of our chunk, so stop
# processing this chunk now.
break
# Compute prep over shifted windows of the period
# (nshift is typically 1)
for n in range(nshift):
# Compute timestamps and rotations for shifted window
time_shift = n * (t_max - t_min) / nshift
shifted_min = t_min + time_shift
shifted_max = t_max + time_shift
angle_shift = n * 2 * pi / nshift
shifted_rot = rotation - angle_shift
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) / N
# If we wanted more harmonics than we have, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data
out[0, 0] = t_min
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
# Insert it and continue
insert_function(out)
processed = idx_max
# Run prep computation
idx_max = prep_period(shifted_min, shifted_max, shifted_rot)
if not idx_max:
break
processed = idx_max
print "Processed", processed, "of", rows, "rows"
return processed