|
- import nilmdb
- from nilmdb.printf import *
-
- import datetime_tz
-
- from nose.tools import *
- from nose.tools import assert_raises
- import os
- import sys
- import cStringIO
-
- def eq_(a, b):
- if not a == b:
- raise AssertionError("%r != %r" % (a, b))
-
- def ne_(a, b):
- if not a != b:
- raise AssertionError("unexpected %r == %r" % (a, b))
-
- class TestTimestamper(object):
-
- # Not a very comprehensive test, but it's good enough.
-
- def test_timestamper(self):
- def join(list):
- return "\n".join(list) + "\n"
-
- start = datetime_tz.datetime_tz.smartparse("03/24/2012").totimestamp()
- lines_in = [ "hello", "world", "hello world" ]
- lines_out = [ "1332561600.000000 hello",
- "1332561600.000125 world",
- "1332561600.000250 hello world" ]
-
- # full
- input = cStringIO.StringIO(join(lines_in))
- ts = nilmdb.timestamper.TimestamperRate(input, start, 8000)
- foo = ts.readlines()
- eq_(foo, join(lines_out))
-
- # first 30 or so bytes means the first 2 lines
- input = cStringIO.StringIO(join(lines_in))
- ts = nilmdb.timestamper.TimestamperRate(input, start, 8000)
- foo = ts.readlines(30)
- eq_(foo, join(lines_out[0:2]))
-
- # stop iteration early
- input = cStringIO.StringIO(join(lines_in))
- ts = nilmdb.timestamper.TimestamperRate(input, start, 8000,
- 1332561600.000200)
- foo = ""
- for line in ts:
- foo += line
- eq_(foo, join(lines_out[0:2]))
-
- # stop iteration early (readlines)
- input = cStringIO.StringIO(join(lines_in))
- ts = nilmdb.timestamper.TimestamperRate(input, start, 8000,
- 1332561600.000200)
- foo = ts.readlines()
- eq_(foo, join(lines_out[0:2]))
-
- # stop iteration really early
- input = cStringIO.StringIO(join(lines_in))
- ts = nilmdb.timestamper.TimestamperRate(input, start, 8000,
- 1332561600.000000)
- foo = ts.readlines()
- eq_(foo, "")
-
- # use iterator
- input = cStringIO.StringIO(join(lines_in))
- ts = nilmdb.timestamper.TimestamperRate(input, start, 8000)
- foo = ""
- for line in ts:
- foo += line
- eq_(foo, join(lines_out))
-
- # check that TimestamperNow gives similar result
- input = cStringIO.StringIO(join(lines_in))
- ts = nilmdb.timestamper.TimestamperNow(input)
- foo = ts.readlines()
- ne_(foo, join(lines_out))
- eq_(len(foo), len(join(lines_out)))
-
- # Test passing a file (should be empty)
- ts = nilmdb.timestamper.TimestamperNow("/dev/null")
- for line in ts:
- raise AssertionError
- ts.close()
|