2012-03-24 13:32:11 -04:00
|
|
|
import nilmdb
|
2012-12-31 15:52:28 -05:00
|
|
|
from nilmdb.utils.printf import *
|
2015-01-18 17:44:51 -05:00
|
|
|
from nilmdb.utils import datetime_tz
|
2012-03-24 13:32:11 -04:00
|
|
|
|
|
|
|
from nose.tools import *
|
|
|
|
from nose.tools import assert_raises
|
|
|
|
import os
|
|
|
|
import sys
|
2012-03-24 17:15:24 -04:00
|
|
|
import cStringIO
|
2012-03-24 13:32:11 -04:00
|
|
|
|
2013-01-05 15:00:34 -05:00
|
|
|
from testutil.helpers import *
|
2012-03-24 13:32:11 -04:00
|
|
|
|
2013-01-28 19:04:52 -05:00
|
|
|
from nilmdb.utils import timestamper
|
|
|
|
|
2012-03-24 13:32:11 -04:00
|
|
|
class TestTimestamper(object):
|
|
|
|
|
2012-03-24 17:15:24 -04:00
|
|
|
# Not a very comprehensive test, but it's good enough.
|
2012-04-09 17:31:39 -04:00
|
|
|
|
2012-03-24 13:32:11 -04:00
|
|
|
def test_timestamper(self):
|
2012-03-24 17:15:24 -04:00
|
|
|
def join(list):
|
|
|
|
return "\n".join(list) + "\n"
|
2012-03-24 13:32:11 -04:00
|
|
|
|
2015-01-18 17:44:51 -05:00
|
|
|
datetime_tz.localtz_set("America/New_York")
|
|
|
|
|
2013-03-14 17:34:40 -04:00
|
|
|
start = nilmdb.utils.time.parse_time("03/24/2012")
|
2012-03-26 18:28:33 -04:00
|
|
|
lines_in = [ "hello", "world", "hello world", "# commented out" ]
|
2013-03-14 19:46:06 -04:00
|
|
|
lines_out = [ "1332561600000000 hello",
|
|
|
|
"1332561600000125 world",
|
|
|
|
"1332561600000250 hello world" ]
|
2012-03-24 13:32:11 -04:00
|
|
|
|
2012-03-24 17:15:24 -04:00
|
|
|
# full
|
|
|
|
input = cStringIO.StringIO(join(lines_in))
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
2012-03-24 17:15:24 -04:00
|
|
|
foo = ts.readlines()
|
|
|
|
eq_(foo, join(lines_out))
|
2012-04-09 17:31:39 -04:00
|
|
|
in_("TimestamperRate(..., start=", str(ts))
|
2012-03-24 17:15:24 -04:00
|
|
|
|
|
|
|
# first 30 or so bytes means the first 2 lines
|
|
|
|
input = cStringIO.StringIO(join(lines_in))
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
2012-03-24 17:15:24 -04:00
|
|
|
foo = ts.readlines(30)
|
|
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
|
|
|
|
# stop iteration early
|
|
|
|
input = cStringIO.StringIO(join(lines_in))
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
2013-03-14 19:46:06 -04:00
|
|
|
1332561600000200)
|
2012-03-24 17:15:24 -04:00
|
|
|
foo = ""
|
|
|
|
for line in ts:
|
|
|
|
foo += line
|
|
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
|
|
|
|
# stop iteration early (readlines)
|
|
|
|
input = cStringIO.StringIO(join(lines_in))
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
2013-03-14 19:46:06 -04:00
|
|
|
1332561600000200)
|
2012-03-24 17:15:24 -04:00
|
|
|
foo = ts.readlines()
|
|
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
|
|
|
|
# stop iteration really early
|
|
|
|
input = cStringIO.StringIO(join(lines_in))
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
2013-03-14 19:46:06 -04:00
|
|
|
1332561600000000)
|
2012-03-24 17:15:24 -04:00
|
|
|
foo = ts.readlines()
|
|
|
|
eq_(foo, "")
|
|
|
|
|
|
|
|
# use iterator
|
|
|
|
input = cStringIO.StringIO(join(lines_in))
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
2012-03-24 17:15:24 -04:00
|
|
|
foo = ""
|
|
|
|
for line in ts:
|
|
|
|
foo += line
|
|
|
|
eq_(foo, join(lines_out))
|
|
|
|
|
|
|
|
# check that TimestamperNow gives similar result
|
|
|
|
input = cStringIO.StringIO(join(lines_in))
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperNow(input)
|
2012-03-24 17:15:24 -04:00
|
|
|
foo = ts.readlines()
|
|
|
|
ne_(foo, join(lines_out))
|
|
|
|
eq_(len(foo), len(join(lines_out)))
|
2012-04-09 17:31:39 -04:00
|
|
|
eq_(str(ts), "TimestamperNow(...)")
|
2012-03-24 17:15:24 -04:00
|
|
|
|
|
|
|
# Test passing a file (should be empty)
|
2013-01-28 19:04:52 -05:00
|
|
|
ts = timestamper.TimestamperNow("/dev/null")
|
2012-03-24 13:32:11 -04:00
|
|
|
for line in ts:
|
2012-03-24 17:15:24 -04:00
|
|
|
raise AssertionError
|
|
|
|
ts.close()
|