Compare commits

...

7 Commits

Author SHA1 Message Date
ab9a327130 Remove upper limit on requests library version 2014-02-18 16:36:34 -05:00
da72fc9777 Explicitly avoid HTTP/1.1 persistent connections (keep-alive)
We do this by creating a new requests.Session object for each request,
sending a "Connection: close" request header, and then explicitly
marking the connection for close after the response is read.

This is to avoid a longstanding race condition with HTTP keepalive
and server timeouts.  Due to data processing, capture, etc, requests
may be separated by an arbitrary delay.  If this delay is shorter
than the server's KeepAliveTimeout, the same connection is used.
If the delay is longer, a new connection is used.  If the delay is
the same, however, the request may be sent on the old connection at
the exact same time that the server closes it.  Typically, the
client sees the connection as closing between the request and the
response, which leads to "httplib.BadStatusLine" errors.

This patch avoids the race condition entirely by not using persistent
connections.

Another solution may be to detect those errors and retry the
connection, resending the request.  However, the race condition could
potentially show up in other places, like a closed connection during
the request body, not after.  Such an error could also be a legitimate
network condition or problem.  This solution should be more reliable,
and the overhead of each new connection will hopefully be minimal for
typical workloads.
2014-02-18 14:36:58 -05:00
a01cb4132d Add test for limited interval removal 2014-02-14 15:53:02 -05:00
7c3da2fe44 Limit the max number of intervals we remove in one stream_remove call 2014-02-14 15:52:53 -05:00
f0e06dc436 Allow newer versions of Requests library 2014-02-14 15:13:34 -05:00
ddc0eb4264 Coalesce calls to table.remove during stream_remove; significant speedup for degenerate cases 2014-02-14 15:13:17 -05:00
0a22db3965 Ignore exceptions during __del__ handlers, which may get called during shutdown 2014-02-14 15:07:30 -05:00
7 changed files with 98 additions and 56 deletions

View File

@@ -18,10 +18,8 @@ class HTTPClient(object):
reparsed = urlparse.urlparse("http://" + baseurl).geturl() reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed.rstrip('/') + '/' self.baseurl = reparsed.rstrip('/') + '/'
# Build Requests session object, enable SSL verification # Note whether we want SSL verification
self.verify_ssl = verify_ssl self.verify_ssl = verify_ssl
self.session = requests.Session()
self.session.verify = True
# Saved response, so that tests can verify a few things. # Saved response, so that tests can verify a few things.
self._last_response = {} self._last_response = {}
@@ -59,17 +57,34 @@ class HTTPClient(object):
raise Error(**args) raise Error(**args)
def close(self): def close(self):
self.session.close() pass
def _do_req(self, method, url, query_data, body_data, stream, headers): def _do_req(self, method, url, query_data, body_data, stream, headers):
url = urlparse.urljoin(self.baseurl, url) url = urlparse.urljoin(self.baseurl, url)
try: try:
response = self.session.request(method, url, # Create a new session, ensure we send "Connection: close",
params = query_data, # and explicitly close connection after the transfer.
data = body_data, # This is to avoid HTTP/1.1 persistent connections
stream = stream, # (keepalive), because they have fundamental race
headers = headers, # conditions when there are delays between requests:
verify = self.verify_ssl) # a new request may be sent at the same instant that the
# server decides to timeout the connection.
session = requests.Session()
if headers is None:
headers = {}
headers["Connection"] = "close"
response = session.request(method, url,
params = query_data,
data = body_data,
stream = stream,
headers = headers,
verify = self.verify_ssl)
# Close the connection. If it's a generator (stream =
# True), the requests library shouldn't actually close the
# HTTP connection until all data has been read from the
# response.
session.close()
except requests.RequestException as e: except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url, raise ServerError(status = "502 Error", url = url,
message = str(e.message)) message = str(e.message))

View File

@@ -83,8 +83,11 @@ _sql_schema_updates = {
class NilmDB(object): class NilmDB(object):
verbose = 0 verbose = 0
def __init__(self, basepath, max_results=None, def __init__(self, basepath,
max_removals=None, bulkdata_args=None): max_results=None,
max_removals=None,
max_int_removals=None,
bulkdata_args=None):
"""Initialize NilmDB at the given basepath. """Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing: Other arguments are for debugging / testing:
@@ -92,7 +95,10 @@ class NilmDB(object):
stream_intervals or stream_extract response. stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once 'max_removals' is the max rows to delete at once
in stream_move. in stream_remove.
'max_int_removals' is the max intervals to delete
at once in stream_remove.
'bulkdata_args' is kwargs for the bulkdata module. 'bulkdata_args' is kwargs for the bulkdata module.
""" """
@@ -134,6 +140,9 @@ class NilmDB(object):
# Remove up to this many rows per call to stream_remove. # Remove up to this many rows per call to stream_remove.
self.max_removals = max_removals or 1048576 self.max_removals = max_removals or 1048576
# Remove up to this many intervals per call to stream_remove.
self.max_int_removals = max_int_removals or 4096
def get_basepath(self): def get_basepath(self):
return self.basepath return self.basepath
@@ -643,13 +652,22 @@ class NilmDB(object):
to_remove = Interval(start, end) to_remove = Interval(start, end)
removed = 0 removed = 0
remaining = self.max_removals remaining = self.max_removals
int_remaining = self.max_int_removals
restart = None restart = None
# Can't remove intervals from within the iterator, so we need to # Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now. # remember what's currently in the intersection now.
all_candidates = list(intervals.intersection(to_remove, orig = True)) all_candidates = list(intervals.intersection(to_remove, orig = True))
remove_start = None
remove_end = None
for (dbint, orig) in all_candidates: for (dbint, orig) in all_candidates:
# Stop if we've hit the max number of interval removals
if int_remaining <= 0:
restart = dbint.start
break
# Find row start and end # Find row start and end
row_start = self._find_start(table, dbint) row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint) row_end = self._find_end(table, dbint)
@@ -670,14 +688,29 @@ class NilmDB(object):
# Remove interval from the database # Remove interval from the database
self._remove_interval(stream_id, orig, dbint) self._remove_interval(stream_id, orig, dbint)
# Remove data from the underlying table storage # Remove data from the underlying table storage,
table.remove(row_start, row_end) # coalescing adjacent removals to reduce the number of calls
# to table.remove.
if remove_end == row_start:
# Extend our coalesced region
remove_end = row_end
else:
# Perform previous removal, then save this one
if remove_end is not None:
table.remove(remove_start, remove_end)
remove_start = row_start
remove_end = row_end
# Count how many were removed # Count how many were removed
removed += row_end - row_start removed += row_end - row_start
remaining -= row_end - row_start remaining -= row_end - row_start
int_remaining -= 1
if restart is not None: if restart is not None:
break break
# Perform any final coalesced removal
if remove_end is not None:
table.remove(remove_start, remove_end)
return (removed, restart) return (removed, restart)

View File

@@ -28,10 +28,13 @@ def must_close(errorfile = sys.stderr, wrap_verify = False):
@wrap_class_method @wrap_class_method
def __del__(orig, self, *args, **kwargs): def __del__(orig, self, *args, **kwargs):
if "_must_close" in self.__dict__: try:
fprintf(errorfile, "error: %s.close() wasn't called!\n", if "_must_close" in self.__dict__:
self.__class__.__name__) fprintf(errorfile, "error: %s.close() wasn't called!\n",
return orig(self, *args, **kwargs) self.__class__.__name__)
return orig(self, *args, **kwargs)
except: # pragma: no cover
pass
@wrap_class_method @wrap_class_method
def close(orig, self, *args, **kwargs): def close(orig, self, *args, **kwargs):

View File

@@ -120,7 +120,7 @@ def serializer_proxy(obj_or_type):
try: try:
self.__call_queue.put((None, None, None, None)) self.__call_queue.put((None, None, None, None))
self.__thread.join() self.__thread.join()
except TypeError: # pragma: no cover except: # pragma: no cover
pass pass
return SerializerObjectProxy(obj_or_type) return SerializerObjectProxy(obj_or_type)

View File

@@ -117,7 +117,7 @@ setup(name='nilmdb',
'python-dateutil', 'python-dateutil',
'pytz', 'pytz',
'psutil >= 0.3.0', 'psutil >= 0.3.0',
'requests >= 1.1.0, < 2.0.0', 'requests >= 1.1.0',
'progressbar >= 2.2', 'progressbar >= 2.2',
], ],
packages = [ 'nilmdb', packages = [ 'nilmdb',

View File

@@ -690,40 +690,15 @@ class TestClient(object):
client.close() client.close()
def test_client_12_persistent(self): def test_client_12_persistent(self):
# Check that connections are persistent when they should be. # Check that connections are NOT persistent. Rather than trying
# This is pretty hard to test; we have to poke deep into # to verify this at the TCP level, just make sure that the response
# the Requests library. # contained a "Connection: close" header.
with nilmdb.client.Client(url = testurl) as c: with nilmdb.client.Client(url = testurl) as c:
def connections():
try:
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except Exception:
raise SkipTest("can't get connection info")
# First request makes a connection
c.stream_create("/persist/test", "uint16_1") c.stream_create("/persist/test", "uint16_1")
eq_(connections(), (1, 1)) eq_(c.http._last_response.headers["Connection"], "close")
# Non-generator
c.stream_list("/persist/test")
eq_(connections(), (1, 2))
c.stream_list("/persist/test")
eq_(connections(), (1, 3))
# Generators
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 4))
for x in c.stream_intervals("/persist/test"):
pass
eq_(connections(), (1, 5))
# Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test") c.stream_destroy("/persist/test")
eq_(connections(), (1, 7)) eq_(c.http._last_response.headers["Connection"], "close")
def test_client_13_timestamp_rounding(self): def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point # Test potentially bad timestamps (due to floating point

View File

@@ -21,13 +21,17 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb" testdb = "tests/cmdline-testdb"
def server_start(max_results = None, max_removals = None, bulkdata_args = {}): def server_start(max_results = None,
max_removals = None,
max_int_removals = None,
bulkdata_args = {}):
global test_server, test_db global test_server, test_db
# Start web app on a custom port # Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb, testdb,
max_results = max_results, max_results = max_results,
max_removals = max_removals, max_removals = max_removals,
max_int_removals = max_int_removals,
bulkdata_args = bulkdata_args) bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False, port = 32180, stoppable = False,
@@ -880,14 +884,26 @@ class TestCmdline(object):
self.ok("destroy -R /newton/prep") # destroy again self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self): def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into # Limit max_removals, to cover more functionality.
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Also limit max_removals, to cover more functionality.
server_stop() server_stop()
server_start(max_removals = 4321, server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 }) "files_per_dir" : 3 })
self.do_remove_files()
self.ok("destroy -R /newton/prep") # destroy again
def test_14b_remove_files_maxint(self):
# Limit max_int_removals, to cover more functionality.
server_stop()
server_start(max_int_removals = 1,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
self.do_remove_files()
def do_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Insert data. Just for fun, insert out of order # Insert data. Just for fun, insert out of order
self.ok("create /newton/prep float32_8") self.ok("create /newton/prep float32_8")