Compare commits

..

4 Commits

Author SHA1 Message Date
320c32cfdc Sample script for copying wildcard paths between hosts 2013-03-19 17:55:18 -04:00
0f1e442cd4 Support filtering (or copying) between two different hosts 2013-03-19 17:54:59 -04:00
3e78da12dc Rename copy.py to avoid stupid Python conflicts
Any other module (in this case, nilmdb -> datetime_tz -> pytz -> gettext)
that does an "import copy" would pull in the copy.py, if it's in the
current working directory.  Python is dumb...
2013-03-19 16:19:14 -04:00
ef9277cbff Rename nilmtools directory to just src 2013-03-19 15:42:29 -04:00
10 changed files with 134 additions and 50 deletions

View File

@@ -1,13 +1,24 @@
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test: test:
python src/filter.py \
-u "http://localhost:12380/" \
-U "http://127.0.0.1:12380/" \
/lees-compressor/no-leak/raw/1 \
/lees-compressor/no-leak/raw/4
test2:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true -@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true -@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true -@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true -@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4 time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16 python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
all:
@echo "Try 'make install'"
version: version:
python setup.py version python setup.py version

13
scripts/copy-wildcard.sh Executable file
View File

@@ -0,0 +1,13 @@
#!/bin/bash
# example: ./copy-wildcard.sh http://src:12380 http://dest:12380 /path/*
if [ $# != 3 ] ; then
echo "usage: $0 src-url dest-url path-wildcard"
exit 1
fi
set -e
nilmtool -u "$1" list "$3" | sort | while read path layout; do
nilm-copy -u "$1" -U "$2" "$path" "$path"
done

View File

@@ -1,5 +1,7 @@
#!/bin/bash #!/bin/bash
# example: ./decimate-it.sh 1 2 3 4 5 6 7 8 9 10 11 12
for i in "$@"; do for i in "$@"; do
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18 nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i)) /usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags. # Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer # https://github.com/warner/python-versioneer
import versioneer import versioneer
versioneer.versionfile_source = 'nilmtools/_version.py' versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py' versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-' versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-' versioneer.parentdir_prefix = 'nilmtools-'
@@ -65,11 +65,12 @@ setup(name='nilmtools',
], ],
packages = [ 'nilmtools', packages = [ 'nilmtools',
], ],
package_dir = { 'nilmtools': 'src' },
entry_points = { entry_points = {
'console_scripts': [ 'console_scripts': [
'nilm-decimate = nilmtools.decimate:main', 'nilm-decimate = nilmtools.decimate:main',
'nilm-insert = nilmtools.insert:main', 'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy:main', 'nilm-copy = nilmtools.copy_:main',
], ],
}, },
zip_safe = False, zip_safe = False,

View File

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-" tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-" parentdir_prefix = "nilmtools-"
versionfile_source = "nilmtools/_version.py" versionfile_source = "src/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False): def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full } variables = { "refnames": git_refnames, "full": git_full }

View File

@@ -16,16 +16,17 @@ def main():
print "Source is %s (%s)" % (e.src.path, e.src.layout) print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path) print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:" print "You could make it with a command like:"
print " nilmtool create", e.dest.path, e.src.layout print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1) raise SystemExit(1)
# Copy metadata # Copy metadata
meta = f.client.stream_get_metadata(f.src.path) meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta) f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings # Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(args.url).stream_extract extractor = nilmdb.client.Client(f.src.url).stream_extract
inserter = nilmdb.client.Client(args.url).stream_insert_context inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
for i in f.intervals(): for i in f.intervals():
print "Processing", f.interval_string(i) print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx: with inserter(f.dest.path, i.start, i.end) as insert_ctx:

View File

@@ -28,17 +28,18 @@ def main():
else: else:
rec = 'float32_' + str(src.layout_count * 3) rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:" print "You could make it with a command like:"
print " nilmtool create", dest.path, rec print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1) raise SystemExit(1)
if not (args.factor >= 2): if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more") raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": args.srcpath, f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor }) "decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently # If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath): if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3 n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor, f.process_python(function = decimate_again, rows = args.factor,
args = (n,)) args = (n,))

View File

@@ -1,6 +1,9 @@
#!/usr/bin/python #!/usr/bin/python
from __future__ import absolute_import
import nilmdb.client import nilmdb.client
from nilmdb.client import Client
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human, from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds) timestamp_to_seconds)
@@ -21,8 +24,10 @@ class MissingDestination(Exception):
Exception.__init__(self, "destination path " + dest.path + " not found") Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object): class StreamInfo(object):
def __init__(self, info): def __init__(self, url, info, interhost):
self.url = url
self.info = info self.info = info
self.interhost = interhost
try: try:
self.path = info[0] self.path = info[0]
self.layout = info[1] self.layout = info[1]
@@ -38,25 +43,38 @@ class StreamInfo(object):
def __str__(self): def __str__(self):
"""Print stream info as a string""" """Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours", res = ""
if self.interhost:
res = sprintf("[%s] ", self.url)
res += sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6, self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0) self.seconds / 3600.0)
return res
class Filter(object): class Filter(object):
def __init__(self): def __init__(self):
self._parser = None self._parser = None
self._args = None self._client_src = None
self._client = None self._client_dest = None
self._using_client = False self._using_client = False
self.src = None self.src = None
self.dest = None self.dest = None
self.start = None
self.end = None
self.interhost = False
@property @property
def client(self): def client_src(self):
if self._using_client: if self._using_client:
raise Exception("Filter client is in use; make another") raise Exception("Filter client is in use; make another")
return self._client return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description = "Filter data"): def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@@ -67,6 +85,9 @@ class Filter(object):
group.add_argument("-u", "--url", action="store", group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/", default="http://localhost:12380/",
help="Server URL (default: %(default)s)") help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true", group.add_argument("-D", "--dry-run", action="store_true",
default = False, default = False,
help="Just print intervals that would be " help="Just print intervals that would be "
@@ -93,22 +114,30 @@ class Filter(object):
def parse_args(self): def parse_args(self):
args = self._parser.parse_args() args = self._parser.parse_args()
self._args = args
self._client = nilmdb.client.Client(args.url)
if args.srcpath == args.destpath: if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
raise Exception("source and destination path must be different") raise Exception("source and destination path must be different")
# Open and print info about the streams # Open and print info about the streams
src = self._client.stream_list(args.srcpath, extended = True) src = self._client_src.stream_list(args.srcpath, extended = True)
if len(src) != 1: if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found") raise Exception("source path " + args.srcpath + " not found")
self.src = StreamInfo(src[0]) self.src = StreamInfo(args.url, src[0], self.interhost)
dest = self._client.stream_list(args.destpath, extended = True) dest = self._client_dest.stream_list(args.destpath, extended = True)
if len(dest) != 1: if len(dest) != 1:
raise MissingDestination(self.src, StreamInfo([args.destpath])) raise MissingDestination(self.src,
self.dest = StreamInfo(dest[0]) StreamInfo(args.dest_url, [args.destpath],
self.interhost))
self.dest = StreamInfo(args.dest_url, dest[0], self.interhost)
print "Source:", self.src print "Source:", self.src
print " Dest:", self.dest print " Dest:", self.dest
@@ -118,25 +147,51 @@ class Filter(object):
print self.interval_string(interval) print self.interval_string(interval)
raise SystemExit(0) raise SystemExit(0)
self.start = args.start
self.end = args.end
return args return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self): def intervals(self):
"""Generate all the intervals that this filter should process""" """Generate all the intervals that this filter should process"""
self._using_client = True self._using_client = True
saved_int = None
for (start, end) in self._client.stream_intervals(
self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end):
# Join adjacent intervals if self.interhost:
if saved_int is not None: # Do the difference ourselves
if saved_int.end == start: s_intervals = ( Interval(start, end)
start = saved_int.start for (start, end) in
else: self._client_src.stream_intervals(
yield saved_int self.src.path,
saved_int = Interval(start, end) start = self.start, end = self.end) )
if saved_int is not None: d_intervals = ( Interval(start, end)
yield saved_int for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
for interval in self._optimize_int(intervals):
yield interval
self._using_client = False self._using_client = False
# Misc helpers # Misc helpers
@@ -151,7 +206,7 @@ class Filter(object):
def check_dest_metadata(self, data): def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If """See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'.""" there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath) metadata = self._client_dest.stream_get_metadata(self.dest.path)
for key in data: for key in data:
wanted = str(data[key]) wanted = str(data[key])
val = metadata.get(key, wanted) val = metadata.get(key, wanted)
@@ -166,7 +221,7 @@ class Filter(object):
m += "this error.\n" m += "this error.\n"
raise Exception(m) raise Exception(m)
# All good -- write the metadata in case it's not already there # All good -- write the metadata in case it's not already there
self._client.stream_update_metadata(self._args.destpath, data) self._client_dest.stream_update_metadata(self.dest.path, data)
# Main processing helper # Main processing helper
def process_python(self, function, rows, args = None, partial = False): def process_python(self, function, rows, args = None, partial = False):
@@ -192,10 +247,8 @@ class Filter(object):
""" """
if args is None: if args is None:
args = [] args = []
extractor = nilmdb.client.Client(self._args.url).stream_extract extractor = Client(self.src.url).stream_extract
inserter = nilmdb.client.Client(self._args.url).stream_insert_context inserter = Client(self.dest.url).stream_insert_context
src = self._args.srcpath
dest = self._args.destpath
# Parse input data. We use homogenous types for now, which # Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int. # means the timestamp type will be either float or int.
@@ -209,9 +262,11 @@ class Filter(object):
for interval in self.intervals(): for interval in self.intervals():
print "Processing", self.interval_string(interval) print "Processing", self.interval_string(interval)
with inserter(dest, interval.start, interval.end) as insert_ctx: with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
src_array = [] src_array = []
for line in extractor(src, interval.start, interval.end): for line in extractor(self.src.path,
interval.start, interval.end):
# Read in data # Read in data
src_array.append([ float(x) for x in line.split() ]) src_array.append([ float(x) for x in line.split() ])

View File