Compare commits
4 Commits
nilmtools-
...
nilmtools-
Author | SHA1 | Date | |
---|---|---|---|
320c32cfdc | |||
0f1e442cd4 | |||
3e78da12dc | |||
ef9277cbff |
21
Makefile
21
Makefile
@@ -1,13 +1,24 @@
|
|||||||
|
all:
|
||||||
|
ifeq ($(INSIDE_EMACS), t)
|
||||||
|
@make test
|
||||||
|
else
|
||||||
|
@echo "Try 'make install'"
|
||||||
|
endif
|
||||||
|
|
||||||
test:
|
test:
|
||||||
|
python src/filter.py \
|
||||||
|
-u "http://localhost:12380/" \
|
||||||
|
-U "http://127.0.0.1:12380/" \
|
||||||
|
/lees-compressor/no-leak/raw/1 \
|
||||||
|
/lees-compressor/no-leak/raw/4
|
||||||
|
|
||||||
|
test2:
|
||||||
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
|
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
|
||||||
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
|
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
|
||||||
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
|
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
|
||||||
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
|
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
|
||||||
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
|
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
|
||||||
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
|
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
|
||||||
|
|
||||||
all:
|
|
||||||
@echo "Try 'make install'"
|
|
||||||
|
|
||||||
version:
|
version:
|
||||||
python setup.py version
|
python setup.py version
|
||||||
|
13
scripts/copy-wildcard.sh
Executable file
13
scripts/copy-wildcard.sh
Executable file
@@ -0,0 +1,13 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# example: ./copy-wildcard.sh http://src:12380 http://dest:12380 /path/*
|
||||||
|
|
||||||
|
if [ $# != 3 ] ; then
|
||||||
|
echo "usage: $0 src-url dest-url path-wildcard"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
set -e
|
||||||
|
nilmtool -u "$1" list "$3" | sort | while read path layout; do
|
||||||
|
nilm-copy -u "$1" -U "$2" "$path" "$path"
|
||||||
|
done
|
@@ -1,5 +1,7 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
|
# example: ./decimate-it.sh 1 2 3 4 5 6 7 8 9 10 11 12
|
||||||
|
|
||||||
for i in "$@"; do
|
for i in "$@"; do
|
||||||
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
|
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
|
||||||
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))
|
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))
|
||||||
|
5
setup.py
5
setup.py
@@ -30,7 +30,7 @@ except ImportError:
|
|||||||
# Versioneer manages version numbers from git tags.
|
# Versioneer manages version numbers from git tags.
|
||||||
# https://github.com/warner/python-versioneer
|
# https://github.com/warner/python-versioneer
|
||||||
import versioneer
|
import versioneer
|
||||||
versioneer.versionfile_source = 'nilmtools/_version.py'
|
versioneer.versionfile_source = 'src/_version.py'
|
||||||
versioneer.versionfile_build = 'nilmtools/_version.py'
|
versioneer.versionfile_build = 'nilmtools/_version.py'
|
||||||
versioneer.tag_prefix = 'nilmtools-'
|
versioneer.tag_prefix = 'nilmtools-'
|
||||||
versioneer.parentdir_prefix = 'nilmtools-'
|
versioneer.parentdir_prefix = 'nilmtools-'
|
||||||
@@ -65,11 +65,12 @@ setup(name='nilmtools',
|
|||||||
],
|
],
|
||||||
packages = [ 'nilmtools',
|
packages = [ 'nilmtools',
|
||||||
],
|
],
|
||||||
|
package_dir = { 'nilmtools': 'src' },
|
||||||
entry_points = {
|
entry_points = {
|
||||||
'console_scripts': [
|
'console_scripts': [
|
||||||
'nilm-decimate = nilmtools.decimate:main',
|
'nilm-decimate = nilmtools.decimate:main',
|
||||||
'nilm-insert = nilmtools.insert:main',
|
'nilm-insert = nilmtools.insert:main',
|
||||||
'nilm-copy = nilmtools.copy:main',
|
'nilm-copy = nilmtools.copy_:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
zip_safe = False,
|
zip_safe = False,
|
||||||
|
@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
|
|||||||
|
|
||||||
tag_prefix = "nilmtools-"
|
tag_prefix = "nilmtools-"
|
||||||
parentdir_prefix = "nilmtools-"
|
parentdir_prefix = "nilmtools-"
|
||||||
versionfile_source = "nilmtools/_version.py"
|
versionfile_source = "src/_version.py"
|
||||||
|
|
||||||
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
|
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
|
||||||
variables = { "refnames": git_refnames, "full": git_full }
|
variables = { "refnames": git_refnames, "full": git_full }
|
@@ -16,16 +16,17 @@ def main():
|
|||||||
print "Source is %s (%s)" % (e.src.path, e.src.layout)
|
print "Source is %s (%s)" % (e.src.path, e.src.layout)
|
||||||
print "Destination %s doesn't exist" % (e.dest.path)
|
print "Destination %s doesn't exist" % (e.dest.path)
|
||||||
print "You could make it with a command like:"
|
print "You could make it with a command like:"
|
||||||
print " nilmtool create", e.dest.path, e.src.layout
|
print " nilmtool -u %s create %s %s" % (e.dest.url,
|
||||||
|
e.dest.path, e.src.layout)
|
||||||
raise SystemExit(1)
|
raise SystemExit(1)
|
||||||
|
|
||||||
# Copy metadata
|
# Copy metadata
|
||||||
meta = f.client.stream_get_metadata(f.src.path)
|
meta = f.client_src.stream_get_metadata(f.src.path)
|
||||||
f.check_dest_metadata(meta)
|
f.check_dest_metadata(meta)
|
||||||
|
|
||||||
# Copy all rows of data as ASCII strings
|
# Copy all rows of data as ASCII strings
|
||||||
extractor = nilmdb.client.Client(args.url).stream_extract
|
extractor = nilmdb.client.Client(f.src.url).stream_extract
|
||||||
inserter = nilmdb.client.Client(args.url).stream_insert_context
|
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
|
||||||
for i in f.intervals():
|
for i in f.intervals():
|
||||||
print "Processing", f.interval_string(i)
|
print "Processing", f.interval_string(i)
|
||||||
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
|
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
|
@@ -28,17 +28,18 @@ def main():
|
|||||||
else:
|
else:
|
||||||
rec = 'float32_' + str(src.layout_count * 3)
|
rec = 'float32_' + str(src.layout_count * 3)
|
||||||
print "You could make it with a command like:"
|
print "You could make it with a command like:"
|
||||||
print " nilmtool create", dest.path, rec
|
print " nilmtool -u %s create %s %s" % (e.dest.url,
|
||||||
|
e.dest.path, rec)
|
||||||
raise SystemExit(1)
|
raise SystemExit(1)
|
||||||
|
|
||||||
if not (args.factor >= 2):
|
if not (args.factor >= 2):
|
||||||
raise Exception("factor needs to be 2 or more")
|
raise Exception("factor needs to be 2 or more")
|
||||||
|
|
||||||
f.check_dest_metadata({ "decimate_source": args.srcpath,
|
f.check_dest_metadata({ "decimate_source": f.src.path,
|
||||||
"decimate_factor": args.factor })
|
"decimate_factor": args.factor })
|
||||||
|
|
||||||
# If source is decimated, we have to decimate a bit differently
|
# If source is decimated, we have to decimate a bit differently
|
||||||
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
|
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
|
||||||
n = f.src.layout_count // 3
|
n = f.src.layout_count // 3
|
||||||
f.process_python(function = decimate_again, rows = args.factor,
|
f.process_python(function = decimate_again, rows = args.factor,
|
||||||
args = (n,))
|
args = (n,))
|
@@ -1,6 +1,9 @@
|
|||||||
#!/usr/bin/python
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
import nilmdb.client
|
import nilmdb.client
|
||||||
|
from nilmdb.client import Client
|
||||||
from nilmdb.utils.printf import *
|
from nilmdb.utils.printf import *
|
||||||
from nilmdb.utils.time import (parse_time, timestamp_to_human,
|
from nilmdb.utils.time import (parse_time, timestamp_to_human,
|
||||||
timestamp_to_seconds)
|
timestamp_to_seconds)
|
||||||
@@ -21,8 +24,10 @@ class MissingDestination(Exception):
|
|||||||
Exception.__init__(self, "destination path " + dest.path + " not found")
|
Exception.__init__(self, "destination path " + dest.path + " not found")
|
||||||
|
|
||||||
class StreamInfo(object):
|
class StreamInfo(object):
|
||||||
def __init__(self, info):
|
def __init__(self, url, info, interhost):
|
||||||
|
self.url = url
|
||||||
self.info = info
|
self.info = info
|
||||||
|
self.interhost = interhost
|
||||||
try:
|
try:
|
||||||
self.path = info[0]
|
self.path = info[0]
|
||||||
self.layout = info[1]
|
self.layout = info[1]
|
||||||
@@ -38,25 +43,38 @@ class StreamInfo(object):
|
|||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
"""Print stream info as a string"""
|
"""Print stream info as a string"""
|
||||||
return sprintf("%s (%s), %.2fM rows, %.2f hours",
|
res = ""
|
||||||
|
if self.interhost:
|
||||||
|
res = sprintf("[%s] ", self.url)
|
||||||
|
res += sprintf("%s (%s), %.2fM rows, %.2f hours",
|
||||||
self.path, self.layout, self.rows / 1e6,
|
self.path, self.layout, self.rows / 1e6,
|
||||||
self.seconds / 3600.0)
|
self.seconds / 3600.0)
|
||||||
|
return res
|
||||||
|
|
||||||
class Filter(object):
|
class Filter(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._parser = None
|
self._parser = None
|
||||||
self._args = None
|
self._client_src = None
|
||||||
self._client = None
|
self._client_dest = None
|
||||||
self._using_client = False
|
self._using_client = False
|
||||||
self.src = None
|
self.src = None
|
||||||
self.dest = None
|
self.dest = None
|
||||||
|
self.start = None
|
||||||
|
self.end = None
|
||||||
|
self.interhost = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def client(self):
|
def client_src(self):
|
||||||
if self._using_client:
|
if self._using_client:
|
||||||
raise Exception("Filter client is in use; make another")
|
raise Exception("Filter client is in use; make another")
|
||||||
return self._client
|
return self._client_src
|
||||||
|
|
||||||
|
@property
|
||||||
|
def client_dest(self):
|
||||||
|
if self._using_client:
|
||||||
|
raise Exception("Filter client is in use; make another")
|
||||||
|
return self._client_dest
|
||||||
|
|
||||||
def setup_parser(self, description = "Filter data"):
|
def setup_parser(self, description = "Filter data"):
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
@@ -67,6 +85,9 @@ class Filter(object):
|
|||||||
group.add_argument("-u", "--url", action="store",
|
group.add_argument("-u", "--url", action="store",
|
||||||
default="http://localhost:12380/",
|
default="http://localhost:12380/",
|
||||||
help="Server URL (default: %(default)s)")
|
help="Server URL (default: %(default)s)")
|
||||||
|
group.add_argument("-U", "--dest-url", action="store",
|
||||||
|
help="Destination server URL "
|
||||||
|
"(default: same as source)")
|
||||||
group.add_argument("-D", "--dry-run", action="store_true",
|
group.add_argument("-D", "--dry-run", action="store_true",
|
||||||
default = False,
|
default = False,
|
||||||
help="Just print intervals that would be "
|
help="Just print intervals that would be "
|
||||||
@@ -93,22 +114,30 @@ class Filter(object):
|
|||||||
|
|
||||||
def parse_args(self):
|
def parse_args(self):
|
||||||
args = self._parser.parse_args()
|
args = self._parser.parse_args()
|
||||||
self._args = args
|
|
||||||
self._client = nilmdb.client.Client(args.url)
|
|
||||||
|
|
||||||
if args.srcpath == args.destpath:
|
if args.dest_url is None:
|
||||||
|
args.dest_url = args.url
|
||||||
|
if args.url != args.dest_url:
|
||||||
|
self.interhost = True
|
||||||
|
|
||||||
|
self._client_src = Client(args.url)
|
||||||
|
self._client_dest = Client(args.dest_url)
|
||||||
|
|
||||||
|
if (not self.interhost) and (args.srcpath == args.destpath):
|
||||||
raise Exception("source and destination path must be different")
|
raise Exception("source and destination path must be different")
|
||||||
|
|
||||||
# Open and print info about the streams
|
# Open and print info about the streams
|
||||||
src = self._client.stream_list(args.srcpath, extended = True)
|
src = self._client_src.stream_list(args.srcpath, extended = True)
|
||||||
if len(src) != 1:
|
if len(src) != 1:
|
||||||
raise Exception("source path " + args.srcpath + " not found")
|
raise Exception("source path " + args.srcpath + " not found")
|
||||||
self.src = StreamInfo(src[0])
|
self.src = StreamInfo(args.url, src[0], self.interhost)
|
||||||
|
|
||||||
dest = self._client.stream_list(args.destpath, extended = True)
|
dest = self._client_dest.stream_list(args.destpath, extended = True)
|
||||||
if len(dest) != 1:
|
if len(dest) != 1:
|
||||||
raise MissingDestination(self.src, StreamInfo([args.destpath]))
|
raise MissingDestination(self.src,
|
||||||
self.dest = StreamInfo(dest[0])
|
StreamInfo(args.dest_url, [args.destpath],
|
||||||
|
self.interhost))
|
||||||
|
self.dest = StreamInfo(args.dest_url, dest[0], self.interhost)
|
||||||
|
|
||||||
print "Source:", self.src
|
print "Source:", self.src
|
||||||
print " Dest:", self.dest
|
print " Dest:", self.dest
|
||||||
@@ -118,25 +147,51 @@ class Filter(object):
|
|||||||
print self.interval_string(interval)
|
print self.interval_string(interval)
|
||||||
raise SystemExit(0)
|
raise SystemExit(0)
|
||||||
|
|
||||||
|
self.start = args.start
|
||||||
|
self.end = args.end
|
||||||
|
|
||||||
return args
|
return args
|
||||||
|
|
||||||
|
def _optimize_int(self, it):
|
||||||
|
"""Join and yield adjacent intervals from the iterator 'it'"""
|
||||||
|
saved_int = None
|
||||||
|
for interval in it:
|
||||||
|
if saved_int is not None:
|
||||||
|
if saved_int.end == interval.start:
|
||||||
|
interval.start = saved_int.start
|
||||||
|
else:
|
||||||
|
yield saved_int
|
||||||
|
saved_int = interval
|
||||||
|
if saved_int is not None:
|
||||||
|
yield saved_int
|
||||||
|
|
||||||
def intervals(self):
|
def intervals(self):
|
||||||
"""Generate all the intervals that this filter should process"""
|
"""Generate all the intervals that this filter should process"""
|
||||||
self._using_client = True
|
self._using_client = True
|
||||||
saved_int = None
|
|
||||||
for (start, end) in self._client.stream_intervals(
|
|
||||||
self._args.srcpath, diffpath = self._args.destpath,
|
|
||||||
start = self._args.start, end = self._args.end):
|
|
||||||
|
|
||||||
# Join adjacent intervals
|
if self.interhost:
|
||||||
if saved_int is not None:
|
# Do the difference ourselves
|
||||||
if saved_int.end == start:
|
s_intervals = ( Interval(start, end)
|
||||||
start = saved_int.start
|
for (start, end) in
|
||||||
|
self._client_src.stream_intervals(
|
||||||
|
self.src.path,
|
||||||
|
start = self.start, end = self.end) )
|
||||||
|
d_intervals = ( Interval(start, end)
|
||||||
|
for (start, end) in
|
||||||
|
self._client_dest.stream_intervals(
|
||||||
|
self.dest.path,
|
||||||
|
start = self.start, end = self.end) )
|
||||||
|
intervals = nilmdb.utils.interval.set_difference(s_intervals,
|
||||||
|
d_intervals)
|
||||||
else:
|
else:
|
||||||
yield saved_int
|
# Let the server do the difference for us
|
||||||
saved_int = Interval(start, end)
|
intervals = ( Interval(start, end)
|
||||||
if saved_int is not None:
|
for (start, end) in
|
||||||
yield saved_int
|
self._client_src.stream_intervals(
|
||||||
|
self.src.path, diffpath = self.dest.path,
|
||||||
|
start = self.start, end = self.end) )
|
||||||
|
for interval in self._optimize_int(intervals):
|
||||||
|
yield interval
|
||||||
self._using_client = False
|
self._using_client = False
|
||||||
|
|
||||||
# Misc helpers
|
# Misc helpers
|
||||||
@@ -151,7 +206,7 @@ class Filter(object):
|
|||||||
def check_dest_metadata(self, data):
|
def check_dest_metadata(self, data):
|
||||||
"""See if the metadata jives, and complain if it doesn't. If
|
"""See if the metadata jives, and complain if it doesn't. If
|
||||||
there's no conflict, update the metadata to match 'data'."""
|
there's no conflict, update the metadata to match 'data'."""
|
||||||
metadata = self._client.stream_get_metadata(self._args.destpath)
|
metadata = self._client_dest.stream_get_metadata(self.dest.path)
|
||||||
for key in data:
|
for key in data:
|
||||||
wanted = str(data[key])
|
wanted = str(data[key])
|
||||||
val = metadata.get(key, wanted)
|
val = metadata.get(key, wanted)
|
||||||
@@ -166,7 +221,7 @@ class Filter(object):
|
|||||||
m += "this error.\n"
|
m += "this error.\n"
|
||||||
raise Exception(m)
|
raise Exception(m)
|
||||||
# All good -- write the metadata in case it's not already there
|
# All good -- write the metadata in case it's not already there
|
||||||
self._client.stream_update_metadata(self._args.destpath, data)
|
self._client_dest.stream_update_metadata(self.dest.path, data)
|
||||||
|
|
||||||
# Main processing helper
|
# Main processing helper
|
||||||
def process_python(self, function, rows, args = None, partial = False):
|
def process_python(self, function, rows, args = None, partial = False):
|
||||||
@@ -192,10 +247,8 @@ class Filter(object):
|
|||||||
"""
|
"""
|
||||||
if args is None:
|
if args is None:
|
||||||
args = []
|
args = []
|
||||||
extractor = nilmdb.client.Client(self._args.url).stream_extract
|
extractor = Client(self.src.url).stream_extract
|
||||||
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
|
inserter = Client(self.dest.url).stream_insert_context
|
||||||
src = self._args.srcpath
|
|
||||||
dest = self._args.destpath
|
|
||||||
|
|
||||||
# Parse input data. We use homogenous types for now, which
|
# Parse input data. We use homogenous types for now, which
|
||||||
# means the timestamp type will be either float or int.
|
# means the timestamp type will be either float or int.
|
||||||
@@ -209,9 +262,11 @@ class Filter(object):
|
|||||||
|
|
||||||
for interval in self.intervals():
|
for interval in self.intervals():
|
||||||
print "Processing", self.interval_string(interval)
|
print "Processing", self.interval_string(interval)
|
||||||
with inserter(dest, interval.start, interval.end) as insert_ctx:
|
with inserter(self.dest.path,
|
||||||
|
interval.start, interval.end) as insert_ctx:
|
||||||
src_array = []
|
src_array = []
|
||||||
for line in extractor(src, interval.start, interval.end):
|
for line in extractor(self.src.path,
|
||||||
|
interval.start, interval.end):
|
||||||
# Read in data
|
# Read in data
|
||||||
src_array.append([ float(x) for x in line.split() ])
|
src_array.append([ float(x) for x in line.split() ])
|
||||||
|
|
Reference in New Issue
Block a user