import nilmdb from nilmdb.utils.printf import * from nilmdb.utils import datetime_tz from nose.tools import * from nose.tools import assert_raises import os import sys import cStringIO from testutil.helpers import * from nilmdb.utils import timestamper class TestTimestamper(object): # Not a very comprehensive test, but it's good enough. def test_timestamper(self): def join(list): return "\n".join(list) + "\n" start = datetime_tz.datetime_tz.smartparse("03/24/2012").totimestamp() lines_in = [ "hello", "world", "hello world", "# commented out" ] lines_out = [ "1332561600.000000 hello", "1332561600.000125 world", "1332561600.000250 hello world" ] # full input = cStringIO.StringIO(join(lines_in)) ts = timestamper.TimestamperRate(input, start, 8000) foo = ts.readlines() eq_(foo, join(lines_out)) in_("TimestamperRate(..., start=", str(ts)) # first 30 or so bytes means the first 2 lines input = cStringIO.StringIO(join(lines_in)) ts = timestamper.TimestamperRate(input, start, 8000) foo = ts.readlines(30) eq_(foo, join(lines_out[0:2])) # stop iteration early input = cStringIO.StringIO(join(lines_in)) ts = timestamper.TimestamperRate(input, start, 8000, 1332561600.000200) foo = "" for line in ts: foo += line eq_(foo, join(lines_out[0:2])) # stop iteration early (readlines) input = cStringIO.StringIO(join(lines_in)) ts = timestamper.TimestamperRate(input, start, 8000, 1332561600.000200) foo = ts.readlines() eq_(foo, join(lines_out[0:2])) # stop iteration really early input = cStringIO.StringIO(join(lines_in)) ts = timestamper.TimestamperRate(input, start, 8000, 1332561600.000000) foo = ts.readlines() eq_(foo, "") # use iterator input = cStringIO.StringIO(join(lines_in)) ts = timestamper.TimestamperRate(input, start, 8000) foo = "" for line in ts: foo += line eq_(foo, join(lines_out)) # check that TimestamperNow gives similar result input = cStringIO.StringIO(join(lines_in)) ts = timestamper.TimestamperNow(input) foo = ts.readlines() ne_(foo, join(lines_out)) eq_(len(foo), len(join(lines_out))) eq_(str(ts), "TimestamperNow(...)") # Test passing a file (should be empty) ts = timestamper.TimestamperNow("/dev/null") for line in ts: raise AssertionError ts.close()