Browse Source

Explicitly avoid HTTP/1.1 persistent connections (keep-alive)

We do this by creating a new requests.Session object for each request,
sending a "Connection: close" request header, and then explicitly
marking the connection for close after the response is read.

This is to avoid a longstanding race condition with HTTP keepalive
and server timeouts.  Due to data processing, capture, etc, requests
may be separated by an arbitrary delay.  If this delay is shorter
than the server's KeepAliveTimeout, the same connection is used.
If the delay is longer, a new connection is used.  If the delay is
the same, however, the request may be sent on the old connection at
the exact same time that the server closes it.  Typically, the
client sees the connection as closing between the request and the
response, which leads to "httplib.BadStatusLine" errors.

This patch avoids the race condition entirely by not using persistent

Another solution may be to detect those errors and retry the
connection, resending the request.  However, the race condition could
potentially show up in other places, like a closed connection during
the request body, not after.  Such an error could also be a legitimate
network condition or problem.  This solution should be more reliable,
and the overhead of each new connection will hopefully be minimal for
typical workloads.
Jim Paris 8 years ago
2 changed files with 31 additions and 41 deletions
  1. +25
  2. +6

+ 25
- 10
nilmdb/client/ View File

@@ -18,10 +18,8 @@ class HTTPClient(object):
reparsed = urlparse.urlparse("http://" + baseurl).geturl()
self.baseurl = reparsed.rstrip('/') + '/'

# Build Requests session object, enable SSL verification
# Note whether we want SSL verification
self.verify_ssl = verify_ssl
self.session = requests.Session()
self.session.verify = True

# Saved response, so that tests can verify a few things.
self._last_response = {}
@@ -59,17 +57,34 @@ class HTTPClient(object):
raise Error(**args)

def close(self):

def _do_req(self, method, url, query_data, body_data, stream, headers):
url = urlparse.urljoin(self.baseurl, url)
response = self.session.request(method, url,
params = query_data,
data = body_data,
stream = stream,
headers = headers,
verify = self.verify_ssl)
# Create a new session, ensure we send "Connection: close",
# and explicitly close connection after the transfer.
# This is to avoid HTTP/1.1 persistent connections
# (keepalive), because they have fundamental race
# conditions when there are delays between requests:
# a new request may be sent at the same instant that the
# server decides to timeout the connection.
session = requests.Session()
if headers is None:
headers = {}
headers["Connection"] = "close"
response = session.request(method, url,
params = query_data,
data = body_data,
stream = stream,
headers = headers,
verify = self.verify_ssl)

# Close the connection. If it's a generator (stream =
# True), the requests library shouldn't actually close the
# HTTP connection until all data has been read from the
# response.
except requests.RequestException as e:
raise ServerError(status = "502 Error", url = url,
message = str(e.message))

+ 6
- 31
tests/ View File

@@ -690,40 +690,15 @@ class TestClient(object):

def test_client_12_persistent(self):
# Check that connections are persistent when they should be.
# This is pretty hard to test; we have to poke deep into
# the Requests library.
# Check that connections are NOT persistent. Rather than trying
# to verify this at the TCP level, just make sure that the response
# contained a "Connection: close" header.
with nilmdb.client.Client(url = testurl) as c:
def connections():
poolmanager = c.http._last_response.connection.poolmanager
pool = poolmanager.pools[('http','localhost',32180)]
return (pool.num_connections, pool.num_requests)
except Exception:
raise SkipTest("can't get connection info")

# First request makes a connection
c.stream_create("/persist/test", "uint16_1")
eq_(connections(), (1, 1))

# Non-generator
eq_(connections(), (1, 2))
eq_(connections(), (1, 3))

# Generators
for x in c.stream_intervals("/persist/test"):
eq_(connections(), (1, 4))
for x in c.stream_intervals("/persist/test"):
eq_(connections(), (1, 5))

# Clean up
eq_(c.http._last_response.headers["Connection"], "close")

eq_(connections(), (1, 7))
eq_(c.http._last_response.headers["Connection"], "close")

def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point