Browse Source

Implement Rocket.append_string() in C; misc cleanups along the way

This should more or less complete the rocket interface.
tags/nilmdb-1.3.0
Jim Paris 10 years ago
parent
commit
0047e0360a
4 changed files with 354 additions and 99 deletions
  1. +3
    -0
      Makefile
  2. +3
    -3
      nilmdb/server/bulkdata.py
  3. +2
    -3
      nilmdb/server/pyrocket.py
  4. +346
    -93
      nilmdb/server/rocket.c

+ 3
- 0
Makefile View File

@@ -14,6 +14,9 @@ sdist:
install:
python setup.py install

develop:
python setup.py develop

docs:
make -C docs



+ 3
- 3
nilmdb/server/bulkdata.py View File

@@ -13,8 +13,8 @@ import cPickle as pickle
import re
import sys

from . import pyrocket as rocket
#from . import rocket
#from . import pyrocket as rocket
from . import rocket

# Up to 256 open file descriptors at any given time.
# These variables are global so they can be used in the decorator arguments.
@@ -381,7 +381,7 @@ class Table(object):
"end time %s", linenum,
ftts(obj), ftts(end))
else:
err = str(obj)
err = sprintf("line %d: %s", linenum, str(obj))
raise ValueError("error parsing input data: " + err)
tot_rows += added_rows
except Exception:


+ 2
- 3
nilmdb/server/pyrocket.py View File

@@ -91,9 +91,8 @@ class Rocket(object):
end: end timestamp for interval
last_timestamp: last timestamp that was previously parsed

Raises ParseError((linenum, timestamp, ERR_TYPE)) if
timestamps are non-monotonic, outside the start/end interval,
etc.
Raises ParseError if timestamps are non-monotonic, outside the
start/end interval, etc.

On success, return a tuple with three values:
added_rows: how many rows were added from the file


+ 346
- 93
nilmdb/server/rocket.c View File

@@ -2,24 +2,81 @@
#include <structmember.h>
#include <endian.h>

// PyErr_NewExceptionWithDoc, PyModule_AddIntConstant
#include <stdint.h>

/* Values missing from stdint.h */
#define UINT8_MIN 0
#define UINT16_MIN 0
#define UINT32_MIN 0
#define UINT64_MIN 0

/* Marker values (if min == max, skip range check) */
#define FLOAT32_MIN 0
#define FLOAT32_MAX 0
#define FLOAT64_MIN 0
#define FLOAT64_MAX 0

/* Somewhat arbitrary, just so we can use fixed sizes for strings
etc. */
static const int MAX_LAYOUT_COUNT = 64;

/* Error object and constants */
static PyObject *ParseError;
typedef enum {
ERR_OTHER,
ERR_NON_MONOTONIC,
ERR_OUT_OF_INTERVAL,
} parseerror_code_t;
static void add_parseerror_codes(PyObject *module)
{
PyModule_AddIntMacro(module, ERR_OTHER);
PyModule_AddIntMacro(module, ERR_NON_MONOTONIC);
PyModule_AddIntMacro(module, ERR_OUT_OF_INTERVAL);
}

/* Helpers to raise ParseErrors. Use "return raise_str(...)" etc. */
static PyObject *raise_str(int linenum, int code, const char *string)
{
PyObject *o;
o = Py_BuildValue("(iis)", linenum, code, string);
if (o != NULL) {
PyErr_SetObject(ParseError, o);
Py_DECREF(o);
}
return NULL;
}
static PyObject *raise_num(int linenum, int code, double num)
{
PyObject *o;
o = Py_BuildValue("(iid)", linenum, code, num);
if (o != NULL) {
PyErr_SetObject(ParseError, o);
Py_DECREF(o);
}
return NULL;
}

/****
* Layout and type helpers
*/
typedef union {
int8_t i;
uint8_t u;
} union8_t;
typedef union {
int16_t i;
uint16_t u;
} union16_t;
typedef union {
int32_t i;
uint32_t u;
float f;
} union32_t;
typedef union {
int8_t i8[8];
uint8_t u8[8];
int16_t i16[4];
uint16_t u16[4];
int32_t i32[2];
uint32_t u32[2];
int64_t i64[1];
uint64_t u64[1];
float f[2];
double d[1];
} union_t;
int64_t i;
uint64_t u;
double d;
} union64_t;

typedef enum {
LAYOUT_TYPE_NONE,
@@ -53,6 +110,11 @@ struct {
{ NULL }
};

/****
* Object definition, init, etc
*/

/* Rocket object */
typedef struct {
PyObject_HEAD
layout_type_t layout_type;
@@ -62,6 +124,7 @@ typedef struct {
int file_size;
} Rocket;

/* Dealloc / new */
static void Rocket_dealloc(Rocket *self)
{
if (self->file) {
@@ -87,6 +150,7 @@ static PyObject *Rocket_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
return (PyObject *)self;
}

/* .__init__(layout, file) */
static int Rocket_init(Rocket *self, PyObject *args, PyObject *kwds)
{
const char *layout, *path;
@@ -141,6 +205,7 @@ static int Rocket_init(Rocket *self, PyObject *args, PyObject *kwds)
return 0;
}

/* .close() */
static PyObject *Rocket_close(Rocket *self)
{
if (self->file) {
@@ -151,6 +216,7 @@ static PyObject *Rocket_close(Rocket *self)
return Py_None;
}

/* .file_size property */
static PyObject *Rocket_get_file_size(Rocket *self)
{
if (!self->file) {
@@ -170,30 +236,38 @@ static PyObject *Rocket_get_file_size(Rocket *self)
return PyInt_FromLong(self->file_size);
}

static inline void write_pyobject(FILE *out, PyObject *val, layout_type_t type)
/****
* Append from iterator
*/

/* Helper for writing Python objects to the file */
static inline void append_pyobject(FILE *out, PyObject *val, layout_type_t type)
{
union_t u;
union8_t t8;
union16_t t16;
union32_t t32;
union64_t t64;
int ret = 0;

switch (type) {
#define CASE(type, pyconvert, pytype, disktype, htole, bytes) \
case LAYOUT_TYPE_##type: \
u.pytype[0] = pyconvert(val); \
pytype = pyconvert(val); \
if (PyErr_Occurred()) \
return; \
u.disktype[0] = htole(u.disktype[0]); \
ret = fwrite(&u.disktype[0], bytes, 1, out); \
disktype = htole(disktype); \
ret = fwrite(&disktype, bytes, 1, out); \
break
CASE(INT8, PyInt_AsLong, i8, u8, , 1);
CASE(UINT8, PyInt_AsLong, u8, u8, , 1);
CASE(INT16, PyInt_AsLong, i16, u16, htole16, 2);
CASE(UINT16, PyInt_AsLong, u16, u16, htole16, 2);
CASE(INT32, PyInt_AsLong, i32, u32, htole32, 4);
CASE(UINT32, PyInt_AsLong, u32, u32, htole32, 4);
CASE(INT64, PyInt_AsLong, i64, u64, htole64, 8);
CASE(UINT64, PyInt_AsLong, u64, u64, htole64, 8);
CASE(FLOAT32, PyFloat_AsDouble, f, u32, htole32, 4);
CASE(FLOAT64, PyFloat_AsDouble, d, u64, htole64, 8);
CASE(INT8, PyInt_AsLong, t8.i, t8.u, , 1);
CASE(UINT8, PyInt_AsLong, t8.u, t8.u, , 1);
CASE(INT16, PyInt_AsLong, t16.i, t16.u, htole16, 2);
CASE(UINT16, PyInt_AsLong, t16.u, t16.u, htole16, 2);
CASE(INT32, PyInt_AsLong, t32.i, t32.u, htole32, 4);
CASE(UINT32, PyInt_AsLong, t32.u, t32.u, htole32, 4);
CASE(INT64, PyInt_AsLong, t64.i, t64.u, htole64, 8);
CASE(UINT64, PyInt_AsLong, t64.u, t64.u, htole64, 8);
CASE(FLOAT32, PyFloat_AsDouble, t32.f, t32.u, htole32, 4);
CASE(FLOAT64, PyFloat_AsDouble, t64.d, t64.u, htole64, 8);
#undef CASE
default:
PyErr_SetString(PyExc_TypeError, "unknown type");
@@ -203,38 +277,7 @@ static inline void write_pyobject(FILE *out, PyObject *val, layout_type_t type)
PyErr_SetFromErrno(PyExc_OSError);
}
}

static inline void *read_pyobject(FILE *in, layout_type_t type)
{
union_t u;

switch (type) {
#define CASE(type, pyconvert, pytype, disktype, letoh, bytes) \
case LAYOUT_TYPE_##type: \
if (fread(&u.disktype[0], bytes, 1, in) <= 0) \
break; \
u.disktype[0] = letoh(u.disktype[0]); \
return pyconvert(u.pytype[0]); \
break
CASE(INT8, PyInt_FromLong, i8, u8, , 1);
CASE(UINT8, PyInt_FromLong, u8, u8, , 1);
CASE(INT16, PyInt_FromLong, i16, u16, le16toh, 2);
CASE(UINT16, PyInt_FromLong, u16, u16, le16toh, 2);
CASE(INT32, PyInt_FromLong, i32, u32, le32toh, 4);
CASE(UINT32, PyInt_FromLong, u32, u32, le32toh, 4);
CASE(INT64, PyInt_FromLong, i64, u64, le64toh, 8);
CASE(UINT64, PyInt_FromLong, u64, u64, le64toh, 8);
CASE(FLOAT32, PyFloat_FromDouble, f, u32, le32toh, 4);
CASE(FLOAT64, PyFloat_FromDouble, d, u64, le64toh, 8);
#undef CASE
default:
PyErr_SetString(PyExc_TypeError, "unknown type");
return NULL;
}
PyErr_SetString(PyExc_OSError, "failed to read from file");
return NULL;
}

/* .append_iter(maxrows, dataiter) */
static PyObject *Rocket_append_iter(Rocket *self, PyObject *args)
{
int maxrows;
@@ -269,17 +312,17 @@ static PyObject *Rocket_append_iter(Rocket *self, PyObject *args)
}

/* Extract and write timestamp */
write_pyobject(self->file, PyList_GetItem(rowlist, 0),
LAYOUT_TYPE_FLOAT64);
append_pyobject(self->file, PyList_GetItem(rowlist, 0),
LAYOUT_TYPE_FLOAT64);
if (PyErr_Occurred())
goto row_err;

/* Extract and write values */
int i;
for (i = 0; i < self->layout_count; i++) {
write_pyobject(self->file,
PyList_GetItem(rowlist, i+1),
self->layout_type);
append_pyobject(self->file,
PyList_GetItem(rowlist, i+1),
self->layout_type);
if (PyErr_Occurred())
goto row_err;
}
@@ -293,6 +336,133 @@ row_err:
return NULL;
}

/****
* Append from string
*/
static inline long int strtol10(const char *nptr, char **endptr) {
return strtol(nptr, endptr, 10);
}
static inline long int strtoul10(const char *nptr, char **endptr) {
return strtoul(nptr, endptr, 10);
}

/* .append_string(count, data, offset, linenum, start, end, last_timestamp) */
static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
{
int count;
char *data;
int offset;
int linenum;
double start;
double end;
double last_timestamp;

int written = 0;
char *endptr;
union8_t t8;
union16_t t16;
union32_t t32;
union64_t t64;
int i;

/* It would be nice to use 't#' instead of 's' for data,
but we need the null termination for strto*. If we had
strnto* that took a length, we could use t# and not require
a copy. */
if (!PyArg_ParseTuple(args, "isiiddd:append_string", &count,
&data, &offset, &linenum,
&start, &end, &last_timestamp))
return NULL;

char *buf = &data[offset];
while (written < count && *buf)
{
linenum++;

/* Extract timestamp */
t64.d = strtod(buf, &endptr);
if (endptr == buf)
return raise_str(linenum, ERR_OTHER, "bad timestamp");
if (t64.d <= last_timestamp)
return raise_num(linenum, ERR_NON_MONOTONIC, t64.d);
last_timestamp = t64.d;
if (t64.d < start || t64.d >= end)
return raise_num(linenum, ERR_OUT_OF_INTERVAL, t64.d);
t64.u = le64toh(t64.u);
if (fwrite(&t64.u, 8, 1, self->file) != 1)
goto err;
buf = endptr;

/* Parse all values in the line */
switch (self->layout_type) {
#define CS(type, parsefunc, parsetype, realtype, disktype, letoh, bytes) \
case LAYOUT_TYPE_##type: \
/* parse and write in a loop */ \
for (i = 0; i < self->layout_count; i++) { \
parsetype = parsefunc(buf, &endptr); \
if (endptr == buf) \
goto wrong_number_of_values; \
if (type##_MIN != type##_MAX && \
(parsetype < type##_MIN || \
parsetype > type##_MAX)) \
goto value_out_of_range; \
realtype = parsetype; \
disktype = letoh(disktype); \
if (fwrite(&disktype, bytes, \
1, self->file) != 1) \
goto err; \
buf = endptr; \
} \
while (*buf == ' ' || *buf == '\t') \
buf++; \
if (*buf == '\n') \
buf++; \
else if (*buf != '\0') \
goto extra_data_on_line; \
break

CS(INT8, strtol10, t64.i, t8.i, t8.u, , 1);
CS(UINT8, strtoul10, t64.u, t8.u, t8.u, , 1);
CS(INT16, strtol10, t64.i, t16.i, t16.u, le16toh, 2);
CS(UINT16, strtoul10, t64.u, t16.u, t16.u, le16toh, 2);
CS(INT32, strtol10, t64.i, t32.i, t32.u, le32toh, 4);
CS(UINT32, strtoul10, t64.u, t32.u, t32.u, le32toh, 4);
CS(INT64, strtol10, t64.i, t64.i, t64.u, le64toh, 8);
CS(UINT64, strtoul10, t64.u, t64.u, t64.u, le64toh, 8);
CS(FLOAT32, strtod, t64.d, t32.f, t32.u, le32toh, 4);
CS(FLOAT64, strtod, t64.d, t64.d, t64.u, le64toh, 8);
#undef CS
default:
PyErr_SetString(PyExc_TypeError, "unknown type");
return NULL;
}

/* Done this line */
written++;
}

fflush(self->file);

/* Build return value and return*/
offset = buf - data;
PyObject *o;
o = Py_BuildValue("(iidi)", written, offset, last_timestamp, linenum);
return o;
err:
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
wrong_number_of_values:
return raise_str(linenum, ERR_OTHER, "wrong number of values");
value_out_of_range:
return raise_str(linenum, ERR_OTHER, "value out of range");
extra_data_on_line:
return raise_str(linenum, ERR_OTHER, "extra data on line");
}

/****
* Extract to Python list
*/

static int _extract_handle_params(Rocket *self, PyObject *args, long *count)
{
long offset;
@@ -310,6 +480,41 @@ static int _extract_handle_params(Rocket *self, PyObject *args, long *count)
return 0;
}

/* Helper for extracting data from a file as a Python object */
static inline void *extract_pyobject(FILE *in, layout_type_t type)
{
union8_t t8;
union16_t t16;
union32_t t32;
union64_t t64;

switch (type) {
#define CASE(type, pyconvert, pytype, disktype, letoh, bytes) \
case LAYOUT_TYPE_##type: \
if (fread(&disktype, bytes, 1, in) <= 0) \
break; \
disktype = letoh(disktype); \
return pyconvert(pytype); \
break
CASE(INT8, PyInt_FromLong, t8.i, t8.u, , 1);
CASE(UINT8, PyInt_FromLong, t8.u, t8.u, , 1);
CASE(INT16, PyInt_FromLong, t16.i, t16.u, le16toh, 2);
CASE(UINT16, PyInt_FromLong, t16.u, t16.u, le16toh, 2);
CASE(INT32, PyInt_FromLong, t32.i, t32.u, le32toh, 4);
CASE(UINT32, PyInt_FromLong, t32.u, t32.u, le32toh, 4);
CASE(INT64, PyInt_FromLong, t64.i, t64.u, le64toh, 8);
CASE(UINT64, PyInt_FromLong, t64.u, t64.u, le64toh, 8);
CASE(FLOAT32, PyFloat_FromDouble, t32.f, t32.u, le32toh, 4);
CASE(FLOAT64, PyFloat_FromDouble, t64.d, t64.u, le64toh, 8);
#undef CASE
default:
PyErr_SetString(PyExc_TypeError, "unknown type");
return NULL;
}
PyErr_SetString(PyExc_OSError, "failed to read from file");
return NULL;
}

static PyObject *Rocket_extract_list(Rocket *self, PyObject *args)
{
long count;
@@ -332,8 +537,8 @@ static PyObject *Rocket_extract_list(Rocket *self, PyObject *args)
}

/* Timestamp */
PyObject *entry = read_pyobject(self->file,
LAYOUT_TYPE_FLOAT64);
PyObject *entry = extract_pyobject(self->file,
LAYOUT_TYPE_FLOAT64);
if (!entry || (PyList_SetItem(rowlist, 0, entry) < 0)) {
Py_DECREF(rowlist);
Py_DECREF(retlist);
@@ -343,8 +548,8 @@ static PyObject *Rocket_extract_list(Rocket *self, PyObject *args)
/* Data */
int i;
for (i = 0; i < self->layout_count; i++) {
PyObject *ent = read_pyobject(self->file,
self->layout_type);
PyObject *ent = extract_pyobject(self->file,
self->layout_type);
if (!ent || (PyList_SetItem(rowlist, i+1, ent) < 0)) {
Py_DECREF(rowlist);
Py_DECREF(retlist);
@@ -364,6 +569,10 @@ static PyObject *Rocket_extract_list(Rocket *self, PyObject *args)
return retlist;
}

/****
* Extract to string
*/

static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
{
long count;
@@ -383,7 +592,10 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
const int alloc_size = 1048576;

int row, i;
union_t u;
union8_t t8;
union16_t t16;
union32_t t32;
union64_t t64;
for (row = 0; row < count; row++) {
/* Make sure there's space for a line */
if ((len_alloc - len) < min_free) {
@@ -396,10 +608,11 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
}

/* Read and print timestamp */
if (fread(&u.u64[0], 8, 1, self->file) != 1)
if (fread(&t64.u, 8, 1, self->file) != 1)
goto err;
t64.u = le64toh(t64.u);
/* Timestamps are always printed to the microsecond */
ret = sprintf(&str[len], "%.6f", u.d[0]);
ret = sprintf(&str[len], "%.6f", t64.d);
if (ret <= 0)
goto err;
len += ret;
@@ -410,31 +623,32 @@ static PyObject *Rocket_extract_string(Rocket *self, PyObject *args)
case LAYOUT_TYPE_##type: \
/* read and format in a loop */ \
for (i = 0; i < self->layout_count; i++) { \
if (fread(&u.disktype[0], bytes, \
if (fread(&disktype, bytes, \
1, self->file) < 0) \
goto err; \
u.disktype[0] = letoh(u.disktype[0]); \
disktype = letoh(disktype); \
ret = sprintf(&str[len], " " fmt, \
u.fmttype[0]); \
fmttype); \
if (ret <= 0) \
goto err; \
len += ret; \
} \
break
CASE(INT8, "%hhd", i8, u8, , 1);
CASE(UINT8, "%hhu", u8, u8, , 1);
CASE(INT16, "%hd", i16, u16, le16toh, 2);
CASE(UINT16, "%hu", u16, u16, le16toh, 2);
CASE(INT32, "%d", i32, u32, le32toh, 4);
CASE(UINT32, "%u", u32, u32, le32toh, 4);
CASE(INT64, "%ld", i64, u64, le64toh, 8);
CASE(UINT64, "%lu", u64, u64, le64toh, 8);
CASE(INT8, "%hhd", t8.i, t8.u, , 1);
CASE(UINT8, "%hhu", t8.u, t8.u, , 1);
CASE(INT16, "%hd", t16.i, t16.u, le16toh, 2);
CASE(UINT16, "%hu", t16.u, t16.u, le16toh, 2);
CASE(INT32, "%d", t32.i, t32.u, le32toh, 4);
CASE(UINT32, "%u", t32.u, t32.u, le32toh, 4);
CASE(INT64, "%ld", t64.i, t64.u, le64toh, 8);
CASE(UINT64, "%lu", t64.u, t64.u, le64toh, 8);
/* These next two are a bit debatable. floats
are 6-9 significant figures, doubles are
15-19. This matches the old prep format,
for float32. */
CASE(FLOAT32, "%.6e", f, u32, le32toh, 4);
CASE(FLOAT64, "%.16e", d, u64, le64toh, 8);
are 6-9 significant figures, so we print 7.
Doubles are 15-19, so we print 17. This is
similar to the old prep format for float32.
*/
CASE(FLOAT32, "%.6e", t32.f, t32.u, le32toh, 4);
CASE(FLOAT64, "%.16e", t64.d, t64.u, le64toh, 8);
#undef CASE
default:
PyErr_SetString(PyExc_TypeError, "unknown type");
@@ -453,6 +667,10 @@ err:
return NULL;
}

/****
* Module and type setup
*/

static PyGetSetDef Rocket_getsetters[] = {
{ "file_size", (getter)Rocket_get_file_size, NULL,
"file size in bytes", NULL },
@@ -467,15 +685,43 @@ static PyMemberDef Rocket_members[] = {

static PyMethodDef Rocket_methods[] = {
{ "close", (PyCFunction)Rocket_close, METH_NOARGS,
"close(self)\n\n"
"Close file handle" },

{ "append_iter", (PyCFunction)Rocket_append_iter, METH_VARARGS,
"append_iter(self, maxrows, iterable)\n\n"
"Append up to maxrows of data from iter to the file" },

{ "append_string", (PyCFunction)Rocket_append_string, METH_VARARGS,
"append_string(self, count, data, offset, line, start, end, ts)\n\n"
"Parse string and append data.\n"
"\n"
" count: maximum number of rows to add\n"
" data: string data\n"
" offset: byte offset into data to start parsing\n"
" line: current line number of data\n"
" start: starting timestamp for interval\n"
" end: end timestamp for interval\n"
" ts: last timestamp that was previously parsed\n"
"\n"
"Raises ParseError if timestamps are non-monotonic, outside\n"
"the start/end interval etc.\n"
"\n"
"On success, return a tuple with three values:\n"
" added_rows: how many rows were added from the file\n"
" data_offset: current offset into the data string\n"
" last_timestamp: last timestamp we parsed" },

{ "extract_list", (PyCFunction)Rocket_extract_list, METH_VARARGS,
"Extract count rows of data from the file at offset offset. "
"extract_list(self, offset, count)\n\n"
"Extract count rows of data from the file at offset offset.\n"
"Return a list of lists [[row],[row],...]" },

{ "extract_string", (PyCFunction)Rocket_extract_string, METH_VARARGS,
"Extract count rows of data from the file at offset offset. "
"extract_string(self, offset, count)\n\n"
"Extract count rows of data from the file at offset offset.\n"
"Return an ascii formatted string according to the layout" },

{ NULL },
};

@@ -493,10 +739,11 @@ static PyTypeObject RocketType = {
.tp_members = Rocket_members,
.tp_getset = Rocket_getsetters,

.tp_doc = ("C implementation of the \"rocket\" data parsing "
"interface, which translates between the binary "
"format on disk and the ASCII or Python list "
"format used when communicating with the rest of "
.tp_doc = ("rocket.Rocket(layout, file)\n\n"
"C implementation of the \"rocket\" data parsing\n"
"interface, which translates between the binary\n"
"format on disk and the ASCII or Python list\n"
"format used when communicating with the rest of\n"
"the system.")
};

@@ -517,5 +764,11 @@ initrocket(void)
"Rocket data parsing and formatting module");
Py_INCREF(&RocketType);
PyModule_AddObject(module, "Rocket", (PyObject *)&RocketType);

ParseError = PyErr_NewException("rocket.ParseError", NULL, NULL);
Py_INCREF(ParseError);
PyModule_AddObject(module, "ParseError", ParseError);
add_parseerror_codes(module);

return;
}

Loading…
Cancel
Save