Compare commits
7 Commits
nilmdb-1.9
...
nilmdb-1.1
Author | SHA1 | Date | |
---|---|---|---|
ab9a327130 | |||
da72fc9777 | |||
a01cb4132d | |||
7c3da2fe44 | |||
f0e06dc436 | |||
ddc0eb4264 | |||
0a22db3965 |
@@ -18,10 +18,8 @@ class HTTPClient(object):
|
|||||||
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
|
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
|
||||||
self.baseurl = reparsed.rstrip('/') + '/'
|
self.baseurl = reparsed.rstrip('/') + '/'
|
||||||
|
|
||||||
# Build Requests session object, enable SSL verification
|
# Note whether we want SSL verification
|
||||||
self.verify_ssl = verify_ssl
|
self.verify_ssl = verify_ssl
|
||||||
self.session = requests.Session()
|
|
||||||
self.session.verify = True
|
|
||||||
|
|
||||||
# Saved response, so that tests can verify a few things.
|
# Saved response, so that tests can verify a few things.
|
||||||
self._last_response = {}
|
self._last_response = {}
|
||||||
@@ -59,17 +57,34 @@ class HTTPClient(object):
|
|||||||
raise Error(**args)
|
raise Error(**args)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.session.close()
|
pass
|
||||||
|
|
||||||
def _do_req(self, method, url, query_data, body_data, stream, headers):
|
def _do_req(self, method, url, query_data, body_data, stream, headers):
|
||||||
url = urlparse.urljoin(self.baseurl, url)
|
url = urlparse.urljoin(self.baseurl, url)
|
||||||
try:
|
try:
|
||||||
response = self.session.request(method, url,
|
# Create a new session, ensure we send "Connection: close",
|
||||||
params = query_data,
|
# and explicitly close connection after the transfer.
|
||||||
data = body_data,
|
# This is to avoid HTTP/1.1 persistent connections
|
||||||
stream = stream,
|
# (keepalive), because they have fundamental race
|
||||||
headers = headers,
|
# conditions when there are delays between requests:
|
||||||
verify = self.verify_ssl)
|
# a new request may be sent at the same instant that the
|
||||||
|
# server decides to timeout the connection.
|
||||||
|
session = requests.Session()
|
||||||
|
if headers is None:
|
||||||
|
headers = {}
|
||||||
|
headers["Connection"] = "close"
|
||||||
|
response = session.request(method, url,
|
||||||
|
params = query_data,
|
||||||
|
data = body_data,
|
||||||
|
stream = stream,
|
||||||
|
headers = headers,
|
||||||
|
verify = self.verify_ssl)
|
||||||
|
|
||||||
|
# Close the connection. If it's a generator (stream =
|
||||||
|
# True), the requests library shouldn't actually close the
|
||||||
|
# HTTP connection until all data has been read from the
|
||||||
|
# response.
|
||||||
|
session.close()
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
raise ServerError(status = "502 Error", url = url,
|
raise ServerError(status = "502 Error", url = url,
|
||||||
message = str(e.message))
|
message = str(e.message))
|
||||||
|
@@ -83,8 +83,11 @@ _sql_schema_updates = {
|
|||||||
class NilmDB(object):
|
class NilmDB(object):
|
||||||
verbose = 0
|
verbose = 0
|
||||||
|
|
||||||
def __init__(self, basepath, max_results=None,
|
def __init__(self, basepath,
|
||||||
max_removals=None, bulkdata_args=None):
|
max_results=None,
|
||||||
|
max_removals=None,
|
||||||
|
max_int_removals=None,
|
||||||
|
bulkdata_args=None):
|
||||||
"""Initialize NilmDB at the given basepath.
|
"""Initialize NilmDB at the given basepath.
|
||||||
Other arguments are for debugging / testing:
|
Other arguments are for debugging / testing:
|
||||||
|
|
||||||
@@ -92,7 +95,10 @@ class NilmDB(object):
|
|||||||
stream_intervals or stream_extract response.
|
stream_intervals or stream_extract response.
|
||||||
|
|
||||||
'max_removals' is the max rows to delete at once
|
'max_removals' is the max rows to delete at once
|
||||||
in stream_move.
|
in stream_remove.
|
||||||
|
|
||||||
|
'max_int_removals' is the max intervals to delete
|
||||||
|
at once in stream_remove.
|
||||||
|
|
||||||
'bulkdata_args' is kwargs for the bulkdata module.
|
'bulkdata_args' is kwargs for the bulkdata module.
|
||||||
"""
|
"""
|
||||||
@@ -134,6 +140,9 @@ class NilmDB(object):
|
|||||||
# Remove up to this many rows per call to stream_remove.
|
# Remove up to this many rows per call to stream_remove.
|
||||||
self.max_removals = max_removals or 1048576
|
self.max_removals = max_removals or 1048576
|
||||||
|
|
||||||
|
# Remove up to this many intervals per call to stream_remove.
|
||||||
|
self.max_int_removals = max_int_removals or 4096
|
||||||
|
|
||||||
def get_basepath(self):
|
def get_basepath(self):
|
||||||
return self.basepath
|
return self.basepath
|
||||||
|
|
||||||
@@ -643,13 +652,22 @@ class NilmDB(object):
|
|||||||
to_remove = Interval(start, end)
|
to_remove = Interval(start, end)
|
||||||
removed = 0
|
removed = 0
|
||||||
remaining = self.max_removals
|
remaining = self.max_removals
|
||||||
|
int_remaining = self.max_int_removals
|
||||||
restart = None
|
restart = None
|
||||||
|
|
||||||
# Can't remove intervals from within the iterator, so we need to
|
# Can't remove intervals from within the iterator, so we need to
|
||||||
# remember what's currently in the intersection now.
|
# remember what's currently in the intersection now.
|
||||||
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
all_candidates = list(intervals.intersection(to_remove, orig = True))
|
||||||
|
|
||||||
|
remove_start = None
|
||||||
|
remove_end = None
|
||||||
|
|
||||||
for (dbint, orig) in all_candidates:
|
for (dbint, orig) in all_candidates:
|
||||||
|
# Stop if we've hit the max number of interval removals
|
||||||
|
if int_remaining <= 0:
|
||||||
|
restart = dbint.start
|
||||||
|
break
|
||||||
|
|
||||||
# Find row start and end
|
# Find row start and end
|
||||||
row_start = self._find_start(table, dbint)
|
row_start = self._find_start(table, dbint)
|
||||||
row_end = self._find_end(table, dbint)
|
row_end = self._find_end(table, dbint)
|
||||||
@@ -670,14 +688,29 @@ class NilmDB(object):
|
|||||||
# Remove interval from the database
|
# Remove interval from the database
|
||||||
self._remove_interval(stream_id, orig, dbint)
|
self._remove_interval(stream_id, orig, dbint)
|
||||||
|
|
||||||
# Remove data from the underlying table storage
|
# Remove data from the underlying table storage,
|
||||||
table.remove(row_start, row_end)
|
# coalescing adjacent removals to reduce the number of calls
|
||||||
|
# to table.remove.
|
||||||
|
if remove_end == row_start:
|
||||||
|
# Extend our coalesced region
|
||||||
|
remove_end = row_end
|
||||||
|
else:
|
||||||
|
# Perform previous removal, then save this one
|
||||||
|
if remove_end is not None:
|
||||||
|
table.remove(remove_start, remove_end)
|
||||||
|
remove_start = row_start
|
||||||
|
remove_end = row_end
|
||||||
|
|
||||||
# Count how many were removed
|
# Count how many were removed
|
||||||
removed += row_end - row_start
|
removed += row_end - row_start
|
||||||
remaining -= row_end - row_start
|
remaining -= row_end - row_start
|
||||||
|
int_remaining -= 1
|
||||||
|
|
||||||
if restart is not None:
|
if restart is not None:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Perform any final coalesced removal
|
||||||
|
if remove_end is not None:
|
||||||
|
table.remove(remove_start, remove_end)
|
||||||
|
|
||||||
return (removed, restart)
|
return (removed, restart)
|
||||||
|
@@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
|
|||||||
|
|
||||||
@wrap_class_method
|
@wrap_class_method
|
||||||
def __del__(orig, self, *args, **kwargs):
|
def __del__(orig, self, *args, **kwargs):
|
||||||
if "_must_close" in self.__dict__:
|
try:
|
||||||
fprintf(errorfile, "error: %s.close() wasn't called!\n",
|
if "_must_close" in self.__dict__:
|
||||||
self.__class__.__name__)
|
fprintf(errorfile, "error: %s.close() wasn't called!\n",
|
||||||
return orig(self, *args, **kwargs)
|
self.__class__.__name__)
|
||||||
|
return orig(self, *args, **kwargs)
|
||||||
|
except: # pragma: no cover
|
||||||
|
pass
|
||||||
|
|
||||||
@wrap_class_method
|
@wrap_class_method
|
||||||
def close(orig, self, *args, **kwargs):
|
def close(orig, self, *args, **kwargs):
|
||||||
|
@@ -120,7 +120,7 @@ def serializer_proxy(obj_or_type):
|
|||||||
try:
|
try:
|
||||||
self.__call_queue.put((None, None, None, None))
|
self.__call_queue.put((None, None, None, None))
|
||||||
self.__thread.join()
|
self.__thread.join()
|
||||||
except TypeError: # pragma: no cover
|
except: # pragma: no cover
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return SerializerObjectProxy(obj_or_type)
|
return SerializerObjectProxy(obj_or_type)
|
||||||
|
2
setup.py
2
setup.py
@@ -117,7 +117,7 @@ setup(name='nilmdb',
|
|||||||
'python-dateutil',
|
'python-dateutil',
|
||||||
'pytz',
|
'pytz',
|
||||||
'psutil >= 0.3.0',
|
'psutil >= 0.3.0',
|
||||||
'requests >= 1.1.0, < 2.0.0',
|
'requests >= 1.1.0',
|
||||||
'progressbar >= 2.2',
|
'progressbar >= 2.2',
|
||||||
],
|
],
|
||||||
packages = [ 'nilmdb',
|
packages = [ 'nilmdb',
|
||||||
|
@@ -690,40 +690,15 @@ class TestClient(object):
|
|||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
def test_client_12_persistent(self):
|
def test_client_12_persistent(self):
|
||||||
# Check that connections are persistent when they should be.
|
# Check that connections are NOT persistent. Rather than trying
|
||||||
# This is pretty hard to test; we have to poke deep into
|
# to verify this at the TCP level, just make sure that the response
|
||||||
# the Requests library.
|
# contained a "Connection: close" header.
|
||||||
with nilmdb.client.Client(url = testurl) as c:
|
with nilmdb.client.Client(url = testurl) as c:
|
||||||
def connections():
|
|
||||||
try:
|
|
||||||
poolmanager = c.http._last_response.connection.poolmanager
|
|
||||||
pool = poolmanager.pools[('http','localhost',32180)]
|
|
||||||
return (pool.num_connections, pool.num_requests)
|
|
||||||
except Exception:
|
|
||||||
raise SkipTest("can't get connection info")
|
|
||||||
|
|
||||||
# First request makes a connection
|
|
||||||
c.stream_create("/persist/test", "uint16_1")
|
c.stream_create("/persist/test", "uint16_1")
|
||||||
eq_(connections(), (1, 1))
|
eq_(c.http._last_response.headers["Connection"], "close")
|
||||||
|
|
||||||
# Non-generator
|
|
||||||
c.stream_list("/persist/test")
|
|
||||||
eq_(connections(), (1, 2))
|
|
||||||
c.stream_list("/persist/test")
|
|
||||||
eq_(connections(), (1, 3))
|
|
||||||
|
|
||||||
# Generators
|
|
||||||
for x in c.stream_intervals("/persist/test"):
|
|
||||||
pass
|
|
||||||
eq_(connections(), (1, 4))
|
|
||||||
for x in c.stream_intervals("/persist/test"):
|
|
||||||
pass
|
|
||||||
eq_(connections(), (1, 5))
|
|
||||||
|
|
||||||
# Clean up
|
|
||||||
c.stream_remove("/persist/test")
|
|
||||||
c.stream_destroy("/persist/test")
|
c.stream_destroy("/persist/test")
|
||||||
eq_(connections(), (1, 7))
|
eq_(c.http._last_response.headers["Connection"], "close")
|
||||||
|
|
||||||
def test_client_13_timestamp_rounding(self):
|
def test_client_13_timestamp_rounding(self):
|
||||||
# Test potentially bad timestamps (due to floating point
|
# Test potentially bad timestamps (due to floating point
|
||||||
|
@@ -21,13 +21,17 @@ from testutil.helpers import *
|
|||||||
|
|
||||||
testdb = "tests/cmdline-testdb"
|
testdb = "tests/cmdline-testdb"
|
||||||
|
|
||||||
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
|
def server_start(max_results = None,
|
||||||
|
max_removals = None,
|
||||||
|
max_int_removals = None,
|
||||||
|
bulkdata_args = {}):
|
||||||
global test_server, test_db
|
global test_server, test_db
|
||||||
# Start web app on a custom port
|
# Start web app on a custom port
|
||||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||||
testdb,
|
testdb,
|
||||||
max_results = max_results,
|
max_results = max_results,
|
||||||
max_removals = max_removals,
|
max_removals = max_removals,
|
||||||
|
max_int_removals = max_int_removals,
|
||||||
bulkdata_args = bulkdata_args)
|
bulkdata_args = bulkdata_args)
|
||||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||||
port = 32180, stoppable = False,
|
port = 32180, stoppable = False,
|
||||||
@@ -880,14 +884,26 @@ class TestCmdline(object):
|
|||||||
self.ok("destroy -R /newton/prep") # destroy again
|
self.ok("destroy -R /newton/prep") # destroy again
|
||||||
|
|
||||||
def test_14_remove_files(self):
|
def test_14_remove_files(self):
|
||||||
# Test BulkData's ability to remove when data is split into
|
# Limit max_removals, to cover more functionality.
|
||||||
# multiple files. Should be a fairly comprehensive test of
|
|
||||||
# remove functionality.
|
|
||||||
# Also limit max_removals, to cover more functionality.
|
|
||||||
server_stop()
|
server_stop()
|
||||||
server_start(max_removals = 4321,
|
server_start(max_removals = 4321,
|
||||||
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||||
"files_per_dir" : 3 })
|
"files_per_dir" : 3 })
|
||||||
|
self.do_remove_files()
|
||||||
|
self.ok("destroy -R /newton/prep") # destroy again
|
||||||
|
|
||||||
|
def test_14b_remove_files_maxint(self):
|
||||||
|
# Limit max_int_removals, to cover more functionality.
|
||||||
|
server_stop()
|
||||||
|
server_start(max_int_removals = 1,
|
||||||
|
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||||
|
"files_per_dir" : 3 })
|
||||||
|
self.do_remove_files()
|
||||||
|
|
||||||
|
def do_remove_files(self):
|
||||||
|
# Test BulkData's ability to remove when data is split into
|
||||||
|
# multiple files. Should be a fairly comprehensive test of
|
||||||
|
# remove functionality.
|
||||||
|
|
||||||
# Insert data. Just for fun, insert out of order
|
# Insert data. Just for fun, insert out of order
|
||||||
self.ok("create /newton/prep float32_8")
|
self.ok("create /newton/prep float32_8")
|
||||||
|
Reference in New Issue
Block a user