Compare commits

...

4 Commits

Author SHA1 Message Date
320c32cfdc Sample script for copying wildcard paths between hosts 2013-03-19 17:55:18 -04:00
0f1e442cd4 Support filtering (or copying) between two different hosts 2013-03-19 17:54:59 -04:00
3e78da12dc Rename copy.py to avoid stupid Python conflicts
Any other module (in this case, nilmdb -> datetime_tz -> pytz -> gettext)
that does an "import copy" would pull in the copy.py, if it's in the
current working directory.  Python is dumb...
2013-03-19 16:19:14 -04:00
ef9277cbff Rename nilmtools directory to just src 2013-03-19 15:42:29 -04:00
10 changed files with 134 additions and 50 deletions

View File

@@ -1,13 +1,24 @@
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test:
python src/filter.py \
-u "http://localhost:12380/" \
-U "http://127.0.0.1:12380/" \
/lees-compressor/no-leak/raw/1 \
/lees-compressor/no-leak/raw/4
test2:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
all:
@echo "Try 'make install'"
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version

13
scripts/copy-wildcard.sh Executable file
View File

@@ -0,0 +1,13 @@
#!/bin/bash
# example: ./copy-wildcard.sh http://src:12380 http://dest:12380 /path/*
if [ $# != 3 ] ; then
echo "usage: $0 src-url dest-url path-wildcard"
exit 1
fi
set -e
nilmtool -u "$1" list "$3" | sort | while read path layout; do
nilm-copy -u "$1" -U "$2" "$path" "$path"
done

View File

@@ -1,5 +1,7 @@
#!/bin/bash
# example: ./decimate-it.sh 1 2 3 4 5 6 7 8 9 10 11 12
for i in "$@"; do
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
@@ -65,11 +65,12 @@ setup(name='nilmtools',
],
packages = [ 'nilmtools',
],
package_dir = { 'nilmtools': 'src' },
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy:main',
'nilm-copy = nilmtools.copy_:main',
],
},
zip_safe = False,

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "nilmtools/_version.py"
versionfile_source = "src/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

View File

@@ -16,16 +16,17 @@ def main():
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool create", e.dest.path, e.src.layout
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
# Copy metadata
meta = f.client.stream_get_metadata(f.src.path)
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(args.url).stream_extract
inserter = nilmdb.client.Client(args.url).stream_insert_context
extractor = nilmdb.client.Client(f.src.url).stream_extract
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
for i in f.intervals():
print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx:

View File

@@ -28,17 +28,18 @@ def main():
else:
rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool create", dest.path, rec
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": args.srcpath,
f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))

View File

@@ -1,6 +1,9 @@
#!/usr/bin/python
from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
@@ -21,8 +24,10 @@ class MissingDestination(Exception):
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, info):
def __init__(self, url, info, interhost):
self.url = url
self.info = info
self.interhost = interhost
try:
self.path = info[0]
self.layout = info[1]
@@ -38,25 +43,38 @@ class StreamInfo(object):
def __str__(self):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
res = ""
if self.interhost:
res = sprintf("[%s] ", self.url)
res += sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
return res
class Filter(object):
def __init__(self):
self._parser = None
self._args = None
self._client = None
self._client_src = None
self._client_dest = None
self._using_client = False
self.src = None
self.dest = None
self.start = None
self.end = None
self.interhost = False
@property
def client(self):
def client_src(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser(
@@ -67,6 +85,9 @@ class Filter(object):
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
@@ -93,22 +114,30 @@ class Filter(object):
def parse_args(self):
args = self._parser.parse_args()
self._args = args
self._client = nilmdb.client.Client(args.url)
if args.srcpath == args.destpath:
if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
raise Exception("source and destination path must be different")
# Open and print info about the streams
src = self._client.stream_list(args.srcpath, extended = True)
src = self._client_src.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
self.src = StreamInfo(src[0])
self.src = StreamInfo(args.url, src[0], self.interhost)
dest = self._client.stream_list(args.destpath, extended = True)
dest = self._client_dest.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise MissingDestination(self.src, StreamInfo([args.destpath]))
self.dest = StreamInfo(dest[0])
raise MissingDestination(self.src,
StreamInfo(args.dest_url, [args.destpath],
self.interhost))
self.dest = StreamInfo(args.dest_url, dest[0], self.interhost)
print "Source:", self.src
print " Dest:", self.dest
@@ -118,25 +147,51 @@ class Filter(object):
print self.interval_string(interval)
raise SystemExit(0)
self.start = args.start
self.end = args.end
return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
saved_int = None
for (start, end) in self._client.stream_intervals(
self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end):
# Join adjacent intervals
if saved_int is not None:
if saved_int.end == start:
start = saved_int.start
else:
yield saved_int
saved_int = Interval(start, end)
if saved_int is not None:
yield saved_int
if self.interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start = self.start, end = self.end) )
d_intervals = ( Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
for interval in self._optimize_int(intervals):
yield interval
self._using_client = False
# Misc helpers
@@ -151,7 +206,7 @@ class Filter(object):
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath)
metadata = self._client_dest.stream_get_metadata(self.dest.path)
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
@@ -166,7 +221,7 @@ class Filter(object):
m += "this error.\n"
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client.stream_update_metadata(self._args.destpath, data)
self._client_dest.stream_update_metadata(self.dest.path, data)
# Main processing helper
def process_python(self, function, rows, args = None, partial = False):
@@ -192,10 +247,8 @@ class Filter(object):
"""
if args is None:
args = []
extractor = nilmdb.client.Client(self._args.url).stream_extract
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
src = self._args.srcpath
dest = self._args.destpath
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
# Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int.
@@ -209,9 +262,11 @@ class Filter(object):
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(dest, interval.start, interval.end) as insert_ctx:
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
src_array = []
for line in extractor(src, interval.start, interval.end):
for line in extractor(self.src.path,
interval.start, interval.end):
# Read in data
src_array.append([ float(x) for x in line.split() ])