161 lines
5.5 KiB
Python
161 lines
5.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import nilmdb
|
|
from nilmdb.utils.printf import *
|
|
from nose.tools import *
|
|
from nose.tools import assert_raises
|
|
import itertools
|
|
import errno
|
|
import pickle
|
|
|
|
from testutil.helpers import *
|
|
|
|
testdb = b"tests/bulkdata-testdb"
|
|
|
|
import nilmdb.server.bulkdata
|
|
from nilmdb.server.bulkdata import BulkData
|
|
|
|
class TestBulkData(object):
|
|
|
|
def test_bulkdata(self):
|
|
for (size, files, db) in [ ( None, None, testdb ),
|
|
( 25, 1000, testdb ),
|
|
( 1000, 3, testdb.decode("utf-8") ) ]:
|
|
recursive_unlink(db)
|
|
os.mkdir(db)
|
|
self.do_basic(db, size, files)
|
|
|
|
def test_corruption(self):
|
|
db = testdb
|
|
recursive_unlink(db)
|
|
os.mkdir(db)
|
|
|
|
# Remove lock before close
|
|
data = BulkData(db)
|
|
os.unlink(data.lock)
|
|
data.close()
|
|
|
|
def do_basic(self, db, size, files):
|
|
"""Do the basic test with variable file_size and files_per_dir"""
|
|
data = BulkData(db, file_size = size, files_per_dir = files)
|
|
|
|
# Try opening it again (should result in locking error)
|
|
with assert_raises(IOError) as e:
|
|
data2 = BulkData(db)
|
|
in_("already locked by another process", str(e.exception))
|
|
|
|
# create empty
|
|
with assert_raises(ValueError):
|
|
data.create("/foo", "uint16_8")
|
|
with assert_raises(ValueError):
|
|
data.create("foo/bar", "uint16_8")
|
|
data.create("/foo/bar", "uint16_8")
|
|
data.create("/foo/baz/quux", "float64_16")
|
|
with assert_raises(ValueError) as e:
|
|
data.create("/foo/bar/baz", "uint16_8")
|
|
in_("path is subdir of existing node", str(e.exception))
|
|
with assert_raises(ValueError):
|
|
data.create("/foo/baz", "float64_16")
|
|
|
|
# filename too long (tests error paths in _create_parents)
|
|
with assert_raises(OSError) as e:
|
|
data.create("/test/long/" + "a"*10000 + "/foo", "int32_1")
|
|
eq_(e.exception.errno, errno.ENAMETOOLONG)
|
|
|
|
|
|
# get node -- see if caching works
|
|
nodes = []
|
|
for i in range(5000):
|
|
nodes.append(data.getnode("/foo/bar"))
|
|
nodes.append(data.getnode("/foo/baz/quux"))
|
|
del nodes
|
|
|
|
def get_node_slice(key):
|
|
if isinstance(key, slice):
|
|
return [ node.get_data(x, x+1) for x in
|
|
range(*key.indices(node.nrows)) ]
|
|
return node.get_data(key, key+1)
|
|
|
|
# Test node
|
|
node = data.getnode("/foo/bar")
|
|
with assert_raises(IndexError):
|
|
x = get_node_slice(0)
|
|
with assert_raises(IndexError):
|
|
x = node[0] # timestamp
|
|
raw = []
|
|
for i in range(1000):
|
|
raw.append(b"%d 1 2 3 4 5 6 7 8\n" % (10000 + i))
|
|
node.append_data(b"".join(raw[0:1]), 0, 50000)
|
|
node.append_data(b"".join(raw[1:100]), 0, 50000)
|
|
node.append_data(b"".join(raw[100:]), 0, 50000)
|
|
|
|
misc_slices = [ 0, 100, slice(None), slice(0), slice(10),
|
|
slice(5,10), slice(3,None), slice(3,-3),
|
|
slice(20,10), slice(200,100,-1), slice(None,0,-1),
|
|
slice(100,500,5) ]
|
|
|
|
# Extract slices
|
|
for s in misc_slices:
|
|
eq_(get_node_slice(s), raw[s])
|
|
|
|
# Extract misc slices while appending, to make sure the
|
|
# data isn't being added in the middle of the file
|
|
for s in [2, slice(1,5), 2, slice(1,5)]:
|
|
node.append_data(b"0 0 0 0 0 0 0 0 0\n", 0, 50000)
|
|
raw.append(b"0 0 0 0 0 0 0 0 0\n")
|
|
eq_(get_node_slice(s), raw[s])
|
|
|
|
# Get some coverage of remove; remove is more fully tested
|
|
# in cmdline
|
|
with assert_raises(IndexError):
|
|
node.remove(9999,9998)
|
|
|
|
# close, reopen
|
|
data.close()
|
|
data = BulkData(db, file_size = size, files_per_dir = files)
|
|
node = data.getnode("/foo/bar")
|
|
|
|
# make an empty dir that will get ignored by _get_nrows
|
|
data.close()
|
|
os.mkdir(os.path.join(testdb, b"data/foo/bar/0123"))
|
|
data = BulkData(db, file_size = size, files_per_dir = files)
|
|
node = data.getnode("/foo/bar")
|
|
|
|
# make a corrupted file that's the wrong size
|
|
data.close()
|
|
with open(os.path.join(testdb, b"data/foo/bar/0123/0123"), "wb") as f:
|
|
f.write(b"x"*17)
|
|
data = BulkData(db, file_size = size, files_per_dir = files)
|
|
with assert_raises(ValueError) as e:
|
|
node = data.getnode("/foo/bar")
|
|
in_("file offset is not a multiple of data size", str(e.exception))
|
|
|
|
# mess with format
|
|
data.close()
|
|
with open(os.path.join(testdb, b"data/foo/bar/_format"), "rb") as f:
|
|
fmt = pickle.load(f)
|
|
fmt["version"] = 2
|
|
with open(os.path.join(testdb, b"data/foo/bar/_format"), "wb") as f:
|
|
pickle.dump(fmt, f, 2)
|
|
data = BulkData(db, file_size = size, files_per_dir = files)
|
|
with assert_raises(NotImplementedError) as e:
|
|
node = data.getnode("/foo/bar")
|
|
in_("old version 2 bulk data store is not supported", str(e.exception))
|
|
|
|
# Extract slices
|
|
for s in misc_slices:
|
|
eq_(get_node_slice(s), raw[s])
|
|
|
|
# destroy
|
|
with assert_raises(ValueError):
|
|
data.destroy("/foo")
|
|
with assert_raises(ValueError):
|
|
data.destroy("/foo/baz")
|
|
with assert_raises(ValueError):
|
|
data.destroy("/foo/qwerty")
|
|
data.destroy("/foo/baz/quux")
|
|
data.destroy("/foo/bar")
|
|
|
|
# close
|
|
data.close()
|