You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

335 lines
13 KiB

  1. #!/usr/bin/python
  2. from __future__ import absolute_import
  3. import nilmdb.client
  4. from nilmdb.client import Client
  5. from nilmdb.client.numpyclient import NumpyClient
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  8. timestamp_to_seconds)
  9. from nilmdb.utils.interval import Interval
  10. import nilmtools
  11. import itertools
  12. import time
  13. import sys
  14. import re
  15. import argparse
  16. import numpy as np
  17. import cStringIO
  18. class MissingDestination(Exception):
  19. def __init__(self, args, src, dest):
  20. self.parsed_args = args
  21. self.src = src
  22. self.dest = dest
  23. Exception.__init__(self, "destination path " + dest.path + " not found")
  24. class StreamInfo(object):
  25. def __init__(self, url, info):
  26. self.url = url
  27. self.info = info
  28. try:
  29. self.path = info[0]
  30. self.layout = info[1]
  31. self.layout_type = self.layout.split('_')[0]
  32. self.layout_count = int(self.layout.split('_')[1])
  33. self.total_count = self.layout_count + 1
  34. self.timestamp_min = info[2]
  35. self.timestamp_max = info[3]
  36. self.rows = info[4]
  37. self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
  38. except IndexError, TypeError:
  39. pass
  40. def string(self, interhost):
  41. """Return stream info as a string. If interhost is true,
  42. include the host URL."""
  43. if interhost:
  44. return sprintf("[%s] ", self.url) + str(self)
  45. return str(self)
  46. def __str__(self):
  47. """Return stream info as a string."""
  48. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  49. self.path, self.layout, self.rows / 1e6,
  50. self.seconds / 3600.0)
  51. def get_stream_info(client, path):
  52. """Return a StreamInfo object about the given path, or None if it
  53. doesn't exist"""
  54. streams = client.stream_list(path, extended = True)
  55. if len(streams) != 1:
  56. return None
  57. return StreamInfo(client.geturl(), streams[0])
  58. class Filter(object):
  59. def __init__(self):
  60. self._parser = None
  61. self._client_src = None
  62. self._client_dest = None
  63. self._using_client = False
  64. self.src = None
  65. self.dest = None
  66. self.start = None
  67. self.end = None
  68. self.interhost = False
  69. self.force_metadata = False
  70. @property
  71. def client_src(self):
  72. if self._using_client:
  73. raise Exception("Filter client is in use; make another")
  74. return self._client_src
  75. @property
  76. def client_dest(self):
  77. if self._using_client:
  78. raise Exception("Filter client is in use; make another")
  79. return self._client_dest
  80. def setup_parser(self, description = "Filter data", skip_paths = False):
  81. parser = argparse.ArgumentParser(
  82. formatter_class = argparse.RawDescriptionHelpFormatter,
  83. version = nilmtools.__version__,
  84. description = description)
  85. group = parser.add_argument_group("General filter arguments")
  86. group.add_argument("-u", "--url", action="store",
  87. default="http://localhost/nilmdb/",
  88. help="Server URL (default: %(default)s)")
  89. group.add_argument("-U", "--dest-url", action="store",
  90. help="Destination server URL "
  91. "(default: same as source)")
  92. group.add_argument("-D", "--dry-run", action="store_true",
  93. default = False,
  94. help="Just print intervals that would be "
  95. "processed")
  96. group.add_argument("--force-metadata", action="store_true",
  97. default = False,
  98. help="Force metadata changes if the dest "
  99. "doesn't match")
  100. group.add_argument("-s", "--start",
  101. metavar="TIME", type=self.arg_time,
  102. help="Starting timestamp for intervals "
  103. "(free-form, inclusive)")
  104. group.add_argument("-e", "--end",
  105. metavar="TIME", type=self.arg_time,
  106. help="Ending timestamp for intervals "
  107. "(free-form, noninclusive)")
  108. if not skip_paths:
  109. # Individual filter scripts might want to add these arguments
  110. # themselves, to include multiple sources in a different order
  111. # (for example). "srcpath" and "destpath" arguments must exist,
  112. # though.
  113. group.add_argument("srcpath", action="store",
  114. help="Path of source stream, e.g. /foo/bar")
  115. group.add_argument("destpath", action="store",
  116. help="Path of destination stream, e.g. /foo/bar")
  117. self._parser = parser
  118. return parser
  119. def interval_string(self, interval):
  120. return sprintf("[ %s -> %s ]",
  121. timestamp_to_human(interval.start),
  122. timestamp_to_human(interval.end))
  123. def parse_args(self, argv = None):
  124. args = self._parser.parse_args(argv)
  125. if args.dest_url is None:
  126. args.dest_url = args.url
  127. if args.url != args.dest_url:
  128. self.interhost = True
  129. self._client_src = Client(args.url)
  130. self._client_dest = Client(args.dest_url)
  131. if (not self.interhost) and (args.srcpath == args.destpath):
  132. self._parser.error("source and destination path must be different")
  133. # Open and print info about the streams
  134. self.src = get_stream_info(self._client_src, args.srcpath)
  135. if not self.src:
  136. self._parser.error("source path " + args.srcpath + " not found")
  137. self.dest = get_stream_info(self._client_dest, args.destpath)
  138. if not self.dest:
  139. raise MissingDestination(args, self.src,
  140. StreamInfo(args.dest_url, [args.destpath]))
  141. print "Source:", self.src.string(self.interhost)
  142. print " Dest:", self.dest.string(self.interhost)
  143. if args.dry_run:
  144. for interval in self.intervals():
  145. print self.interval_string(interval)
  146. raise SystemExit(0)
  147. self.force_metadata = args.force_metadata
  148. self.start = args.start
  149. self.end = args.end
  150. return args
  151. def _optimize_int(self, it):
  152. """Join and yield adjacent intervals from the iterator 'it'"""
  153. saved_int = None
  154. for interval in it:
  155. if saved_int is not None:
  156. if saved_int.end == interval.start:
  157. interval.start = saved_int.start
  158. else:
  159. yield saved_int
  160. saved_int = interval
  161. if saved_int is not None:
  162. yield saved_int
  163. def intervals(self):
  164. """Generate all the intervals that this filter should process"""
  165. self._using_client = True
  166. if self.interhost:
  167. # Do the difference ourselves
  168. s_intervals = ( Interval(start, end)
  169. for (start, end) in
  170. self._client_src.stream_intervals(
  171. self.src.path,
  172. start = self.start, end = self.end) )
  173. d_intervals = ( Interval(start, end)
  174. for (start, end) in
  175. self._client_dest.stream_intervals(
  176. self.dest.path,
  177. start = self.start, end = self.end) )
  178. intervals = nilmdb.utils.interval.set_difference(s_intervals,
  179. d_intervals)
  180. else:
  181. # Let the server do the difference for us
  182. intervals = ( Interval(start, end)
  183. for (start, end) in
  184. self._client_src.stream_intervals(
  185. self.src.path, diffpath = self.dest.path,
  186. start = self.start, end = self.end) )
  187. # Optimize intervals: join intervals that are adjacent
  188. for interval in self._optimize_int(intervals):
  189. yield interval
  190. self._using_client = False
  191. # Misc helpers
  192. def arg_time(self, toparse):
  193. """Parse a time string argument"""
  194. try:
  195. return nilmdb.utils.time.parse_time(toparse)
  196. except ValueError as e:
  197. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  198. str(e), toparse))
  199. def check_dest_metadata(self, data):
  200. """See if the metadata jives, and complain if it doesn't. If
  201. there's no conflict, update the metadata to match 'data'."""
  202. metadata = self._client_dest.stream_get_metadata(self.dest.path)
  203. if not self.force_metadata:
  204. for key in data:
  205. wanted = str(data[key])
  206. val = metadata.get(key, wanted)
  207. if val != wanted and self.dest.rows > 0:
  208. m = "Metadata in destination stream:\n"
  209. m += " %s = %s\n" % (key, val)
  210. m += "doesn't match desired data:\n"
  211. m += " %s = %s\n" % (key, wanted)
  212. m += "Refusing to change it. To prevent this error, "
  213. m += "change or delete the metadata with nilmtool,\n"
  214. m += "remove existing data from the stream, or "
  215. m += "retry with --force-metadata."
  216. raise Exception(m)
  217. # All good -- write the metadata in case it's not already there
  218. self._client_dest.stream_update_metadata(self.dest.path, data)
  219. # The main filter processing method.
  220. def process_numpy(self, function, args = None, rows = 100000):
  221. """For all intervals that exist in self.src but don't exist in
  222. self.dest, call 'function' with a Numpy array corresponding to
  223. the data. The data is converted to a Numpy array in chunks of
  224. 'rows' rows at a time.
  225. 'function' should be defined as:
  226. def function(data, interval, args, insert_func, final)
  227. 'data': array of data to process -- may be empty
  228. 'interval': overall interval we're processing (but not necessarily
  229. the interval of this particular chunk of data)
  230. 'args': opaque arguments passed to process_numpy
  231. 'insert_func': function to call in order to insert array of data.
  232. Should be passed a 2-dimensional array of data to insert.
  233. Data timestamps must be within the provided interval.
  234. 'final': True if this is the last bit of data for this
  235. contiguous interval, False otherwise.
  236. Return value of 'function' is the number of data rows processed.
  237. Unprocessed data will be provided again in a subsequent call
  238. (unless 'final' is True).
  239. """
  240. if args is None:
  241. args = []
  242. extractor = NumpyClient(self.src.url).stream_extract_numpy
  243. inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
  244. for interval in self.intervals():
  245. print "Processing", self.interval_string(interval)
  246. with inserter(self.dest.path,
  247. interval.start, interval.end) as insert_ctx:
  248. insert_function = insert_ctx.insert
  249. old_array = np.array([])
  250. for new_array in extractor(self.src.path,
  251. interval.start, interval.end,
  252. layout = self.src.layout,
  253. maxrows = rows):
  254. # If we still had old data left, combine it
  255. if old_array.shape[0] != 0:
  256. array = np.vstack((old_array, new_array))
  257. else:
  258. array = new_array
  259. # Pass it to the process function
  260. processed = function(array, interval, args,
  261. insert_function, False)
  262. # Send any pending data
  263. insert_ctx.send()
  264. # Save the unprocessed parts
  265. if processed >= 0:
  266. old_array = array[processed:]
  267. else:
  268. raise Exception(
  269. sprintf("%s return value %s must be >= 0",
  270. str(function), str(processed)))
  271. # Warn if there's too much data remaining
  272. if old_array.shape[0] > 3 * rows:
  273. printf("warning: %d unprocessed rows in buffer\n",
  274. old_array.shape[0])
  275. # Last call for this contiguous interval
  276. if old_array.shape[0] != 0:
  277. function(old_array, interval, args, insert_function, True)
  278. def main(argv = None):
  279. # This is just a dummy function; actual filters can use the other
  280. # functions to prepare stuff, and then do something with the data.
  281. f = Filter()
  282. parser = f.setup_parser()
  283. args = f.parse_args(argv)
  284. for i in f.intervals():
  285. print "Generic filter: need to handle", f.interval_string(i)
  286. if __name__ == "__main__":
  287. main()