88 lines
2.8 KiB
Python
88 lines
2.8 KiB
Python
import nilmdb
|
|
from nilmdb.utils.printf import *
|
|
from nilmdb.utils import datetime_tz
|
|
|
|
from nose.tools import *
|
|
from nose.tools import assert_raises
|
|
import os
|
|
import sys
|
|
import cStringIO
|
|
|
|
from testutil.helpers import *
|
|
|
|
from nilmdb.utils import timestamper
|
|
|
|
class TestTimestamper(object):
|
|
|
|
# Not a very comprehensive test, but it's good enough.
|
|
|
|
def test_timestamper(self):
|
|
def join(list):
|
|
return "\n".join(list) + "\n"
|
|
|
|
datetime_tz.localtz_set("America/New_York")
|
|
|
|
start = nilmdb.utils.time.parse_time("03/24/2012")
|
|
lines_in = [ "hello", "world", "hello world", "# commented out" ]
|
|
lines_out = [ "1332561600000000 hello",
|
|
"1332561600000125 world",
|
|
"1332561600000250 hello world" ]
|
|
|
|
# full
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
|
foo = ts.readlines()
|
|
eq_(foo, join(lines_out))
|
|
in_("TimestamperRate(..., start=", str(ts))
|
|
|
|
# first 30 or so bytes means the first 2 lines
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
|
foo = ts.readlines(30)
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
# stop iteration early
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
|
1332561600000200)
|
|
foo = ""
|
|
for line in ts:
|
|
foo += line
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
# stop iteration early (readlines)
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
|
1332561600000200)
|
|
foo = ts.readlines()
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
# stop iteration really early
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
|
1332561600000000)
|
|
foo = ts.readlines()
|
|
eq_(foo, "")
|
|
|
|
# use iterator
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
|
foo = ""
|
|
for line in ts:
|
|
foo += line
|
|
eq_(foo, join(lines_out))
|
|
|
|
# check that TimestamperNow gives similar result
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperNow(input)
|
|
foo = ts.readlines()
|
|
ne_(foo, join(lines_out))
|
|
eq_(len(foo), len(join(lines_out)))
|
|
eq_(str(ts), "TimestamperNow(...)")
|
|
|
|
# Test passing a file (should be empty)
|
|
ts = timestamper.TimestamperNow("/dev/null")
|
|
for line in ts:
|
|
raise AssertionError
|
|
ts.close()
|