|
- import nilmdb
- from nilmdb.utils.printf import *
- import datetime_tz
-
- from nose.tools import *
- from nose.tools import assert_raises
- import os
- import sys
- import io
-
- from testutil.helpers import *
-
- from nilmdb.utils import timestamper
-
- class TestTimestamper(object):
-
- # Not a very comprehensive test, but it's good enough.
-
- def test_timestamper(self):
- def join(list):
- return "\n".join(list) + "\n"
-
- datetime_tz.localtz_set("America/New_York")
-
- start = nilmdb.utils.time.parse_time("03/24/2012")
- lines_in = [ "hello", "world", "hello world", "# commented out" ]
- lines_out = [ "1332561600000000 hello",
- "1332561600000125 world",
- "1332561600000250 hello world" ]
-
- # full
- input = io.StringIO(join(lines_in))
- ts = timestamper.TimestamperRate(input, start, 8000)
- foo = ts.readlines()
- eq_(foo, join(lines_out))
- in_("TimestamperRate(..., start=", str(ts))
-
- # first 30 or so bytes means the first 2 lines
- input = io.StringIO(join(lines_in))
- ts = timestamper.TimestamperRate(input, start, 8000)
- foo = ts.readlines(30)
- eq_(foo, join(lines_out[0:2]))
-
- # stop iteration early
- input = io.StringIO(join(lines_in))
- ts = timestamper.TimestamperRate(input, start, 8000,
- 1332561600000200)
- foo = ""
- for line in ts:
- foo += line
- eq_(foo, join(lines_out[0:2]))
-
- # stop iteration early (readlines)
- input = io.StringIO(join(lines_in))
- ts = timestamper.TimestamperRate(input, start, 8000,
- 1332561600000200)
- foo = ts.readlines()
- eq_(foo, join(lines_out[0:2]))
-
- # stop iteration really early
- input = io.StringIO(join(lines_in))
- ts = timestamper.TimestamperRate(input, start, 8000,
- 1332561600000000)
- foo = ts.readlines()
- eq_(foo, "")
-
- # use iterator
- input = io.StringIO(join(lines_in))
- ts = timestamper.TimestamperRate(input, start, 8000)
- foo = ""
- for line in ts:
- foo += line
- eq_(foo, join(lines_out))
-
- # check that TimestamperNow gives similar result
- input = io.StringIO(join(lines_in))
- ts = timestamper.TimestamperNow(input)
- foo = ts.readlines()
- ne_(foo, join(lines_out))
- eq_(len(foo), len(join(lines_out)))
- eq_(str(ts), "TimestamperNow(...)")
-
- # Test passing a file (should be empty)
- ts = timestamper.TimestamperNow("/dev/null")
- for line in ts:
- raise AssertionError
- ts.close()
|