You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

168 lines
6.5 KiB

  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.utils.printf import *
  4. from nilmdb.utils.time import parse_time, format_time
  5. import time
  6. import sys
  7. import re
  8. import argparse
  9. import subprocess
  10. class ParseError(Exception):
  11. def __init__(self, filename, error):
  12. msg = filename + ": " + error
  13. super(ParseError, self).__init__(msg)
  14. def parse_args():
  15. parser = argparse.ArgumentParser(description = """\
  16. Insert data from ethstream, either live (using the system time as a
  17. reference) or prerecorded (using comments in the file as a reference).
  18. The data is assumed to have been recorded at the specified rate.
  19. Small discrepencies between the accumulated timestamps and the
  20. reference time are ignored; larger discrepencies cause gaps to be
  21. created in the stream. Overlapping data returns an error.
  22. """, formatter_class = argparse.RawDescriptionHelpFormatter)
  23. parser.add_argument("-u", "--url", action="store",
  24. default="http://localhost:12380/",
  25. help="NilmDB server URL (default: %(default)s)")
  26. parser.add_argument("-r", "--rate", action="store", default=8000, type=float,
  27. help="Data rate in Hz (default: %(default)s)")
  28. parser.add_argument("-l", "--live", action="store_true",
  29. help="Live capture; use system time to verify rate")
  30. parser.add_argument("path", action="store",
  31. help="Path of stream, e.g. /foo/bar")
  32. parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
  33. default=[sys.stdin], help="Input files (default: stdin)")
  34. args = parser.parse_args()
  35. printf("Stream path: %s\n", args.path)
  36. printf(" Data rate: %s Hz\n", repr(args.rate))
  37. return args
  38. def main(args = None):
  39. if args is None:
  40. args = parse_args()
  41. client = nilmdb.client.Client(args.url)
  42. # Local copies to save dictionary lookups
  43. live = args.live
  44. # data_ts is the timestamp that we'll use for the current line
  45. data_ts_base = 0
  46. data_ts_inc = 0
  47. data_ts_step = 1.0 / args.rate
  48. # clock_ts is the imprecise "real" timestamp (from the filename,
  49. # comments, or or system clock)
  50. clock_ts = None
  51. def print_clock_updated():
  52. printf("Clock time updated to %s\n", format_time(clock_ts))
  53. if data_ts_base != 0:
  54. diff = data_ts - clock_ts
  55. if diff >= 0:
  56. printf(" (data timestamp ahead by %.6f sec)\n", diff)
  57. else:
  58. printf(" (data timestamp behind by %.6f sec)\n", -diff)
  59. with client.stream_insert_context(args.path) as stream:
  60. for f in args.infile:
  61. filename = f.name
  62. printf("Processing %s\n", filename)
  63. # If the filename ends in .gz, open it with gzcat instead.
  64. if filename.endswith(".gz"):
  65. p = subprocess.Popen(["gzip", "-dc"],
  66. stdin = f, stdout = subprocess.PIPE)
  67. f = p.stdout
  68. # Try to get a real timestamp from the filename
  69. try:
  70. # Subtract 1 hour because files are created at the end
  71. # of the hour. Hopefully, we'll be able to use
  72. # internal comments and this value won't matter anyway.
  73. clock_ts = parse_time(filename).totimestamp() - 3600
  74. print_clock_updated()
  75. except ValueError:
  76. pass
  77. truncated_lines = 0
  78. # Read each line
  79. for line in f:
  80. data_ts = data_ts_base + data_ts_inc * data_ts_step
  81. # If no content other than the newline, skip it
  82. if len(line) <= 1:
  83. continue
  84. # If line starts with a comment, look for a timestamp
  85. if line[0] == '#':
  86. try:
  87. clock_ts = parse_time(line[1:]).totimestamp()
  88. print_clock_updated()
  89. except ValueError:
  90. pass
  91. continue
  92. # If inserting live, use clock timestamp
  93. if live:
  94. clock_ts = time.time()
  95. # If we have a real timestamp, compare it to the data
  96. # timestamp, and make sure things match up.
  97. if clock_ts is not None:
  98. if (data_ts - 10) > clock_ts:
  99. # Accumulated line timestamps are in the future.
  100. # If we were to set data_ts=clock_ts, we'd create
  101. # an overlap, so we have to just bail out here.
  102. err = sprintf("Data is coming in too fast: data time "
  103. "is %s but clock time is only %s",
  104. format_time(data_ts),
  105. format_time(clock_ts))
  106. raise ParseError(filename, err)
  107. if (data_ts + 10) < clock_ts:
  108. # Accumulated line timetamps are in the past. We
  109. # can just skip some time and leave a gap in the
  110. # data.
  111. if data_ts_base != 0:
  112. printf("Skipping data timestamp forward from "
  113. "%s to %s to match clock time\n",
  114. format_time(data_ts),
  115. format_time(clock_ts))
  116. stream.finalize()
  117. data_ts_base = data_ts = clock_ts
  118. data_ts_inc = 0
  119. # Don't use this clock time anymore until we update it
  120. clock_ts = None
  121. if data_ts_base == 0:
  122. raise ParseError(filename, "No idea what timestamp to use")
  123. # This line is legit, so increment timestamp
  124. data_ts_inc += 1
  125. # Once in a while a line might be truncated, if we're at
  126. # the end of a file. Ignore it, but if we ignore too many,
  127. # bail out.
  128. if line[-1] != '\n':
  129. truncated_lines += 1
  130. if truncated_lines > 3:
  131. raise ParseError(filename, "too many short lines")
  132. printf("Ignoring short line in %s\n", filename)
  133. continue
  134. # Insert it
  135. stream.insert("%.6f %s" % (data_ts, line))
  136. print "Done"
  137. if __name__ == "__main__":
  138. main()