You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

370 lines
15 KiB

  1. #!/usr/bin/python
  2. from __future__ import absolute_import
  3. import nilmdb.client
  4. from nilmdb.client import Client
  5. from nilmdb.client.numpyclient import NumpyClient
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  8. timestamp_to_seconds)
  9. from nilmdb.utils.interval import Interval
  10. import nilmtools
  11. import itertools
  12. import time
  13. import sys
  14. import re
  15. import argparse
  16. import numpy as np
  17. import cStringIO
  18. class ArgumentError(Exception):
  19. pass
  20. class MissingDestination(Exception):
  21. def __init__(self, args, src, dest):
  22. self.parsed_args = args
  23. self.src = src
  24. self.dest = dest
  25. Exception.__init__(self, "destination path " + dest.path + " not found")
  26. class StreamInfo(object):
  27. def __init__(self, url, info):
  28. self.url = url
  29. self.info = info
  30. try:
  31. self.path = info[0]
  32. self.layout = info[1]
  33. self.layout_type = self.layout.split('_')[0]
  34. self.layout_count = int(self.layout.split('_')[1])
  35. self.total_count = self.layout_count + 1
  36. self.timestamp_min = info[2]
  37. self.timestamp_max = info[3]
  38. self.rows = info[4]
  39. self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
  40. except IndexError, TypeError:
  41. pass
  42. def string(self, interhost):
  43. """Return stream info as a string. If interhost is true,
  44. include the host URL."""
  45. if interhost:
  46. return sprintf("[%s] ", self.url) + str(self)
  47. return str(self)
  48. def __str__(self):
  49. """Return stream info as a string."""
  50. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  51. self.path, self.layout, self.rows / 1e6,
  52. self.seconds / 3600.0)
  53. def get_stream_info(client, path):
  54. """Return a StreamInfo object about the given path, or None if it
  55. doesn't exist"""
  56. streams = client.stream_list(path, extended = True)
  57. if len(streams) != 1:
  58. return None
  59. return StreamInfo(client.geturl(), streams[0])
  60. class Filter(object):
  61. def __init__(self, parser_description = None):
  62. self._parser = None
  63. self._client_src = None
  64. self._client_dest = None
  65. self._using_client = False
  66. self.src = None
  67. self.dest = None
  68. self.start = None
  69. self.end = None
  70. self.interhost = False
  71. self.force_metadata = False
  72. if parser_description is not None:
  73. self.setup_parser(parser_description)
  74. self.parse_args()
  75. @property
  76. def client_src(self):
  77. if self._using_client:
  78. raise Exception("Filter client is in use; make another")
  79. return self._client_src
  80. @property
  81. def client_dest(self):
  82. if self._using_client:
  83. raise Exception("Filter client is in use; make another")
  84. return self._client_dest
  85. def setup_parser(self, description = "Filter data", skip_paths = False):
  86. parser = argparse.ArgumentParser(
  87. formatter_class = argparse.RawDescriptionHelpFormatter,
  88. version = nilmtools.__version__,
  89. description = description)
  90. group = parser.add_argument_group("General filter arguments")
  91. group.add_argument("-u", "--url", action="store",
  92. default="http://localhost/nilmdb/",
  93. help="Server URL (default: %(default)s)")
  94. group.add_argument("-U", "--dest-url", action="store",
  95. help="Destination server URL "
  96. "(default: same as source)")
  97. group.add_argument("-D", "--dry-run", action="store_true",
  98. default = False,
  99. help="Just print intervals that would be "
  100. "processed")
  101. group.add_argument("--force-metadata", action="store_true",
  102. default = False,
  103. help="Force metadata changes if the dest "
  104. "doesn't match")
  105. group.add_argument("-s", "--start",
  106. metavar="TIME", type=self.arg_time,
  107. help="Starting timestamp for intervals "
  108. "(free-form, inclusive)")
  109. group.add_argument("-e", "--end",
  110. metavar="TIME", type=self.arg_time,
  111. help="Ending timestamp for intervals "
  112. "(free-form, noninclusive)")
  113. if not skip_paths:
  114. # Individual filter scripts might want to add these arguments
  115. # themselves, to include multiple sources in a different order
  116. # (for example). "srcpath" and "destpath" arguments must exist,
  117. # though.
  118. group.add_argument("srcpath", action="store",
  119. help="Path of source stream, e.g. /foo/bar")
  120. group.add_argument("destpath", action="store",
  121. help="Path of destination stream, e.g. /foo/bar")
  122. self._parser = parser
  123. return parser
  124. def interval_string(self, interval):
  125. return sprintf("[ %s -> %s ]",
  126. timestamp_to_human(interval.start),
  127. timestamp_to_human(interval.end))
  128. def set_args(self, url, dest_url, srcpath, destpath, start, end,
  129. parsed_args = None, quiet = True):
  130. """Set arguments directly from parameters"""
  131. if dest_url is None:
  132. dest_url = url
  133. if url != dest_url:
  134. self.interhost = True
  135. self._client_src = Client(url)
  136. self._client_dest = Client(dest_url)
  137. if (not self.interhost) and (srcpath == destpath):
  138. raise ArgumentError("source and destination path must be different")
  139. # Open the streams
  140. self.src = get_stream_info(self._client_src, srcpath)
  141. if not self.src:
  142. raise ArgumentError("source path " + srcpath + " not found")
  143. self.dest = get_stream_info(self._client_dest, destpath)
  144. if not self.dest:
  145. raise MissingDestination(parsed_args, self.src,
  146. StreamInfo(dest_url, [destpath]))
  147. self.start = start
  148. self.end = end
  149. # Print info
  150. if not quiet:
  151. print "Source:", self.src.string(self.interhost)
  152. print " Dest:", self.dest.string(self.interhost)
  153. def parse_args(self, argv = None):
  154. """Parse arguments from a command line"""
  155. args = self._parser.parse_args(argv)
  156. self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
  157. args.start, args.end, quiet = False, parsed_args = args)
  158. self.force_metadata = args.force_metadata
  159. if args.dry_run:
  160. for interval in self.intervals():
  161. print self.interval_string(interval)
  162. raise SystemExit(0)
  163. return args
  164. def intervals(self):
  165. """Generate all the intervals that this filter should process"""
  166. self._using_client = True
  167. if self.interhost:
  168. # Do the difference ourselves
  169. s_intervals = ( Interval(start, end)
  170. for (start, end) in
  171. self._client_src.stream_intervals(
  172. self.src.path,
  173. start = self.start, end = self.end) )
  174. d_intervals = ( Interval(start, end)
  175. for (start, end) in
  176. self._client_dest.stream_intervals(
  177. self.dest.path,
  178. start = self.start, end = self.end) )
  179. intervals = nilmdb.utils.interval.set_difference(s_intervals,
  180. d_intervals)
  181. else:
  182. # Let the server do the difference for us
  183. intervals = ( Interval(start, end)
  184. for (start, end) in
  185. self._client_src.stream_intervals(
  186. self.src.path, diffpath = self.dest.path,
  187. start = self.start, end = self.end) )
  188. # Optimize intervals: join intervals that are adjacent
  189. for interval in nilmdb.utils.interval.optimize(intervals):
  190. yield interval
  191. self._using_client = False
  192. # Misc helpers
  193. def arg_time(self, toparse):
  194. """Parse a time string argument"""
  195. try:
  196. return nilmdb.utils.time.parse_time(toparse)
  197. except ValueError as e:
  198. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  199. str(e), toparse))
  200. def check_dest_metadata(self, data):
  201. """See if the metadata jives, and complain if it doesn't. If
  202. there's no conflict, update the metadata to match 'data'."""
  203. metadata = self._client_dest.stream_get_metadata(self.dest.path)
  204. if not self.force_metadata:
  205. for key in data:
  206. wanted = data[key]
  207. if not isinstance(wanted, basestring):
  208. wanted = str(wanted)
  209. val = metadata.get(key, wanted)
  210. # Force UTF-8 encoding for comparison and display
  211. wanted = wanted.encode('utf-8')
  212. val = val.encode('utf-8')
  213. key = key.encode('utf-8')
  214. if val != wanted and self.dest.rows > 0:
  215. m = "Metadata in destination stream:\n"
  216. m += " %s = %s\n" % (key, val)
  217. m += "doesn't match desired data:\n"
  218. m += " %s = %s\n" % (key, wanted)
  219. m += "Refusing to change it. To prevent this error, "
  220. m += "change or delete the metadata with nilmtool,\n"
  221. m += "remove existing data from the stream, or "
  222. m += "retry with --force-metadata."
  223. raise Exception(m)
  224. # All good -- write the metadata in case it's not already there
  225. self._client_dest.stream_update_metadata(self.dest.path, data)
  226. # Filter processing for a single interval of data.
  227. def process_numpy_interval(self, interval, extractor, insert_ctx,
  228. function, args = None, rows = 100000):
  229. """For the given 'interval' of data, extract data, process it
  230. through 'function', and insert the result.
  231. 'extractor' should be a function like NumpyClient.stream_extract_numpy
  232. 'insert_ctx' should be a class like StreamInserterNumpy, with member
  233. functions 'insert', 'send', and 'update_end'.
  234. See process_numpy for details on 'function', 'args', and 'rows'.
  235. """
  236. if args is None:
  237. args = []
  238. insert_function = insert_ctx.insert
  239. old_array = np.array([])
  240. for new_array in extractor(self.src.path,
  241. interval.start, interval.end,
  242. layout = self.src.layout,
  243. maxrows = rows):
  244. # If we still had old data left, combine it
  245. if old_array.shape[0] != 0:
  246. array = np.vstack((old_array, new_array))
  247. else:
  248. array = new_array
  249. # Pass it to the process function
  250. processed = function(array, interval, args,
  251. insert_function, False)
  252. # Send any pending data
  253. insert_ctx.send()
  254. # Save the unprocessed parts
  255. if processed >= 0:
  256. old_array = array[processed:]
  257. else:
  258. raise Exception(
  259. sprintf("%s return value %s must be >= 0",
  260. str(function), str(processed)))
  261. # Warn if there's too much data remaining
  262. if old_array.shape[0] > 3 * rows:
  263. printf("warning: %d unprocessed rows in buffer\n",
  264. old_array.shape[0])
  265. # Last call for this contiguous interval
  266. if old_array.shape[0] != 0:
  267. processed = function(old_array, interval, args,
  268. insert_function, True)
  269. if processed != old_array.shape[0]:
  270. # Truncate the interval we're inserting at the first
  271. # unprocessed data point. This ensures that
  272. # we'll not miss any data when we run again later.
  273. insert_ctx.update_end(old_array[processed][0])
  274. # The main filter processing method.
  275. def process_numpy(self, function, args = None, rows = 100000):
  276. """Calls process_numpy_interval for each interval that currently
  277. exists in self.src, but doesn't exist in self.dest. It will
  278. process the data in chunks as follows:
  279. For each chunk of data, call 'function' with a Numpy array
  280. corresponding to the data. The data is converted to a Numpy
  281. array in chunks of 'rows' rows at a time.
  282. 'function' should be defined as:
  283. # def function(data, interval, args, insert_func, final)
  284. 'data': array of data to process -- may be empty
  285. 'interval': overall interval we're processing (but not necessarily
  286. the interval of this particular chunk of data)
  287. 'args': opaque arguments passed to process_numpy
  288. 'insert_func': function to call in order to insert array of data.
  289. Should be passed a 2-dimensional array of data to insert.
  290. Data timestamps must be within the provided interval.
  291. 'final': True if this is the last bit of data for this
  292. contiguous interval, False otherwise.
  293. Return value of 'function' is the number of data rows processed.
  294. Unprocessed data will be provided again in a subsequent call
  295. (unless 'final' is True).
  296. If unprocessed data remains after 'final' is True, the interval
  297. being inserted will be ended at the timestamp of the first
  298. unprocessed data point.
  299. """
  300. extractor = NumpyClient(self.src.url).stream_extract_numpy
  301. inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
  302. for interval in self.intervals():
  303. print "Processing", self.interval_string(interval)
  304. with inserter(self.dest.path,
  305. interval.start, interval.end) as insert_ctx:
  306. self.process_numpy_interval(interval, extractor, insert_ctx,
  307. function, args, rows)
  308. def main(argv = None):
  309. # This is just a dummy function; actual filters can use the other
  310. # functions to prepare stuff, and then do something with the data.
  311. f = Filter()
  312. parser = f.setup_parser()
  313. args = f.parse_args(argv)
  314. for i in f.intervals():
  315. print "Generic filter: need to handle", f.interval_string(i)
  316. if __name__ == "__main__":
  317. main()