Compare commits

...

8 Commits

Author SHA1 Message Date
320c32cfdc Sample script for copying wildcard paths between hosts 2013-03-19 17:55:18 -04:00
0f1e442cd4 Support filtering (or copying) between two different hosts 2013-03-19 17:54:59 -04:00
3e78da12dc Rename copy.py to avoid stupid Python conflicts
Any other module (in this case, nilmdb -> datetime_tz -> pytz -> gettext)
that does an "import copy" would pull in the copy.py, if it's in the
current working directory.  Python is dumb...
2013-03-19 16:19:14 -04:00
ef9277cbff Rename nilmtools directory to just src 2013-03-19 15:42:29 -04:00
de68956f76 Update copy tool 2013-03-16 23:13:45 -04:00
e73dd313d5 Reworked things to match nilmdb updates; a bit faster 2013-03-16 14:46:49 -04:00
d23fa9ee78 Remove unnecessary option group 2013-03-12 19:02:29 -04:00
2b9ecc6697 Add nilm-copy tool 2013-03-12 18:52:39 -04:00
12 changed files with 482 additions and 314 deletions

View File

@@ -1,11 +1,24 @@
test:
nilmtool remove /lees-compressor/noleak/raw~4 -s 2000 -e 2020
nilmtool remove /lees-compressor/noleak/raw~16 -s 2000 -e 2020
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw /lees-compressor/noleak/raw~4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw~4 /lees-compressor/noleak/raw~16
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test:
python src/filter.py \
-u "http://localhost:12380/" \
-U "http://127.0.0.1:12380/" \
/lees-compressor/no-leak/raw/1 \
/lees-compressor/no-leak/raw/4
test2:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version

View File

@@ -1,70 +0,0 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
print "Source is %s (%s)" % (e.src, e.layout)
print "Destination %s doesn't exist" % (e.dest)
if "decimate_source" in f.client.stream_get_metadata(e.src):
rec = e.layout
elif 'int32' in e.layout_type or 'float64' in e.layout_type:
rec = 'float64_' + str(e.layout_count * 3)
else:
rec = 'float32_' + str(e.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool create", e.dest, rec
raise SystemExit(1)
f.check_dest_metadata({ "decimate_source": args.srcpath,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
f.process(function = decimate_again, rows = args.factor)
else:
f.process(function = decimate_first, rows = args.factor)
def decimate_first(data):
"""Decimate original data -- result has 3 times as many columns"""
data = np.array(data)
rows, cols = data.shape
n = cols - 1
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, 1 : n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 1 : n+1], 0)
return [out]
def decimate_again(data):
"""Decimate already-decimated data -- result has the same number
of columns"""
data = np.array(data)
rows, cols = data.shape
n = (cols - 1) // 3
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, n+1 : 2*n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 2*n+1 : 3*n+1], 0)
return [out]
if __name__ == "__main__":
main()

View File

@@ -1,220 +0,0 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
import nilmtools
import itertools
import time
import sys
import re
import argparse
class MissingDestination(Exception):
def __init__(self, src, layout, dest):
self.src = src
self.layout = layout
self.layout_type = layout.split('_')[0]
self.layout_count = int(layout.split('_')[1])
self.dest = dest
Exception.__init__(self, "destination path " + dest + " not found")
class Filter(object):
def __init__(self):
self._parser = None
self._args = None
self._client = None
self._using_client = False
self.srcinfo = None
self.destinfo = None
@property
def client(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client
def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def parse_args(self):
args = self._parser.parse_args()
self._args = args
self._client = nilmdb.client.Client(args.url)
if args.srcpath == args.destpath:
raise Exception("source and destination path must be different")
# Open and print info about the streams
src = self._client.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
self.srcinfo = src[0]
dest = self._client.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise MissingDestination(self.srcinfo[0], self.srcinfo[1],
args.destpath)
self.destinfo = dest[0]
print "Source:", self.stream_info_string(self.srcinfo)
print " Dest:", self.stream_info_string(self.destinfo)
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
return args
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
for i in self._client.stream_intervals(
self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end):
yield i
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse).totimestamp()
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def stream_info_string(self, info):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
info[0], info[1], info[4] / 1e6, info[5] / 3600)
def interval_string(self, interval):
"""Print interval as a string"""
return sprintf("[ %s -> %s ]", format_time(interval[0]),
format_time(interval[1]))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath)
rows = self.destinfo[4]
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. You can change the stream's "
m += "metadata manually, or\n"
m += "remove existing data from the stream, to prevent "
m += "this error.\n"
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client.stream_update_metadata(self._args.destpath, data)
# Main processing helper
def process(self, function, rows, partial = True, args = None):
"""Process data in chunks of 'rows' data at a time.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed an array containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new array of data, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = nilmdb.client.Client(self._args.url).stream_extract
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
src = self._args.srcpath
dest = self._args.destpath
islice = itertools.islice
# Figure out how to format output data
dest_layout = self.destinfo[1].split('_')[1]
def int_formatter(row):
return ("%.6f " % row[0]) + " ".join(str(int(x)) for x in row[1:])
def float_formatter(row):
return ("%.6f " % row[0]) + " ".join(repr(x) for x in row[1:])
if "int" in dest_layout:
formatter = int_formatter
else:
formatter = float_formatter
for (start, end) in self.intervals():
print "Processing", self.interval_string((start, end))
with inserter(dest, start, end) as insert_ctx:
src_array = []
for line in extractor(src, start, end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
def main():
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args()
for (start, end) in f.intervals():
print "Generic filter: need to handle", start, " to ", end
if __name__ == "__main__":
main()

13
scripts/copy-wildcard.sh Executable file
View File

@@ -0,0 +1,13 @@
#!/bin/bash
# example: ./copy-wildcard.sh http://src:12380 http://dest:12380 /path/*
if [ $# != 3 ] ; then
echo "usage: $0 src-url dest-url path-wildcard"
exit 1
fi
set -e
nilmtool -u "$1" list "$3" | sort | while read path layout; do
nilm-copy -u "$1" -U "$2" "$path" "$path"
done

8
scripts/decimate-it.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/bin/bash
# example: ./decimate-it.sh 1 2 3 4 5 6 7 8 9 10 11 12
for i in "$@"; do
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))
done

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
@@ -65,10 +65,12 @@ setup(name='nilmtools',
],
packages = [ 'nilmtools',
],
package_dir = { 'nilmtools': 'src' },
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy_:main',
],
},
zip_safe = False,

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "nilmtools/_version.py"
versionfile_source = "src/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

37
src/copy_.py Executable file
View File

@@ -0,0 +1,37 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import sys
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
# Copy metadata
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(f.src.url).stream_extract
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
for i in f.intervals():
print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for row in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(row + "\n")
if __name__ == "__main__":
main()

80
src/decimate.py Executable file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import operator
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
src = e.src
dest = e.dest
print "Source is %s (%s)" % (src.path, src.layout)
print "Destination %s doesn't exist" % (dest.path)
if "decimate_source" in f.client.stream_get_metadata(src.path):
rec = src.layout
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else:
rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
else:
n = f.src.layout_count
f.process_python(function = decimate_first, rows = args.factor,
args = (n,))
def decimate_first(data, n):
"""Decimate original data -- result has 3 times as many columns"""
# For this simple calculation, converting to a Numpy array
# and doing the math is slower than just doing it directly.
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate_again(data, n):
"""Decimate already-decimated data -- result has the same number
of columns"""
rows = iter(data)
r = rows.next()
r_sum = r[0:(n+1)]
r_min = r[(n+1):(2*n+1)]
r_max = r[(2*n+1):(3*n+1)]
for r in rows:
r_sum = map(operator.add, r_sum, r[0:(n+1)])
r_min = map(min, r_min, r[(n+1):(2*n+1)])
r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
r_mean = [ x / len(data) for x in r_sum ]
return [ r_mean + r_min + r_max ]
if __name__ == "__main__":
main()

300
src/filter.py Normal file
View File

@@ -0,0 +1,300 @@
#!/usr/bin/python
from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools
import itertools
import time
import sys
import re
import argparse
class MissingDestination(Exception):
def __init__(self, src, dest):
self.src = src
self.dest = dest
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, url, info, interhost):
self.url = url
self.info = info
self.interhost = interhost
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except IndexError, TypeError:
pass
def __str__(self):
"""Print stream info as a string"""
res = ""
if self.interhost:
res = sprintf("[%s] ", self.url)
res += sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
return res
class Filter(object):
def __init__(self):
self._parser = None
self._client_src = None
self._client_dest = None
self._using_client = False
self.src = None
self.dest = None
self.start = None
self.end = None
self.interhost = False
@property
def client_src(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def parse_args(self):
args = self._parser.parse_args()
if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
raise Exception("source and destination path must be different")
# Open and print info about the streams
src = self._client_src.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
self.src = StreamInfo(args.url, src[0], self.interhost)
dest = self._client_dest.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise MissingDestination(self.src,
StreamInfo(args.dest_url, [args.destpath],
self.interhost))
self.dest = StreamInfo(args.dest_url, dest[0], self.interhost)
print "Source:", self.src
print " Dest:", self.dest
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
self.start = args.start
self.end = args.end
return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start = self.start, end = self.end) )
d_intervals = ( Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
for interval in self._optimize_int(intervals):
yield interval
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. You can change the stream's "
m += "metadata manually, or\n"
m += "remove existing data from the stream, to prevent "
m += "this error.\n"
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# Main processing helper
def process_python(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time.
This provides data as nested Python lists and expects the same
back.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed a list containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new list of rdata, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
# Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int.
if "int" in self.src.layout_type:
parser = lambda line: [ int(x) for x in line.split() ]
else:
parser = lambda line: [ float(x) for x in line.split() ]
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
src_array = []
for line in extractor(self.src.path,
interval.start, interval.end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
def main():
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args()
for i in f.intervals():
print "Generic filter: need to handle", f.interval_string(i)
if __name__ == "__main__":
main()

View File

@@ -2,7 +2,9 @@
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import nilmtools
import time
@@ -61,20 +63,22 @@ def main(args = None):
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_step = 1.0 / args.rate
data_ts_rate = args.rate
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock time updated to %s\n", format_time(clock_ts))
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n", diff)
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else:
printf(" (data timestamp behind by %.6f sec)\n", -diff)
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
@@ -92,7 +96,7 @@ def main(args = None):
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename).totimestamp() - 3600
clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
print_clock_updated()
except ValueError:
pass
@@ -101,7 +105,8 @@ def main(args = None):
# Read each line
for line in f:
data_ts = data_ts_base + data_ts_inc * data_ts_step
data_ts = data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# If no content other than the newline, skip it
if len(line) <= 1:
@@ -110,7 +115,7 @@ def main(args = None):
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:]).totimestamp()
clock_ts = parse_time(line[1:])
print_clock_updated()
except ValueError:
pass
@@ -118,30 +123,30 @@ def main(args = None):
# If inserting live, use clock timestamp
if live:
clock_ts = time.time()
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - 10) > clock_ts:
if (data_ts - seconds_to_timestamp(10)) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
format_time(data_ts),
format_time(clock_ts))
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
raise ParseError(filename, err)
if (data_ts + 10) < clock_ts:
if (data_ts + seconds_to_timestamp(10)) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
format_time(data_ts),
format_time(clock_ts))
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
@@ -166,7 +171,7 @@ def main(args = None):
continue
# Insert it
stream.insert("%.6f %s" % (data_ts, line))
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":