You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

381 lines
15 KiB

  1. #!/usr/bin/python
  2. from __future__ import absolute_import
  3. import nilmdb.client
  4. from nilmdb.client import Client
  5. from nilmdb.client.numpyclient import NumpyClient
  6. from nilmdb.utils.printf import *
  7. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  8. timestamp_to_seconds)
  9. from nilmdb.utils.interval import Interval
  10. import nilmtools
  11. import itertools
  12. import time
  13. import sys
  14. import re
  15. import argparse
  16. import numpy as np
  17. import cStringIO
  18. import functools
  19. class ArgumentError(Exception):
  20. pass
  21. class MissingDestination(Exception):
  22. def __init__(self, args, src, dest):
  23. self.parsed_args = args
  24. self.src = src
  25. self.dest = dest
  26. Exception.__init__(self, "destination path " + dest.path + " not found")
  27. class StreamInfo(object):
  28. def __init__(self, url, info):
  29. self.url = url
  30. self.info = info
  31. try:
  32. self.path = info[0]
  33. self.layout = info[1]
  34. self.layout_type = self.layout.split('_')[0]
  35. self.layout_count = int(self.layout.split('_')[1])
  36. self.total_count = self.layout_count + 1
  37. self.timestamp_min = info[2]
  38. self.timestamp_max = info[3]
  39. self.rows = info[4]
  40. self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
  41. except IndexError, TypeError:
  42. pass
  43. def string(self, interhost):
  44. """Return stream info as a string. If interhost is true,
  45. include the host URL."""
  46. if interhost:
  47. return sprintf("[%s] ", self.url) + str(self)
  48. return str(self)
  49. def __str__(self):
  50. """Return stream info as a string."""
  51. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  52. self.path, self.layout, self.rows / 1e6,
  53. self.seconds / 3600.0)
  54. def get_stream_info(client, path):
  55. """Return a StreamInfo object about the given path, or None if it
  56. doesn't exist"""
  57. streams = client.stream_list(path, extended = True)
  58. if len(streams) != 1:
  59. return None
  60. return StreamInfo(client.geturl(), streams[0])
  61. # Filter processing for a single interval of data.
  62. def process_numpy_interval(interval, extractor, inserter, warn_rows,
  63. function, args = None):
  64. """For the given 'interval' of data, extract data, process it
  65. through 'function', and insert the result.
  66. 'extractor' should be a function like NumpyClient.stream_extract_numpy
  67. but with the the interval 'start' and 'end' as the only parameters,
  68. e.g.:
  69. extractor = functools.partial(NumpyClient.stream_extract_numpy,
  70. src_path, layout = l, maxrows = m)
  71. 'inserter' should be a function like NumpyClient.stream_insert_context
  72. but with the interval 'start' and 'end' as the only parameters, e.g.:
  73. inserter = functools.partial(NumpyClient.stream_insert_context,
  74. dest_path)
  75. If 'warn_rows' is not None, print a warning to stdout when the
  76. number of unprocessed rows exceeds this amount.
  77. See process_numpy for details on 'function' and 'args'.
  78. """
  79. if args is None:
  80. args = []
  81. with inserter(interval.start, interval.end) as insert_ctx:
  82. insert_func = insert_ctx.insert
  83. old_array = np.array([])
  84. for new_array in extractor(interval.start, interval.end):
  85. # If we still had old data left, combine it
  86. if old_array.shape[0] != 0:
  87. array = np.vstack((old_array, new_array))
  88. else:
  89. array = new_array
  90. # Pass the data to the user provided function
  91. processed = function(array, interval, args, insert_func, False)
  92. # Send any pending data that the user function inserted
  93. insert_ctx.send()
  94. # Save the unprocessed parts
  95. if processed >= 0:
  96. old_array = array[processed:]
  97. else:
  98. raise Exception(
  99. sprintf("%s return value %s must be >= 0",
  100. str(function), str(processed)))
  101. # Warn if there's too much data remaining
  102. if warn_rows is not None and old_array.shape[0] > warn_rows:
  103. printf("warning: %d unprocessed rows in buffer\n",
  104. old_array.shape[0])
  105. # Last call for this contiguous interval
  106. if old_array.shape[0] != 0:
  107. processed = function(old_array, interval, args,
  108. insert_func, True)
  109. if processed != old_array.shape[0]:
  110. # Truncate the interval we're inserting at the first
  111. # unprocessed data point. This ensures that
  112. # we'll not miss any data when we run again later.
  113. insert_ctx.update_end(old_array[processed][0])
  114. class Filter(object):
  115. def __init__(self, parser_description = None):
  116. self._parser = None
  117. self._client_src = None
  118. self._client_dest = None
  119. self._using_client = False
  120. self.src = None
  121. self.dest = None
  122. self.start = None
  123. self.end = None
  124. self.interhost = False
  125. self.force_metadata = False
  126. if parser_description is not None:
  127. self.setup_parser(parser_description)
  128. self.parse_args()
  129. @property
  130. def client_src(self):
  131. if self._using_client:
  132. raise Exception("Filter client is in use; make another")
  133. return self._client_src
  134. @property
  135. def client_dest(self):
  136. if self._using_client:
  137. raise Exception("Filter client is in use; make another")
  138. return self._client_dest
  139. def setup_parser(self, description = "Filter data", skip_paths = False):
  140. parser = argparse.ArgumentParser(
  141. formatter_class = argparse.RawDescriptionHelpFormatter,
  142. version = nilmtools.__version__,
  143. description = description)
  144. group = parser.add_argument_group("General filter arguments")
  145. group.add_argument("-u", "--url", action="store",
  146. default="http://localhost/nilmdb/",
  147. help="Server URL (default: %(default)s)")
  148. group.add_argument("-U", "--dest-url", action="store",
  149. help="Destination server URL "
  150. "(default: same as source)")
  151. group.add_argument("-D", "--dry-run", action="store_true",
  152. default = False,
  153. help="Just print intervals that would be "
  154. "processed")
  155. group.add_argument("--force-metadata", action="store_true",
  156. default = False,
  157. help="Force metadata changes if the dest "
  158. "doesn't match")
  159. group.add_argument("-s", "--start",
  160. metavar="TIME", type=self.arg_time,
  161. help="Starting timestamp for intervals "
  162. "(free-form, inclusive)")
  163. group.add_argument("-e", "--end",
  164. metavar="TIME", type=self.arg_time,
  165. help="Ending timestamp for intervals "
  166. "(free-form, noninclusive)")
  167. if not skip_paths:
  168. # Individual filter scripts might want to add these arguments
  169. # themselves, to include multiple sources in a different order
  170. # (for example). "srcpath" and "destpath" arguments must exist,
  171. # though.
  172. group.add_argument("srcpath", action="store",
  173. help="Path of source stream, e.g. /foo/bar")
  174. group.add_argument("destpath", action="store",
  175. help="Path of destination stream, e.g. /foo/bar")
  176. self._parser = parser
  177. return parser
  178. def set_args(self, url, dest_url, srcpath, destpath, start, end,
  179. parsed_args = None, quiet = True):
  180. """Set arguments directly from parameters"""
  181. if dest_url is None:
  182. dest_url = url
  183. if url != dest_url:
  184. self.interhost = True
  185. self._client_src = Client(url)
  186. self._client_dest = Client(dest_url)
  187. if (not self.interhost) and (srcpath == destpath):
  188. raise ArgumentError("source and destination path must be different")
  189. # Open the streams
  190. self.src = get_stream_info(self._client_src, srcpath)
  191. if not self.src:
  192. raise ArgumentError("source path " + srcpath + " not found")
  193. self.dest = get_stream_info(self._client_dest, destpath)
  194. if not self.dest:
  195. raise MissingDestination(parsed_args, self.src,
  196. StreamInfo(dest_url, [destpath]))
  197. self.start = start
  198. self.end = end
  199. # Print info
  200. if not quiet:
  201. print "Source:", self.src.string(self.interhost)
  202. print " Dest:", self.dest.string(self.interhost)
  203. def parse_args(self, argv = None):
  204. """Parse arguments from a command line"""
  205. args = self._parser.parse_args(argv)
  206. self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
  207. args.start, args.end, quiet = False, parsed_args = args)
  208. self.force_metadata = args.force_metadata
  209. if args.dry_run:
  210. for interval in self.intervals():
  211. print interval.human_string()
  212. raise SystemExit(0)
  213. return args
  214. def intervals(self):
  215. """Generate all the intervals that this filter should process"""
  216. self._using_client = True
  217. if self.interhost:
  218. # Do the difference ourselves
  219. s_intervals = ( Interval(start, end)
  220. for (start, end) in
  221. self._client_src.stream_intervals(
  222. self.src.path,
  223. start = self.start, end = self.end) )
  224. d_intervals = ( Interval(start, end)
  225. for (start, end) in
  226. self._client_dest.stream_intervals(
  227. self.dest.path,
  228. start = self.start, end = self.end) )
  229. intervals = nilmdb.utils.interval.set_difference(s_intervals,
  230. d_intervals)
  231. else:
  232. # Let the server do the difference for us
  233. intervals = ( Interval(start, end)
  234. for (start, end) in
  235. self._client_src.stream_intervals(
  236. self.src.path, diffpath = self.dest.path,
  237. start = self.start, end = self.end) )
  238. # Optimize intervals: join intervals that are adjacent
  239. for interval in nilmdb.utils.interval.optimize(intervals):
  240. yield interval
  241. self._using_client = False
  242. # Misc helpers
  243. @staticmethod
  244. def arg_time(toparse):
  245. """Parse a time string argument"""
  246. try:
  247. return nilmdb.utils.time.parse_time(toparse)
  248. except ValueError as e:
  249. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  250. str(e), toparse))
  251. def check_dest_metadata(self, data):
  252. """See if the metadata jives, and complain if it doesn't. If
  253. there's no conflict, update the metadata to match 'data'."""
  254. metadata = self._client_dest.stream_get_metadata(self.dest.path)
  255. if not self.force_metadata:
  256. for key in data:
  257. wanted = data[key]
  258. if not isinstance(wanted, basestring):
  259. wanted = str(wanted)
  260. val = metadata.get(key, wanted)
  261. # Force UTF-8 encoding for comparison and display
  262. wanted = wanted.encode('utf-8')
  263. val = val.encode('utf-8')
  264. key = key.encode('utf-8')
  265. if val != wanted and self.dest.rows > 0:
  266. m = "Metadata in destination stream:\n"
  267. m += " %s = %s\n" % (key, val)
  268. m += "doesn't match desired data:\n"
  269. m += " %s = %s\n" % (key, wanted)
  270. m += "Refusing to change it. To prevent this error, "
  271. m += "change or delete the metadata with nilmtool,\n"
  272. m += "remove existing data from the stream, or "
  273. m += "retry with --force-metadata."
  274. raise Exception(m)
  275. # All good -- write the metadata in case it's not already there
  276. self._client_dest.stream_update_metadata(self.dest.path, data)
  277. # The main filter processing method.
  278. def process_numpy(self, function, args = None, rows = 100000,
  279. intervals = None):
  280. """Calls process_numpy_interval for each interval that currently
  281. exists in self.src, but doesn't exist in self.dest. It will
  282. process the data in chunks as follows:
  283. For each chunk of data, call 'function' with a Numpy array
  284. corresponding to the data. The data is converted to a Numpy
  285. array in chunks of 'rows' rows at a time.
  286. If 'intervals' is not None, process those intervals instead of
  287. the default list.
  288. 'function' should be defined as:
  289. # def function(data, interval, args, insert_func, final)
  290. 'data': array of data to process -- may be empty
  291. 'interval': overall interval we're processing (but not necessarily
  292. the interval of this particular chunk of data)
  293. 'args': opaque arguments passed to process_numpy
  294. 'insert_func': function to call in order to insert array of data.
  295. Should be passed a 2-dimensional array of data to insert.
  296. Data timestamps must be within the provided interval.
  297. 'final': True if this is the last bit of data for this
  298. contiguous interval, False otherwise.
  299. Return value of 'function' is the number of data rows processed.
  300. Unprocessed data will be provided again in a subsequent call
  301. (unless 'final' is True).
  302. If unprocessed data remains after 'final' is True, the interval
  303. being inserted will be ended at the timestamp of the first
  304. unprocessed data point.
  305. """
  306. extractor = NumpyClient(self.src.url).stream_extract_numpy
  307. inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
  308. extractor_func = functools.partial(extractor, self.src.path,
  309. layout = self.src.layout,
  310. maxrows = rows)
  311. inserter_func = functools.partial(inserter, self.dest.path)
  312. for interval in (intervals or self.intervals()):
  313. print "Processing", interval.human_string()
  314. process_numpy_interval(interval, extractor_func, inserter_func,
  315. rows * 3, function, args)
  316. def main(argv = None):
  317. # This is just a dummy function; actual filters can use the other
  318. # functions to prepare stuff, and then do something with the data.
  319. f = Filter()
  320. parser = f.setup_parser()
  321. args = f.parse_args(argv)
  322. for i in f.intervals():
  323. print "Generic filter: need to handle", i.human_string()
  324. if __name__ == "__main__":
  325. main()