You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

279 lines
11 KiB

  1. #!/usr/bin/env python3
  2. import nilmdb.client
  3. from nilmdb.utils.printf import printf, sprintf
  4. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  5. timestamp_to_seconds, seconds_to_timestamp,
  6. rate_to_period, now as time_now)
  7. import os
  8. import nilmtools
  9. import sys
  10. import argparse
  11. import subprocess
  12. import textwrap
  13. class ParseError(Exception):
  14. def __init__(self, filename, error):
  15. msg = filename + ": " + error
  16. super(ParseError, self).__init__(msg)
  17. def parse_args(argv=None):
  18. parser = argparse.ArgumentParser(
  19. formatter_class=argparse.RawDescriptionHelpFormatter,
  20. description=textwrap.dedent("""\
  21. Insert large amount of data from an external source like ethstream.
  22. This code tracks two timestamps:
  23. (1) The 'data' timestamp is the precise timestamp corresponding to
  24. a particular row of data, and is the timestamp that gets
  25. inserted into the database. It increases by 'data_delta' for
  26. every row of input.
  27. 'data_delta' can come from one of two sources. If '--delta'
  28. is specified, it is pulled from the first column of data. If
  29. '--rate' is specified, 'data_delta' is set to a fixed value of
  30. (1 / rate).
  31. (2) The 'clock' timestamp is the less precise timestamp that gives
  32. the absolute time. It can come from two sources. If '--live'
  33. is specified, it is pulled directly from the system clock. If
  34. '--file' is specified, it is extracted from the input filename
  35. every time a new file is opened for read, and from comments
  36. that appear in the file.
  37. Small discrepencies between 'data' and 'clock' are ignored. If
  38. the 'data' timestamp ever differs from the 'clock' timestamp by
  39. more than 'max_gap' seconds:
  40. - If 'data' is running behind, there is a gap in the data, so it
  41. is stepped forward to match 'clock'.
  42. - If 'data' is running ahead, there is overlap in the data, and an
  43. error is raised. If '--skip' is specified, the current file
  44. is skipped instead of raising an error.
  45. """))
  46. def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
  47. parser.add_argument("-u", "--url", action="store", default=def_url,
  48. help="NilmDB server URL (default: %(default)s)")
  49. parser.add_argument("-v", "--version", action="version",
  50. version=nilmtools.__version__)
  51. group = parser.add_argument_group("Misc options")
  52. group.add_argument("-D", "--dry-run", action="store_true",
  53. help="Parse files, but don't insert any data")
  54. group.add_argument("-s", "--skip", action="store_true",
  55. help="Skip files if the data would overlap")
  56. group.add_argument("-m", "--max-gap", action="store", default=10.0,
  57. metavar="SEC", type=float,
  58. help="Max discrepency between clock and data "
  59. "timestamps (default: %(default)s)")
  60. group = parser.add_argument_group("Data timestamp delta")
  61. exc = group.add_mutually_exclusive_group()
  62. exc.add_argument("-r", "--rate", action="store", default=8000.0,
  63. type=float,
  64. help="Data_delta is constant 1/RATE "
  65. "(default: %(default)s Hz)")
  66. exc.add_argument("-d", "--delta", action="store_true",
  67. help="Data_delta is the first number in each line")
  68. group = parser.add_argument_group("Clock timestamp source")
  69. exc = group.add_mutually_exclusive_group()
  70. exc.add_argument("-l", "--live", action="store_true",
  71. help="Use live system time for clock timestamp")
  72. exc.add_argument("-f", "--file", action="store_true", default=True,
  73. help="Use filename or comments for clock timestamp")
  74. group.add_argument("-o", "--offset-filename", metavar="SEC",
  75. action="store", default=-3600.0, type=float,
  76. help="Offset to add to filename timestamps "
  77. "(default: %(default)s)")
  78. group.add_argument("-O", "--offset-comment", metavar="SEC",
  79. action="store", default=0.0, type=float,
  80. help="Offset to add to comment timestamps "
  81. "(default: %(default)s)")
  82. group = parser.add_argument_group("Database path")
  83. group.add_argument("path", action="store",
  84. help="Path of stream, e.g. /foo/bar")
  85. group = parser.add_argument_group("Input files")
  86. group.add_argument("infile", type=argparse.FileType('rb'), nargs='*',
  87. default=[sys.stdin],
  88. help="Input files (default: stdin)")
  89. args = parser.parse_args(argv)
  90. printf(" Stream path: %s\n", args.path)
  91. printf(" Data timestamp: ")
  92. if args.delta:
  93. printf("delta on each input line\n")
  94. else:
  95. printf("fixed rate %s Hz\n", repr(args.rate))
  96. printf(" Clock timestamp: ")
  97. if args.live:
  98. printf("live system clock\n")
  99. else:
  100. printf("from filenames and comments\n")
  101. printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
  102. printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
  103. printf(" Max gap: %s seconds\n", repr(args.max_gap))
  104. if args.dry_run:
  105. printf("Dry run (no data will be inserted)\n")
  106. return args
  107. def main(argv=None):
  108. args = parse_args(argv)
  109. client = nilmdb.client.Client(args.url)
  110. # data_ts is the timestamp that we'll use for the current line
  111. data_ts_base = 0
  112. data_ts_inc = 0
  113. data_ts_rate = args.rate
  114. data_ts_delta = 0
  115. def get_data_ts():
  116. if args.delta:
  117. return data_ts_base + data_ts_delta
  118. else:
  119. return data_ts_base + rate_to_period(data_ts_rate,
  120. data_ts_inc)
  121. # clock_ts is the imprecise "real" timestamp (from the filename,
  122. # comments, or system clock)
  123. clock_ts = None
  124. def print_clock_updated():
  125. printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
  126. if data_ts_base != 0:
  127. diff = get_data_ts() - clock_ts
  128. if diff >= 0:
  129. printf(" (data timestamp ahead by %.6f sec)\n",
  130. timestamp_to_seconds(diff))
  131. else:
  132. printf(" (data timestamp behind by %.6f sec)\n",
  133. timestamp_to_seconds(-diff))
  134. offset_filename = seconds_to_timestamp(args.offset_filename)
  135. offset_comment = seconds_to_timestamp(args.offset_comment)
  136. max_gap = seconds_to_timestamp(args.max_gap)
  137. with client.stream_insert_context(args.path) as stream:
  138. for f in args.infile:
  139. filename = f.name
  140. printf("Processing %s\n", filename)
  141. # If the filename ends in .gz, re-open it with gzip to
  142. # decompress.
  143. if filename.endswith(".gz"):
  144. p = subprocess.Popen(["gzip", "-dc"],
  145. stdin=f, stdout=subprocess.PIPE)
  146. f = p.stdout
  147. # Try to get a real timestamp from the filename
  148. try:
  149. # Subtract 1 hour because files are created at the end
  150. # of the hour. Hopefully, we'll be able to use
  151. # internal comments and this value won't matter anyway.
  152. clock_ts = parse_time(filename) + offset_filename
  153. print_clock_updated()
  154. except ValueError:
  155. pass
  156. # Read each line
  157. for line in f:
  158. # The last line in the file may be truncated.
  159. # Ignore it; we shouldn't ever see more than one at the end.
  160. if line[-1] != b'\n'[0]:
  161. printf("Ignoring short line in %s\n", filename)
  162. continue
  163. # If no content other than the newline, skip it
  164. if len(line) <= 1:
  165. continue
  166. # If line starts with a comment, look for a timestamp
  167. if line[0] == b'#'[0]:
  168. try:
  169. comment = line[1:].decode('utf-8', errors='ignore')
  170. clock_ts = parse_time(comment) + offset_comment
  171. print_clock_updated()
  172. except ValueError:
  173. pass
  174. # for some reason the following line doesn't show up as
  175. # being covered, even though it definitely runs
  176. continue # pragma: no cover
  177. # If --delta mode, increment data_ts_delta by the
  178. # delta from the file.
  179. if args.delta:
  180. try:
  181. (delta, line) = line.split(None, 1)
  182. data_ts_delta += float(delta)
  183. except ValueError:
  184. raise ParseError(filename, "can't parse delta")
  185. # Calculate data_ts for this row
  186. data_ts = get_data_ts()
  187. # If inserting live, use clock timestamp
  188. if args.live:
  189. clock_ts = time_now()
  190. # If we have a real timestamp, compare it to the data
  191. # timestamp, and make sure things match up.
  192. if clock_ts is not None:
  193. if (data_ts - max_gap) > clock_ts:
  194. # Accumulated line timestamps are in the future.
  195. # If we were to set data_ts=clock_ts, we'd create
  196. # an overlap, so we have to just bail out here.
  197. err = sprintf("Data is coming in too fast: data time "
  198. "is %s but clock time is only %s",
  199. timestamp_to_human(data_ts),
  200. timestamp_to_human(clock_ts))
  201. if args.skip:
  202. printf("%s\n", err)
  203. printf("Skipping the remainder of this file\n")
  204. break
  205. raise ParseError(filename, err)
  206. if (data_ts + max_gap) < clock_ts:
  207. # Accumulated line timetamps are in the past. We
  208. # can just skip some time and leave a gap in the
  209. # data.
  210. if data_ts_base != 0:
  211. printf("Skipping data timestamp forward from "
  212. "%s to %s to match clock time\n",
  213. timestamp_to_human(data_ts),
  214. timestamp_to_human(clock_ts))
  215. stream.finalize()
  216. data_ts_base = data_ts = clock_ts
  217. data_ts_inc = data_ts_delta = 0
  218. # Don't use this clock time anymore until we update it
  219. clock_ts = None
  220. if data_ts_base == 0:
  221. raise ParseError(filename, "No idea what timestamp to use")
  222. # This line is legit, so increment timestamp (for --rate)
  223. data_ts_inc += 1
  224. # Insert it
  225. if not args.dry_run:
  226. stream.insert(b"%d %s" % (data_ts, line))
  227. print("Done")
  228. if __name__ == "__main__":
  229. main()