- # -*- coding: utf-8 -*-
-
- import nilmdb
- from nilmdb.utils.printf import *
- from nilmdb.utils import timestamper
- from nilmdb.client import ClientError, ServerError
- from nilmdb.utils import datetime_tz
-
- from nose.tools import *
- from nose.tools import assert_raises
- import itertools
- import distutils.version
- import os
- import sys
- import threading
- import cStringIO
- import simplejson as json
- import unittest
- import warnings
- import resource
- import time
-
- from testutil.helpers import *
-
- testdb = "tests/client-testdb"
-
- def setup_module():
- global test_server, test_db
- # Clear out DB
- recursive_unlink(testdb)
-
- # Start web app on a custom port
- test_db = nilmdb.NilmDB(testdb, sync = False)
- test_server = nilmdb.Server(test_db, host = "127.0.0.1",
- port = 12380, stoppable = False,
- fast_shutdown = True,
- force_traceback = False)
- test_server.start(blocking = False)
-
- def teardown_module():
- global test_server, test_db
- # Close web app
- test_server.stop()
- test_db.close()
-
- class TestClient(object):
-
- def test_client_01_basic(self):
- # Test a fake host
- client = nilmdb.Client(url = "http://localhost:1/")
- with assert_raises(nilmdb.client.ServerError):
- client.version()
- client.close()
-
- # Trigger same error with a PUT request
- client = nilmdb.Client(url = "http://localhost:1/")
- with assert_raises(nilmdb.client.ServerError):
- client.version()
- client.close()
-
- # Then a fake URL on a real host
- client = nilmdb.Client(url = "http://localhost:12380/fake/")
- with assert_raises(nilmdb.client.ClientError):
- client.version()
- client.close()
-
- # Now a real URL with no http:// prefix
- client = nilmdb.Client(url = "localhost:12380")
- version = client.version()
- client.close()
-
- # Now use the real URL
- client = nilmdb.Client(url = "http://localhost:12380/")
- version = client.version()
- eq_(distutils.version.LooseVersion(version),
- distutils.version.LooseVersion(test_server.version))
-
- # Bad URLs should give 404, not 500
- with assert_raises(ClientError):
- client.http.get("/stream/create")
- client.close()
-
- def test_client_02_createlist(self):
- # Basic stream tests, like those in test_nilmdb:test_stream
- client = nilmdb.Client(url = "http://localhost:12380/")
-
- # Database starts empty
- eq_(client.stream_list(), [])
-
- # Bad path
- with assert_raises(ClientError):
- client.stream_create("foo/bar/baz", "PrepData")
- with assert_raises(ClientError):
- client.stream_create("/foo", "PrepData")
- # Bad layout type
- with assert_raises(ClientError):
- client.stream_create("/newton/prep", "NoSuchLayout")
-
- # Create three streams
- client.stream_create("/newton/prep", "PrepData")
- client.stream_create("/newton/raw", "RawData")
- client.stream_create("/newton/zzz/rawnotch", "RawNotchedData")
-
- # Verify we got 3 streams
- eq_(client.stream_list(), [ ["/newton/prep", "PrepData"],
- ["/newton/raw", "RawData"],
- ["/newton/zzz/rawnotch", "RawNotchedData"]
- ])
- # Match just one type or one path
- eq_(client.stream_list(layout="RawData"),
- [ ["/newton/raw", "RawData"] ])
- eq_(client.stream_list(path="/newton/raw"),
- [ ["/newton/raw", "RawData"] ])
-
- # Try messing with resource limits to trigger errors and get
- # more coverage. Here, make it so we can only create files 1
- # byte in size, which will trigger an IOError in the server when
- # we create a table.
- limit = resource.getrlimit(resource.RLIMIT_FSIZE)
- resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1]))
- with assert_raises(ServerError) as e:
- client.stream_create("/newton/hello", "RawData")
- resource.setrlimit(resource.RLIMIT_FSIZE, limit)
-
- client.close()
-
- def test_client_03_metadata(self):
- client = nilmdb.Client(url = "http://localhost:12380/")
-
- # Set / get metadata
- eq_(client.stream_get_metadata("/newton/prep"), {})
- eq_(client.stream_get_metadata("/newton/raw"), {})
- meta1 = { "description": "The Data",
- "v_scale": "1.234" }
- meta2 = { "description": "The Data" }
- meta3 = { "v_scale": "1.234" }
- client.stream_set_metadata("/newton/prep", meta1)
- client.stream_update_metadata("/newton/prep", {})
- client.stream_update_metadata("/newton/raw", meta2)
- client.stream_update_metadata("/newton/raw", meta3)
- eq_(client.stream_get_metadata("/newton/prep"), meta1)
- eq_(client.stream_get_metadata("/newton/raw"), meta1)
- eq_(client.stream_get_metadata("/newton/raw",
- [ "description" ] ), meta2)
- eq_(client.stream_get_metadata("/newton/raw",
- [ "description", "v_scale" ] ), meta1)
-
- # missing key
- eq_(client.stream_get_metadata("/newton/raw", "descr"),
- { "descr": None })
- eq_(client.stream_get_metadata("/newton/raw", [ "descr" ]),
- { "descr": None })
-
- # test wrong types (list instead of dict)
- with assert_raises(ClientError):
- client.stream_set_metadata("/newton/prep", [1,2,3])
- with assert_raises(ClientError):
- client.stream_update_metadata("/newton/prep", [1,2,3])
- client.close()
-
- def test_client_04_insert(self):
- client = nilmdb.Client(url = "http://localhost:12380/")
-
- datetime_tz.localtz_set("America/New_York")
-
- testfile = "tests/data/prep-20120323T1000"
- start = datetime_tz.datetime_tz.smartparse("20120323T1000")
- start = start.totimestamp()
- rate = 120
-
- # First try a nonexistent path
- data = timestamper.TimestamperRate(testfile, start, 120)
- with assert_raises(ClientError) as e:
- result = client.stream_insert("/newton/no-such-path", data)
- in_("404 Not Found", str(e.exception))
-
- # Now try reversed timestamps
- data = timestamper.TimestamperRate(testfile, start, 120)
- data = reversed(list(data))
- with assert_raises(ClientError) as e:
- result = client.stream_insert("/newton/prep", data)
- in_("400 Bad Request", str(e.exception))
- in_("timestamp is not monotonically increasing", str(e.exception))
-
- # Now try empty data (no server request made)
- empty = cStringIO.StringIO("")
- data = timestamper.TimestamperRate(empty, start, 120)
- result = client.stream_insert("/newton/prep", data)
- eq_(result, None)
-
- # Try forcing a server request with empty data
- with assert_raises(ClientError) as e:
- client.http.put("stream/insert", "", { "path": "/newton/prep",
- "start": 0, "end": 0 })
- in_("400 Bad Request", str(e.exception))
- in_("no data provided", str(e.exception))
-
- # Specify start/end (starts too late)
- data = timestamper.TimestamperRate(testfile, start, 120)
- with assert_raises(ClientError) as e:
- result = client.stream_insert("/newton/prep", data,
- start + 5, start + 120)
- in_("400 Bad Request", str(e.exception))
- in_("Data timestamp 1332511200.0 < start time 1332511205.0",
- str(e.exception))
-
- # Specify start/end (ends too early)
- data = timestamper.TimestamperRate(testfile, start, 120)
- with assert_raises(ClientError) as e:
- result = client.stream_insert("/newton/prep", data,
- start, start + 1)
- in_("400 Bad Request", str(e.exception))
- # Client chunks the input, so the exact timestamp here might change
- # if the chunk positions change.
- in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
- str(e.exception))
-
- # Now do the real load
- data = timestamper.TimestamperRate(testfile, start, 120)
- result = client.stream_insert("/newton/prep", data,
- start, start + 119.999777)
- eq_(result, "ok")
-
- # Verify the intervals. Should be just one, even if the data
- # was inserted in chunks, due to nilmdb interval concatenation.
- intervals = list(client.stream_intervals("/newton/prep"))
- eq_(intervals, [[start, start + 119.999777]])
-
- # Try some overlapping data -- just insert it again
- data = timestamper.TimestamperRate(testfile, start, 120)
- with assert_raises(ClientError) as e:
- result = client.stream_insert("/newton/prep", data)
- in_("400 Bad Request", str(e.exception))
- in_("verlap", str(e.exception))
-
- client.close()
-
- def test_client_05_extractremove(self):
- # Misc tests for extract and remove. Most of them are in test_cmdline.
- client = nilmdb.Client(url = "http://localhost:12380/")
-
- for x in client.stream_extract("/newton/prep", 123, 123):
- raise AssertionError("shouldn't be any data for this request")
-
- with assert_raises(ClientError) as e:
- client.stream_remove("/newton/prep", 123, 120)
-
- client.close()
-
- def test_client_06_generators(self):
- # A lot of the client functionality is already tested by test_cmdline,
- # but this gets a bit more coverage that cmdline misses.
- client = nilmdb.Client(url = "http://localhost:12380/")
-
- # Trigger a client error in generator
- start = datetime_tz.datetime_tz.smartparse("20120323T2000")
- end = datetime_tz.datetime_tz.smartparse("20120323T1000")
- for function in [ client.stream_intervals, client.stream_extract ]:
- with assert_raises(ClientError) as e:
- function("/newton/prep",
- start.totimestamp(),
- end.totimestamp()).next()
- in_("400 Bad Request", str(e.exception))
- in_("end before start", str(e.exception))
-
- # Trigger a curl error in generator
- with assert_raises(ServerError) as e:
- client.http.get_gen("http://nosuchurl/").next()
-
- # Trigger a curl error in generator
- with assert_raises(ServerError) as e:
- client.http.get_gen("http://nosuchurl/").next()
-
- # Check non-json version of string output
- eq_(json.loads(client.http.get("/stream/list",retjson=False)),
- client.http.get("/stream/list",retjson=True))
-
- # Check non-json version of generator output
- for (a, b) in itertools.izip(
- client.http.get_gen("/stream/list",retjson=False),
- client.http.get_gen("/stream/list",retjson=True)):
- eq_(json.loads(a), b)
-
- # Check PUT with generator out
- with assert_raises(ClientError) as e:
- client.http.put_gen("stream/insert", "",
- { "path": "/newton/prep",
- "start": 0, "end": 0 }).next()
- in_("400 Bad Request", str(e.exception))
- in_("no data provided", str(e.exception))
-
- # Check 404 for missing streams
- for function in [ client.stream_intervals, client.stream_extract ]:
- with assert_raises(ClientError) as e:
- function("/no/such/stream").next()
- in_("404 Not Found", str(e.exception))
- in_("No such stream", str(e.exception))
-
- client.close()
-
- def test_client_07_headers(self):
- # Make sure that /stream/intervals and /stream/extract
- # properly return streaming, chunked, text/plain response.
- # Pokes around in client.http internals a bit to look at the
- # response headers.
-
- client = nilmdb.Client(url = "http://localhost:12380/")
- http = client.http
-
- # Use a warning rather than returning a test failure, so that we can
- # still disable chunked responses for debugging.
-
- # Intervals
- x = http.get("stream/intervals", { "path": "/newton/prep" },
- retjson=False)
- lines_(x, 1)
- if "Transfer-Encoding: chunked" not in http._headers:
- warnings.warn("Non-chunked HTTP response for /stream/intervals")
- if "Content-Type: text/plain;charset=utf-8" not in http._headers:
- raise AssertionError("/stream/intervals is not text/plain:\n" +
- http._headers)
-
- # Extract
- x = http.get("stream/extract",
- { "path": "/newton/prep",
- "start": "123",
- "end": "123" }, retjson=False)
- if "Transfer-Encoding: chunked" not in http._headers:
- warnings.warn("Non-chunked HTTP response for /stream/extract")
- if "Content-Type: text/plain;charset=utf-8" not in http._headers:
- raise AssertionError("/stream/extract is not text/plain:\n" +
- http._headers)
-
- # Make sure Access-Control-Allow-Origin gets set
- if "Access-Control-Allow-Origin: " not in http._headers:
- raise AssertionError("No Access-Control-Allow-Origin (CORS) "
- "header in /stream/extract response:\n" +
- http._headers)
-
- client.close()
-
- def test_client_08_unicode(self):
- # Basic Unicode tests
- client = nilmdb.Client(url = "http://localhost:12380/")
-
- # Delete streams that exist
- for stream in client.stream_list():
- client.stream_destroy(stream[0])
-
- # Database is empty
- eq_(client.stream_list(), [])
-
- # Create Unicode stream, match it
- raw = [ u"/düsseldorf/raw", u"uint16_6" ]
- prep = [ u"/düsseldorf/prep", u"uint16_6" ]
- client.stream_create(*raw)
- eq_(client.stream_list(), [raw])
- eq_(client.stream_list(layout=raw[1]), [raw])
- eq_(client.stream_list(path=raw[0]), [raw])
- client.stream_create(*prep)
- eq_(client.stream_list(), [prep, raw])
-
- # Set / get metadata with Unicode keys and values
- eq_(client.stream_get_metadata(raw[0]), {})
- eq_(client.stream_get_metadata(prep[0]), {})
- meta1 = { u"alpha": u"α",
- u"β": u"beta" }
- meta2 = { u"alpha": u"α" }
- meta3 = { u"β": u"beta" }
- client.stream_set_metadata(prep[0], meta1)
- client.stream_update_metadata(prep[0], {})
- client.stream_update_metadata(raw[0], meta2)
- client.stream_update_metadata(raw[0], meta3)
- eq_(client.stream_get_metadata(prep[0]), meta1)
- eq_(client.stream_get_metadata(raw[0]), meta1)
- eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
- eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
-
- client.close()
-
- def test_client_09_closing(self):
- # Make sure we actually close sockets correctly. New
- # connections will block for a while if they're not, since the
- # server will stop accepting new connections.
- for test in [1, 2]:
- start = time.time()
- for i in range(50):
- if time.time() - start > 15:
- raise AssertionError("Connections seem to be blocking... "
- "probably not closing properly.")
- if test == 1:
- # explicit close
- client = nilmdb.Client(url = "http://localhost:12380/")
- with assert_raises(ClientError) as e:
- client.stream_remove("/newton/prep", 123, 120)
- client.close() # remove this to see the failure
- elif test == 2:
- # use the context manager
- with nilmdb.Client(url = "http://localhost:12380/") as c:
- with assert_raises(ClientError) as e:
- c.stream_remove("/newton/prep", 123, 120)
|