718 lines
27 KiB
Python
718 lines
27 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""NilmDB
|
|
|
|
Object that represents a NILM database file.
|
|
|
|
Manages both the SQL database and the table storage backend.
|
|
"""
|
|
|
|
import os
|
|
import errno
|
|
import sqlite3
|
|
|
|
import nilmdb.utils
|
|
from nilmdb.utils.printf import printf
|
|
from nilmdb.utils.time import timestamp_to_bytes
|
|
|
|
from nilmdb.utils.interval import IntervalError
|
|
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
|
|
|
|
from nilmdb.server import bulkdata
|
|
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
|
|
|
|
# Note about performance and transactions:
|
|
#
|
|
# Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
|
|
# takes about 125msec. sqlite3 will commit transactions at 3 times:
|
|
# 1: explicit con.commit()
|
|
# 2: between a series of DML commands and non-DML commands, e.g.
|
|
# after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
|
|
# 3: at the end of an explicit transaction, e.g. "with self.con as con:"
|
|
#
|
|
# To speed things up, we can set 'PRAGMA synchronous=OFF'. Or, it
|
|
# seems that 'PRAGMA synchronous=NORMAL' and 'PRAGMA journal_mode=WAL'
|
|
# give an equivalent speedup more safely. That is what is used here.
|
|
_sql_schema_updates = {
|
|
0: {"next": 1, "sql": """
|
|
-- All streams
|
|
CREATE TABLE streams(
|
|
id INTEGER PRIMARY KEY, -- stream ID
|
|
path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
|
|
layout TEXT NOT NULL -- layout name, e.g. float32_8
|
|
);
|
|
|
|
-- Individual timestamped ranges in those streams.
|
|
-- For a given start_time and end_time, this tells us that the
|
|
-- data is stored between start_pos and end_pos.
|
|
-- Times are stored as μs since Unix epoch
|
|
-- Positions are opaque: PyTables rows, file offsets, etc.
|
|
--
|
|
-- Note: end_pos points to the row _after_ end_time, so end_pos-1
|
|
-- is the last valid row.
|
|
CREATE TABLE ranges(
|
|
stream_id INTEGER NOT NULL,
|
|
start_time INTEGER NOT NULL,
|
|
end_time INTEGER NOT NULL,
|
|
start_pos INTEGER NOT NULL,
|
|
end_pos INTEGER NOT NULL
|
|
);
|
|
CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
|
|
"""},
|
|
|
|
1: {"next": 3, "sql": """
|
|
-- Generic dictionary-type metadata that can be associated with a stream
|
|
CREATE TABLE metadata(
|
|
stream_id INTEGER NOT NULL,
|
|
key TEXT NOT NULL,
|
|
value TEXT
|
|
);
|
|
"""},
|
|
|
|
2: {"error": "old format with floating-point timestamps requires "
|
|
"nilmdb 1.3.1 or older"},
|
|
|
|
3: {"next": None},
|
|
}
|
|
|
|
|
|
@nilmdb.utils.must_close()
|
|
class NilmDB():
|
|
verbose = 0
|
|
|
|
def __init__(self, basepath,
|
|
max_results=None,
|
|
max_removals=None,
|
|
max_int_removals=None,
|
|
bulkdata_args=None):
|
|
"""Initialize NilmDB at the given basepath.
|
|
Other arguments are for debugging / testing:
|
|
|
|
'max_results' is the max rows to send in a single
|
|
stream_intervals or stream_extract response.
|
|
|
|
'max_removals' is the max rows to delete at once
|
|
in stream_remove.
|
|
|
|
'max_int_removals' is the max intervals to delete
|
|
at once in stream_remove.
|
|
|
|
'bulkdata_args' is kwargs for the bulkdata module.
|
|
"""
|
|
if bulkdata_args is None:
|
|
bulkdata_args = {}
|
|
|
|
# set up path
|
|
self.basepath = os.path.abspath(basepath)
|
|
|
|
# Create the database path if it doesn't exist
|
|
try:
|
|
os.makedirs(self.basepath)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise IOError("can't create tree " + self.basepath)
|
|
|
|
# Our data goes inside it
|
|
self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
|
|
|
|
# SQLite database too
|
|
sqlfilename = os.path.join(self.basepath, "data.sql")
|
|
self.con = sqlite3.connect(sqlfilename, check_same_thread=True)
|
|
try:
|
|
self._sql_schema_update()
|
|
except Exception:
|
|
self.data.close()
|
|
raise
|
|
|
|
# See big comment at top about the performance implications of this
|
|
self.con.execute("PRAGMA synchronous=NORMAL")
|
|
self.con.execute("PRAGMA journal_mode=WAL")
|
|
|
|
# Approximate largest number of elements that we want to send
|
|
# in a single reply (for stream_intervals, stream_extract).
|
|
self.max_results = max_results or 16384
|
|
|
|
# Remove up to this many rows per call to stream_remove.
|
|
self.max_removals = max_removals or 1048576
|
|
|
|
# Remove up to this many intervals per call to stream_remove.
|
|
self.max_int_removals = max_int_removals or 4096
|
|
|
|
def get_basepath(self):
|
|
return self.basepath
|
|
|
|
def close(self):
|
|
if self.con:
|
|
self.con.commit()
|
|
self.con.close()
|
|
self.con = None
|
|
self.data.close()
|
|
|
|
def _sql_schema_update(self):
|
|
cur = self.con.cursor()
|
|
version = cur.execute("PRAGMA user_version").fetchone()[0]
|
|
oldversion = version
|
|
|
|
while True:
|
|
if version not in _sql_schema_updates:
|
|
raise Exception(self.basepath + ": unknown database version "
|
|
+ str(version))
|
|
update = _sql_schema_updates[version]
|
|
if "error" in update:
|
|
raise Exception(self.basepath + ": can't use database version "
|
|
+ str(version) + ": " + update["error"])
|
|
if update["next"] is None:
|
|
break
|
|
cur.executescript(update["sql"])
|
|
version = update["next"]
|
|
if self.verbose:
|
|
printf("Database schema updated to %d\n", version)
|
|
|
|
if version != oldversion:
|
|
with self.con:
|
|
cur.execute("PRAGMA user_version = {v:d}".format(v=version))
|
|
|
|
def _check_user_times(self, start, end):
|
|
if start is None:
|
|
start = nilmdb.utils.time.min_timestamp
|
|
if end is None:
|
|
end = nilmdb.utils.time.max_timestamp
|
|
if start >= end:
|
|
raise NilmDBError("start must precede end")
|
|
return (start, end)
|
|
|
|
@nilmdb.utils.lru_cache(size=64)
|
|
def _get_intervals(self, stream_id):
|
|
"""
|
|
Return a mutable IntervalSet corresponding to the given stream ID.
|
|
"""
|
|
iset = IntervalSet()
|
|
result = self.con.execute("SELECT start_time, end_time, "
|
|
"start_pos, end_pos "
|
|
"FROM ranges "
|
|
"WHERE stream_id=?", (stream_id,))
|
|
try:
|
|
for (start_time, end_time, start_pos, end_pos) in result:
|
|
iset += DBInterval(start_time, end_time,
|
|
start_time, end_time,
|
|
start_pos, end_pos)
|
|
except IntervalError:
|
|
raise NilmDBError("unexpected overlap in ranges table!")
|
|
|
|
return iset
|
|
|
|
def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
|
|
"""Helper that adds interval to the SQL database only"""
|
|
self.con.execute("INSERT INTO ranges "
|
|
"(stream_id,start_time,end_time,start_pos,end_pos) "
|
|
"VALUES (?,?,?,?,?)",
|
|
(id, start, end, start_pos, end_pos))
|
|
|
|
def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
|
|
"""Helper that removes interval from the SQL database only"""
|
|
self.con.execute("DELETE FROM ranges WHERE "
|
|
"stream_id=? AND start_time=? AND "
|
|
"end_time=? AND start_pos=? AND end_pos=?",
|
|
(id, start, end, start_pos, end_pos))
|
|
|
|
def _add_interval(self, stream_id, interval, start_pos, end_pos):
|
|
"""
|
|
Add interval to the internal interval cache, and to the database.
|
|
Note: arguments must be ints (not numpy.int64, etc)
|
|
"""
|
|
# Load this stream's intervals
|
|
iset = self._get_intervals(stream_id)
|
|
|
|
# Check for adjacency. If there's a stream in the database
|
|
# that ends exactly when this one starts, and the database
|
|
# rows match up, we can make one interval that covers the
|
|
# time range [adjacent.start -> interval.end)
|
|
# and database rows [ adjacent.start_pos -> end_pos ].
|
|
# Only do this if the resulting interval isn't too large.
|
|
max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
|
|
adjacent = iset.find_end(interval.start)
|
|
if (adjacent is not None and
|
|
start_pos == adjacent.db_endpos and
|
|
(end_pos - adjacent.db_startpos) < max_merged_rows):
|
|
# First delete the old one, both from our iset and the
|
|
# database
|
|
iset -= adjacent
|
|
self._sql_interval_delete(stream_id,
|
|
adjacent.db_start, adjacent.db_end,
|
|
adjacent.db_startpos, adjacent.db_endpos)
|
|
|
|
# Now update our interval so the fallthrough add is
|
|
# correct.
|
|
interval.start = adjacent.start
|
|
start_pos = adjacent.db_startpos
|
|
|
|
# Add the new interval to the iset
|
|
iset.iadd_nocheck(DBInterval(interval.start, interval.end,
|
|
interval.start, interval.end,
|
|
start_pos, end_pos))
|
|
|
|
# Insert into the database
|
|
self._sql_interval_insert(stream_id, interval.start, interval.end,
|
|
int(start_pos), int(end_pos))
|
|
|
|
self.con.commit()
|
|
|
|
def _remove_interval(self, stream_id, original, remove):
|
|
"""
|
|
Remove an interval from the internal cache and the database.
|
|
|
|
stream_id: id of stream
|
|
original: original DBInterval; must be already present in DB
|
|
to_remove: DBInterval to remove; must be subset of 'original'
|
|
"""
|
|
# Load this stream's intervals
|
|
iset = self._get_intervals(stream_id)
|
|
|
|
# Remove existing interval from the cached set and the database
|
|
iset -= original
|
|
self._sql_interval_delete(stream_id,
|
|
original.db_start, original.db_end,
|
|
original.db_startpos, original.db_endpos)
|
|
|
|
# Add back the intervals that would be left over if the
|
|
# requested interval is removed. There may be two of them, if
|
|
# the removed piece was in the middle.
|
|
def add(iset, start, end, start_pos, end_pos):
|
|
iset += DBInterval(start, end, start, end, start_pos, end_pos)
|
|
self._sql_interval_insert(stream_id, start, end,
|
|
start_pos, end_pos)
|
|
|
|
if original.start != remove.start:
|
|
# Interval before the removed region
|
|
add(iset, original.start, remove.start,
|
|
original.db_startpos, remove.db_startpos)
|
|
|
|
if original.end != remove.end:
|
|
# Interval after the removed region
|
|
add(iset, remove.end, original.end,
|
|
remove.db_endpos, original.db_endpos)
|
|
|
|
# Commit SQL changes
|
|
self.con.commit()
|
|
|
|
return
|
|
|
|
def stream_list(self, path=None, layout=None, extended=False):
|
|
"""Return list of lists of all streams in the database.
|
|
|
|
If path is specified, include only streams with a path that
|
|
matches the given string.
|
|
|
|
If layout is specified, include only streams with a layout
|
|
that matches the given string.
|
|
|
|
If extended=False, returns a list of lists containing
|
|
the path and layout: [ path, layout ]
|
|
|
|
If extended=True, returns a list of lists containing
|
|
more information:
|
|
path
|
|
layout
|
|
interval_min (earliest interval start)
|
|
interval_max (latest interval end)
|
|
rows (total number of rows of data)
|
|
time (total time covered by this stream, in timestamp units)
|
|
"""
|
|
params = ()
|
|
query = "SELECT streams.path, streams.layout"
|
|
if extended:
|
|
query += ", min(ranges.start_time), max(ranges.end_time) "
|
|
query += ", coalesce(sum(ranges.end_pos - ranges.start_pos), 0) "
|
|
query += ", coalesce(sum(ranges.end_time - ranges.start_time), 0) "
|
|
query += " FROM streams"
|
|
if extended:
|
|
query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
|
|
query += " WHERE 1=1"
|
|
if layout is not None:
|
|
query += " AND streams.layout=?"
|
|
params += (layout,)
|
|
if path is not None:
|
|
query += " AND streams.path=?"
|
|
params += (path,)
|
|
query += " GROUP BY streams.id ORDER BY streams.path"
|
|
result = self.con.execute(query, params).fetchall()
|
|
return [list(x) for x in result]
|
|
|
|
def stream_intervals(self, path, start=None, end=None, diffpath=None):
|
|
"""
|
|
List all intervals in 'path' between 'start' and 'end'. If
|
|
'diffpath' is not none, list instead the set-difference
|
|
between the intervals in the two streams; i.e. all interval
|
|
ranges that are present in 'path' but not 'diffpath'.
|
|
|
|
Returns (intervals, restart) tuple.
|
|
|
|
'intervals' is a list of [start,end] timestamps of all intervals
|
|
that exist for path, between start and end.
|
|
|
|
'restart', if not None, means that there were too many results
|
|
to return in a single request. The data is complete from the
|
|
starting timestamp to the point at which it was truncated, and
|
|
a new request with a start time of 'restart' will fetch the
|
|
next block of data.
|
|
"""
|
|
stream_id = self._stream_id(path)
|
|
intervals = self._get_intervals(stream_id)
|
|
if diffpath:
|
|
diffstream_id = self._stream_id(diffpath)
|
|
diffintervals = self._get_intervals(diffstream_id)
|
|
(start, end) = self._check_user_times(start, end)
|
|
requested = Interval(start, end)
|
|
result = []
|
|
if diffpath:
|
|
getter = nilmdb.utils.interval.set_difference(
|
|
intervals.intersection(requested),
|
|
diffintervals.intersection(requested))
|
|
else:
|
|
getter = intervals.intersection(requested)
|
|
for n, i in enumerate(getter):
|
|
if n >= self.max_results:
|
|
restart = i.start
|
|
break
|
|
result.append([i.start, i.end])
|
|
else:
|
|
restart = None
|
|
return (result, restart)
|
|
|
|
def stream_create(self, path, layout_name):
|
|
"""Create a new table in the database.
|
|
|
|
path: path to the data (e.g. '/newton/prep').
|
|
Paths must contain at least two elements, e.g.:
|
|
/newton/prep
|
|
/newton/raw
|
|
/newton/upstairs/prep
|
|
/newton/upstairs/raw
|
|
|
|
layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
|
|
"""
|
|
# Create the bulk storage. Raises ValueError on error, which we
|
|
# pass along.
|
|
self.data.create(path, layout_name)
|
|
|
|
# Insert into SQL database once the bulk storage is happy
|
|
with self.con as con:
|
|
con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
|
|
(path, layout_name))
|
|
|
|
def _stream_id(self, path):
|
|
"""Return unique stream ID"""
|
|
result = self.con.execute("SELECT id FROM streams WHERE path=?",
|
|
(path,)).fetchone()
|
|
if result is None:
|
|
raise StreamError("No stream at path " + path)
|
|
return result[0]
|
|
|
|
def stream_set_metadata(self, path, data):
|
|
"""Set stream metadata from a dictionary, e.g.
|
|
{ description: 'Downstairs lighting',
|
|
v_scaling: 123.45 }
|
|
This replaces all existing metadata.
|
|
"""
|
|
stream_id = self._stream_id(path)
|
|
with self.con as con:
|
|
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
|
|
for key in data:
|
|
if data[key] != '':
|
|
con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
|
|
(stream_id, key, data[key]))
|
|
|
|
def stream_get_metadata(self, path):
|
|
"""Return stream metadata as a dictionary."""
|
|
stream_id = self._stream_id(path)
|
|
result = self.con.execute("SELECT metadata.key, metadata.value "
|
|
"FROM metadata "
|
|
"WHERE metadata.stream_id=?", (stream_id,))
|
|
data = {}
|
|
for (key, value) in result:
|
|
data[key] = value
|
|
return data
|
|
|
|
def stream_update_metadata(self, path, newdata):
|
|
"""Update stream metadata from a dictionary"""
|
|
data = self.stream_get_metadata(path)
|
|
data.update(newdata)
|
|
self.stream_set_metadata(path, data)
|
|
|
|
def stream_rename(self, oldpath, newpath):
|
|
"""Rename a stream."""
|
|
stream_id = self._stream_id(oldpath)
|
|
|
|
# Rename the data
|
|
self.data.rename(oldpath, newpath)
|
|
|
|
# Rename the stream in the database
|
|
with self.con as con:
|
|
con.execute("UPDATE streams SET path=? WHERE id=?",
|
|
(newpath, stream_id))
|
|
|
|
def stream_destroy(self, path):
|
|
"""Fully remove a table from the database. Fails if there are
|
|
any intervals data present; remove them first. Metadata is
|
|
also removed."""
|
|
stream_id = self._stream_id(path)
|
|
|
|
# Verify that no intervals are present, and clear the cache
|
|
iset = self._get_intervals(stream_id)
|
|
if iset:
|
|
raise NilmDBError("all intervals must be removed before "
|
|
"destroying a stream")
|
|
self._get_intervals.cache_remove(self, stream_id)
|
|
|
|
# Delete the bulkdata storage
|
|
self.data.destroy(path)
|
|
|
|
# Delete metadata, stream, intervals (should be none)
|
|
with self.con as con:
|
|
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
|
|
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
|
|
con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
|
|
|
|
def stream_insert(self, path, start, end, data, binary=False):
|
|
"""Insert new data into the database.
|
|
path: Path at which to add the data
|
|
start: Starting timestamp
|
|
end: Ending timestamp
|
|
data: Textual data, formatted according to the layout of path
|
|
|
|
'binary', if True, means that 'data' is raw binary:
|
|
little-endian, matching the current table's layout,
|
|
including the int64 timestamp.
|
|
"""
|
|
# First check for basic overlap using timestamp info given.
|
|
stream_id = self._stream_id(path)
|
|
iset = self._get_intervals(stream_id)
|
|
interval = Interval(start, end)
|
|
if iset.intersects(interval):
|
|
raise OverlapError("new data overlaps existing data at range: "
|
|
+ str(iset & interval))
|
|
|
|
# Tenatively append the data. This will raise a ValueError if
|
|
# there are any parse errors.
|
|
table = self.data.getnode(path)
|
|
row_start = table.nrows
|
|
table.append_data(data, start, end, binary)
|
|
row_end = table.nrows
|
|
|
|
# Insert the record into the sql database.
|
|
self._add_interval(stream_id, interval, row_start, row_end)
|
|
|
|
# And that's all
|
|
return
|
|
|
|
def _bisect_left(self, a, x, lo, hi):
|
|
# Like bisect.bisect_left, but doesn't choke on large indices on
|
|
# 32-bit systems, like bisect's fast C implementation does.
|
|
while lo < hi:
|
|
mid = (lo + hi) // 2
|
|
if a[mid] < x:
|
|
lo = mid + 1
|
|
else:
|
|
hi = mid
|
|
return lo
|
|
|
|
def _find_start(self, table, dbinterval):
|
|
"""
|
|
Given a DBInterval, find the row in the database that
|
|
corresponds to the start time. Return the first database
|
|
position with a timestamp (first element) greater than or
|
|
equal to 'start'.
|
|
"""
|
|
# Optimization for the common case where an interval wasn't truncated
|
|
if dbinterval.start == dbinterval.db_start:
|
|
return dbinterval.db_startpos
|
|
return self._bisect_left(table,
|
|
dbinterval.start,
|
|
dbinterval.db_startpos,
|
|
dbinterval.db_endpos)
|
|
|
|
def _find_end(self, table, dbinterval):
|
|
"""
|
|
Given a DBInterval, find the row in the database that follows
|
|
the end time. Return the first database position after the
|
|
row with timestamp (first element) greater than or equal
|
|
to 'end'.
|
|
"""
|
|
# Optimization for the common case where an interval wasn't truncated
|
|
if dbinterval.end == dbinterval.db_end:
|
|
return dbinterval.db_endpos
|
|
# Note that we still use bisect_left here, because we don't
|
|
# want to include the given timestamp in the results. This is
|
|
# so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
|
|
# non-overlapping data.
|
|
return self._bisect_left(table,
|
|
dbinterval.end,
|
|
dbinterval.db_startpos,
|
|
dbinterval.db_endpos)
|
|
|
|
def stream_extract(self, path, start=None, end=None,
|
|
count=False, markup=False, binary=False):
|
|
"""
|
|
Returns (data, restart) tuple.
|
|
|
|
'data' is ASCII-formatted data from the database, formatted
|
|
according to the layout of the stream.
|
|
|
|
'restart', if not None, means that there were too many results to
|
|
return in a single request. The data is complete from the
|
|
starting timestamp to the point at which it was truncated,
|
|
and a new request with a start time of 'restart' will fetch
|
|
the next block of data.
|
|
|
|
'count', if true, means to not return raw data, but just the count
|
|
of rows that would have been returned. This is much faster
|
|
than actually fetching the data. It is not limited by
|
|
max_results.
|
|
|
|
'markup', if true, indicates that returned data should be
|
|
marked with a comment denoting when a particular interval
|
|
starts, and another comment when an interval ends.
|
|
|
|
'binary', if true, means to return raw binary rather than
|
|
ASCII-formatted data.
|
|
"""
|
|
stream_id = self._stream_id(path)
|
|
table = self.data.getnode(path)
|
|
intervals = self._get_intervals(stream_id)
|
|
(start, end) = self._check_user_times(start, end)
|
|
requested = Interval(start, end)
|
|
result = []
|
|
matched = 0
|
|
remaining = self.max_results
|
|
restart = None
|
|
if binary and (markup or count):
|
|
raise NilmDBError("binary mode can't be used with markup or count")
|
|
for interval in intervals.intersection(requested):
|
|
# Reading single rows from the table is too slow, so
|
|
# we use two bisections to find both the starting and
|
|
# ending row for this particular interval, then
|
|
# read the entire range as one slice.
|
|
row_start = self._find_start(table, interval)
|
|
row_end = self._find_end(table, interval)
|
|
|
|
if count:
|
|
matched += row_end - row_start
|
|
continue
|
|
|
|
# Shorten it if we'll hit the maximum number of results
|
|
row_max = row_start + remaining
|
|
if row_max < row_end:
|
|
row_end = row_max
|
|
restart = table[row_max]
|
|
|
|
# Add markup
|
|
if markup:
|
|
result.append(b"# interval-start " +
|
|
timestamp_to_bytes(interval.start) + b"\n")
|
|
|
|
# Gather these results up
|
|
result.append(table.get_data(row_start, row_end, binary))
|
|
|
|
# Count them
|
|
remaining -= row_end - row_start
|
|
|
|
# Add markup, and exit if restart is set.
|
|
if restart is not None:
|
|
if markup:
|
|
result.append(b"# interval-end " +
|
|
timestamp_to_bytes(restart) + b"\n")
|
|
break
|
|
if markup:
|
|
result.append(b"# interval-end " +
|
|
timestamp_to_bytes(interval.end) + b"\n")
|
|
|
|
if count:
|
|
return matched
|
|
full_result = b"".join(result)
|
|
return (full_result, restart)
|
|
|
|
def stream_remove(self, path, start=None, end=None):
|
|
"""
|
|
Remove data from the specified time interval within a stream.
|
|
|
|
Removes data in the interval [start, end), and intervals are
|
|
truncated or split appropriately.
|
|
|
|
Returns a (removed, restart) tuple.
|
|
|
|
'removed' is the number of data points that were removed.
|
|
|
|
'restart', if not None, means there were too many rows to
|
|
remove in a single request. This function should be called
|
|
again with a start time of 'restart' to complete the removal.
|
|
"""
|
|
stream_id = self._stream_id(path)
|
|
table = self.data.getnode(path)
|
|
intervals = self._get_intervals(stream_id)
|
|
(start, end) = self._check_user_times(start, end)
|
|
to_remove = Interval(start, end)
|
|
removed = 0
|
|
remaining = self.max_removals
|
|
int_remaining = self.max_int_removals
|
|
restart = None
|
|
|
|
# Can't remove intervals from within the iterator, so we need to
|
|
# remember what's currently in the intersection now.
|
|
all_candidates = list(intervals.intersection(to_remove, orig=True))
|
|
|
|
remove_start = None
|
|
remove_end = None
|
|
|
|
for (dbint, orig) in all_candidates:
|
|
# Stop if we've hit the max number of interval removals
|
|
if int_remaining <= 0:
|
|
restart = dbint.start
|
|
break
|
|
|
|
# Find row start and end
|
|
row_start = self._find_start(table, dbint)
|
|
row_end = self._find_end(table, dbint)
|
|
|
|
# Shorten it if we'll hit the maximum number of removals
|
|
row_max = row_start + remaining
|
|
if row_max < row_end:
|
|
row_end = row_max
|
|
dbint.end = table[row_max]
|
|
restart = dbint.end
|
|
|
|
# Adjust the DBInterval to match the newly found ends
|
|
dbint.db_start = dbint.start
|
|
dbint.db_end = dbint.end
|
|
dbint.db_startpos = row_start
|
|
dbint.db_endpos = row_end
|
|
|
|
# Remove interval from the database
|
|
self._remove_interval(stream_id, orig, dbint)
|
|
|
|
# Remove data from the underlying table storage,
|
|
# coalescing adjacent removals to reduce the number of calls
|
|
# to table.remove.
|
|
if remove_end == row_start:
|
|
# Extend our coalesced region
|
|
remove_end = row_end
|
|
else:
|
|
# Perform previous removal, then save this one
|
|
if remove_end is not None:
|
|
table.remove(remove_start, remove_end)
|
|
remove_start = row_start
|
|
remove_end = row_end
|
|
|
|
# Count how many were removed
|
|
removed += row_end - row_start
|
|
remaining -= row_end - row_start
|
|
int_remaining -= 1
|
|
|
|
if restart is not None:
|
|
break
|
|
|
|
# Perform any final coalesced removal
|
|
if remove_end is not None:
|
|
table.remove(remove_start, remove_end)
|
|
|
|
return (removed, restart)
|