You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

156 lines
5.8 KiB

  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.utils.printf import *
  4. from nilmdb.utils.time import parse_time, format_time
  5. import time
  6. import sys
  7. import re
  8. import argparse
  9. import subprocess
  10. class ParseError(Exception):
  11. def __init__(self, filename, error):
  12. msg = filename + ": " + error
  13. super(ParseError, self).__init__(msg)
  14. parser = argparse.ArgumentParser(description = """\
  15. Insert data from ethstream, either live (using the system time as a
  16. reference) or prerecorded (using comments in the file as a reference).
  17. The data is assumed to have been recorded at the specified rate.
  18. Small discrepencies between the accumulated timestamps and the
  19. reference time are ignored; larger discrepencies cause gaps to be
  20. created in the stream. Overlapping data returns an error.
  21. """, formatter_class = argparse.RawDescriptionHelpFormatter)
  22. parser.add_argument("-u", "--url", action="store",
  23. default="http://localhost:12380/",
  24. help="NilmDB server URL (default: %(default)s)")
  25. parser.add_argument("-r", "--rate", action="store", default=8000, type=float,
  26. help="Data rate in Hz (default: %(default)s)")
  27. parser.add_argument("-l", "--live", action="store_true",
  28. help="Live capture; use system time to verify rate")
  29. parser.add_argument("path", action="store",
  30. help="Path of stream, e.g. /foo/bar")
  31. parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
  32. default=[sys.stdin], help="Input files (default: stdin)")
  33. args = parser.parse_args()
  34. printf("Stream path: %s\n", args.path)
  35. printf(" Data rate: %s Hz\n", repr(args.rate))
  36. client = nilmdb.client.Client(args.url)
  37. # Local copies to save dictionary lookups
  38. live = args.live
  39. # data_ts is the timestamp that we'll use for the current line
  40. data_ts_base = 0
  41. data_ts_inc = 0
  42. data_ts_step = 1.0 / args.rate
  43. # clock_ts is the imprecise "real" timestamp (from the filename,
  44. # comments, or or system clock)
  45. clock_ts = None
  46. def print_clock_updated():
  47. printf("Clock time updated to %s\n", format_time(clock_ts))
  48. if data_ts_base != 0:
  49. diff = data_ts - clock_ts
  50. if diff >= 0:
  51. printf(" (data timestamp ahead by %.6f sec)\n", diff)
  52. else:
  53. printf(" (data timestamp behind by %.6f sec)\n", -diff)
  54. with client.stream_insert_context(args.path) as stream:
  55. for f in args.infile:
  56. filename = f.name
  57. printf("Processing %s\n", filename)
  58. # If the filename ends in .gz, open it with gzcat instead.
  59. if filename.endswith(".gz"):
  60. p = subprocess.Popen(["gzip", "-dc"],
  61. stdin = f, stdout = subprocess.PIPE)
  62. f = p.stdout
  63. # Try to get a real timestamp from the filename
  64. try:
  65. # Subtract 1 hour because files are created at the end of the hour.
  66. # Hopefully, we'll be able to use internal comments and this value
  67. # won't matter anyway.
  68. clock_ts = parse_time(filename).totimestamp() - 3600
  69. print_clock_updated()
  70. except ValueError:
  71. pass
  72. truncated_lines = 0
  73. # Read each line
  74. for line in f:
  75. data_ts = data_ts_base + data_ts_inc * data_ts_step
  76. # If no content other than the newline, skip it
  77. if len(line) <= 1:
  78. continue
  79. # If line starts with a comment, look for a timestamp
  80. if line[0] == '#':
  81. try:
  82. clock_ts = parse_time(line[1:]).totimestamp()
  83. print_clock_updated()
  84. except ValueError:
  85. pass
  86. continue
  87. # If inserting live, use clock timestamp
  88. if live:
  89. clock_ts = time.time()
  90. # If we have a real timestamp, compare it to the data
  91. # timestamp, and make sure things match up.
  92. if clock_ts is not None:
  93. if (data_ts - 10) > clock_ts:
  94. # Accumulated line timestamps are in the future.
  95. # If we were to set data_ts=clock_ts, we'd create
  96. # an overlap, so we have to just bail out here.
  97. err = sprintf("Data is coming in too fast: data time is %s "
  98. "but clock time is only %s",
  99. format_time(data_ts), format_time(clock_ts))
  100. raise ParseError(filename, err)
  101. if (data_ts + 10) < clock_ts:
  102. # Accumulated line timetamps are in the past. We
  103. # can just skip some time and leave a gap in the
  104. # data.
  105. if data_ts_base != 0:
  106. printf("Skipping data timestamp forward from %s to %s "
  107. "to match clock time\n",
  108. format_time(data_ts), format_time(clock_ts))
  109. stream.finalize()
  110. data_ts_base = data_ts = clock_ts
  111. data_ts_inc = 0
  112. # Don't use this clock time anymore until we update it
  113. clock_ts = None
  114. if data_ts_base == 0:
  115. raise ParseError(filename, "No idea what timestamp to use")
  116. # This line is legit, so increment timestamp
  117. data_ts_inc += 1
  118. # Once in a while a line might be truncated, if we're at
  119. # the end of a file. Ignore it, but if we ignore too many,
  120. # bail out.
  121. if line[-1] != '\n':
  122. truncated_lines += 1
  123. if truncated_lines > 3:
  124. raise ParseError(filename, "too many short lines")
  125. printf("Ignoring short line in %s\n", filename)
  126. continue
  127. # Insert it
  128. stream.insert("%.6f %s" % (data_ts, line))
  129. print "Done"