You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

392 lines
16 KiB

  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.client import Client
  4. from nilmdb.client.numpyclient import NumpyClient
  5. from nilmdb.utils.printf import *
  6. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  7. timestamp_to_seconds)
  8. from nilmdb.utils.interval import Interval
  9. import nilmtools
  10. import itertools
  11. import time
  12. import sys
  13. import os
  14. import re
  15. import argparse
  16. import numpy as np
  17. import io
  18. import functools
  19. class ArgumentError(Exception):
  20. pass
  21. class MissingDestination(Exception):
  22. def __init__(self, args, src, dest):
  23. self.parsed_args = args
  24. self.src = src
  25. self.dest = dest
  26. Exception.__init__(self, "destination path " + dest.path + " not found")
  27. class StreamInfo(object):
  28. def __init__(self, url, info):
  29. self.url = url
  30. self.info = info
  31. try:
  32. self.path = info[0]
  33. self.layout = info[1]
  34. self.layout_type = self.layout.split('_')[0]
  35. self.layout_count = int(self.layout.split('_')[1])
  36. self.total_count = self.layout_count + 1
  37. self.timestamp_min = info[2]
  38. self.timestamp_max = info[3]
  39. self.rows = info[4]
  40. self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
  41. except IndexError as TypeError:
  42. pass
  43. def string(self, interhost):
  44. """Return stream info as a string. If interhost is true,
  45. include the host URL."""
  46. if interhost:
  47. return sprintf("[%s] ", self.url) + str(self)
  48. return str(self)
  49. def __str__(self):
  50. """Return stream info as a string."""
  51. return sprintf("%s (%s), %.2fM rows, %.2f hours",
  52. self.path, self.layout, self.rows / 1e6,
  53. self.seconds / 3600.0)
  54. def get_stream_info(client, path):
  55. """Return a StreamInfo object about the given path, or None if it
  56. doesn't exist"""
  57. streams = client.stream_list(path, extended = True)
  58. if len(streams) != 1:
  59. return None
  60. return StreamInfo(client.geturl(), streams[0])
  61. # Filter processing for a single interval of data.
  62. def process_numpy_interval(interval, extractor, inserter, warn_rows,
  63. function, args = None):
  64. """For the given 'interval' of data, extract data, process it
  65. through 'function', and insert the result.
  66. 'extractor' should be a function like NumpyClient.stream_extract_numpy
  67. but with the the interval 'start' and 'end' as the only parameters,
  68. e.g.:
  69. extractor = functools.partial(NumpyClient.stream_extract_numpy,
  70. src_path, layout = l, maxrows = m)
  71. 'inserter' should be a function like NumpyClient.stream_insert_context
  72. but with the interval 'start' and 'end' as the only parameters, e.g.:
  73. inserter = functools.partial(NumpyClient.stream_insert_context,
  74. dest_path)
  75. If 'warn_rows' is not None, print a warning to stdout when the
  76. number of unprocessed rows exceeds this amount.
  77. See process_numpy for details on 'function' and 'args'.
  78. """
  79. if args is None:
  80. args = []
  81. with inserter(interval.start, interval.end) as insert_ctx:
  82. insert_func = insert_ctx.insert
  83. old_array = np.array([])
  84. for new_array in extractor(interval.start, interval.end):
  85. # If we still had old data left, combine it
  86. if old_array.shape[0] != 0:
  87. array = np.vstack((old_array, new_array))
  88. else:
  89. array = new_array
  90. # Pass the data to the user provided function
  91. processed = function(array, interval, args, insert_func, False)
  92. # Send any pending data that the user function inserted
  93. insert_ctx.send()
  94. # Save the unprocessed parts
  95. if processed >= 0:
  96. old_array = array[processed:]
  97. else:
  98. raise Exception(
  99. sprintf("%s return value %s must be >= 0",
  100. str(function), str(processed)))
  101. # Warn if there's too much data remaining
  102. if warn_rows is not None and old_array.shape[0] > warn_rows:
  103. printf("warning: %d unprocessed rows in buffer\n",
  104. old_array.shape[0])
  105. # Last call for this contiguous interval
  106. if old_array.shape[0] != 0:
  107. processed = function(old_array, interval, args,
  108. insert_func, True)
  109. if processed != old_array.shape[0]:
  110. # Truncate the interval we're inserting at the first
  111. # unprocessed data point. This ensures that
  112. # we'll not miss any data when we run again later.
  113. insert_ctx.update_end(old_array[processed][0])
  114. def example_callback_function(data, interval, args, insert_func, final):
  115. """Example of the signature for the function that gets passed
  116. to process_numpy_interval.
  117. 'data': array of data to process -- may be empty
  118. 'interval': overall interval we're processing (but not necessarily
  119. the interval of this particular chunk of data)
  120. 'args': opaque arguments passed to process_numpy
  121. 'insert_func': function to call in order to insert array of data.
  122. Should be passed a 2-dimensional array of data to insert.
  123. Data timestamps must be within the provided interval.
  124. 'final': True if this is the last bit of data for this
  125. contiguous interval, False otherwise.
  126. Return value of 'function' is the number of data rows processed.
  127. Unprocessed data will be provided again in a subsequent call
  128. (unless 'final' is True).
  129. If unprocessed data remains after 'final' is True, the interval
  130. being inserted will be ended at the timestamp of the first
  131. unprocessed data point.
  132. """
  133. raise NotImplementedError("example_callback_function does nothing")
  134. class Filter(object):
  135. def __init__(self, parser_description = None):
  136. self._parser = None
  137. self._client_src = None
  138. self._client_dest = None
  139. self._using_client = False
  140. self.src = None
  141. self.dest = None
  142. self.start = None
  143. self.end = None
  144. self._interhost = False
  145. self._force_metadata = False
  146. if parser_description is not None:
  147. self.setup_parser(parser_description)
  148. self.parse_args()
  149. self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
  150. @property
  151. def client_src(self):
  152. if self._using_client:
  153. raise Exception("Filter client is in use; make another")
  154. return self._client_src
  155. @property
  156. def client_dest(self):
  157. if self._using_client:
  158. raise Exception("Filter client is in use; make another")
  159. return self._client_dest
  160. def setup_parser(self, description = "Filter data", skip_paths = False):
  161. parser = argparse.ArgumentParser(
  162. formatter_class = argparse.RawDescriptionHelpFormatter,
  163. description = description)
  164. group = parser.add_argument_group("General filter arguments")
  165. group.add_argument("-u", "--url", action="store",
  166. default=self.def_url,
  167. help="Server URL (default: %(default)s)")
  168. group.add_argument("-U", "--dest-url", action="store",
  169. help="Destination server URL "
  170. "(default: same as source)")
  171. group.add_argument("-D", "--dry-run", action="store_true",
  172. default = False,
  173. help="Just print intervals that would be "
  174. "processed")
  175. group.add_argument("-F", "--force-metadata", action="store_true",
  176. default = False,
  177. help="Force metadata changes if the dest "
  178. "doesn't match")
  179. group.add_argument("-s", "--start",
  180. metavar="TIME", type=self.arg_time,
  181. help="Starting timestamp for intervals "
  182. "(free-form, inclusive)")
  183. group.add_argument("-e", "--end",
  184. metavar="TIME", type=self.arg_time,
  185. help="Ending timestamp for intervals "
  186. "(free-form, noninclusive)")
  187. group.add_argument("-v", "--version", action="version",
  188. version = nilmtools.__version__)
  189. if not skip_paths:
  190. # Individual filter scripts might want to add these arguments
  191. # themselves, to include multiple sources in a different order
  192. # (for example). "srcpath" and "destpath" arguments must exist,
  193. # though.
  194. group.add_argument("srcpath", action="store",
  195. help="Path of source stream, e.g. /foo/bar")
  196. group.add_argument("destpath", action="store",
  197. help="Path of destination stream, e.g. /foo/bar")
  198. self._parser = parser
  199. return parser
  200. def set_args(self, url, dest_url, srcpath, destpath, start, end,
  201. parsed_args = None, quiet = True):
  202. """Set arguments directly from parameters"""
  203. if dest_url is None:
  204. dest_url = url
  205. if url != dest_url:
  206. self._interhost = True
  207. self._client_src = Client(url)
  208. self._client_dest = Client(dest_url)
  209. if (not self._interhost) and (srcpath == destpath):
  210. raise ArgumentError("source and destination path must be different")
  211. # Open the streams
  212. self.src = get_stream_info(self._client_src, srcpath)
  213. if not self.src:
  214. raise ArgumentError("source path " + srcpath + " not found")
  215. self.dest = get_stream_info(self._client_dest, destpath)
  216. if not self.dest:
  217. raise MissingDestination(parsed_args, self.src,
  218. StreamInfo(dest_url, [destpath]))
  219. self.start = start
  220. self.end = end
  221. # Print info
  222. if not quiet:
  223. print("Source:", self.src.string(self._interhost))
  224. print(" Dest:", self.dest.string(self._interhost))
  225. def parse_args(self, argv = None):
  226. """Parse arguments from a command line"""
  227. args = self._parser.parse_args(argv)
  228. self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
  229. args.start, args.end, quiet = False, parsed_args = args)
  230. self._force_metadata = args.force_metadata
  231. if args.dry_run:
  232. for interval in self.intervals():
  233. print(interval.human_string())
  234. raise SystemExit(0)
  235. return args
  236. def intervals(self):
  237. """Generate all the intervals that this filter should process"""
  238. self._using_client = True
  239. if self._interhost:
  240. # Do the difference ourselves
  241. s_intervals = ( Interval(start, end)
  242. for (start, end) in
  243. self._client_src.stream_intervals(
  244. self.src.path,
  245. start = self.start, end = self.end) )
  246. d_intervals = ( Interval(start, end)
  247. for (start, end) in
  248. self._client_dest.stream_intervals(
  249. self.dest.path,
  250. start = self.start, end = self.end) )
  251. intervals = nilmdb.utils.interval.set_difference(s_intervals,
  252. d_intervals)
  253. else:
  254. # Let the server do the difference for us
  255. intervals = ( Interval(start, end)
  256. for (start, end) in
  257. self._client_src.stream_intervals(
  258. self.src.path, diffpath = self.dest.path,
  259. start = self.start, end = self.end) )
  260. # Optimize intervals: join intervals that are adjacent
  261. for interval in nilmdb.utils.interval.optimize(intervals):
  262. yield interval
  263. self._using_client = False
  264. # Misc helpers
  265. @staticmethod
  266. def arg_time(toparse):
  267. """Parse a time string argument"""
  268. try:
  269. return nilmdb.utils.time.parse_time(toparse)
  270. except ValueError as e:
  271. raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
  272. str(e), toparse))
  273. def check_dest_metadata(self, data):
  274. """See if the metadata jives, and complain if it doesn't. For
  275. each key in data, if the stream contains the key, it must match
  276. values. If the stream does not contain the key, it is created."""
  277. metadata = self._client_dest.stream_get_metadata(self.dest.path)
  278. if not self._force_metadata:
  279. for key in data:
  280. wanted = data[key]
  281. if not isinstance(wanted, str):
  282. wanted = str(wanted)
  283. val = metadata.get(key, wanted)
  284. # Force UTF-8 encoding for comparison and display
  285. wanted = wanted.encode('utf-8')
  286. val = val.encode('utf-8')
  287. key = key.encode('utf-8')
  288. if val != wanted and self.dest.rows > 0:
  289. m = "Metadata in destination stream:\n"
  290. m += " %s = %s\n" % (key, val)
  291. m += "doesn't match desired data:\n"
  292. m += " %s = %s\n" % (key, wanted)
  293. m += "Refusing to change it. To prevent this error, "
  294. m += "change or delete the metadata with nilmtool,\n"
  295. m += "remove existing data from the stream, or "
  296. m += "retry with --force-metadata."
  297. raise Exception(m)
  298. # All good -- write the metadata in case it's not already there
  299. self._client_dest.stream_update_metadata(self.dest.path, data)
  300. # The main filter processing method.
  301. def process_numpy(self, function, args = None, rows = 100000,
  302. intervals = None):
  303. """Calls process_numpy_interval for each interval that currently
  304. exists in self.src, but doesn't exist in self.dest. It will
  305. process the data in chunks as follows:
  306. For each chunk of data, call 'function' with a Numpy array
  307. corresponding to the data. The data is converted to a Numpy
  308. array in chunks of 'rows' rows at a time.
  309. If 'intervals' is not None, process those intervals instead of
  310. the default list.
  311. 'function' should be defined with the same interface as
  312. nilmtools.filter.example_callback_function. See the
  313. documentation of that for details. 'args' are passed to
  314. 'function'.
  315. """
  316. extractor = NumpyClient(self.src.url).stream_extract_numpy
  317. inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
  318. extractor_func = functools.partial(extractor, self.src.path,
  319. layout = self.src.layout,
  320. maxrows = rows)
  321. inserter_func = functools.partial(inserter, self.dest.path)
  322. for interval in (intervals or self.intervals()):
  323. print("Processing", interval.human_string())
  324. process_numpy_interval(interval, extractor_func, inserter_func,
  325. rows * 3, function, args)
  326. def main(argv = None):
  327. # This is just a dummy function; actual filters can use the other
  328. # functions to prepare stuff, and then do something with the data.
  329. f = Filter()
  330. parser = f.setup_parser()
  331. args = f.parse_args(argv)
  332. for i in f.intervals():
  333. print("Generic filter: need to handle", i.human_string())
  334. if __name__ == "__main__":
  335. main()