You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

246 lines
9.4 KiB

  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.utils.printf import *
  4. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  5. timestamp_to_seconds)
  6. from nilmdb.utils.interval import Interval
  7. import nilmtools
  8. import itertools
  9. import time
  10. import sys
  11. import re
  12. import argparse
  13. class MissingDestination(Exception):
  14. def __init__(self, src, dest):
  15. self.src = src
  16. self.dest = dest
  17. Exception.__init__(self, "destination path " + dest.path + " not found")
  18. class StreamInfo(object):
  19. def __init__(self, info):
  20. self.info = info
  21. try:
  22. self.path = info[0]
  23. self.layout = info[1]
  24. self.layout_type = self.layout.split('_')[0]
  25. self.layout_count = int(self.layout.split('_')[1])
  26. self.total_count = self.layout_count + 1
  27. self.timestamp_min = info[2]
  28. self.timestamp_max = info[3]
  29. self.rows = info[4]
  30. self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
  31. except IndexError, TypeError:
  32. pass
  33. def __str__(self):
  34. """Print stream info as a string"""
  35. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  36. self.path, self.layout, self.rows / 1e6,
  37. self.seconds / 3600.0)
  38. class Filter(object):
  39. def __init__(self):
  40. self._parser = None
  41. self._args = None
  42. self._client = None
  43. self._using_client = False
  44. self.src = None
  45. self.dest = None
  46. @property
  47. def client(self):
  48. if self._using_client:
  49. raise Exception("Filter client is in use; make another")
  50. return self._client
  51. def setup_parser(self, description = "Filter data"):
  52. parser = argparse.ArgumentParser(
  53. formatter_class = argparse.RawDescriptionHelpFormatter,
  54. version = nilmtools.__version__,
  55. description = description)
  56. group = parser.add_argument_group("General filter arguments")
  57. group.add_argument("-u", "--url", action="store",
  58. default="http://localhost:12380/",
  59. help="Server URL (default: %(default)s)")
  60. group.add_argument("-D", "--dry-run", action="store_true",
  61. default = False,
  62. help="Just print intervals that would be "
  63. "processed")
  64. group.add_argument("-s", "--start",
  65. metavar="TIME", type=self.arg_time,
  66. help="Starting timestamp for intervals "
  67. "(free-form, inclusive)")
  68. group.add_argument("-e", "--end",
  69. metavar="TIME", type=self.arg_time,
  70. help="Ending timestamp for intervals "
  71. "(free-form, noninclusive)")
  72. group.add_argument("srcpath", action="store",
  73. help="Path of source stream, e.g. /foo/bar")
  74. group.add_argument("destpath", action="store",
  75. help="Path of destination stream, e.g. /foo/bar")
  76. self._parser = parser
  77. return parser
  78. def interval_string(self, interval):
  79. return sprintf("[ %s -> %s ]",
  80. timestamp_to_human(interval.start),
  81. timestamp_to_human(interval.end))
  82. def parse_args(self):
  83. args = self._parser.parse_args()
  84. self._args = args
  85. self._client = nilmdb.client.Client(args.url)
  86. if args.srcpath == args.destpath:
  87. raise Exception("source and destination path must be different")
  88. # Open and print info about the streams
  89. src = self._client.stream_list(args.srcpath, extended = True)
  90. if len(src) != 1:
  91. raise Exception("source path " + args.srcpath + " not found")
  92. self.src = StreamInfo(src[0])
  93. dest = self._client.stream_list(args.destpath, extended = True)
  94. if len(dest) != 1:
  95. raise MissingDestination(self.src, StreamInfo([args.destpath]))
  96. self.dest = StreamInfo(dest[0])
  97. print "Source:", self.src
  98. print " Dest:", self.dest
  99. if args.dry_run:
  100. for interval in self.intervals():
  101. print self.interval_string(interval)
  102. raise SystemExit(0)
  103. return args
  104. def intervals(self):
  105. """Generate all the intervals that this filter should process"""
  106. self._using_client = True
  107. saved_int = None
  108. for (start, end) in self._client.stream_intervals(
  109. self._args.srcpath, diffpath = self._args.destpath,
  110. start = self._args.start, end = self._args.end):
  111. # Join adjacent intervals
  112. if saved_int is not None:
  113. if saved_int.end == start:
  114. start = saved_int.start
  115. else:
  116. yield saved_int
  117. saved_int = Interval(start, end)
  118. if saved_int is not None:
  119. yield saved_int
  120. self._using_client = False
  121. # Misc helpers
  122. def arg_time(self, toparse):
  123. """Parse a time string argument"""
  124. try:
  125. return nilmdb.utils.time.parse_time(toparse)
  126. except ValueError as e:
  127. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  128. str(e), toparse))
  129. def check_dest_metadata(self, data):
  130. """See if the metadata jives, and complain if it doesn't. If
  131. there's no conflict, update the metadata to match 'data'."""
  132. metadata = self._client.stream_get_metadata(self._args.destpath)
  133. for key in data:
  134. wanted = str(data[key])
  135. val = metadata.get(key, wanted)
  136. if val != wanted and self.dest.rows > 0:
  137. m = "Metadata in destination stream:\n"
  138. m += " %s = %s\n" % (key, val)
  139. m += "doesn't match desired data:\n"
  140. m += " %s = %s\n" % (key, wanted)
  141. m += "Refusing to change it. You can change the stream's "
  142. m += "metadata manually, or\n"
  143. m += "remove existing data from the stream, to prevent "
  144. m += "this error.\n"
  145. raise Exception(m)
  146. # All good -- write the metadata in case it's not already there
  147. self._client.stream_update_metadata(self._args.destpath, data)
  148. # Main processing helper
  149. def process_python(self, function, rows, args = None, partial = False):
  150. """Process data in chunks of 'rows' data at a time.
  151. This provides data as nested Python lists and expects the same
  152. back.
  153. function: function to process the data
  154. rows: maximum number of rows to pass to 'function' at once
  155. args: tuple containing extra arguments to pass to 'function'
  156. partial: if true, less than 'rows' may be passed to 'function'.
  157. if false, partial data at the end of an interval will
  158. be dropped.
  159. 'function' should be defined like:
  160. function(data, *args)
  161. It will be passed a list containing up to 'rows' rows of
  162. data from the source stream, and any arguments passed in
  163. 'args'. It should transform the data as desired, and return a
  164. new list of rdata, which will be inserted into the destination
  165. stream.
  166. """
  167. if args is None:
  168. args = []
  169. extractor = nilmdb.client.Client(self._args.url).stream_extract
  170. inserter = nilmdb.client.Client(self._args.url).stream_insert_context
  171. src = self._args.srcpath
  172. dest = self._args.destpath
  173. # Parse input data. We use homogenous types for now, which
  174. # means the timestamp type will be either float or int.
  175. if "int" in self.src.layout_type:
  176. parser = lambda line: [ int(x) for x in line.split() ]
  177. else:
  178. parser = lambda line: [ float(x) for x in line.split() ]
  179. # Format output data.
  180. formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
  181. for interval in self.intervals():
  182. print "Processing", self.interval_string(interval)
  183. with inserter(dest, interval.start, interval.end) as insert_ctx:
  184. src_array = []
  185. for line in extractor(src, interval.start, interval.end):
  186. # Read in data
  187. src_array.append([ float(x) for x in line.split() ])
  188. if len(src_array) == rows:
  189. # Pass through filter function
  190. dest_array = function(src_array, *args)
  191. # Write result to destination
  192. out = [ formatter(row) for row in dest_array ]
  193. insert_ctx.insert("".join(out))
  194. # Clear source array
  195. src_array = []
  196. # Take care of partial chunk
  197. if len(src_array) and partial:
  198. dest_array = function(src_array, *args)
  199. out = [ formatter(row) for row in dest_array ]
  200. insert_ctx.insert("".join(out))
  201. def main():
  202. # This is just a dummy function; actual filters can use the other
  203. # functions to prepare stuff, and then do something with the data.
  204. f = Filter()
  205. parser = f.setup_parser()
  206. args = f.parse_args()
  207. for i in f.intervals():
  208. print "Generic filter: need to handle", f.interval_string(i)
  209. if __name__ == "__main__":
  210. main()