"""Client utilities for accessing NILM database via HTTP""" from __future__ import absolute_import import nilmdb from nilmdb.printf import * import time import sys import re import os import json import urlparse import urllib2 from urllib2 import urlopen, HTTPError import pycurl class ClientError(Exception): def __init__(self, code, message): Exception.__init__(self, message) self.code = code def __str__(self): return sprintf("[%03d] %s", self.code, self.message) class MyCurl(object): def __init__(self, baseurl = ""): """If baseurl is supplied, all other functions that take a URL can be given a relative URL instead.""" self.baseurl = baseurl self.curl = pycurl.Curl() self.curl.setopt(pycurl.SSL_VERIFYHOST, 2) self.curl.setopt(pycurl.FOLLOWLOCATION, 1) self.curl.setopt(pycurl.MAXREDIRS, 5) def _setup_url(self, url, params): url = urlparse.urljoin(self.baseurl, url) if params: url = urlparse.urljoin(url, "?" + urllib.urlencode(params, True)) self.curl.setopt(pycurl.URL, url) return url def getjson(self, url, params = None): """Simple GET that returns JSON string""" self._setup_url(url, params) self._body = "" def body_callback(x): self._body += x self.curl.setopt(pycurl.WRITEFUNCTION, body_callback) try: self.curl.perform() except pycurl.error as e: raise ClientError(500, e[1]) code = self.curl.getinfo(pycurl.RESPONSE_CODE) if code != 200: raise ClientError(code, "HTTP error") return json.loads(self._body) class Client(object): def __init__(self, url): self.curl = MyCurl(url) def version(self): return self.curl.getjson("version") def stream_list(self, path = None, layout = None): params = {} if path is not None: params["path"] = path if layout is not None: params["layout"] = layout return self.curl.getjson("stream/list", params) def stream_get_metadata(self, path, keys = None): params = { "path": path } if keys is not None: params["key"] = keys return self.curl.getjson("stream/get_metadata", params) def stream_set_metadata(self): raise NotImplementedError def stream_update_metadata(self): raise NotImplementedError def stream_create(self): raise NotImplementedError def stream_insert(self, path): params = { "path": path } return self.curl.getjson("stream/insert", params) # eq_(db.stream_list(), []) # # Bad path # with assert_raises(ValueError): # db.stream_create("foo/bar/baz", "PrepData") # with assert_raises(ValueError): # db.stream_create("/foo", "PrepData") # # Bad layout type # with assert_raises(KeyError): # db.stream_create("/newton/prep", "NoSuchLayout") # # Bad index columns # with assert_raises(KeyError): # db.stream_create("/newton/prep", "PrepData", ["nonexistant"]) # db.stream_create("/newton/prep", "PrepData") # db.stream_create("/newton/raw", "RawData") # db.stream_create("/newton/zzz/rawnotch", "RawNotchedData") # # Verify we got 3 streams # eq_(db.stream_list(), [ ("/newton/prep", "PrepData"), # ("/newton/raw", "RawData"), # ("/newton/zzz/rawnotch", "RawNotchedData") # ]) # # Match just one type # eq_(db.stream_list(layout="RawData"), [ ("/newton/raw", "RawData") ]) # # Verify that columns were made right # eq_(len(db.h5file.getNode("/newton/prep").cols), 9) # eq_(len(db.h5file.getNode("/newton/raw").cols), 7) # eq_(len(db.h5file.getNode("/newton/zzz/rawnotch").cols), 10) # assert(db.h5file.getNode("/newton/prep").colindexed["timestamp"]) # assert(not db.h5file.getNode("/newton/prep").colindexed["p1"]) # # Set / get metadata # eq_(db.stream_get_metadata("/newton/prep"), {}) # eq_(db.stream_get_metadata("/newton/raw"), {}) # meta1 = { "description": "The Data", # "v_scale": "1.234" } # meta2 = { "description": "The Data" } # meta3 = { "v_scale": "1.234" } # db.stream_set_metadata("/newton/prep", meta1) # db.stream_update_metadata("/newton/prep", {}) # db.stream_update_metadata("/newton/raw", meta2) # db.stream_update_metadata("/newton/raw", meta3) # eq_(db.stream_get_metadata("/newton/prep"), meta1) # eq_(db.stream_get_metadata("/newton/raw"), meta1) # streams = getjson("/stream/list") # eq_(streams, [ # ['/newton/prep', 'PrepData'], # ['/newton/raw', 'RawData'], # ['/newton/zzz/rawnotch', 'RawNotchedData'], # ]) # streams = getjson("/stream/list?layout=RawData") # eq_(streams, [['/newton/raw', 'RawData']]) # streams = getjson("/stream/list?layout=NoSuchLayout") # eq_(streams, []) # with assert_raises(HTTPError) as e: # getjson("/stream/get_metadata?path=foo") # eq_(e.exception.code, 404) # data = getjson("/stream/get_metadata?path=/newton/prep") # eq_(data, {'description': 'The Data', 'v_scale': '1.234'}) # data = getjson("/stream/get_metadata?path=/newton/prep" # "&key=v_scale") # eq_(data, {'v_scale': '1.234'}) # data = getjson("/stream/get_metadata?path=/newton/prep" # "&key=v_scale&key=description") # eq_(data, {'description': 'The Data', 'v_scale': '1.234'}) # data = getjson("/stream/get_metadata?path=/newton/prep" # "&key=v_scale&key=foo") # eq_(data, {'foo': None, 'v_scale': '1.234'}) # data = getjson("/stream/get_metadata?path=/newton/prep" # "&key=foo") # eq_(data, {'foo': None}) # def test_insert(self): # # invalid path first # with assert_raises(HTTPError) as e: # getjson("/stream/insert?path=/newton/") # eq_(e.exception.code, 404) # # XXX TODO # with assert_raises(HTTPError) as e: # getjson("/stream/insert?path=/newton/prep") # eq_(e.exception.code, 501)