You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

221 lines
8.6 KiB

  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.utils.printf import *
  4. from nilmdb.utils.time import parse_time, format_time
  5. import nilmtools
  6. import itertools
  7. import time
  8. import sys
  9. import re
  10. import argparse
  11. class MissingDestination(Exception):
  12. def __init__(self, src, layout, dest):
  13. self.src = src
  14. self.layout = layout
  15. self.layout_type = layout.split('_')[0]
  16. self.layout_count = int(layout.split('_')[1])
  17. self.dest = dest
  18. Exception.__init__(self, "destination path " + dest + " not found")
  19. class Filter(object):
  20. def __init__(self):
  21. self._parser = None
  22. self._args = None
  23. self._client = None
  24. self._using_client = False
  25. self.srcinfo = None
  26. self.destinfo = None
  27. @property
  28. def client(self):
  29. if self._using_client:
  30. raise Exception("Filter client is in use; make another")
  31. return self._client
  32. def setup_parser(self, description = "Filter data"):
  33. parser = argparse.ArgumentParser(
  34. formatter_class = argparse.RawDescriptionHelpFormatter,
  35. version = nilmtools.__version__,
  36. description = description)
  37. group = parser.add_argument_group("General filter arguments")
  38. group.add_argument("-u", "--url", action="store",
  39. default="http://localhost:12380/",
  40. help="Server URL (default: %(default)s)")
  41. group.add_argument("-D", "--dry-run", action="store_true",
  42. default = False,
  43. help="Just print intervals that would be "
  44. "processed")
  45. group.add_argument("-s", "--start",
  46. metavar="TIME", type=self.arg_time,
  47. help="Starting timestamp for intervals "
  48. "(free-form, inclusive)")
  49. group.add_argument("-e", "--end",
  50. metavar="TIME", type=self.arg_time,
  51. help="Ending timestamp for intervals "
  52. "(free-form, noninclusive)")
  53. group.add_argument("srcpath", action="store",
  54. help="Path of source stream, e.g. /foo/bar")
  55. group.add_argument("destpath", action="store",
  56. help="Path of destination stream, e.g. /foo/bar")
  57. self._parser = parser
  58. return parser
  59. def parse_args(self):
  60. args = self._parser.parse_args()
  61. self._args = args
  62. self._client = nilmdb.client.Client(args.url)
  63. if args.srcpath == args.destpath:
  64. raise Exception("source and destination path must be different")
  65. # Open and print info about the streams
  66. src = self._client.stream_list(args.srcpath, extended = True)
  67. if len(src) != 1:
  68. raise Exception("source path " + args.srcpath + " not found")
  69. self.srcinfo = src[0]
  70. dest = self._client.stream_list(args.destpath, extended = True)
  71. if len(dest) != 1:
  72. raise MissingDestination(self.srcinfo[0], self.srcinfo[1],
  73. args.destpath)
  74. self.destinfo = dest[0]
  75. print "Source:", self.stream_info_string(self.srcinfo)
  76. print " Dest:", self.stream_info_string(self.destinfo)
  77. if args.dry_run:
  78. for interval in self.intervals():
  79. print self.interval_string(interval)
  80. raise SystemExit(0)
  81. return args
  82. def intervals(self):
  83. """Generate all the intervals that this filter should process"""
  84. self._using_client = True
  85. for i in self._client.stream_intervals(
  86. self._args.srcpath, diffpath = self._args.destpath,
  87. start = self._args.start, end = self._args.end):
  88. yield i
  89. self._using_client = False
  90. # Misc helpers
  91. def arg_time(self, toparse):
  92. """Parse a time string argument"""
  93. try:
  94. return nilmdb.utils.time.parse_time(toparse).totimestamp()
  95. except ValueError as e:
  96. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  97. str(e), toparse))
  98. def stream_info_string(self, info):
  99. """Print stream info as a string"""
  100. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  101. info[0], info[1], info[4] / 1e6, info[5] / 3600)
  102. def interval_string(self, interval):
  103. """Print interval as a string"""
  104. return sprintf("[ %s -> %s ]", format_time(interval[0]),
  105. format_time(interval[1]))
  106. def check_dest_metadata(self, data):
  107. """See if the metadata jives, and complain if it doesn't. If
  108. there's no conflict, update the metadata to match 'data'."""
  109. metadata = self._client.stream_get_metadata(self._args.destpath)
  110. rows = self.destinfo[4]
  111. for key in data:
  112. wanted = str(data[key])
  113. val = metadata.get(key, wanted)
  114. if val != wanted and rows > 0:
  115. m = "Metadata in destination stream:\n"
  116. m += " %s = %s\n" % (key, val)
  117. m += "doesn't match desired data:\n"
  118. m += " %s = %s\n" % (key, wanted)
  119. m += "Refusing to change it. You can change the stream's "
  120. m += "metadata manually, or\n"
  121. m += "remove existing data from the stream, to prevent "
  122. m += "this error.\n"
  123. raise Exception(m)
  124. # All good -- write the metadata in case it's not already there
  125. self._client.stream_update_metadata(self._args.destpath, data)
  126. # Main processing helper
  127. def process(self, function, rows, partial = True, args = None):
  128. """Process data in chunks of 'rows' data at a time.
  129. function: function to process the data
  130. rows: maximum number of rows to pass to 'function' at once
  131. args: tuple containing extra arguments to pass to 'function'
  132. partial: if true, less than 'rows' may be passed to 'function'.
  133. if false, partial data at the end of an interval will
  134. be dropped.
  135. 'function' should be defined like:
  136. function(data, *args)
  137. It will be passed an array containing up to 'rows' rows of
  138. data from the source stream, and any arguments passed in
  139. 'args'. It should transform the data as desired, and return a
  140. new array of data, which will be inserted into the destination
  141. stream.
  142. """
  143. if args is None:
  144. args = []
  145. extractor = nilmdb.client.Client(self._args.url).stream_extract
  146. inserter = nilmdb.client.Client(self._args.url).stream_insert_context
  147. src = self._args.srcpath
  148. dest = self._args.destpath
  149. islice = itertools.islice
  150. # Figure out how to format output data
  151. dest_layout = self.destinfo[1].split('_')[1]
  152. def int_formatter(row):
  153. return ("%.6f " % row[0]) + " ".join(str(int(x)) for x in row[1:])
  154. def float_formatter(row):
  155. return ("%.6f " % row[0]) + " ".join(repr(x) for x in row[1:])
  156. if "int" in dest_layout:
  157. formatter = int_formatter
  158. else:
  159. formatter = float_formatter
  160. for (start, end) in self.intervals():
  161. print "Processing", self.interval_string((start, end))
  162. with inserter(dest, start, end) as insert_ctx:
  163. src_array = []
  164. for line in extractor(src, start, end):
  165. # Read in data
  166. src_array.append([ float(x) for x in line.split() ])
  167. if len(src_array) == rows:
  168. # Pass through filter function
  169. dest_array = function(src_array, *args)
  170. # Write result to destination
  171. out = [ formatter(row) for row in dest_array ]
  172. insert_ctx.insert("\n".join(out) + "\n")
  173. # Clear source array
  174. src_array = []
  175. # Take care of partial chunk
  176. if len(src_array) and partial:
  177. dest_array = function(src_array, *args)
  178. out = [ formatter(row) for row in dest_array ]
  179. insert_ctx.insert("\n".join(out) + "\n")
  180. def main():
  181. # This is just a dummy function; actual filters can use the other
  182. # functions to prepare stuff, and then do something with the data.
  183. f = Filter()
  184. parser = f.setup_parser()
  185. args = f.parse_args()
  186. for (start, end) in f.intervals():
  187. print "Generic filter: need to handle", start, " to ", end
  188. if __name__ == "__main__":
  189. main()