|
|
@@ -4,63 +4,113 @@ import nilmdb.client |
|
|
|
from nilmdb.utils.printf import * |
|
|
|
from nilmdb.utils.time import parse_time, format_time |
|
|
|
|
|
|
|
import nilmtools |
|
|
|
import time |
|
|
|
import sys |
|
|
|
import re |
|
|
|
import argparse |
|
|
|
|
|
|
|
class MissingDestination(Exception): |
|
|
|
def __init__(self, src, layout, dest): |
|
|
|
self.src = src |
|
|
|
self.layout = layout |
|
|
|
self.layout_type = layout.split('_')[0] |
|
|
|
self.layout_count = int(layout.split('_')[1]) |
|
|
|
self.dest = dest |
|
|
|
Exception.__init__(self, "destination path " + dest + " not found") |
|
|
|
|
|
|
|
class Filter(object): |
|
|
|
|
|
|
|
def __init__(self, description = "Filter data"): |
|
|
|
self.args = None |
|
|
|
def __init__(self): |
|
|
|
self._parser = None |
|
|
|
self._args = None |
|
|
|
self._client = None |
|
|
|
self.parse_args(description) |
|
|
|
self._using_client = False |
|
|
|
self.srcinfo = None |
|
|
|
self.destinfo = None |
|
|
|
|
|
|
|
@property |
|
|
|
def client(self): |
|
|
|
if self._using_client: |
|
|
|
raise Exception("Filter client is in use; make another") |
|
|
|
return self._client |
|
|
|
|
|
|
|
def parse_args(self, description): |
|
|
|
def setup_parser(self, description = "Filter data"): |
|
|
|
parser = argparse.ArgumentParser( |
|
|
|
description = description, |
|
|
|
formatter_class = argparse.RawDescriptionHelpFormatter) |
|
|
|
parser.add_argument("-u", "--url", action="store", |
|
|
|
default="http://localhost:12380/", |
|
|
|
help="Server URL (default: %(default)s)") |
|
|
|
parser.add_argument("srcpath", action="store", |
|
|
|
help="Path of source stream, e.g. /foo/bar") |
|
|
|
parser.add_argument("destpath", action="store", |
|
|
|
help="Path of destination stream, e.g. /foo/bar") |
|
|
|
self.args = parser.parse_args() |
|
|
|
formatter_class = argparse.RawDescriptionHelpFormatter, |
|
|
|
version = nilmtools.__version__, |
|
|
|
description = description) |
|
|
|
group = parser.add_argument_group("General filter arguments") |
|
|
|
group.add_argument("-u", "--url", action="store", |
|
|
|
default="http://localhost:12380/", |
|
|
|
help="Server URL (default: %(default)s)") |
|
|
|
group.add_argument("-D", "--dry-run", action="store_true", |
|
|
|
default = False, |
|
|
|
help="Just print intervals that would be " |
|
|
|
"processed") |
|
|
|
group.add_argument("srcpath", action="store", |
|
|
|
help="Path of source stream, e.g. /foo/bar") |
|
|
|
group.add_argument("destpath", action="store", |
|
|
|
help="Path of destination stream, e.g. /foo/bar") |
|
|
|
self._parser = parser |
|
|
|
return parser |
|
|
|
|
|
|
|
def parse_args(self): |
|
|
|
args = self._parser.parse_args() |
|
|
|
self._args = args |
|
|
|
self._client = nilmdb.client.Client(args.url) |
|
|
|
|
|
|
|
if args.srcpath == args.destpath: |
|
|
|
raise Exception("source and destination path must be different") |
|
|
|
|
|
|
|
# Open and print info about the streams |
|
|
|
def stream_info_string(info): |
|
|
|
return sprintf("%s (%s), %.2fM rows, %.2f hours\n", |
|
|
|
info[0], info[1], info[4] / 1e6, info[5] / 3600) |
|
|
|
|
|
|
|
src = self._client.stream_list(args.srcpath, extended = True) |
|
|
|
if len(src) != 1: |
|
|
|
raise Exception("source path " + args.srcpath + " not found") |
|
|
|
print "Source:", stream_info_string(src[0]) |
|
|
|
self.srcinfo = src[0] |
|
|
|
|
|
|
|
dest = self._client.stream_list(args.destpath, extended = True) |
|
|
|
if len(dest) != 1: |
|
|
|
raise Exception("destination path " + args.destpath + " not found") |
|
|
|
print " Dest:", stream_info_string(dest[0]) |
|
|
|
raise MissingDestination(self.srcinfo[0], self.srcinfo[1], |
|
|
|
args.destpath) |
|
|
|
self.destinfo = dest[0] |
|
|
|
|
|
|
|
print "Source:", self.stream_info_string(self.srcinfo) |
|
|
|
print " Dest:", self.stream_info_string(self.destinfo) |
|
|
|
|
|
|
|
if args.dry_run: |
|
|
|
for interval in self.intervals(): |
|
|
|
print self.interval_string(interval) |
|
|
|
raise SystemExit(0) |
|
|
|
|
|
|
|
return args |
|
|
|
|
|
|
|
def intervals(self): |
|
|
|
"""Generate all the intervals that this filter should process""" |
|
|
|
self._using_client = True |
|
|
|
for i in self._client.stream_intervals( |
|
|
|
args.srcpath, diffpath = args.destpath): |
|
|
|
self._args.srcpath, diffpath = self._args.destpath): |
|
|
|
yield i |
|
|
|
self._using_client = False |
|
|
|
|
|
|
|
# Misc helpers |
|
|
|
def stream_info_string(self, info): |
|
|
|
return sprintf("%s (%s), %.2fM rows, %.2f hours", |
|
|
|
info[0], info[1], info[4] / 1e6, info[5] / 3600) |
|
|
|
|
|
|
|
def interval_string(self, interval): |
|
|
|
return sprintf("[ %s -> %s ]", format_time(interval[0]), |
|
|
|
format_time(interval[1])) |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
# This is just a dummy function; actual filters can use the other |
|
|
|
# functions to prepare stuff, and then do something with the data. |
|
|
|
f = Filter() |
|
|
|
for interval in f.intervals(): |
|
|
|
print "Generic filter: need to handle interval:", interval |
|
|
|
parser = f.setup_parser() |
|
|
|
args = f.parse_args() |
|
|
|
for (start, end) in f.intervals(): |
|
|
|
print "Generic filter: need to handle", start, " to ", end |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
main() |