Jim Paris
f316026592
datetime_tz isn't readily available, so it's a lot easier to just package it within the nilmdb tree.
93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
import nilmdb
|
|
from nilmdb.utils.printf import *
|
|
from nilmdb.utils import datetime_tz
|
|
|
|
from nose.tools import *
|
|
from nose.tools import assert_raises
|
|
import os
|
|
import sys
|
|
import cStringIO
|
|
|
|
from testutil.helpers import *
|
|
|
|
from nilmdb.utils import timestamper
|
|
|
|
class TestTimestamper(object):
|
|
|
|
# Not a very comprehensive test, but it's good enough.
|
|
|
|
def test_timestamper(self):
|
|
def join(list):
|
|
return "\n".join(list) + "\n"
|
|
|
|
start = datetime_tz.datetime_tz.smartparse("03/24/2012").totimestamp()
|
|
lines_in = [ "hello", "world", "hello world", "# commented out" ]
|
|
lines_out = [ "1332561600.000000 hello",
|
|
"1332561600.000125 world",
|
|
"1332561600.000250 hello world" ]
|
|
|
|
# full
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
|
foo = ts.readlines()
|
|
eq_(foo, join(lines_out))
|
|
in_("TimestamperRate(..., start=", str(ts))
|
|
|
|
# first 30 or so bytes means the first 2 lines
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
|
foo = ts.readlines(30)
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
# stop iteration early
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
|
1332561600.000200)
|
|
foo = ""
|
|
for line in ts:
|
|
foo += line
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
# stop iteration early (readlines)
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
|
1332561600.000200)
|
|
foo = ts.readlines()
|
|
eq_(foo, join(lines_out[0:2]))
|
|
|
|
# stop iteration really early
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000,
|
|
1332561600.000000)
|
|
foo = ts.readlines()
|
|
eq_(foo, "")
|
|
|
|
# use iterator
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperRate(input, start, 8000)
|
|
foo = ""
|
|
for line in ts:
|
|
foo += line
|
|
eq_(foo, join(lines_out))
|
|
|
|
# check that TimestamperNow gives similar result
|
|
input = cStringIO.StringIO(join(lines_in))
|
|
ts = timestamper.TimestamperNow(input)
|
|
foo = ts.readlines()
|
|
ne_(foo, join(lines_out))
|
|
eq_(len(foo), len(join(lines_out)))
|
|
eq_(str(ts), "TimestamperNow(...)")
|
|
|
|
# Test passing a file (should be empty)
|
|
ts = timestamper.TimestamperNow("/dev/null")
|
|
for line in ts:
|
|
raise AssertionError
|
|
ts.close()
|
|
|
|
# Test the null timestamper
|
|
input = cStringIO.StringIO(join(lines_out)) # note: lines_out
|
|
ts = timestamper.TimestamperNull(input)
|
|
foo = ts.readlines()
|
|
eq_(foo, join(lines_out))
|
|
eq_(str(ts), "TimestamperNull(...)")
|