You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

390 lines
16 KiB

  1. #!/usr/bin/python
  2. from __future__ import absolute_import
  3. import nilmdb.client
  4. from nilmdb.client import Client
  5. from nilmdb.client.numpyclient import NumpyClient
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  8. timestamp_to_seconds)
  9. from nilmdb.utils.interval import Interval
  10. import nilmtools
  11. import itertools
  12. import time
  13. import sys
  14. import re
  15. import argparse
  16. import numpy as np
  17. import cStringIO
  18. import functools
  19. class ArgumentError(Exception):
  20. pass
  21. class MissingDestination(Exception):
  22. def __init__(self, args, src, dest):
  23. self.parsed_args = args
  24. self.src = src
  25. self.dest = dest
  26. Exception.__init__(self, "destination path " + dest.path + " not found")
  27. class StreamInfo(object):
  28. def __init__(self, url, info):
  29. self.url = url
  30. self.info = info
  31. try:
  32. self.path = info[0]
  33. self.layout = info[1]
  34. self.layout_type = self.layout.split('_')[0]
  35. self.layout_count = int(self.layout.split('_')[1])
  36. self.total_count = self.layout_count + 1
  37. self.timestamp_min = info[2]
  38. self.timestamp_max = info[3]
  39. self.rows = info[4]
  40. self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
  41. except IndexError, TypeError:
  42. pass
  43. def string(self, interhost):
  44. """Return stream info as a string. If interhost is true,
  45. include the host URL."""
  46. if interhost:
  47. return sprintf("[%s] ", self.url) + str(self)
  48. return str(self)
  49. def __str__(self):
  50. """Return stream info as a string."""
  51. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  52. self.path, self.layout, self.rows / 1e6,
  53. self.seconds / 3600.0)
  54. def get_stream_info(client, path):
  55. """Return a StreamInfo object about the given path, or None if it
  56. doesn't exist"""
  57. streams = client.stream_list(path, extended = True)
  58. if len(streams) != 1:
  59. return None
  60. return StreamInfo(client.geturl(), streams[0])
  61. # Filter processing for a single interval of data.
  62. def process_numpy_interval(interval, extractor, inserter, warn_rows,
  63. function, args = None):
  64. """For the given 'interval' of data, extract data, process it
  65. through 'function', and insert the result.
  66. 'extractor' should be a function like NumpyClient.stream_extract_numpy
  67. but with the the interval 'start' and 'end' as the only parameters,
  68. e.g.:
  69. extractor = functools.partial(NumpyClient.stream_extract_numpy,
  70. src_path, layout = l, maxrows = m)
  71. 'inserter' should be a function like NumpyClient.stream_insert_context
  72. but with the interval 'start' and 'end' as the only parameters, e.g.:
  73. inserter = functools.partial(NumpyClient.stream_insert_context,
  74. dest_path)
  75. If 'warn_rows' is not None, print a warning to stdout when the
  76. number of unprocessed rows exceeds this amount.
  77. See process_numpy for details on 'function' and 'args'.
  78. """
  79. if args is None:
  80. args = []
  81. with inserter(interval.start, interval.end) as insert_ctx:
  82. insert_func = insert_ctx.insert
  83. old_array = np.array([])
  84. for new_array in extractor(interval.start, interval.end):
  85. # If we still had old data left, combine it
  86. if old_array.shape[0] != 0:
  87. array = np.vstack((old_array, new_array))
  88. else:
  89. array = new_array
  90. # Pass the data to the user provided function
  91. processed = function(array, interval, args, insert_func, False)
  92. # Send any pending data that the user function inserted
  93. insert_ctx.send()
  94. # Save the unprocessed parts
  95. if processed >= 0:
  96. old_array = array[processed:]
  97. else:
  98. raise Exception(
  99. sprintf("%s return value %s must be >= 0",
  100. str(function), str(processed)))
  101. # Warn if there's too much data remaining
  102. if warn_rows is not None and old_array.shape[0] > warn_rows:
  103. printf("warning: %d unprocessed rows in buffer\n",
  104. old_array.shape[0])
  105. # Last call for this contiguous interval
  106. if old_array.shape[0] != 0:
  107. processed = function(old_array, interval, args,
  108. insert_func, True)
  109. if processed != old_array.shape[0]:
  110. # Truncate the interval we're inserting at the first
  111. # unprocessed data point. This ensures that
  112. # we'll not miss any data when we run again later.
  113. insert_ctx.update_end(old_array[processed][0])
  114. def example_callback_function(data, interval, args, insert_func, final):
  115. """Example of the signature for the function that gets passed
  116. to process_numpy_interval.
  117. 'data': array of data to process -- may be empty
  118. 'interval': overall interval we're processing (but not necessarily
  119. the interval of this particular chunk of data)
  120. 'args': opaque arguments passed to process_numpy
  121. 'insert_func': function to call in order to insert array of data.
  122. Should be passed a 2-dimensional array of data to insert.
  123. Data timestamps must be within the provided interval.
  124. 'final': True if this is the last bit of data for this
  125. contiguous interval, False otherwise.
  126. Return value of 'function' is the number of data rows processed.
  127. Unprocessed data will be provided again in a subsequent call
  128. (unless 'final' is True).
  129. If unprocessed data remains after 'final' is True, the interval
  130. being inserted will be ended at the timestamp of the first
  131. unprocessed data point.
  132. """
  133. raise NotImplementedError("example_callback_function does nothing")
  134. class Filter(object):
  135. def __init__(self, parser_description = None):
  136. self._parser = None
  137. self._client_src = None
  138. self._client_dest = None
  139. self._using_client = False
  140. self.src = None
  141. self.dest = None
  142. self.start = None
  143. self.end = None
  144. self._interhost = False
  145. self._force_metadata = False
  146. if parser_description is not None:
  147. self.setup_parser(parser_description)
  148. self.parse_args()
  149. @property
  150. def client_src(self):
  151. if self._using_client:
  152. raise Exception("Filter client is in use; make another")
  153. return self._client_src
  154. @property
  155. def client_dest(self):
  156. if self._using_client:
  157. raise Exception("Filter client is in use; make another")
  158. return self._client_dest
  159. def setup_parser(self, description = "Filter data", skip_paths = False):
  160. parser = argparse.ArgumentParser(
  161. formatter_class = argparse.RawDescriptionHelpFormatter,
  162. version = nilmtools.__version__,
  163. description = description)
  164. group = parser.add_argument_group("General filter arguments")
  165. group.add_argument("-u", "--url", action="store",
  166. default="http://localhost/nilmdb/",
  167. help="Server URL (default: %(default)s)")
  168. group.add_argument("-U", "--dest-url", action="store",
  169. help="Destination server URL "
  170. "(default: same as source)")
  171. group.add_argument("-D", "--dry-run", action="store_true",
  172. default = False,
  173. help="Just print intervals that would be "
  174. "processed")
  175. group.add_argument("-F", "--force-metadata", action="store_true",
  176. default = False,
  177. help="Force metadata changes if the dest "
  178. "doesn't match")
  179. group.add_argument("-s", "--start",
  180. metavar="TIME", type=self.arg_time,
  181. help="Starting timestamp for intervals "
  182. "(free-form, inclusive)")
  183. group.add_argument("-e", "--end",
  184. metavar="TIME", type=self.arg_time,
  185. help="Ending timestamp for intervals "
  186. "(free-form, noninclusive)")
  187. if not skip_paths:
  188. # Individual filter scripts might want to add these arguments
  189. # themselves, to include multiple sources in a different order
  190. # (for example). "srcpath" and "destpath" arguments must exist,
  191. # though.
  192. group.add_argument("srcpath", action="store",
  193. help="Path of source stream, e.g. /foo/bar")
  194. group.add_argument("destpath", action="store",
  195. help="Path of destination stream, e.g. /foo/bar")
  196. self._parser = parser
  197. return parser
  198. def set_args(self, url, dest_url, srcpath, destpath, start, end,
  199. parsed_args = None, quiet = True):
  200. """Set arguments directly from parameters"""
  201. if dest_url is None:
  202. dest_url = url
  203. if url != dest_url:
  204. self._interhost = True
  205. self._client_src = Client(url)
  206. self._client_dest = Client(dest_url)
  207. if (not self._interhost) and (srcpath == destpath):
  208. raise ArgumentError("source and destination path must be different")
  209. # Open the streams
  210. self.src = get_stream_info(self._client_src, srcpath)
  211. if not self.src:
  212. raise ArgumentError("source path " + srcpath + " not found")
  213. self.dest = get_stream_info(self._client_dest, destpath)
  214. if not self.dest:
  215. raise MissingDestination(parsed_args, self.src,
  216. StreamInfo(dest_url, [destpath]))
  217. self.start = start
  218. self.end = end
  219. # Print info
  220. if not quiet:
  221. print "Source:", self.src.string(self._interhost)
  222. print " Dest:", self.dest.string(self._interhost)
  223. def parse_args(self, argv = None):
  224. """Parse arguments from a command line"""
  225. args = self._parser.parse_args(argv)
  226. self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
  227. args.start, args.end, quiet = False, parsed_args = args)
  228. self._force_metadata = args.force_metadata
  229. if args.dry_run:
  230. for interval in self.intervals():
  231. print interval.human_string()
  232. raise SystemExit(0)
  233. return args
  234. def intervals(self):
  235. """Generate all the intervals that this filter should process"""
  236. self._using_client = True
  237. if self._interhost:
  238. # Do the difference ourselves
  239. s_intervals = ( Interval(start, end)
  240. for (start, end) in
  241. self._client_src.stream_intervals(
  242. self.src.path,
  243. start = self.start, end = self.end) )
  244. d_intervals = ( Interval(start, end)
  245. for (start, end) in
  246. self._client_dest.stream_intervals(
  247. self.dest.path,
  248. start = self.start, end = self.end) )
  249. intervals = nilmdb.utils.interval.set_difference(s_intervals,
  250. d_intervals)
  251. else:
  252. # Let the server do the difference for us
  253. intervals = ( Interval(start, end)
  254. for (start, end) in
  255. self._client_src.stream_intervals(
  256. self.src.path, diffpath = self.dest.path,
  257. start = self.start, end = self.end) )
  258. # Optimize intervals: join intervals that are adjacent
  259. for interval in nilmdb.utils.interval.optimize(intervals):
  260. yield interval
  261. self._using_client = False
  262. # Misc helpers
  263. @staticmethod
  264. def arg_time(toparse):
  265. """Parse a time string argument"""
  266. try:
  267. return nilmdb.utils.time.parse_time(toparse)
  268. except ValueError as e:
  269. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  270. str(e), toparse))
  271. def check_dest_metadata(self, data):
  272. """See if the metadata jives, and complain if it doesn't. For
  273. each key in data, if the stream contains the key, it must match
  274. values. If the stream does not contain the key, it is created."""
  275. metadata = self._client_dest.stream_get_metadata(self.dest.path)
  276. if not self._force_metadata:
  277. for key in data:
  278. wanted = data[key]
  279. if not isinstance(wanted, basestring):
  280. wanted = str(wanted)
  281. val = metadata.get(key, wanted)
  282. # Force UTF-8 encoding for comparison and display
  283. wanted = wanted.encode('utf-8')
  284. val = val.encode('utf-8')
  285. key = key.encode('utf-8')
  286. if val != wanted and self.dest.rows > 0:
  287. m = "Metadata in destination stream:\n"
  288. m += " %s = %s\n" % (key, val)
  289. m += "doesn't match desired data:\n"
  290. m += " %s = %s\n" % (key, wanted)
  291. m += "Refusing to change it. To prevent this error, "
  292. m += "change or delete the metadata with nilmtool,\n"
  293. m += "remove existing data from the stream, or "
  294. m += "retry with --force-metadata."
  295. raise Exception(m)
  296. # All good -- write the metadata in case it's not already there
  297. self._client_dest.stream_update_metadata(self.dest.path, data)
  298. # The main filter processing method.
  299. def process_numpy(self, function, args = None, rows = 100000,
  300. intervals = None):
  301. """Calls process_numpy_interval for each interval that currently
  302. exists in self.src, but doesn't exist in self.dest. It will
  303. process the data in chunks as follows:
  304. For each chunk of data, call 'function' with a Numpy array
  305. corresponding to the data. The data is converted to a Numpy
  306. array in chunks of 'rows' rows at a time.
  307. If 'intervals' is not None, process those intervals instead of
  308. the default list.
  309. 'function' should be defined with the same interface as
  310. nilmtools.filter.example_callback_function. See the
  311. documentation of that for details. 'args' are passed to
  312. 'function'.
  313. """
  314. extractor = NumpyClient(self.src.url).stream_extract_numpy
  315. inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
  316. extractor_func = functools.partial(extractor, self.src.path,
  317. layout = self.src.layout,
  318. maxrows = rows)
  319. inserter_func = functools.partial(inserter, self.dest.path)
  320. for interval in (intervals or self.intervals()):
  321. print "Processing", interval.human_string()
  322. process_numpy_interval(interval, extractor_func, inserter_func,
  323. rows * 3, function, args)
  324. def main(argv = None):
  325. # This is just a dummy function; actual filters can use the other
  326. # functions to prepare stuff, and then do something with the data.
  327. f = Filter()
  328. parser = f.setup_parser()
  329. args = f.parse_args(argv)
  330. for i in f.intervals():
  331. print "Generic filter: need to handle", i.human_string()
  332. if __name__ == "__main__":
  333. main()