|
|
@@ -3,6 +3,7 @@ |
|
|
|
from __future__ import absolute_import |
|
|
|
|
|
|
|
import nilmdb.client |
|
|
|
from nilmdb.client import Client |
|
|
|
from nilmdb.utils.printf import * |
|
|
|
from nilmdb.utils.time import (parse_time, timestamp_to_human, |
|
|
|
timestamp_to_seconds) |
|
|
@@ -23,8 +24,10 @@ class MissingDestination(Exception): |
|
|
|
Exception.__init__(self, "destination path " + dest.path + " not found") |
|
|
|
|
|
|
|
class StreamInfo(object): |
|
|
|
def __init__(self, info): |
|
|
|
def __init__(self, url, info, interhost): |
|
|
|
self.url = url |
|
|
|
self.info = info |
|
|
|
self.interhost = interhost |
|
|
|
try: |
|
|
|
self.path = info[0] |
|
|
|
self.layout = info[1] |
|
|
@@ -40,25 +43,38 @@ class StreamInfo(object): |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
"""Print stream info as a string""" |
|
|
|
return sprintf("%s (%s), %.2fM rows, %.2f hours", |
|
|
|
res = "" |
|
|
|
if self.interhost: |
|
|
|
res = sprintf("[%s] ", self.url) |
|
|
|
res += sprintf("%s (%s), %.2fM rows, %.2f hours", |
|
|
|
self.path, self.layout, self.rows / 1e6, |
|
|
|
self.seconds / 3600.0) |
|
|
|
return res |
|
|
|
|
|
|
|
class Filter(object): |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
self._parser = None |
|
|
|
self._args = None |
|
|
|
self._client = None |
|
|
|
self._client_src = None |
|
|
|
self._client_dest = None |
|
|
|
self._using_client = False |
|
|
|
self.src = None |
|
|
|
self.dest = None |
|
|
|
self.start = None |
|
|
|
self.end = None |
|
|
|
self.interhost = False |
|
|
|
|
|
|
|
@property |
|
|
|
def client(self): |
|
|
|
def client_src(self): |
|
|
|
if self._using_client: |
|
|
|
raise Exception("Filter client is in use; make another") |
|
|
|
return self._client |
|
|
|
return self._client_src |
|
|
|
|
|
|
|
@property |
|
|
|
def client_dest(self): |
|
|
|
if self._using_client: |
|
|
|
raise Exception("Filter client is in use; make another") |
|
|
|
return self._client_dest |
|
|
|
|
|
|
|
def setup_parser(self, description = "Filter data"): |
|
|
|
parser = argparse.ArgumentParser( |
|
|
@@ -69,6 +85,9 @@ class Filter(object): |
|
|
|
group.add_argument("-u", "--url", action="store", |
|
|
|
default="http://localhost:12380/", |
|
|
|
help="Server URL (default: %(default)s)") |
|
|
|
group.add_argument("-U", "--dest-url", action="store", |
|
|
|
help="Destination server URL " |
|
|
|
"(default: same as source)") |
|
|
|
group.add_argument("-D", "--dry-run", action="store_true", |
|
|
|
default = False, |
|
|
|
help="Just print intervals that would be " |
|
|
@@ -95,22 +114,30 @@ class Filter(object): |
|
|
|
|
|
|
|
def parse_args(self): |
|
|
|
args = self._parser.parse_args() |
|
|
|
self._args = args |
|
|
|
self._client = nilmdb.client.Client(args.url) |
|
|
|
|
|
|
|
if args.srcpath == args.destpath: |
|
|
|
if args.dest_url is None: |
|
|
|
args.dest_url = args.url |
|
|
|
if args.url != args.dest_url: |
|
|
|
self.interhost = True |
|
|
|
|
|
|
|
self._client_src = Client(args.url) |
|
|
|
self._client_dest = Client(args.dest_url) |
|
|
|
|
|
|
|
if (not self.interhost) and (args.srcpath == args.destpath): |
|
|
|
raise Exception("source and destination path must be different") |
|
|
|
|
|
|
|
# Open and print info about the streams |
|
|
|
src = self._client.stream_list(args.srcpath, extended = True) |
|
|
|
src = self._client_src.stream_list(args.srcpath, extended = True) |
|
|
|
if len(src) != 1: |
|
|
|
raise Exception("source path " + args.srcpath + " not found") |
|
|
|
self.src = StreamInfo(src[0]) |
|
|
|
self.src = StreamInfo(args.url, src[0], self.interhost) |
|
|
|
|
|
|
|
dest = self._client.stream_list(args.destpath, extended = True) |
|
|
|
dest = self._client_dest.stream_list(args.destpath, extended = True) |
|
|
|
if len(dest) != 1: |
|
|
|
raise MissingDestination(self.src, StreamInfo([args.destpath])) |
|
|
|
self.dest = StreamInfo(dest[0]) |
|
|
|
raise MissingDestination(self.src, |
|
|
|
StreamInfo(args.dest_url, [args.destpath], |
|
|
|
self.interhost)) |
|
|
|
self.dest = StreamInfo(args.dest_url, dest[0], self.interhost) |
|
|
|
|
|
|
|
print "Source:", self.src |
|
|
|
print " Dest:", self.dest |
|
|
@@ -120,25 +147,51 @@ class Filter(object): |
|
|
|
print self.interval_string(interval) |
|
|
|
raise SystemExit(0) |
|
|
|
|
|
|
|
self.start = args.start |
|
|
|
self.end = args.end |
|
|
|
|
|
|
|
return args |
|
|
|
|
|
|
|
def intervals(self): |
|
|
|
"""Generate all the intervals that this filter should process""" |
|
|
|
self._using_client = True |
|
|
|
def _optimize_int(self, it): |
|
|
|
"""Join and yield adjacent intervals from the iterator 'it'""" |
|
|
|
saved_int = None |
|
|
|
for (start, end) in self._client.stream_intervals( |
|
|
|
self._args.srcpath, diffpath = self._args.destpath, |
|
|
|
start = self._args.start, end = self._args.end): |
|
|
|
|
|
|
|
# Join adjacent intervals |
|
|
|
for interval in it: |
|
|
|
if saved_int is not None: |
|
|
|
if saved_int.end == start: |
|
|
|
start = saved_int.start |
|
|
|
if saved_int.end == interval.start: |
|
|
|
interval.start = saved_int.start |
|
|
|
else: |
|
|
|
yield saved_int |
|
|
|
saved_int = Interval(start, end) |
|
|
|
saved_int = interval |
|
|
|
if saved_int is not None: |
|
|
|
yield saved_int |
|
|
|
|
|
|
|
def intervals(self): |
|
|
|
"""Generate all the intervals that this filter should process""" |
|
|
|
self._using_client = True |
|
|
|
|
|
|
|
if self.interhost: |
|
|
|
# Do the difference ourselves |
|
|
|
s_intervals = ( Interval(start, end) |
|
|
|
for (start, end) in |
|
|
|
self._client_src.stream_intervals( |
|
|
|
self.src.path, |
|
|
|
start = self.start, end = self.end) ) |
|
|
|
d_intervals = ( Interval(start, end) |
|
|
|
for (start, end) in |
|
|
|
self._client_dest.stream_intervals( |
|
|
|
self.dest.path, |
|
|
|
start = self.start, end = self.end) ) |
|
|
|
intervals = nilmdb.utils.interval.set_difference(s_intervals, |
|
|
|
d_intervals) |
|
|
|
else: |
|
|
|
# Let the server do the difference for us |
|
|
|
intervals = ( Interval(start, end) |
|
|
|
for (start, end) in |
|
|
|
self._client_src.stream_intervals( |
|
|
|
self.src.path, diffpath = self.dest.path, |
|
|
|
start = self.start, end = self.end) ) |
|
|
|
for interval in self._optimize_int(intervals): |
|
|
|
yield interval |
|
|
|
self._using_client = False |
|
|
|
|
|
|
|
# Misc helpers |
|
|
@@ -153,7 +206,7 @@ class Filter(object): |
|
|
|
def check_dest_metadata(self, data): |
|
|
|
"""See if the metadata jives, and complain if it doesn't. If |
|
|
|
there's no conflict, update the metadata to match 'data'.""" |
|
|
|
metadata = self._client.stream_get_metadata(self._args.destpath) |
|
|
|
metadata = self._client_dest.stream_get_metadata(self.dest.path) |
|
|
|
for key in data: |
|
|
|
wanted = str(data[key]) |
|
|
|
val = metadata.get(key, wanted) |
|
|
@@ -168,7 +221,7 @@ class Filter(object): |
|
|
|
m += "this error.\n" |
|
|
|
raise Exception(m) |
|
|
|
# All good -- write the metadata in case it's not already there |
|
|
|
self._client.stream_update_metadata(self._args.destpath, data) |
|
|
|
self._client_dest.stream_update_metadata(self.dest.path, data) |
|
|
|
|
|
|
|
# Main processing helper |
|
|
|
def process_python(self, function, rows, args = None, partial = False): |
|
|
@@ -194,10 +247,8 @@ class Filter(object): |
|
|
|
""" |
|
|
|
if args is None: |
|
|
|
args = [] |
|
|
|
extractor = nilmdb.client.Client(self._args.url).stream_extract |
|
|
|
inserter = nilmdb.client.Client(self._args.url).stream_insert_context |
|
|
|
src = self._args.srcpath |
|
|
|
dest = self._args.destpath |
|
|
|
extractor = Client(self.src.url).stream_extract |
|
|
|
inserter = Client(self.dest.url).stream_insert_context |
|
|
|
|
|
|
|
# Parse input data. We use homogenous types for now, which |
|
|
|
# means the timestamp type will be either float or int. |
|
|
@@ -211,9 +262,11 @@ class Filter(object): |
|
|
|
|
|
|
|
for interval in self.intervals(): |
|
|
|
print "Processing", self.interval_string(interval) |
|
|
|
with inserter(dest, interval.start, interval.end) as insert_ctx: |
|
|
|
with inserter(self.dest.path, |
|
|
|
interval.start, interval.end) as insert_ctx: |
|
|
|
src_array = [] |
|
|
|
for line in extractor(src, interval.start, interval.end): |
|
|
|
for line in extractor(self.src.path, |
|
|
|
interval.start, interval.end): |
|
|
|
# Read in data |
|
|
|
src_array.append([ float(x) for x in line.split() ]) |
|
|
|
|
|
|
|