Jim Paris
da72fc9777
We do this by creating a new requests.Session object for each request, sending a "Connection: close" request header, and then explicitly marking the connection for close after the response is read. This is to avoid a longstanding race condition with HTTP keepalive and server timeouts. Due to data processing, capture, etc, requests may be separated by an arbitrary delay. If this delay is shorter than the server's KeepAliveTimeout, the same connection is used. If the delay is longer, a new connection is used. If the delay is the same, however, the request may be sent on the old connection at the exact same time that the server closes it. Typically, the client sees the connection as closing between the request and the response, which leads to "httplib.BadStatusLine" errors. This patch avoids the race condition entirely by not using persistent connections. Another solution may be to detect those errors and retry the connection, resending the request. However, the race condition could potentially show up in other places, like a closed connection during the request body, not after. Such an error could also be a legitimate network condition or problem. This solution should be more reliable, and the overhead of each new connection will hopefully be minimal for typical workloads.
729 lines
30 KiB
Python
729 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import nilmdb.server
|
||
import nilmdb.client
|
||
|
||
from nilmdb.utils.printf import *
|
||
from nilmdb.utils import timestamper
|
||
from nilmdb.client import ClientError, ServerError
|
||
from nilmdb.utils import datetime_tz
|
||
|
||
from nose.plugins.skip import SkipTest
|
||
from nose.tools import *
|
||
from nose.tools import assert_raises
|
||
import itertools
|
||
import distutils.version
|
||
import os
|
||
import sys
|
||
import threading
|
||
import cStringIO
|
||
import simplejson as json
|
||
import unittest
|
||
import warnings
|
||
import resource
|
||
import time
|
||
import re
|
||
import struct
|
||
|
||
from testutil.helpers import *
|
||
|
||
testdb = "tests/client-testdb"
|
||
testurl = "http://localhost:32180/"
|
||
|
||
def setup_module():
|
||
global test_server, test_db
|
||
# Clear out DB
|
||
recursive_unlink(testdb)
|
||
|
||
# Start web app on a custom port
|
||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||
port = 32180, stoppable = False,
|
||
fast_shutdown = True,
|
||
force_traceback = True)
|
||
test_server.start(blocking = False)
|
||
|
||
def teardown_module():
|
||
global test_server, test_db
|
||
# Close web app
|
||
test_server.stop()
|
||
test_db.close()
|
||
|
||
class TestClient(object):
|
||
|
||
def test_client_01_basic(self):
|
||
# Test a fake host
|
||
client = nilmdb.client.Client(url = "http://localhost:1/")
|
||
with assert_raises(nilmdb.client.ServerError):
|
||
client.version()
|
||
client.close()
|
||
|
||
# Then a fake URL on a real host
|
||
client = nilmdb.client.Client(url = "http://localhost:32180/fake/")
|
||
with assert_raises(nilmdb.client.ClientError):
|
||
client.version()
|
||
client.close()
|
||
|
||
# Now a real URL with no http:// prefix
|
||
client = nilmdb.client.Client(url = "localhost:32180")
|
||
version = client.version()
|
||
client.close()
|
||
|
||
# Now use the real URL
|
||
client = nilmdb.client.Client(url = testurl)
|
||
version = client.version()
|
||
eq_(distutils.version.LooseVersion(version),
|
||
distutils.version.LooseVersion(test_server.version))
|
||
|
||
# Bad URLs should give 404, not 500
|
||
with assert_raises(ClientError):
|
||
client.http.get("/stream/create")
|
||
client.close()
|
||
|
||
def test_client_02_createlist(self):
|
||
# Basic stream tests, like those in test_nilmdb:test_stream
|
||
client = nilmdb.client.Client(url = testurl)
|
||
|
||
# Database starts empty
|
||
eq_(client.stream_list(), [])
|
||
|
||
# Bad path
|
||
with assert_raises(ClientError):
|
||
client.stream_create("foo/bar/baz", "float32_8")
|
||
with assert_raises(ClientError):
|
||
client.stream_create("/foo", "float32_8")
|
||
# Bad layout type
|
||
with assert_raises(ClientError):
|
||
client.stream_create("/newton/prep", "NoSuchLayout")
|
||
|
||
# Bad method types
|
||
with assert_raises(ClientError):
|
||
client.http.put("/stream/list","")
|
||
# Try a bunch of times to make sure the request body is getting consumed
|
||
for x in range(10):
|
||
with assert_raises(ClientError):
|
||
client.http.post("/stream/list")
|
||
client = nilmdb.client.Client(url = testurl)
|
||
|
||
# Create four streams
|
||
client.stream_create("/newton/prep", "float32_8")
|
||
client.stream_create("/newton/raw", "uint16_6")
|
||
client.stream_create("/newton/zzz/rawnotch2", "uint16_9")
|
||
client.stream_create("/newton/zzz/rawnotch11", "uint16_9")
|
||
|
||
# Verify we got 4 streams in the right order
|
||
eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
|
||
["/newton/raw", "uint16_6"],
|
||
["/newton/zzz/rawnotch2", "uint16_9"],
|
||
["/newton/zzz/rawnotch11", "uint16_9"]
|
||
])
|
||
|
||
# Match just one type or one path
|
||
eq_(client.stream_list(layout="uint16_6"),
|
||
[ ["/newton/raw", "uint16_6"] ])
|
||
eq_(client.stream_list(path="/newton/raw"),
|
||
[ ["/newton/raw", "uint16_6"] ])
|
||
|
||
# Try messing with resource limits to trigger errors and get
|
||
# more coverage. Here, make it so we can only create files 1
|
||
# byte in size, which will trigger an IOError in the server when
|
||
# we create a table.
|
||
limit = resource.getrlimit(resource.RLIMIT_FSIZE)
|
||
resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1]))
|
||
with assert_raises(ServerError) as e:
|
||
client.stream_create("/newton/hello", "uint16_6")
|
||
resource.setrlimit(resource.RLIMIT_FSIZE, limit)
|
||
|
||
client.close()
|
||
|
||
def test_client_03_metadata(self):
|
||
client = nilmdb.client.Client(url = testurl)
|
||
|
||
# Set / get metadata
|
||
eq_(client.stream_get_metadata("/newton/prep"), {})
|
||
eq_(client.stream_get_metadata("/newton/raw"), {})
|
||
meta1 = { "description": "The Data",
|
||
"v_scale": "1.234" }
|
||
meta2 = { "description": "The Data" }
|
||
meta3 = { "v_scale": "1.234" }
|
||
client.stream_set_metadata("/newton/prep", meta1)
|
||
client.stream_update_metadata("/newton/prep", {})
|
||
client.stream_update_metadata("/newton/raw", meta2)
|
||
client.stream_update_metadata("/newton/raw", meta3)
|
||
eq_(client.stream_get_metadata("/newton/prep"), meta1)
|
||
eq_(client.stream_get_metadata("/newton/raw"), meta1)
|
||
eq_(client.stream_get_metadata("/newton/raw",
|
||
[ "description" ] ), meta2)
|
||
eq_(client.stream_get_metadata("/newton/raw",
|
||
[ "description", "v_scale" ] ), meta1)
|
||
|
||
# missing key
|
||
eq_(client.stream_get_metadata("/newton/raw", "descr"),
|
||
{ "descr": None })
|
||
eq_(client.stream_get_metadata("/newton/raw", [ "descr" ]),
|
||
{ "descr": None })
|
||
|
||
# test wrong types (list instead of dict)
|
||
with assert_raises(ClientError):
|
||
client.stream_set_metadata("/newton/prep", [1,2,3])
|
||
with assert_raises(ClientError):
|
||
client.stream_update_metadata("/newton/prep", [1,2,3])
|
||
|
||
# test wrong types (dict of non-strings)
|
||
# numbers are OK; they'll get converted to strings
|
||
client.stream_set_metadata("/newton/prep", { "hello": 1234 })
|
||
# anything else is not
|
||
with assert_raises(ClientError):
|
||
client.stream_set_metadata("/newton/prep", { "world": { 1: 2 } })
|
||
with assert_raises(ClientError):
|
||
client.stream_set_metadata("/newton/prep", { "world": [ 1, 2 ] })
|
||
|
||
client.close()
|
||
|
||
def test_client_04_insert(self):
|
||
client = nilmdb.client.Client(url = testurl)
|
||
|
||
# Limit _max_data to 1 MB, since our test file is 1.5 MB
|
||
old_max_data = nilmdb.client.client.StreamInserter._max_data
|
||
nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
|
||
|
||
datetime_tz.localtz_set("America/New_York")
|
||
|
||
testfile = "tests/data/prep-20120323T1000"
|
||
start = nilmdb.utils.time.parse_time("20120323T1000")
|
||
rate = 120
|
||
|
||
# First try a nonexistent path
|
||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||
with assert_raises(ClientError) as e:
|
||
result = client.stream_insert("/newton/no-such-path", data)
|
||
in_("404 Not Found", str(e.exception))
|
||
|
||
# Now try reversed timestamps
|
||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||
data = reversed(list(data))
|
||
with assert_raises(ClientError) as e:
|
||
result = client.stream_insert("/newton/prep", data)
|
||
in_("400 Bad Request", str(e.exception))
|
||
in2_("timestamp is not monotonically increasing",
|
||
"start must precede end", str(e.exception))
|
||
|
||
# Now try empty data (no server request made)
|
||
empty = cStringIO.StringIO("")
|
||
data = timestamper.TimestamperRate(empty, start, 120)
|
||
result = client.stream_insert("/newton/prep", data)
|
||
eq_(result, None)
|
||
|
||
# It's OK to insert an empty interval
|
||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||
"start": 1, "end": 2 })
|
||
eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
|
||
client.stream_remove("/newton/prep")
|
||
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||
|
||
# Timestamps can be negative too
|
||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||
"start": -2, "end": -1 })
|
||
eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
|
||
client.stream_remove("/newton/prep")
|
||
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||
|
||
# Intervals that end at zero shouldn't be any different
|
||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||
"start": -1, "end": 0 })
|
||
eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
|
||
client.stream_remove("/newton/prep")
|
||
eq_(list(client.stream_intervals("/newton/prep")), [])
|
||
|
||
# Try forcing a server request with equal start and end
|
||
with assert_raises(ClientError) as e:
|
||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||
"start": 0, "end": 0 })
|
||
in_("400 Bad Request", str(e.exception))
|
||
in_("start must precede end", str(e.exception))
|
||
|
||
# Invalid times in HTTP request
|
||
with assert_raises(ClientError) as e:
|
||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||
"start": "asdf", "end": 0 })
|
||
in_("400 Bad Request", str(e.exception))
|
||
in_("invalid start", str(e.exception))
|
||
|
||
with assert_raises(ClientError) as e:
|
||
client.http.put("stream/insert", "", { "path": "/newton/prep",
|
||
"start": 0, "end": "asdf" })
|
||
in_("400 Bad Request", str(e.exception))
|
||
in_("invalid end", str(e.exception))
|
||
|
||
# Good content type
|
||
with assert_raises(ClientError) as e:
|
||
client.http.put("stream/insert", "",
|
||
{ "path": "xxxx", "start": 0, "end": 1,
|
||
"binary": 1 },
|
||
binary = True)
|
||
in_("No such stream", str(e.exception))
|
||
|
||
# Bad content type
|
||
with assert_raises(ClientError) as e:
|
||
client.http.put("stream/insert", "",
|
||
{ "path": "xxxx", "start": 0, "end": 1,
|
||
"binary": 1 },
|
||
binary = False)
|
||
in_("Content type must be application/octet-stream", str(e.exception))
|
||
|
||
# Specify start/end (starts too late)
|
||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||
with assert_raises(ClientError) as e:
|
||
result = client.stream_insert("/newton/prep", data,
|
||
start + 5000000, start + 120000000)
|
||
in_("400 Bad Request", str(e.exception))
|
||
in_("Data timestamp 1332511200000000 < start time 1332511205000000",
|
||
str(e.exception))
|
||
|
||
# Specify start/end (ends too early)
|
||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||
with assert_raises(ClientError) as e:
|
||
result = client.stream_insert("/newton/prep", data,
|
||
start, start + 1000000)
|
||
in_("400 Bad Request", str(e.exception))
|
||
# Client chunks the input, so the exact timestamp here might change
|
||
# if the chunk positions change.
|
||
assert(re.search("Data timestamp 13325[0-9]+ "
|
||
">= end time 1332511201000000", str(e.exception))
|
||
is not None)
|
||
|
||
# Now do the real load
|
||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||
result = client.stream_insert("/newton/prep", data,
|
||
start, start + 119999777)
|
||
|
||
# Verify the intervals. Should be just one, even if the data
|
||
# was inserted in chunks, due to nilmdb interval concatenation.
|
||
intervals = list(client.stream_intervals("/newton/prep"))
|
||
eq_(intervals, [[start, start + 119999777]])
|
||
|
||
# Try some overlapping data -- just insert it again
|
||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||
with assert_raises(ClientError) as e:
|
||
result = client.stream_insert("/newton/prep", data)
|
||
in_("400 Bad Request", str(e.exception))
|
||
in_("verlap", str(e.exception))
|
||
|
||
nilmdb.client.client.StreamInserter._max_data = old_max_data
|
||
client.close()
|
||
|
||
def test_client_05_extractremove(self):
|
||
# Misc tests for extract and remove. Most of them are in test_cmdline.
|
||
client = nilmdb.client.Client(url = testurl)
|
||
|
||
for x in client.stream_extract("/newton/prep",
|
||
999123000000, 999124000000):
|
||
raise AssertionError("shouldn't be any data for this request")
|
||
|
||
with assert_raises(ClientError) as e:
|
||
client.stream_remove("/newton/prep", 123000000, 120000000)
|
||
|
||
# Test count
|
||
eq_(client.stream_count("/newton/prep"), 14400)
|
||
|
||
# Test binary output
|
||
with assert_raises(ClientError) as e:
|
||
list(client.stream_extract("/newton/prep",
|
||
markup = True, binary = True))
|
||
with assert_raises(ClientError) as e:
|
||
list(client.stream_extract("/newton/prep",
|
||
count = True, binary = True))
|
||
data = "".join(client.stream_extract("/newton/prep", binary = True))
|
||
# Quick check using struct
|
||
unpacker = struct.Struct("<qffffffff")
|
||
out = []
|
||
for i in range(14400):
|
||
out.append(unpacker.unpack_from(data, i * unpacker.size))
|
||
eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375,
|
||
2525.169921875, 8350.83984375, 3724.699951171875,
|
||
1355.3399658203125, 2039.0))
|
||
|
||
# Just get some coverage
|
||
with assert_raises(ClientError) as e:
|
||
client.http.post("/stream/remove", { "path": "/none" })
|
||
|
||
client.close()
|
||
|
||
def test_client_06_generators(self):
|
||
# A lot of the client functionality is already tested by test_cmdline,
|
||
# but this gets a bit more coverage that cmdline misses.
|
||
client = nilmdb.client.Client(url = testurl)
|
||
|
||
# Trigger a client error in generator
|
||
start = nilmdb.utils.time.parse_time("20120323T2000")
|
||
end = nilmdb.utils.time.parse_time("20120323T1000")
|
||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||
with assert_raises(ClientError) as e:
|
||
function("/newton/prep", start, end).next()
|
||
in_("400 Bad Request", str(e.exception))
|
||
in_("start must precede end", str(e.exception))
|
||
|
||
# Trigger a curl error in generator
|
||
with assert_raises(ServerError) as e:
|
||
client.http.get_gen("http://nosuchurl.example.com./").next()
|
||
|
||
# Check 404 for missing streams
|
||
for function in [ client.stream_intervals, client.stream_extract ]:
|
||
with assert_raises(ClientError) as e:
|
||
function("/no/such/stream").next()
|
||
in_("404 Not Found", str(e.exception))
|
||
in_("No such stream", str(e.exception))
|
||
|
||
client.close()
|
||
|
||
def test_client_07_headers(self):
|
||
# Make sure that /stream/intervals and /stream/extract
|
||
# properly return streaming, chunked, text/plain response.
|
||
# Pokes around in client.http internals a bit to look at the
|
||
# response headers.
|
||
|
||
client = nilmdb.client.Client(url = testurl)
|
||
http = client.http
|
||
|
||
# Use a warning rather than returning a test failure for the
|
||
# transfer-encoding, so that we can still disable chunked
|
||
# responses for debugging.
|
||
|
||
def headers():
|
||
h = ""
|
||
for (k, v) in http._last_response.headers.items():
|
||
h += k + ": " + v + "\n"
|
||
return h.lower()
|
||
|
||
# Intervals
|
||
x = http.get("stream/intervals", { "path": "/newton/prep" })
|
||
if "transfer-encoding: chunked" not in headers():
|
||
warnings.warn("Non-chunked HTTP response for /stream/intervals")
|
||
if "content-type: application/x-json-stream" not in headers():
|
||
raise AssertionError("/stream/intervals content type "
|
||
"is not application/x-json-stream:\n" +
|
||
headers())
|
||
|
||
# Extract
|
||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||
"start": "123", "end": "124" })
|
||
if "transfer-encoding: chunked" not in headers():
|
||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||
if "content-type: text/plain;charset=utf-8" not in headers():
|
||
raise AssertionError("/stream/extract is not text/plain:\n" +
|
||
headers())
|
||
|
||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||
"start": "123", "end": "124",
|
||
"binary": "1" })
|
||
if "transfer-encoding: chunked" not in headers():
|
||
warnings.warn("Non-chunked HTTP response for /stream/extract")
|
||
if "content-type: application/octet-stream" not in headers():
|
||
raise AssertionError("/stream/extract is not binary:\n" +
|
||
headers())
|
||
|
||
# Make sure a binary of "0" is really off
|
||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||
"start": "123", "end": "124",
|
||
"binary": "0" })
|
||
if "content-type: application/octet-stream" in headers():
|
||
raise AssertionError("/stream/extract is not text:\n" +
|
||
headers())
|
||
|
||
# Invalid parameters
|
||
with assert_raises(ClientError) as e:
|
||
x = http.get("stream/extract", { "path": "/newton/prep",
|
||
"start": "123", "end": "124",
|
||
"binary": "asdfasfd" })
|
||
in_("can't parse parameter", str(e.exception))
|
||
|
||
client.close()
|
||
|
||
def test_client_08_unicode(self):
|
||
# Try both with and without posting JSON
|
||
for post_json in (False, True):
|
||
# Basic Unicode tests
|
||
client = nilmdb.client.Client(url = testurl, post_json = post_json)
|
||
|
||
# Delete streams that exist
|
||
for stream in client.stream_list():
|
||
client.stream_remove(stream[0])
|
||
client.stream_destroy(stream[0])
|
||
|
||
# Database is empty
|
||
eq_(client.stream_list(), [])
|
||
|
||
# Create Unicode stream, match it
|
||
raw = [ u"/düsseldorf/raw", u"uint16_6" ]
|
||
prep = [ u"/düsseldorf/prep", u"uint16_6" ]
|
||
client.stream_create(*raw)
|
||
eq_(client.stream_list(), [raw])
|
||
eq_(client.stream_list(layout=raw[1]), [raw])
|
||
eq_(client.stream_list(path=raw[0]), [raw])
|
||
client.stream_create(*prep)
|
||
eq_(client.stream_list(), [prep, raw])
|
||
|
||
# Set / get metadata with Unicode keys and values
|
||
eq_(client.stream_get_metadata(raw[0]), {})
|
||
eq_(client.stream_get_metadata(prep[0]), {})
|
||
meta1 = { u"alpha": u"α",
|
||
u"β": u"beta" }
|
||
meta2 = { u"alpha": u"α" }
|
||
meta3 = { u"β": u"beta" }
|
||
client.stream_set_metadata(prep[0], meta1)
|
||
client.stream_update_metadata(prep[0], {})
|
||
client.stream_update_metadata(raw[0], meta2)
|
||
client.stream_update_metadata(raw[0], meta3)
|
||
eq_(client.stream_get_metadata(prep[0]), meta1)
|
||
eq_(client.stream_get_metadata(raw[0]), meta1)
|
||
eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
|
||
eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
|
||
|
||
client.close()
|
||
|
||
def test_client_09_closing(self):
|
||
# Make sure we actually close sockets correctly. New
|
||
# connections will block for a while if they're not, since the
|
||
# server will stop accepting new connections.
|
||
for test in [1, 2]:
|
||
start = time.time()
|
||
for i in range(50):
|
||
if time.time() - start > 15:
|
||
raise AssertionError("Connections seem to be blocking... "
|
||
"probably not closing properly.")
|
||
if test == 1:
|
||
# explicit close
|
||
client = nilmdb.client.Client(url = testurl)
|
||
with assert_raises(ClientError) as e:
|
||
client.stream_remove("/newton/prep", 123, 120)
|
||
client.close() # remove this to see the failure
|
||
elif test == 2:
|
||
# use the context manager
|
||
with nilmdb.client.Client(url = testurl) as c:
|
||
with assert_raises(ClientError) as e:
|
||
c.stream_remove("/newton/prep", 123, 120)
|
||
|
||
def test_client_10_context(self):
|
||
# Test using the client's stream insertion context manager to
|
||
# insert data.
|
||
client = nilmdb.client.Client(testurl)
|
||
|
||
client.stream_create("/context/test", "uint16_1")
|
||
with client.stream_insert_context("/context/test") as ctx:
|
||
# override _max_data to trigger frequent server updates
|
||
ctx._max_data = 15
|
||
|
||
ctx.insert("1000 1\n")
|
||
|
||
ctx.insert("1010 ")
|
||
ctx.insert("1\n1020 1")
|
||
ctx.insert("")
|
||
ctx.insert("\n1030 1\n")
|
||
|
||
ctx.insert("1040 1\n")
|
||
ctx.insert("# hello\n")
|
||
ctx.insert(" # hello\n")
|
||
ctx.insert(" 1050 1\n")
|
||
ctx.finalize()
|
||
|
||
ctx.insert("1070 1\n")
|
||
ctx.update_end(1080)
|
||
ctx.finalize()
|
||
ctx.update_start(1090)
|
||
ctx.insert("1100 1\n")
|
||
ctx.insert("1110 1\n")
|
||
ctx.send()
|
||
ctx.insert("1120 1\n")
|
||
ctx.insert("1130 1\n")
|
||
ctx.insert("1140 1\n")
|
||
ctx.update_end(1160)
|
||
ctx.insert("1150 1\n")
|
||
ctx.update_end(1170)
|
||
ctx.insert("1160 1\n")
|
||
ctx.update_end(1180)
|
||
ctx.insert("1170 1" +
|
||
" # this is super long" * 100 +
|
||
"\n")
|
||
ctx.finalize()
|
||
ctx.insert("# this is super long" * 100)
|
||
|
||
with assert_raises(ClientError):
|
||
with client.stream_insert_context("/context/test",
|
||
1000, 2000) as ctx:
|
||
ctx.insert("1180 1\n")
|
||
|
||
with assert_raises(ClientError):
|
||
with client.stream_insert_context("/context/test",
|
||
2000, 3000) as ctx:
|
||
ctx.insert("1180 1\n")
|
||
|
||
with assert_raises(ClientError):
|
||
with client.stream_insert_context("/context/test") as ctx:
|
||
ctx.insert("bogus data\n")
|
||
|
||
with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
|
||
# make sure our override wasn't permanent
|
||
ne_(ctx._max_data, 15)
|
||
ctx.insert("2250 1\n")
|
||
ctx.finalize()
|
||
|
||
with assert_raises(ClientError):
|
||
with client.stream_insert_context("/context/test",
|
||
3000, 4000) as ctx:
|
||
ctx.insert("3010 1\n")
|
||
ctx.insert("3020 2\n")
|
||
ctx.insert("3030 3\n")
|
||
ctx.insert("3040 4\n")
|
||
ctx.insert("3040 4\n") # non-monotonic after a few lines
|
||
ctx.finalize()
|
||
|
||
eq_(list(client.stream_intervals("/context/test")),
|
||
[ [ 1000, 1051 ],
|
||
[ 1070, 1080 ],
|
||
[ 1090, 1180 ],
|
||
[ 2000, 3000 ] ])
|
||
|
||
# destroy stream (try without removing data first)
|
||
with assert_raises(ClientError):
|
||
client.stream_destroy("/context/test")
|
||
client.stream_remove("/context/test")
|
||
client.stream_destroy("/context/test")
|
||
client.close()
|
||
|
||
def test_client_11_emptyintervals(self):
|
||
# Empty intervals are ok! If recording detection events
|
||
# by inserting rows into the database, we want to be able to
|
||
# have an interval where no events occurred. Test them here.
|
||
client = nilmdb.client.Client(testurl)
|
||
client.stream_create("/empty/test", "uint16_1")
|
||
|
||
def info():
|
||
result = []
|
||
for interval in list(client.stream_intervals("/empty/test")):
|
||
result.append((client.stream_count("/empty/test", *interval),
|
||
interval))
|
||
return result
|
||
|
||
eq_(info(), [])
|
||
|
||
# Insert a region with just a few points
|
||
with client.stream_insert_context("/empty/test") as ctx:
|
||
ctx.update_start(100)
|
||
ctx.insert("140 1\n")
|
||
ctx.insert("150 1\n")
|
||
ctx.insert("160 1\n")
|
||
ctx.update_end(200)
|
||
ctx.finalize()
|
||
|
||
eq_(info(), [(3, [100, 200])])
|
||
|
||
# Delete chunk, which will leave one data point and two intervals
|
||
client.stream_remove("/empty/test", 145, 175)
|
||
eq_(info(), [(1, [100, 145]),
|
||
(0, [175, 200])])
|
||
|
||
# Try also creating a completely empty interval from scratch,
|
||
# in a few different ways.
|
||
client.stream_insert("/empty/test", "", 300, 350)
|
||
client.stream_insert("/empty/test", [], 400, 450)
|
||
with client.stream_insert_context("/empty/test", 500, 550):
|
||
pass
|
||
|
||
# If enough timestamps aren't provided, empty streams won't be created.
|
||
client.stream_insert("/empty/test", [])
|
||
with client.stream_insert_context("/empty/test"):
|
||
pass
|
||
client.stream_insert("/empty/test", [], start = 600)
|
||
with client.stream_insert_context("/empty/test", start = 700):
|
||
pass
|
||
client.stream_insert("/empty/test", [], end = 850)
|
||
with client.stream_insert_context("/empty/test", end = 950):
|
||
pass
|
||
|
||
# Equal start and end is OK as long as there's no data
|
||
with client.stream_insert_context("/empty/test", start=9, end=9):
|
||
pass
|
||
|
||
# Try various things that might cause problems
|
||
with client.stream_insert_context("/empty/test", 1000, 1050) as ctx:
|
||
ctx.finalize() # inserts [1000, 1050]
|
||
ctx.finalize() # nothing
|
||
ctx.finalize() # nothing
|
||
ctx.insert("1100 1\n")
|
||
ctx.finalize() # inserts [1100, 1101]
|
||
ctx.update_start(1199)
|
||
ctx.insert("1200 1\n")
|
||
ctx.update_end(1250)
|
||
ctx.finalize() # inserts [1199, 1250]
|
||
ctx.update_start(1299)
|
||
ctx.finalize() # nothing
|
||
ctx.update_end(1350)
|
||
ctx.finalize() # nothing
|
||
ctx.update_start(1400)
|
||
ctx.insert("# nothing!\n")
|
||
ctx.update_end(1450)
|
||
ctx.finalize()
|
||
ctx.update_start(1500)
|
||
ctx.insert("# nothing!")
|
||
ctx.update_end(1550)
|
||
ctx.finalize()
|
||
ctx.insert("# nothing!\n" * 10)
|
||
ctx.finalize()
|
||
# implicit last finalize inserts [1400, 1450]
|
||
|
||
# Check everything
|
||
eq_(info(), [(1, [100, 145]),
|
||
(0, [175, 200]),
|
||
(0, [300, 350]),
|
||
(0, [400, 450]),
|
||
(0, [500, 550]),
|
||
(0, [1000, 1050]),
|
||
(1, [1100, 1101]),
|
||
(1, [1199, 1250]),
|
||
(0, [1400, 1450]),
|
||
(0, [1500, 1550]),
|
||
])
|
||
|
||
# Clean up
|
||
client.stream_remove("/empty/test")
|
||
client.stream_destroy("/empty/test")
|
||
client.close()
|
||
|
||
def test_client_12_persistent(self):
|
||
# Check that connections are NOT persistent. Rather than trying
|
||
# to verify this at the TCP level, just make sure that the response
|
||
# contained a "Connection: close" header.
|
||
with nilmdb.client.Client(url = testurl) as c:
|
||
c.stream_create("/persist/test", "uint16_1")
|
||
eq_(c.http._last_response.headers["Connection"], "close")
|
||
|
||
c.stream_destroy("/persist/test")
|
||
eq_(c.http._last_response.headers["Connection"], "close")
|
||
|
||
def test_client_13_timestamp_rounding(self):
|
||
# Test potentially bad timestamps (due to floating point
|
||
# roundoff etc). The server will round floating point values
|
||
# to the nearest int.
|
||
client = nilmdb.client.Client(testurl)
|
||
|
||
client.stream_create("/rounding/test", "uint16_1")
|
||
with client.stream_insert_context("/rounding/test",
|
||
100000000, 200000000.1) as ctx:
|
||
ctx.insert("100000000.1 1\n")
|
||
ctx.insert("150000000.00003 1\n")
|
||
ctx.insert("199999999.4 1\n")
|
||
eq_(list(client.stream_intervals("/rounding/test")),
|
||
[ [ 100000000, 200000000 ] ])
|
||
|
||
with assert_raises(ClientError):
|
||
with client.stream_insert_context("/rounding/test",
|
||
200000000, 300000000) as ctx:
|
||
ctx.insert("200000000 1\n")
|
||
ctx.insert("250000000 1\n")
|
||
# Server will round this and give an error on finalize()
|
||
ctx.insert("299999999.99 1\n")
|
||
|
||
client.stream_remove("/rounding/test")
|
||
client.stream_destroy("/rounding/test")
|
||
client.close()
|