You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

146 lines
5.5 KiB

  1. #!/usr/bin/python
  2. import nilmdb
  3. from nilmdb.utils.printf import *
  4. from nilmdb.utils import datetime_tz
  5. from nilmdb.utils.time import parse_time, format_time
  6. import sys
  7. import re
  8. import argparse
  9. import subprocess
  10. class ParseError(Exception):
  11. def __init__(self, filename, linenum, error):
  12. msg = filename + ":" + str(linenum+1) + ": " + error
  13. super(ParseError, self).__init__(msg)
  14. parser = argparse.ArgumentParser(description = """\
  15. Insert data from ethstream, either live (using the system time as a
  16. reference) or prerecorded (using comments in the file as a reference).
  17. The data is assumed to have been recorded at the specified rate.
  18. Small discrepencies between the accumulated timestamps and the
  19. reference time are ignored; larger discrepencies cause gaps to be
  20. created in the stream. Overlapping data returns an error.
  21. """, formatter_class = argparse.RawDescriptionHelpFormatter)
  22. parser.add_argument("-u", "--url", action="store",
  23. default="http://localhost:12380/",
  24. help="NilmDB server URL (default: %(default)s)")
  25. parser.add_argument("-r", "--rate", action="store", default=8000, type=float,
  26. help="Data rate in Hz (default: %(default)s)")
  27. parser.add_argument("-l", "--live", action="store_true",
  28. help="Live capture; use system time to verify rate")
  29. parser.add_argument("path", action="store",
  30. help="Path of stream, e.g. /foo/bar")
  31. parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
  32. default=[sys.stdin], help="Input files (default: stdin)")
  33. args = parser.parse_args()
  34. printf("Stream path: %s\n", args.path)
  35. printf(" Data rate: %s Hz\n", repr(args.rate))
  36. client = nilmdb.Client(args.url)
  37. # data_ts is the timestamp that we'll use for the current line
  38. # (increased by 1 / rate every line)
  39. data_ts = None
  40. # clock_ts is the imprecise "real" timestamp (from the filename,
  41. # comments, or or system clock)
  42. clock_ts = None
  43. lines_ok = 0
  44. with client.stream_insert_context(args.path) as stream:
  45. for f in args.infile:
  46. filename = f.name
  47. printf("Processing %s\n", filename)
  48. # If the filename ends in .gz, open it with gzcat just to be
  49. # nice.
  50. if filename.endswith(".gz"):
  51. p = subprocess.Popen(["gzip", "-dc"],
  52. stdin = f, stdout = subprocess.PIPE)
  53. f = p.stdout
  54. # Try to get a real timestamp from the filename
  55. try:
  56. clock_ts = parse_time(filename).totimestamp()
  57. printf("Clock time updated to %s\n", format_time(clock_ts))
  58. except ValueError:
  59. pass
  60. # Read each line
  61. for (linenum, line) in enumerate(f):
  62. # Any comments?
  63. if line.find('#') >= 0:
  64. (line, comment) = line.split('#', 1)
  65. line += '\n'
  66. else:
  67. comment = None
  68. # Figure out our best guess about the real time
  69. if args.live:
  70. clock_ts = datetime_tz.datetime_tz.now().totimestamp()
  71. elif comment:
  72. try:
  73. clock_ts = parse_time(comment).totimestamp()
  74. printf("Clock time updated to %s\n", format_time(clock_ts))
  75. except ValueError:
  76. pass
  77. # If no content other than the newline, skip this line
  78. if len(line) <= 1:
  79. continue
  80. # Make a timestamp for this line
  81. if data_ts is None:
  82. if clock_ts is None:
  83. err = "No idea what timestamp to use"
  84. raise ParseError(filename, linenum, err)
  85. data_ts = clock_ts
  86. else:
  87. data_ts += 1.0 / args.rate
  88. # If we have a real timestamp, compare it to the line
  89. # timestamp, and make sure things are working out.
  90. if clock_ts is not None:
  91. if (data_ts - 10) > clock_ts:
  92. # Accumulated line timestamps are in the future.
  93. # If we were to set data_ts=clock_ts, we'd create
  94. # an overlap, so we have to just bail out here.
  95. err = sprintf("Data is coming in too fast: data time is %s "
  96. "but clock time is only %s",
  97. format_time(data_ts), format_time(clock_ts))
  98. raise ParserError(filename, linenum, err)
  99. if (data_ts + 10) < clock_ts:
  100. # Accumulated line timetamps are in the past. We
  101. # can just skip some time and leave a gap in the
  102. # data.
  103. printf("Skipping data timestamp forward from %s to %s "
  104. "to match clock time\n",
  105. format_time(data_ts), format_time(clock_ts))
  106. stream.finalize()
  107. data_ts = clock_ts
  108. # Don't use this clock time anymore until we update it
  109. clock_ts = None
  110. # Once in a while a line might be truncated, if we're at
  111. # the end of a file. Accept it, but only if we've been
  112. # getting good lines too
  113. if line[-1] != '\n':
  114. if lines_ok > 10:
  115. printf("Ignoring short line at %s:%d\n", filename, linenum)
  116. continue
  117. else:
  118. raise ParserError(filename, linenum, "short line")
  119. lines_ok = 0
  120. stream.insert_line(repr(data_ts) + " " + line)
  121. lines_ok += 1
  122. print "Done"