You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

348 lines
14 KiB

  1. #!/usr/bin/python
  2. from __future__ import absolute_import
  3. import nilmdb.client
  4. from nilmdb.client import Client
  5. from nilmdb.client.numpyclient import NumpyClient
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  8. timestamp_to_seconds)
  9. from nilmdb.utils.interval import Interval
  10. import nilmtools
  11. import itertools
  12. import time
  13. import sys
  14. import re
  15. import argparse
  16. import numpy as np
  17. import cStringIO
  18. class MissingDestination(Exception):
  19. def __init__(self, args, src, dest):
  20. self.parsed_args = args
  21. self.src = src
  22. self.dest = dest
  23. Exception.__init__(self, "destination path " + dest.path + " not found")
  24. class StreamInfo(object):
  25. def __init__(self, url, info):
  26. self.url = url
  27. self.info = info
  28. try:
  29. self.path = info[0]
  30. self.layout = info[1]
  31. self.layout_type = self.layout.split('_')[0]
  32. self.layout_count = int(self.layout.split('_')[1])
  33. self.total_count = self.layout_count + 1
  34. self.timestamp_min = info[2]
  35. self.timestamp_max = info[3]
  36. self.rows = info[4]
  37. self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
  38. except IndexError, TypeError:
  39. pass
  40. def string(self, interhost):
  41. """Return stream info as a string. If interhost is true,
  42. include the host URL."""
  43. if interhost:
  44. return sprintf("[%s] ", self.url) + str(self)
  45. return str(self)
  46. def __str__(self):
  47. """Return stream info as a string."""
  48. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  49. self.path, self.layout, self.rows / 1e6,
  50. self.seconds / 3600.0)
  51. def get_stream_info(client, path):
  52. """Return a StreamInfo object about the given path, or None if it
  53. doesn't exist"""
  54. streams = client.stream_list(path, extended = True)
  55. if len(streams) != 1:
  56. return None
  57. return StreamInfo(client.geturl(), streams[0])
  58. class Filter(object):
  59. def __init__(self, parser_description = None):
  60. self._parser = None
  61. self._client_src = None
  62. self._client_dest = None
  63. self._using_client = False
  64. self.src = None
  65. self.dest = None
  66. self.start = None
  67. self.end = None
  68. self.interhost = False
  69. self.force_metadata = False
  70. if parser_description is not None:
  71. self.setup_parser(parser_description)
  72. self.parse_args()
  73. @property
  74. def client_src(self):
  75. if self._using_client:
  76. raise Exception("Filter client is in use; make another")
  77. return self._client_src
  78. @property
  79. def client_dest(self):
  80. if self._using_client:
  81. raise Exception("Filter client is in use; make another")
  82. return self._client_dest
  83. def setup_parser(self, description = "Filter data", skip_paths = False):
  84. parser = argparse.ArgumentParser(
  85. formatter_class = argparse.RawDescriptionHelpFormatter,
  86. version = nilmtools.__version__,
  87. description = description)
  88. group = parser.add_argument_group("General filter arguments")
  89. group.add_argument("-u", "--url", action="store",
  90. default="http://localhost/nilmdb/",
  91. help="Server URL (default: %(default)s)")
  92. group.add_argument("-U", "--dest-url", action="store",
  93. help="Destination server URL "
  94. "(default: same as source)")
  95. group.add_argument("-D", "--dry-run", action="store_true",
  96. default = False,
  97. help="Just print intervals that would be "
  98. "processed")
  99. group.add_argument("--force-metadata", action="store_true",
  100. default = False,
  101. help="Force metadata changes if the dest "
  102. "doesn't match")
  103. group.add_argument("-s", "--start",
  104. metavar="TIME", type=self.arg_time,
  105. help="Starting timestamp for intervals "
  106. "(free-form, inclusive)")
  107. group.add_argument("-e", "--end",
  108. metavar="TIME", type=self.arg_time,
  109. help="Ending timestamp for intervals "
  110. "(free-form, noninclusive)")
  111. if not skip_paths:
  112. # Individual filter scripts might want to add these arguments
  113. # themselves, to include multiple sources in a different order
  114. # (for example). "srcpath" and "destpath" arguments must exist,
  115. # though.
  116. group.add_argument("srcpath", action="store",
  117. help="Path of source stream, e.g. /foo/bar")
  118. group.add_argument("destpath", action="store",
  119. help="Path of destination stream, e.g. /foo/bar")
  120. self._parser = parser
  121. return parser
  122. def interval_string(self, interval):
  123. return sprintf("[ %s -> %s ]",
  124. timestamp_to_human(interval.start),
  125. timestamp_to_human(interval.end))
  126. def parse_args(self, argv = None):
  127. args = self._parser.parse_args(argv)
  128. if args.dest_url is None:
  129. args.dest_url = args.url
  130. if args.url != args.dest_url:
  131. self.interhost = True
  132. self._client_src = Client(args.url)
  133. self._client_dest = Client(args.dest_url)
  134. if (not self.interhost) and (args.srcpath == args.destpath):
  135. self._parser.error("source and destination path must be different")
  136. # Open and print info about the streams
  137. self.src = get_stream_info(self._client_src, args.srcpath)
  138. if not self.src:
  139. self._parser.error("source path " + args.srcpath + " not found")
  140. self.dest = get_stream_info(self._client_dest, args.destpath)
  141. if not self.dest:
  142. raise MissingDestination(args, self.src,
  143. StreamInfo(args.dest_url, [args.destpath]))
  144. print "Source:", self.src.string(self.interhost)
  145. print " Dest:", self.dest.string(self.interhost)
  146. if args.dry_run:
  147. for interval in self.intervals():
  148. print self.interval_string(interval)
  149. raise SystemExit(0)
  150. self.force_metadata = args.force_metadata
  151. self.start = args.start
  152. self.end = args.end
  153. return args
  154. def _optimize_int(self, it):
  155. """Join and yield adjacent intervals from the iterator 'it'"""
  156. saved_int = None
  157. for interval in it:
  158. if saved_int is not None:
  159. if saved_int.end == interval.start:
  160. interval.start = saved_int.start
  161. else:
  162. yield saved_int
  163. saved_int = interval
  164. if saved_int is not None:
  165. yield saved_int
  166. def intervals(self):
  167. """Generate all the intervals that this filter should process"""
  168. self._using_client = True
  169. if self.interhost:
  170. # Do the difference ourselves
  171. s_intervals = ( Interval(start, end)
  172. for (start, end) in
  173. self._client_src.stream_intervals(
  174. self.src.path,
  175. start = self.start, end = self.end) )
  176. d_intervals = ( Interval(start, end)
  177. for (start, end) in
  178. self._client_dest.stream_intervals(
  179. self.dest.path,
  180. start = self.start, end = self.end) )
  181. intervals = nilmdb.utils.interval.set_difference(s_intervals,
  182. d_intervals)
  183. else:
  184. # Let the server do the difference for us
  185. intervals = ( Interval(start, end)
  186. for (start, end) in
  187. self._client_src.stream_intervals(
  188. self.src.path, diffpath = self.dest.path,
  189. start = self.start, end = self.end) )
  190. # Optimize intervals: join intervals that are adjacent
  191. for interval in self._optimize_int(intervals):
  192. yield interval
  193. self._using_client = False
  194. # Misc helpers
  195. def arg_time(self, toparse):
  196. """Parse a time string argument"""
  197. try:
  198. return nilmdb.utils.time.parse_time(toparse)
  199. except ValueError as e:
  200. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  201. str(e), toparse))
  202. def check_dest_metadata(self, data):
  203. """See if the metadata jives, and complain if it doesn't. If
  204. there's no conflict, update the metadata to match 'data'."""
  205. metadata = self._client_dest.stream_get_metadata(self.dest.path)
  206. if not self.force_metadata:
  207. for key in data:
  208. wanted = str(data[key])
  209. val = metadata.get(key, wanted)
  210. if val != wanted and self.dest.rows > 0:
  211. m = "Metadata in destination stream:\n"
  212. m += " %s = %s\n" % (key, val)
  213. m += "doesn't match desired data:\n"
  214. m += " %s = %s\n" % (key, wanted)
  215. m += "Refusing to change it. To prevent this error, "
  216. m += "change or delete the metadata with nilmtool,\n"
  217. m += "remove existing data from the stream, or "
  218. m += "retry with --force-metadata."
  219. raise Exception(m)
  220. # All good -- write the metadata in case it's not already there
  221. self._client_dest.stream_update_metadata(self.dest.path, data)
  222. # The main filter processing method.
  223. def process_numpy(self, function, args = None, rows = 100000):
  224. """For all intervals that exist in self.src but don't exist in
  225. self.dest, call 'function' with a Numpy array corresponding to
  226. the data. The data is converted to a Numpy array in chunks of
  227. 'rows' rows at a time.
  228. 'function' should be defined as:
  229. def function(data, interval, args, insert_func, final)
  230. 'data': array of data to process -- may be empty
  231. 'interval': overall interval we're processing (but not necessarily
  232. the interval of this particular chunk of data)
  233. 'args': opaque arguments passed to process_numpy
  234. 'insert_func': function to call in order to insert array of data.
  235. Should be passed a 2-dimensional array of data to insert.
  236. Data timestamps must be within the provided interval.
  237. 'final': True if this is the last bit of data for this
  238. contiguous interval, False otherwise.
  239. Return value of 'function' is the number of data rows processed.
  240. Unprocessed data will be provided again in a subsequent call
  241. (unless 'final' is True).
  242. If unprocessed data remains after 'final' is True, the interval
  243. being inserted will be ended at the timestamp of the first
  244. unprocessed data point.
  245. """
  246. if args is None:
  247. args = []
  248. extractor = NumpyClient(self.src.url).stream_extract_numpy
  249. inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
  250. for interval in self.intervals():
  251. print "Processing", self.interval_string(interval)
  252. with inserter(self.dest.path,
  253. interval.start, interval.end) as insert_ctx:
  254. insert_function = insert_ctx.insert
  255. old_array = np.array([])
  256. for new_array in extractor(self.src.path,
  257. interval.start, interval.end,
  258. layout = self.src.layout,
  259. maxrows = rows):
  260. # If we still had old data left, combine it
  261. if old_array.shape[0] != 0:
  262. array = np.vstack((old_array, new_array))
  263. else:
  264. array = new_array
  265. # Pass it to the process function
  266. processed = function(array, interval, args,
  267. insert_function, False)
  268. # Send any pending data
  269. insert_ctx.send()
  270. # Save the unprocessed parts
  271. if processed >= 0:
  272. old_array = array[processed:]
  273. else:
  274. raise Exception(
  275. sprintf("%s return value %s must be >= 0",
  276. str(function), str(processed)))
  277. # Warn if there's too much data remaining
  278. if old_array.shape[0] > 3 * rows:
  279. printf("warning: %d unprocessed rows in buffer\n",
  280. old_array.shape[0])
  281. # Last call for this contiguous interval
  282. if old_array.shape[0] != 0:
  283. processed = function(old_array, interval, args,
  284. insert_function, True)
  285. if processed != old_array.shape[0]:
  286. # Truncate the interval we're inserting at the first
  287. # unprocessed data point. This ensures that
  288. # we'll not miss any data when we run again later.
  289. insert_ctx.update_end(old_array[processed][0])
  290. def main(argv = None):
  291. # This is just a dummy function; actual filters can use the other
  292. # functions to prepare stuff, and then do something with the data.
  293. f = Filter()
  294. parser = f.setup_parser()
  295. args = f.parse_args(argv)
  296. for i in f.intervals():
  297. print "Generic filter: need to handle", f.interval_string(i)
  298. if __name__ == "__main__":
  299. main()