nilmdb/tests/test_bulkdata.py

108 lines
3.4 KiB
Python
Raw Permalink Normal View History

2013-01-05 14:55:22 -05:00
# -*- coding: utf-8 -*-
import nilmdb
from nilmdb.utils.printf import *
from nose.tools import *
from nose.tools import assert_raises
import itertools
2013-01-05 15:00:34 -05:00
from testutil.helpers import *
2013-01-05 14:55:22 -05:00
testdb = "tests/bulkdata-testdb"
import nilmdb.server.bulkdata
from nilmdb.server.bulkdata import BulkData
2013-01-05 14:55:22 -05:00
class TestBulkData(object):
def test_bulkdata(self):
for (size, files, db) in [ ( 0, 0, testdb ),
( 25, 1000, testdb ),
( 1000, 3, testdb.decode("utf-8") ) ]:
recursive_unlink(db)
os.mkdir(db)
self.do_basic(db, size, files)
def do_basic(self, db, size, files):
"""Do the basic test with variable file_size and files_per_dir"""
if not size or not files:
data = BulkData(db)
else:
data = BulkData(db, file_size = size, files_per_dir = files)
# create empty
with assert_raises(ValueError):
data.create("/foo", "uint16_8")
with assert_raises(ValueError):
data.create("foo/bar", "uint16_8")
data.create("/foo/bar", "uint16_8")
data.create(u"/foo/baz/quux", "float64_16")
with assert_raises(ValueError):
data.create("/foo/bar/baz", "uint16_8")
with assert_raises(ValueError):
data.create("/foo/baz", "float64_16")
# get node -- see if caching works
nodes = []
for i in range(5000):
nodes.append(data.getnode("/foo/bar"))
nodes.append(data.getnode("/foo/baz/quux"))
del nodes
# Test node
node = data.getnode("/foo/bar")
with assert_raises(IndexError):
x = node[0]
raw = []
for i in range(1000):
raw.append([10000+i, 1, 2, 3, 4, 5, 6, 7, 8 ])
node.append(raw[0:1])
node.append(raw[1:100])
node.append(raw[100:])
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
slice(5,10), slice(3,None), slice(3,-3),
slice(20,10), slice(200,100,-1), slice(None,0,-1),
slice(100,500,5) ]
# Extract slices
for s in misc_slices:
eq_(node[s], raw[s])
2013-03-03 14:00:00 -05:00
# Extract misc slices while appending, to make sure the
# data isn't being added in the middle of the file
for s in [2, slice(1,5), 2, slice(1,5)]:
node.append([[0,0,0,0,0,0,0,0,0]])
raw.append([0,0,0,0,0,0,0,0,0])
eq_(node[s], raw[s])
# Get some coverage of remove; remove is more fully tested
# in cmdline
with assert_raises(IndexError):
node.remove(9999,9998)
2013-01-05 14:55:22 -05:00
# close, reopen
# reopen
data.close()
if not size or not files:
data = BulkData(db)
else:
data = BulkData(db, file_size = size, files_per_dir = files)
node = data.getnode("/foo/bar")
# Extract slices
for s in misc_slices:
eq_(node[s], raw[s])
# destroy
with assert_raises(ValueError):
data.destroy("/foo")
with assert_raises(ValueError):
data.destroy("/foo/baz")
with assert_raises(ValueError):
data.destroy("/foo/qwerty")
data.destroy("/foo/baz/quux")
data.destroy("/foo/bar")
# close
data.close()