You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

insert.py 11 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.utils.printf import *
  4. from nilmdb.utils.time import (parse_time, timestamp_to_human,
  5. timestamp_to_seconds, seconds_to_timestamp,
  6. rate_to_period, now as time_now)
  7. import nilmtools
  8. import time
  9. import sys
  10. import re
  11. import argparse
  12. import subprocess
  13. import textwrap
  14. class ParseError(Exception):
  15. def __init__(self, filename, error):
  16. msg = filename + ": " + error
  17. super(ParseError, self).__init__(msg)
  18. def parse_args(argv = None):
  19. parser = argparse.ArgumentParser(
  20. formatter_class = argparse.RawDescriptionHelpFormatter,
  21. version = nilmtools.__version__,
  22. description = textwrap.dedent("""\
  23. Insert large amount of data from an external source like ethstream.
  24. This code tracks two timestamps:
  25. (1) The 'data' timestamp is the precise timestamp corresponding to
  26. a particular row of data, and is the timestamp that gets
  27. inserted into the database. It increases by 'data_delta' for
  28. every row of input.
  29. 'data_delta' can come from one of two sources. If '--delta'
  30. is specified, it is pulled from the first column of data. If
  31. '--rate' is specified, 'data_delta' is set to a fixed value of
  32. (1 / rate).
  33. (2) The 'clock' timestamp is the less precise timestamp that gives
  34. the absolute time. It can come from two sources. If '--live'
  35. is specified, it is pulled directly from the system clock. If
  36. '--file' is specified, it is extracted from the input filename
  37. every time a new file is opened for read, and from comments
  38. that appear in the file.
  39. Small discrepencies between 'data' and 'clock' are ignored. If
  40. the 'data' timestamp ever differs from the 'clock' timestamp by
  41. more than 'max_gap' seconds:
  42. - If 'data' is running behind, there is a gap in the data, so it
  43. is stepped forward to match 'clock'.
  44. - If 'data' is running ahead, there is overlap in the data, and an
  45. error is raised. If '--skip' is specified, the current file
  46. is skipped instead of raising an error.
  47. """))
  48. parser.add_argument("-u", "--url", action="store",
  49. default="http://localhost/nilmdb/",
  50. help="NilmDB server URL (default: %(default)s)")
  51. group = parser.add_argument_group("Misc options")
  52. group.add_argument("-D", "--dry-run", action="store_true",
  53. help="Parse files, but don't insert any data")
  54. group.add_argument("-s", "--skip", action="store_true",
  55. help="Skip files if the data would overlap")
  56. group.add_argument("-m", "--max-gap", action="store", default=10.0,
  57. metavar="SEC", type=float,
  58. help="Max discrepency between clock and data "
  59. "timestamps (default: %(default)s)")
  60. group = parser.add_argument_group("Data timestamp delta")
  61. exc = group.add_mutually_exclusive_group()
  62. exc.add_argument("-r", "--rate", action="store", default=8000.0,
  63. type=float,
  64. help="Data_delta is constant 1/RATE "
  65. "(default: %(default)s Hz)")
  66. exc.add_argument("-d", "--delta", action="store_true",
  67. help="Data_delta is the first number in each line")
  68. group = parser.add_argument_group("Clock timestamp source")
  69. exc = group.add_mutually_exclusive_group()
  70. exc.add_argument("-l", "--live", action="store_true",
  71. help="Use live system time for clock timestamp")
  72. exc.add_argument("-f", "--file", action="store_true", default=True,
  73. help="Use filename or comments for clock timestamp")
  74. group.add_argument("-o", "--offset-filename", metavar="SEC",
  75. action="store", default=-3600.0, type=float,
  76. help="Offset to add to filename timestamps "
  77. "(default: %(default)s)")
  78. group.add_argument("-O", "--offset-comment", metavar="SEC",
  79. action="store", default=0.0, type=float,
  80. help="Offset to add to comment timestamps "
  81. "(default: %(default)s)")
  82. group = parser.add_argument_group("Database path")
  83. group.add_argument("path", action="store",
  84. help="Path of stream, e.g. /foo/bar")
  85. group = parser.add_argument_group("Input files")
  86. group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
  87. default=[sys.stdin],
  88. help="Input files (default: stdin)")
  89. args = parser.parse_args(argv)
  90. printf(" Stream path: %s\n", args.path)
  91. printf(" Data timestamp: ")
  92. if args.delta:
  93. printf("delta on each input line\n")
  94. else:
  95. printf("fixed rate %s Hz\n", repr(args.rate))
  96. printf(" Clock timestamp: ")
  97. if args.live:
  98. printf("live system clock\n")
  99. else:
  100. printf("from filenames and comments\n")
  101. printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
  102. printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
  103. printf(" Max gap: %s seconds\n", repr(args.max_gap))
  104. if args.dry_run:
  105. printf("Dry run (no data will be inserted)\n")
  106. return args
  107. def main(argv = None):
  108. args = parse_args(argv)
  109. client = nilmdb.client.Client(args.url)
  110. # data_ts is the timestamp that we'll use for the current line
  111. data_ts_base = 0
  112. data_ts_inc = 0
  113. data_ts_rate = args.rate
  114. data_ts_delta = 0
  115. def get_data_ts():
  116. if args.delta:
  117. return data_ts_base + data_ts_delta
  118. else:
  119. return data_ts_base + rate_to_period(data_ts_rate,
  120. data_ts_inc)
  121. # clock_ts is the imprecise "real" timestamp (from the filename,
  122. # comments, or system clock)
  123. clock_ts = None
  124. def print_clock_updated():
  125. printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
  126. if data_ts_base != 0:
  127. diff = get_data_ts() - clock_ts
  128. if diff >= 0:
  129. printf(" (data timestamp ahead by %.6f sec)\n",
  130. timestamp_to_seconds(diff))
  131. else:
  132. printf(" (data timestamp behind by %.6f sec)\n",
  133. timestamp_to_seconds(-diff))
  134. offset_filename = seconds_to_timestamp(args.offset_filename)
  135. offset_comment = seconds_to_timestamp(args.offset_comment)
  136. max_gap = seconds_to_timestamp(args.max_gap)
  137. with client.stream_insert_context(args.path) as stream:
  138. for f in args.infile:
  139. filename = f.name
  140. printf("Processing %s\n", filename)
  141. # If the filename ends in .gz, re-open it with gzip to
  142. # decompress.
  143. if filename.endswith(".gz"):
  144. p = subprocess.Popen(["gzip", "-dc"],
  145. stdin = f, stdout = subprocess.PIPE)
  146. f = p.stdout
  147. # Try to get a real timestamp from the filename
  148. try:
  149. # Subtract 1 hour because files are created at the end
  150. # of the hour. Hopefully, we'll be able to use
  151. # internal comments and this value won't matter anyway.
  152. clock_ts = parse_time(filename) + offset_filename
  153. print_clock_updated()
  154. except ValueError:
  155. pass
  156. truncated_lines = 0
  157. # Read each line
  158. for line in f:
  159. # Once in a while a line might be truncated, if we're
  160. # at the end of a file. Ignore it, but if we ignore
  161. # too many, bail out.
  162. if line[-1] != '\n':
  163. truncated_lines += 1
  164. if truncated_lines > 3:
  165. raise ParseError(filename, "too many short lines")
  166. printf("Ignoring short line in %s\n", filename)
  167. continue
  168. # If no content other than the newline, skip it
  169. if len(line) <= 1:
  170. continue
  171. # If line starts with a comment, look for a timestamp
  172. if line[0] == '#':
  173. try:
  174. clock_ts = parse_time(line[1:]) + offset_comment
  175. print_clock_updated()
  176. except ValueError:
  177. pass
  178. continue
  179. # If --delta mode, increment data_ts_delta by the
  180. # delta from the file.
  181. if args.delta:
  182. try:
  183. (delta, line) = line.split(None, 1)
  184. data_ts_delta += float(delta)
  185. except ValueError:
  186. raise ParseError(filename, "can't parse delta")
  187. # Calculate data_ts for this row
  188. data_ts = get_data_ts()
  189. # If inserting live, use clock timestamp
  190. if args.live:
  191. clock_ts = time_now()
  192. # If we have a real timestamp, compare it to the data
  193. # timestamp, and make sure things match up.
  194. if clock_ts is not None:
  195. if (data_ts - max_gap) > clock_ts:
  196. # Accumulated line timestamps are in the future.
  197. # If we were to set data_ts=clock_ts, we'd create
  198. # an overlap, so we have to just bail out here.
  199. err = sprintf("Data is coming in too fast: data time "
  200. "is %s but clock time is only %s",
  201. timestamp_to_human(data_ts),
  202. timestamp_to_human(clock_ts))
  203. if args.skip:
  204. printf("%s\n", err)
  205. printf("Skipping the remainder of this file\n")
  206. break
  207. raise ParseError(filename, err)
  208. if (data_ts + max_gap) < clock_ts:
  209. # Accumulated line timetamps are in the past. We
  210. # can just skip some time and leave a gap in the
  211. # data.
  212. if data_ts_base != 0:
  213. printf("Skipping data timestamp forward from "
  214. "%s to %s to match clock time\n",
  215. timestamp_to_human(data_ts),
  216. timestamp_to_human(clock_ts))
  217. stream.finalize()
  218. data_ts_base = data_ts = clock_ts
  219. data_ts_inc = data_ts_delta = 0
  220. # Don't use this clock time anymore until we update it
  221. clock_ts = None
  222. if data_ts_base == 0:
  223. raise ParseError(filename, "No idea what timestamp to use")
  224. # This line is legit, so increment timestamp (for --rate)
  225. data_ts_inc += 1
  226. # Insert it
  227. if not args.dry_run:
  228. stream.insert("%d %s" % (data_ts, line))
  229. print("Done")
  230. if __name__ == "__main__":
  231. main()