Compare commits
4 Commits
nilmtools-
...
nilmtools-
Author | SHA1 | Date | |
---|---|---|---|
320c32cfdc | |||
0f1e442cd4 | |||
3e78da12dc | |||
ef9277cbff |
21
Makefile
21
Makefile
@@ -1,13 +1,24 @@
|
||||
all:
|
||||
ifeq ($(INSIDE_EMACS), t)
|
||||
@make test
|
||||
else
|
||||
@echo "Try 'make install'"
|
||||
endif
|
||||
|
||||
test:
|
||||
python src/filter.py \
|
||||
-u "http://localhost:12380/" \
|
||||
-U "http://127.0.0.1:12380/" \
|
||||
/lees-compressor/no-leak/raw/1 \
|
||||
/lees-compressor/no-leak/raw/4
|
||||
|
||||
test2:
|
||||
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
|
||||
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
|
||||
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
|
||||
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
|
||||
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
|
||||
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
|
||||
|
||||
all:
|
||||
@echo "Try 'make install'"
|
||||
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
|
||||
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
|
||||
|
||||
version:
|
||||
python setup.py version
|
||||
|
13
scripts/copy-wildcard.sh
Executable file
13
scripts/copy-wildcard.sh
Executable file
@@ -0,0 +1,13 @@
|
||||
#!/bin/bash
|
||||
|
||||
# example: ./copy-wildcard.sh http://src:12380 http://dest:12380 /path/*
|
||||
|
||||
if [ $# != 3 ] ; then
|
||||
echo "usage: $0 src-url dest-url path-wildcard"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
set -e
|
||||
nilmtool -u "$1" list "$3" | sort | while read path layout; do
|
||||
nilm-copy -u "$1" -U "$2" "$path" "$path"
|
||||
done
|
@@ -1,5 +1,7 @@
|
||||
#!/bin/bash
|
||||
|
||||
# example: ./decimate-it.sh 1 2 3 4 5 6 7 8 9 10 11 12
|
||||
|
||||
for i in "$@"; do
|
||||
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
|
||||
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))
|
||||
|
5
setup.py
5
setup.py
@@ -30,7 +30,7 @@ except ImportError:
|
||||
# Versioneer manages version numbers from git tags.
|
||||
# https://github.com/warner/python-versioneer
|
||||
import versioneer
|
||||
versioneer.versionfile_source = 'nilmtools/_version.py'
|
||||
versioneer.versionfile_source = 'src/_version.py'
|
||||
versioneer.versionfile_build = 'nilmtools/_version.py'
|
||||
versioneer.tag_prefix = 'nilmtools-'
|
||||
versioneer.parentdir_prefix = 'nilmtools-'
|
||||
@@ -65,11 +65,12 @@ setup(name='nilmtools',
|
||||
],
|
||||
packages = [ 'nilmtools',
|
||||
],
|
||||
package_dir = { 'nilmtools': 'src' },
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'nilm-decimate = nilmtools.decimate:main',
|
||||
'nilm-insert = nilmtools.insert:main',
|
||||
'nilm-copy = nilmtools.copy:main',
|
||||
'nilm-copy = nilmtools.copy_:main',
|
||||
],
|
||||
},
|
||||
zip_safe = False,
|
||||
|
@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
|
||||
|
||||
tag_prefix = "nilmtools-"
|
||||
parentdir_prefix = "nilmtools-"
|
||||
versionfile_source = "nilmtools/_version.py"
|
||||
versionfile_source = "src/_version.py"
|
||||
|
||||
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
|
||||
variables = { "refnames": git_refnames, "full": git_full }
|
@@ -16,16 +16,17 @@ def main():
|
||||
print "Source is %s (%s)" % (e.src.path, e.src.layout)
|
||||
print "Destination %s doesn't exist" % (e.dest.path)
|
||||
print "You could make it with a command like:"
|
||||
print " nilmtool create", e.dest.path, e.src.layout
|
||||
print " nilmtool -u %s create %s %s" % (e.dest.url,
|
||||
e.dest.path, e.src.layout)
|
||||
raise SystemExit(1)
|
||||
|
||||
# Copy metadata
|
||||
meta = f.client.stream_get_metadata(f.src.path)
|
||||
meta = f.client_src.stream_get_metadata(f.src.path)
|
||||
f.check_dest_metadata(meta)
|
||||
|
||||
# Copy all rows of data as ASCII strings
|
||||
extractor = nilmdb.client.Client(args.url).stream_extract
|
||||
inserter = nilmdb.client.Client(args.url).stream_insert_context
|
||||
extractor = nilmdb.client.Client(f.src.url).stream_extract
|
||||
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
|
||||
for i in f.intervals():
|
||||
print "Processing", f.interval_string(i)
|
||||
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
|
@@ -28,17 +28,18 @@ def main():
|
||||
else:
|
||||
rec = 'float32_' + str(src.layout_count * 3)
|
||||
print "You could make it with a command like:"
|
||||
print " nilmtool create", dest.path, rec
|
||||
print " nilmtool -u %s create %s %s" % (e.dest.url,
|
||||
e.dest.path, rec)
|
||||
raise SystemExit(1)
|
||||
|
||||
if not (args.factor >= 2):
|
||||
raise Exception("factor needs to be 2 or more")
|
||||
|
||||
f.check_dest_metadata({ "decimate_source": args.srcpath,
|
||||
f.check_dest_metadata({ "decimate_source": f.src.path,
|
||||
"decimate_factor": args.factor })
|
||||
|
||||
# If source is decimated, we have to decimate a bit differently
|
||||
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
|
||||
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
|
||||
n = f.src.layout_count // 3
|
||||
f.process_python(function = decimate_again, rows = args.factor,
|
||||
args = (n,))
|
@@ -1,6 +1,9 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import nilmdb.client
|
||||
from nilmdb.client import Client
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils.time import (parse_time, timestamp_to_human,
|
||||
timestamp_to_seconds)
|
||||
@@ -21,8 +24,10 @@ class MissingDestination(Exception):
|
||||
Exception.__init__(self, "destination path " + dest.path + " not found")
|
||||
|
||||
class StreamInfo(object):
|
||||
def __init__(self, info):
|
||||
def __init__(self, url, info, interhost):
|
||||
self.url = url
|
||||
self.info = info
|
||||
self.interhost = interhost
|
||||
try:
|
||||
self.path = info[0]
|
||||
self.layout = info[1]
|
||||
@@ -38,25 +43,38 @@ class StreamInfo(object):
|
||||
|
||||
def __str__(self):
|
||||
"""Print stream info as a string"""
|
||||
return sprintf("%s (%s), %.2fM rows, %.2f hours",
|
||||
res = ""
|
||||
if self.interhost:
|
||||
res = sprintf("[%s] ", self.url)
|
||||
res += sprintf("%s (%s), %.2fM rows, %.2f hours",
|
||||
self.path, self.layout, self.rows / 1e6,
|
||||
self.seconds / 3600.0)
|
||||
return res
|
||||
|
||||
class Filter(object):
|
||||
|
||||
def __init__(self):
|
||||
self._parser = None
|
||||
self._args = None
|
||||
self._client = None
|
||||
self._client_src = None
|
||||
self._client_dest = None
|
||||
self._using_client = False
|
||||
self.src = None
|
||||
self.dest = None
|
||||
self.start = None
|
||||
self.end = None
|
||||
self.interhost = False
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
def client_src(self):
|
||||
if self._using_client:
|
||||
raise Exception("Filter client is in use; make another")
|
||||
return self._client
|
||||
return self._client_src
|
||||
|
||||
@property
|
||||
def client_dest(self):
|
||||
if self._using_client:
|
||||
raise Exception("Filter client is in use; make another")
|
||||
return self._client_dest
|
||||
|
||||
def setup_parser(self, description = "Filter data"):
|
||||
parser = argparse.ArgumentParser(
|
||||
@@ -67,6 +85,9 @@ class Filter(object):
|
||||
group.add_argument("-u", "--url", action="store",
|
||||
default="http://localhost:12380/",
|
||||
help="Server URL (default: %(default)s)")
|
||||
group.add_argument("-U", "--dest-url", action="store",
|
||||
help="Destination server URL "
|
||||
"(default: same as source)")
|
||||
group.add_argument("-D", "--dry-run", action="store_true",
|
||||
default = False,
|
||||
help="Just print intervals that would be "
|
||||
@@ -93,22 +114,30 @@ class Filter(object):
|
||||
|
||||
def parse_args(self):
|
||||
args = self._parser.parse_args()
|
||||
self._args = args
|
||||
self._client = nilmdb.client.Client(args.url)
|
||||
|
||||
if args.srcpath == args.destpath:
|
||||
if args.dest_url is None:
|
||||
args.dest_url = args.url
|
||||
if args.url != args.dest_url:
|
||||
self.interhost = True
|
||||
|
||||
self._client_src = Client(args.url)
|
||||
self._client_dest = Client(args.dest_url)
|
||||
|
||||
if (not self.interhost) and (args.srcpath == args.destpath):
|
||||
raise Exception("source and destination path must be different")
|
||||
|
||||
# Open and print info about the streams
|
||||
src = self._client.stream_list(args.srcpath, extended = True)
|
||||
src = self._client_src.stream_list(args.srcpath, extended = True)
|
||||
if len(src) != 1:
|
||||
raise Exception("source path " + args.srcpath + " not found")
|
||||
self.src = StreamInfo(src[0])
|
||||
self.src = StreamInfo(args.url, src[0], self.interhost)
|
||||
|
||||
dest = self._client.stream_list(args.destpath, extended = True)
|
||||
dest = self._client_dest.stream_list(args.destpath, extended = True)
|
||||
if len(dest) != 1:
|
||||
raise MissingDestination(self.src, StreamInfo([args.destpath]))
|
||||
self.dest = StreamInfo(dest[0])
|
||||
raise MissingDestination(self.src,
|
||||
StreamInfo(args.dest_url, [args.destpath],
|
||||
self.interhost))
|
||||
self.dest = StreamInfo(args.dest_url, dest[0], self.interhost)
|
||||
|
||||
print "Source:", self.src
|
||||
print " Dest:", self.dest
|
||||
@@ -118,25 +147,51 @@ class Filter(object):
|
||||
print self.interval_string(interval)
|
||||
raise SystemExit(0)
|
||||
|
||||
self.start = args.start
|
||||
self.end = args.end
|
||||
|
||||
return args
|
||||
|
||||
def _optimize_int(self, it):
|
||||
"""Join and yield adjacent intervals from the iterator 'it'"""
|
||||
saved_int = None
|
||||
for interval in it:
|
||||
if saved_int is not None:
|
||||
if saved_int.end == interval.start:
|
||||
interval.start = saved_int.start
|
||||
else:
|
||||
yield saved_int
|
||||
saved_int = interval
|
||||
if saved_int is not None:
|
||||
yield saved_int
|
||||
|
||||
def intervals(self):
|
||||
"""Generate all the intervals that this filter should process"""
|
||||
self._using_client = True
|
||||
saved_int = None
|
||||
for (start, end) in self._client.stream_intervals(
|
||||
self._args.srcpath, diffpath = self._args.destpath,
|
||||
start = self._args.start, end = self._args.end):
|
||||
|
||||
# Join adjacent intervals
|
||||
if saved_int is not None:
|
||||
if saved_int.end == start:
|
||||
start = saved_int.start
|
||||
else:
|
||||
yield saved_int
|
||||
saved_int = Interval(start, end)
|
||||
if saved_int is not None:
|
||||
yield saved_int
|
||||
if self.interhost:
|
||||
# Do the difference ourselves
|
||||
s_intervals = ( Interval(start, end)
|
||||
for (start, end) in
|
||||
self._client_src.stream_intervals(
|
||||
self.src.path,
|
||||
start = self.start, end = self.end) )
|
||||
d_intervals = ( Interval(start, end)
|
||||
for (start, end) in
|
||||
self._client_dest.stream_intervals(
|
||||
self.dest.path,
|
||||
start = self.start, end = self.end) )
|
||||
intervals = nilmdb.utils.interval.set_difference(s_intervals,
|
||||
d_intervals)
|
||||
else:
|
||||
# Let the server do the difference for us
|
||||
intervals = ( Interval(start, end)
|
||||
for (start, end) in
|
||||
self._client_src.stream_intervals(
|
||||
self.src.path, diffpath = self.dest.path,
|
||||
start = self.start, end = self.end) )
|
||||
for interval in self._optimize_int(intervals):
|
||||
yield interval
|
||||
self._using_client = False
|
||||
|
||||
# Misc helpers
|
||||
@@ -151,7 +206,7 @@ class Filter(object):
|
||||
def check_dest_metadata(self, data):
|
||||
"""See if the metadata jives, and complain if it doesn't. If
|
||||
there's no conflict, update the metadata to match 'data'."""
|
||||
metadata = self._client.stream_get_metadata(self._args.destpath)
|
||||
metadata = self._client_dest.stream_get_metadata(self.dest.path)
|
||||
for key in data:
|
||||
wanted = str(data[key])
|
||||
val = metadata.get(key, wanted)
|
||||
@@ -166,7 +221,7 @@ class Filter(object):
|
||||
m += "this error.\n"
|
||||
raise Exception(m)
|
||||
# All good -- write the metadata in case it's not already there
|
||||
self._client.stream_update_metadata(self._args.destpath, data)
|
||||
self._client_dest.stream_update_metadata(self.dest.path, data)
|
||||
|
||||
# Main processing helper
|
||||
def process_python(self, function, rows, args = None, partial = False):
|
||||
@@ -192,10 +247,8 @@ class Filter(object):
|
||||
"""
|
||||
if args is None:
|
||||
args = []
|
||||
extractor = nilmdb.client.Client(self._args.url).stream_extract
|
||||
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
|
||||
src = self._args.srcpath
|
||||
dest = self._args.destpath
|
||||
extractor = Client(self.src.url).stream_extract
|
||||
inserter = Client(self.dest.url).stream_insert_context
|
||||
|
||||
# Parse input data. We use homogenous types for now, which
|
||||
# means the timestamp type will be either float or int.
|
||||
@@ -209,9 +262,11 @@ class Filter(object):
|
||||
|
||||
for interval in self.intervals():
|
||||
print "Processing", self.interval_string(interval)
|
||||
with inserter(dest, interval.start, interval.end) as insert_ctx:
|
||||
with inserter(self.dest.path,
|
||||
interval.start, interval.end) as insert_ctx:
|
||||
src_array = []
|
||||
for line in extractor(src, interval.start, interval.end):
|
||||
for line in extractor(self.src.path,
|
||||
interval.start, interval.end):
|
||||
# Read in data
|
||||
src_array.append([ float(x) for x in line.split() ])
|
||||
|
Reference in New Issue
Block a user