nilmdb/tests/test_bulkdata.py

161 lines
5.5 KiB
Python

# -*- coding: utf-8 -*-
import nilmdb
from nilmdb.utils.printf import *
from nose.tools import *
from nose.tools import assert_raises
import itertools
import errno
import pickle
from testutil.helpers import *
testdb = b"tests/bulkdata-testdb"
import nilmdb.server.bulkdata
from nilmdb.server.bulkdata import BulkData
class TestBulkData(object):
def test_bulkdata(self):
for (size, files, db) in [ ( None, None, testdb ),
( 25, 1000, testdb ),
( 1000, 3, testdb.decode("utf-8") ) ]:
recursive_unlink(db)
os.mkdir(db)
self.do_basic(db, size, files)
def test_corruption(self):
db = testdb
recursive_unlink(db)
os.mkdir(db)
# Remove lock before close
data = BulkData(db)
os.unlink(data.lock)
data.close()
def do_basic(self, db, size, files):
"""Do the basic test with variable file_size and files_per_dir"""
data = BulkData(db, file_size = size, files_per_dir = files)
# Try opening it again (should result in locking error)
with assert_raises(IOError) as e:
data2 = BulkData(db)
in_("already locked by another process", str(e.exception))
# create empty
with assert_raises(ValueError):
data.create("/foo", "uint16_8")
with assert_raises(ValueError):
data.create("foo/bar", "uint16_8")
data.create("/foo/bar", "uint16_8")
data.create("/foo/baz/quux", "float64_16")
with assert_raises(ValueError) as e:
data.create("/foo/bar/baz", "uint16_8")
in_("path is subdir of existing node", str(e.exception))
with assert_raises(ValueError):
data.create("/foo/baz", "float64_16")
# filename too long (tests error paths in _create_parents)
with assert_raises(OSError) as e:
data.create("/test/long/" + "a"*10000 + "/foo", "int32_1")
eq_(e.exception.errno, errno.ENAMETOOLONG)
# get node -- see if caching works
nodes = []
for i in range(5000):
nodes.append(data.getnode("/foo/bar"))
nodes.append(data.getnode("/foo/baz/quux"))
del nodes
def get_node_slice(key):
if isinstance(key, slice):
return [ node.get_data(x, x+1) for x in
range(*key.indices(node.nrows)) ]
return node.get_data(key, key+1)
# Test node
node = data.getnode("/foo/bar")
with assert_raises(IndexError):
x = get_node_slice(0)
with assert_raises(IndexError):
x = node[0] # timestamp
raw = []
for i in range(1000):
raw.append(b"%d 1 2 3 4 5 6 7 8\n" % (10000 + i))
node.append_data(b"".join(raw[0:1]), 0, 50000)
node.append_data(b"".join(raw[1:100]), 0, 50000)
node.append_data(b"".join(raw[100:]), 0, 50000)
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
slice(5,10), slice(3,None), slice(3,-3),
slice(20,10), slice(200,100,-1), slice(None,0,-1),
slice(100,500,5) ]
# Extract slices
for s in misc_slices:
eq_(get_node_slice(s), raw[s])
# Extract misc slices while appending, to make sure the
# data isn't being added in the middle of the file
for s in [2, slice(1,5), 2, slice(1,5)]:
node.append_data(b"0 0 0 0 0 0 0 0 0\n", 0, 50000)
raw.append(b"0 0 0 0 0 0 0 0 0\n")
eq_(get_node_slice(s), raw[s])
# Get some coverage of remove; remove is more fully tested
# in cmdline
with assert_raises(IndexError):
node.remove(9999,9998)
# close, reopen
data.close()
data = BulkData(db, file_size = size, files_per_dir = files)
node = data.getnode("/foo/bar")
# make an empty dir that will get ignored by _get_nrows
data.close()
os.mkdir(os.path.join(testdb, b"data/foo/bar/0123"))
data = BulkData(db, file_size = size, files_per_dir = files)
node = data.getnode("/foo/bar")
# make a corrupted file that's the wrong size
data.close()
with open(os.path.join(testdb, b"data/foo/bar/0123/0123"), "wb") as f:
f.write(b"x"*17)
data = BulkData(db, file_size = size, files_per_dir = files)
with assert_raises(ValueError) as e:
node = data.getnode("/foo/bar")
in_("file offset is not a multiple of data size", str(e.exception))
# mess with format
data.close()
with open(os.path.join(testdb, b"data/foo/bar/_format"), "rb") as f:
fmt = pickle.load(f)
fmt["version"] = 2
with open(os.path.join(testdb, b"data/foo/bar/_format"), "wb") as f:
pickle.dump(fmt, f, 2)
data = BulkData(db, file_size = size, files_per_dir = files)
with assert_raises(NotImplementedError) as e:
node = data.getnode("/foo/bar")
in_("old version 2 bulk data store is not supported", str(e.exception))
# Extract slices
for s in misc_slices:
eq_(get_node_slice(s), raw[s])
# destroy
with assert_raises(ValueError):
data.destroy("/foo")
with assert_raises(ValueError):
data.destroy("/foo/baz")
with assert_raises(ValueError):
data.destroy("/foo/qwerty")
data.destroy("/foo/baz/quux")
data.destroy("/foo/bar")
# close
data.close()