Compare commits

..

9 Commits

Author SHA1 Message Date
ab9a327130 Remove upper limit on requests library version 2014-02-18 16:36:34 -05:00
da72fc9777 Explicitly avoid HTTP/1.1 persistent connections (keep-alive)
We do this by creating a new requests.Session object for each request,
sending a "Connection: close" request header, and then explicitly
marking the connection for close after the response is read.

This is to avoid a longstanding race condition with HTTP keepalive
and server timeouts.  Due to data processing, capture, etc, requests
may be separated by an arbitrary delay.  If this delay is shorter
than the server's KeepAliveTimeout, the same connection is used.
If the delay is longer, a new connection is used.  If the delay is
the same, however, the request may be sent on the old connection at
the exact same time that the server closes it.  Typically, the
client sees the connection as closing between the request and the
response, which leads to "httplib.BadStatusLine" errors.

This patch avoids the race condition entirely by not using persistent
connections.

Another solution may be to detect those errors and retry the
connection, resending the request.  However, the race condition could
potentially show up in other places, like a closed connection during
the request body, not after.  Such an error could also be a legitimate
network condition or problem.  This solution should be more reliable,
and the overhead of each new connection will hopefully be minimal for
typical workloads.
2014-02-18 14:36:58 -05:00
a01cb4132d Add test for limited interval removal 2014-02-14 15:53:02 -05:00
7c3da2fe44 Limit the max number of intervals we remove in one stream_remove call 2014-02-14 15:52:53 -05:00
f0e06dc436 Allow newer versions of Requests library 2014-02-14 15:13:34 -05:00
ddc0eb4264 Coalesce calls to table.remove during stream_remove; significant speedup for degenerate cases 2014-02-14 15:13:17 -05:00
0a22db3965 Ignore exceptions during __del__ handlers, which may get called during shutdown 2014-02-14 15:07:30 -05:00
8bb8f068de Catch harmless error seen in apache logs during shutdown 2014-02-04 19:50:46 -05:00
416902097d Fix crash in nilmdb-fsck if there are zero intervals, etc. 2014-02-04 19:38:01 -05:00
8 changed files with 104 additions and 57 deletions

View File

@@ -18,10 +18,8 @@ class HTTPClient(object):
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed.rstrip('/') + '/'
# Build Requests session object, enable SSL verification
# Note whether we want SSL verification
self.verify_ssl = verify_ssl
self.session = requests.Session()
self.session.verify = True
# Saved response, so that tests can verify a few things.
self._last_response = {}
@@ -59,17 +57,34 @@ class HTTPClient(object):
raise Error(**args)
def close(self):
self.session.close()
pass
def _do_req(self, method, url, query_data, body_data, stream, headers):
url = urlparse.urljoin(self.baseurl, url)
try:
response = self.session.request(method, url,
params = query_data,
data = body_data,
stream = stream,
headers = headers,
verify = self.verify_ssl)
# Create a new session, ensure we send "Connection: close",
# and explicitly close connection after the transfer.
# This is to avoid HTTP/1.1 persistent connections
# (keepalive), because they have fundamental race
# conditions when there are delays between requests:
# a new request may be sent at the same instant that the
# server decides to timeout the connection.
session = requests.Session()
if headers is None:
headers = {}
headers["Connection"] = "close"
response = session.request(method, url,
params = query_data,
data = body_data,
stream = stream,
headers = headers,
verify = self.verify_ssl)
# Close the connection. If it's a generator (stream =
# True), the requests library shouldn't actually close the
# HTTP connection until all data has been read from the
# response.
session.close()
except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url,
message = str(e.message))

View File

@@ -59,6 +59,8 @@ def retry_if_raised(exc, message = None, max_retries = 100):
class Progress(object):
def __init__(self, maxval):
if maxval == 0:
maxval = 1
self.bar = progressbar.ProgressBar(
maxval = maxval,
widgets = [ progressbar.Percentage(), ' ',

View File

@@ -83,8 +83,11 @@ _sql_schema_updates = {
class NilmDB(object):
verbose = 0
def __init__(self, basepath, max_results=None,
max_removals=None, bulkdata_args=None):
def __init__(self, basepath,
max_results=None,
max_removals=None,
max_int_removals=None,
bulkdata_args=None):
"""Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing:
@@ -92,7 +95,10 @@ class NilmDB(object):
stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once
in stream_move.
in stream_remove.
'max_int_removals' is the max intervals to delete
at once in stream_remove.
'bulkdata_args' is kwargs for the bulkdata module.
"""
@@ -134,6 +140,9 @@ class NilmDB(object):
# Remove up to this many rows per call to stream_remove.
self.max_removals = max_removals or 1048576
# Remove up to this many intervals per call to stream_remove.
self.max_int_removals = max_int_removals or 4096
def get_basepath(self):
return self.basepath
@@ -643,13 +652,22 @@ class NilmDB(object):
to_remove = Interval(start, end)
removed = 0
remaining = self.max_removals
int_remaining = self.max_int_removals
restart = None
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True))
remove_start = None
remove_end = None
for (dbint, orig) in all_candidates:
# Stop if we've hit the max number of interval removals
if int_remaining <= 0:
restart = dbint.start
break
# Find row start and end
row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint)
@@ -670,14 +688,29 @@ class NilmDB(object):
# Remove interval from the database
self._remove_interval(stream_id, orig, dbint)
# Remove data from the underlying table storage
table.remove(row_start, row_end)
# Remove data from the underlying table storage,
# coalescing adjacent removals to reduce the number of calls
# to table.remove.
if remove_end == row_start:
# Extend our coalesced region
remove_end = row_end
else:
# Perform previous removal, then save this one
if remove_end is not None:
table.remove(remove_start, remove_end)
remove_start = row_start
remove_end = row_end
# Count how many were removed
removed += row_end - row_start
remaining -= row_end - row_start
int_remaining -= 1
if restart is not None:
break
# Perform any final coalesced removal
if remove_end is not None:
table.remove(remove_start, remove_end)
return (removed, restart)

View File

@@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
@wrap_class_method
def __del__(orig, self, *args, **kwargs):
if "_must_close" in self.__dict__:
fprintf(errorfile, "error: %s.close() wasn't called!\n",
self.__class__.__name__)
return orig(self, *args, **kwargs)
try:
if "_must_close" in self.__dict__:
fprintf(errorfile, "error: %s.close() wasn't called!\n",
self.__class__.__name__)
return orig(self, *args, **kwargs)
except: # pragma: no cover
pass
@wrap_class_method
def close(orig, self, *args, **kwargs):

View File

@@ -117,7 +117,10 @@ def serializer_proxy(obj_or_type):
return ret
def __del__(self):
self.__call_queue.put((None, None, None, None))
self.__thread.join()
try:
self.__call_queue.put((None, None, None, None))
self.__thread.join()
except: # pragma: no cover
pass
return SerializerObjectProxy(obj_or_type)

View File

@@ -117,7 +117,7 @@ setup(name='nilmdb',
'python-dateutil',
'pytz',
'psutil >= 0.3.0',
'requests >= 1.1.0, < 2.0.0',
'requests >= 1.1.0',
'progressbar >= 2.2',
],
packages = [ 'nilmdb',

View File

@@ -690,40 +690,15 @@ class TestClient(object):
client.close()
def test_client_12_persistent(self):
# Check that connections are persistent when they should be.
# This is pretty hard to test; we have to poke deep into
# the Requests library.
# Check that connections are NOT persistent. Rather than trying
# to verify this at the TCP level, just make sure that the response
# contained a "Connection: close" header.
with nilmdb.client.Client(url = testurl) as c:
def connections():
try:
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except Exception:
raise SkipTest("can't get connection info")
# First request makes a connection
c.stream_create("/persist/test", "uint16_1")
eq_(connections(), (1, 1))
eq_(c.http._last_response.headers["Connection"], "close")
# Non-generator
c.stream_list("/persist/test")
eq_(connections(), (1, 2))
c.stream_list("/persist/test")
eq_(connections(), (1, 3))
# Generators
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 4))
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 5))
# Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test")
eq_(connections(), (1, 7))
eq_(c.http._last_response.headers["Connection"], "close")
def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point

View File

@@ -21,13 +21,17 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb"
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
def server_start(max_results = None,
max_removals = None,
max_int_removals = None,
bulkdata_args = {}):
global test_server, test_db
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb,
max_results = max_results,
max_removals = max_removals,
max_int_removals = max_int_removals,
bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
@@ -880,14 +884,26 @@ class TestCmdline(object):
self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Also limit max_removals, to cover more functionality.
# Limit max_removals, to cover more functionality.
server_stop()
server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
self.do_remove_files()
self.ok("destroy -R /newton/prep") # destroy again
def test_14b_remove_files_maxint(self):
# Limit max_int_removals, to cover more functionality.
server_stop()
server_start(max_int_removals = 1,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
self.do_remove_files()
def do_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Insert data. Just for fun, insert out of order
self.ok("create /newton/prep float32_8")