You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

727 lines
28 KiB

  1. # -*- coding: utf-8 -*-
  2. """NilmDB
  3. Object that represents a NILM database file.
  4. Manages both the SQL database and the table storage backend.
  5. """
  6. # Need absolute_import so that "import nilmdb" won't pull in
  7. # nilmdb.py, but will pull the parent nilmdb module instead.
  8. from __future__ import absolute_import
  9. import nilmdb.utils
  10. from nilmdb.utils.printf import *
  11. from nilmdb.utils.time import timestamp_to_string
  12. from nilmdb.utils.interval import IntervalError
  13. from nilmdb.server.interval import Interval, DBInterval, IntervalSet
  14. from nilmdb.server import bulkdata
  15. from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
  16. import sqlite3
  17. import os
  18. import errno
  19. # Note about performance and transactions:
  20. #
  21. # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
  22. # takes about 125msec. sqlite3 will commit transactions at 3 times:
  23. # 1: explicit con.commit()
  24. # 2: between a series of DML commands and non-DML commands, e.g.
  25. # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
  26. # 3: at the end of an explicit transaction, e.g. "with self.con as con:"
  27. #
  28. # To speed things up, we can set 'PRAGMA synchronous=OFF'. Or, it
  29. # seems that 'PRAGMA synchronous=NORMAL' and 'PRAGMA journal_mode=WAL'
  30. # give an equivalent speedup more safely. That is what is used here.
  31. _sql_schema_updates = {
  32. 0: { "next": 1, "sql": """
  33. -- All streams
  34. CREATE TABLE streams(
  35. id INTEGER PRIMARY KEY, -- stream ID
  36. path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
  37. layout TEXT NOT NULL -- layout name, e.g. float32_8
  38. );
  39. -- Individual timestamped ranges in those streams.
  40. -- For a given start_time and end_time, this tells us that the
  41. -- data is stored between start_pos and end_pos.
  42. -- Times are stored as μs since Unix epoch
  43. -- Positions are opaque: PyTables rows, file offsets, etc.
  44. --
  45. -- Note: end_pos points to the row _after_ end_time, so end_pos-1
  46. -- is the last valid row.
  47. CREATE TABLE ranges(
  48. stream_id INTEGER NOT NULL,
  49. start_time INTEGER NOT NULL,
  50. end_time INTEGER NOT NULL,
  51. start_pos INTEGER NOT NULL,
  52. end_pos INTEGER NOT NULL
  53. );
  54. CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
  55. """ },
  56. 1: { "next": 3, "sql": """
  57. -- Generic dictionary-type metadata that can be associated with a stream
  58. CREATE TABLE metadata(
  59. stream_id INTEGER NOT NULL,
  60. key TEXT NOT NULL,
  61. value TEXT
  62. );
  63. """ },
  64. 2: { "error": "old format with floating-point timestamps requires "
  65. "nilmdb 1.3.1 or older" },
  66. 3: { "next": None },
  67. }
  68. @nilmdb.utils.must_close()
  69. class NilmDB(object):
  70. verbose = 0
  71. def __init__(self, basepath,
  72. max_results=None,
  73. max_removals=None,
  74. max_int_removals=None,
  75. bulkdata_args=None):
  76. """Initialize NilmDB at the given basepath.
  77. Other arguments are for debugging / testing:
  78. 'max_results' is the max rows to send in a single
  79. stream_intervals or stream_extract response.
  80. 'max_removals' is the max rows to delete at once
  81. in stream_remove.
  82. 'max_int_removals' is the max intervals to delete
  83. at once in stream_remove.
  84. 'bulkdata_args' is kwargs for the bulkdata module.
  85. """
  86. if bulkdata_args is None:
  87. bulkdata_args = {}
  88. # set up path
  89. self.basepath = os.path.abspath(basepath)
  90. # Create the database path if it doesn't exist
  91. try:
  92. os.makedirs(self.basepath)
  93. except OSError as e:
  94. if e.errno != errno.EEXIST: # pragma: no cover
  95. # (no coverage, because it's hard to trigger this case
  96. # if tests are run as root)
  97. raise IOError("can't create tree " + self.basepath)
  98. # Our data goes inside it
  99. self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
  100. # SQLite database too
  101. sqlfilename = os.path.join(self.basepath, "data.sql")
  102. self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
  103. try:
  104. self._sql_schema_update()
  105. except Exception: # pragma: no cover
  106. self.data.close()
  107. raise
  108. # See big comment at top about the performance implications of this
  109. self.con.execute("PRAGMA synchronous=NORMAL")
  110. self.con.execute("PRAGMA journal_mode=WAL")
  111. # Approximate largest number of elements that we want to send
  112. # in a single reply (for stream_intervals, stream_extract).
  113. self.max_results = max_results or 16384
  114. # Remove up to this many rows per call to stream_remove.
  115. self.max_removals = max_removals or 1048576
  116. # Remove up to this many intervals per call to stream_remove.
  117. self.max_int_removals = max_int_removals or 4096
  118. def get_basepath(self):
  119. return self.basepath
  120. def close(self):
  121. if self.con:
  122. self.con.commit()
  123. self.con.close()
  124. self.data.close()
  125. def _sql_schema_update(self):
  126. cur = self.con.cursor()
  127. version = cur.execute("PRAGMA user_version").fetchone()[0]
  128. oldversion = version
  129. while True:
  130. if version not in _sql_schema_updates: # pragma: no cover
  131. raise Exception(self.basepath + ": unknown database version "
  132. + str(version))
  133. update = _sql_schema_updates[version]
  134. if "error" in update: # pragma: no cover
  135. raise Exception(self.basepath + ": can't use database version "
  136. + str(version) + ": " + update["error"])
  137. if update["next"] is None:
  138. break
  139. cur.executescript(update["sql"])
  140. version = update["next"]
  141. if self.verbose: # pragma: no cover
  142. printf("Database schema updated to %d\n", version)
  143. if version != oldversion:
  144. with self.con:
  145. cur.execute("PRAGMA user_version = {v:d}".format(v=version))
  146. def _check_user_times(self, start, end):
  147. if start is None:
  148. start = nilmdb.utils.time.min_timestamp
  149. if end is None:
  150. end = nilmdb.utils.time.max_timestamp
  151. if start >= end:
  152. raise NilmDBError("start must precede end")
  153. return (start, end)
  154. @nilmdb.utils.lru_cache(size = 64)
  155. def _get_intervals(self, stream_id):
  156. """
  157. Return a mutable IntervalSet corresponding to the given stream ID.
  158. """
  159. iset = IntervalSet()
  160. result = self.con.execute("SELECT start_time, end_time, "
  161. "start_pos, end_pos "
  162. "FROM ranges "
  163. "WHERE stream_id=?", (stream_id,))
  164. try:
  165. for (start_time, end_time, start_pos, end_pos) in result:
  166. iset += DBInterval(start_time, end_time,
  167. start_time, end_time,
  168. start_pos, end_pos)
  169. except IntervalError: # pragma: no cover
  170. raise NilmDBError("unexpected overlap in ranges table!")
  171. return iset
  172. def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
  173. """Helper that adds interval to the SQL database only"""
  174. self.con.execute("INSERT INTO ranges "
  175. "(stream_id,start_time,end_time,start_pos,end_pos) "
  176. "VALUES (?,?,?,?,?)",
  177. (id, start, end, start_pos, end_pos))
  178. def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
  179. """Helper that removes interval from the SQL database only"""
  180. self.con.execute("DELETE FROM ranges WHERE "
  181. "stream_id=? AND start_time=? AND "
  182. "end_time=? AND start_pos=? AND end_pos=?",
  183. (id, start, end, start_pos, end_pos))
  184. def _add_interval(self, stream_id, interval, start_pos, end_pos):
  185. """
  186. Add interval to the internal interval cache, and to the database.
  187. Note: arguments must be ints (not numpy.int64, etc)
  188. """
  189. # Load this stream's intervals
  190. iset = self._get_intervals(stream_id)
  191. # Check for overlap
  192. if iset.intersects(interval): # pragma: no cover (gets caught earlier)
  193. raise NilmDBError("new interval overlaps existing data")
  194. # Check for adjacency. If there's a stream in the database
  195. # that ends exactly when this one starts, and the database
  196. # rows match up, we can make one interval that covers the
  197. # time range [adjacent.start -> interval.end)
  198. # and database rows [ adjacent.start_pos -> end_pos ].
  199. # Only do this if the resulting interval isn't too large.
  200. max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
  201. adjacent = iset.find_end(interval.start)
  202. if (adjacent is not None and
  203. start_pos == adjacent.db_endpos and
  204. (end_pos - adjacent.db_startpos) < max_merged_rows):
  205. # First delete the old one, both from our iset and the
  206. # database
  207. iset -= adjacent
  208. self._sql_interval_delete(stream_id,
  209. adjacent.db_start, adjacent.db_end,
  210. adjacent.db_startpos, adjacent.db_endpos)
  211. # Now update our interval so the fallthrough add is
  212. # correct.
  213. interval.start = adjacent.start
  214. start_pos = adjacent.db_startpos
  215. # Add the new interval to the iset
  216. iset.iadd_nocheck(DBInterval(interval.start, interval.end,
  217. interval.start, interval.end,
  218. start_pos, end_pos))
  219. # Insert into the database
  220. self._sql_interval_insert(stream_id, interval.start, interval.end,
  221. int(start_pos), int(end_pos))
  222. self.con.commit()
  223. def _remove_interval(self, stream_id, original, remove):
  224. """
  225. Remove an interval from the internal cache and the database.
  226. stream_id: id of stream
  227. original: original DBInterval; must be already present in DB
  228. to_remove: DBInterval to remove; must be subset of 'original'
  229. """
  230. # Just return if we have nothing to remove
  231. if remove.start == remove.end: # pragma: no cover
  232. return
  233. # Load this stream's intervals
  234. iset = self._get_intervals(stream_id)
  235. # Remove existing interval from the cached set and the database
  236. iset -= original
  237. self._sql_interval_delete(stream_id,
  238. original.db_start, original.db_end,
  239. original.db_startpos, original.db_endpos)
  240. # Add back the intervals that would be left over if the
  241. # requested interval is removed. There may be two of them, if
  242. # the removed piece was in the middle.
  243. def add(iset, start, end, start_pos, end_pos):
  244. iset += DBInterval(start, end, start, end, start_pos, end_pos)
  245. self._sql_interval_insert(stream_id, start, end, start_pos, end_pos)
  246. if original.start != remove.start:
  247. # Interval before the removed region
  248. add(iset, original.start, remove.start,
  249. original.db_startpos, remove.db_startpos)
  250. if original.end != remove.end:
  251. # Interval after the removed region
  252. add(iset, remove.end, original.end,
  253. remove.db_endpos, original.db_endpos)
  254. # Commit SQL changes
  255. self.con.commit()
  256. return
  257. def stream_list(self, path = None, layout = None, extended = False):
  258. """Return list of lists of all streams in the database.
  259. If path is specified, include only streams with a path that
  260. matches the given string.
  261. If layout is specified, include only streams with a layout
  262. that matches the given string.
  263. If extended = False, returns a list of lists containing
  264. the path and layout: [ path, layout ]
  265. If extended = True, returns a list of lists containing
  266. more information:
  267. path
  268. layout
  269. interval_min (earliest interval start)
  270. interval_max (latest interval end)
  271. rows (total number of rows of data)
  272. time (total time covered by this stream, in timestamp units)
  273. """
  274. params = ()
  275. query = "SELECT streams.path, streams.layout"
  276. if extended:
  277. query += ", min(ranges.start_time), max(ranges.end_time) "
  278. query += ", coalesce(sum(ranges.end_pos - ranges.start_pos), 0) "
  279. query += ", coalesce(sum(ranges.end_time - ranges.start_time), 0) "
  280. query += " FROM streams"
  281. if extended:
  282. query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
  283. query += " WHERE 1=1"
  284. if layout is not None:
  285. query += " AND streams.layout=?"
  286. params += (layout,)
  287. if path is not None:
  288. query += " AND streams.path=?"
  289. params += (path,)
  290. query += " GROUP BY streams.id ORDER BY streams.path"
  291. result = self.con.execute(query, params).fetchall()
  292. return [ list(x) for x in result ]
  293. def stream_intervals(self, path, start = None, end = None, diffpath = None):
  294. """
  295. List all intervals in 'path' between 'start' and 'end'. If
  296. 'diffpath' is not none, list instead the set-difference
  297. between the intervals in the two streams; i.e. all interval
  298. ranges that are present in 'path' but not 'diffpath'.
  299. Returns (intervals, restart) tuple.
  300. 'intervals' is a list of [start,end] timestamps of all intervals
  301. that exist for path, between start and end.
  302. 'restart', if not None, means that there were too many results
  303. to return in a single request. The data is complete from the
  304. starting timestamp to the point at which it was truncated, and
  305. a new request with a start time of 'restart' will fetch the
  306. next block of data.
  307. """
  308. stream_id = self._stream_id(path)
  309. intervals = self._get_intervals(stream_id)
  310. if diffpath:
  311. diffstream_id = self._stream_id(diffpath)
  312. diffintervals = self._get_intervals(diffstream_id)
  313. (start, end) = self._check_user_times(start, end)
  314. requested = Interval(start, end)
  315. result = []
  316. if diffpath:
  317. getter = nilmdb.utils.interval.set_difference(
  318. intervals.intersection(requested),
  319. diffintervals.intersection(requested))
  320. else:
  321. getter = intervals.intersection(requested)
  322. for n, i in enumerate(getter):
  323. if n >= self.max_results:
  324. restart = i.start
  325. break
  326. result.append([i.start, i.end])
  327. else:
  328. restart = None
  329. return (result, restart)
  330. def stream_create(self, path, layout_name):
  331. """Create a new table in the database.
  332. path: path to the data (e.g. '/newton/prep').
  333. Paths must contain at least two elements, e.g.:
  334. /newton/prep
  335. /newton/raw
  336. /newton/upstairs/prep
  337. /newton/upstairs/raw
  338. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  339. """
  340. # Create the bulk storage. Raises ValueError on error, which we
  341. # pass along.
  342. self.data.create(path, layout_name)
  343. # Insert into SQL database once the bulk storage is happy
  344. with self.con as con:
  345. con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
  346. (path, layout_name))
  347. def _stream_id(self, path):
  348. """Return unique stream ID"""
  349. result = self.con.execute("SELECT id FROM streams WHERE path=?",
  350. (path,)).fetchone()
  351. if result is None:
  352. raise StreamError("No stream at path " + path)
  353. return result[0]
  354. def stream_set_metadata(self, path, data):
  355. """Set stream metadata from a dictionary, e.g.
  356. { description = 'Downstairs lighting',
  357. v_scaling = 123.45 }
  358. This replaces all existing metadata.
  359. """
  360. stream_id = self._stream_id(path)
  361. with self.con as con:
  362. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  363. for key in data:
  364. if data[key] != '':
  365. con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
  366. (stream_id, key, data[key]))
  367. def stream_get_metadata(self, path):
  368. """Return stream metadata as a dictionary."""
  369. stream_id = self._stream_id(path)
  370. result = self.con.execute("SELECT metadata.key, metadata.value "
  371. "FROM metadata "
  372. "WHERE metadata.stream_id=?", (stream_id,))
  373. data = {}
  374. for (key, value) in result:
  375. data[key] = value
  376. return data
  377. def stream_update_metadata(self, path, newdata):
  378. """Update stream metadata from a dictionary"""
  379. data = self.stream_get_metadata(path)
  380. data.update(newdata)
  381. self.stream_set_metadata(path, data)
  382. def stream_rename(self, oldpath, newpath):
  383. """Rename a stream."""
  384. stream_id = self._stream_id(oldpath)
  385. # Rename the data
  386. self.data.rename(oldpath, newpath)
  387. # Rename the stream in the database
  388. with self.con as con:
  389. con.execute("UPDATE streams SET path=? WHERE id=?",
  390. (newpath, stream_id))
  391. def stream_destroy(self, path):
  392. """Fully remove a table from the database. Fails if there are
  393. any intervals data present; remove them first. Metadata is
  394. also removed."""
  395. stream_id = self._stream_id(path)
  396. # Verify that no intervals are present, and clear the cache
  397. iset = self._get_intervals(stream_id)
  398. if len(iset):
  399. raise NilmDBError("all intervals must be removed before "
  400. "destroying a stream")
  401. self._get_intervals.cache_remove(self, stream_id)
  402. # Delete the bulkdata storage
  403. self.data.destroy(path)
  404. # Delete metadata, stream, intervals (should be none)
  405. with self.con as con:
  406. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  407. con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
  408. con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
  409. def stream_insert(self, path, start, end, data, binary = False):
  410. """Insert new data into the database.
  411. path: Path at which to add the data
  412. start: Starting timestamp
  413. end: Ending timestamp
  414. data: Textual data, formatted according to the layout of path
  415. 'binary', if True, means that 'data' is raw binary:
  416. little-endian, matching the current table's layout,
  417. including the int64 timestamp.
  418. """
  419. # First check for basic overlap using timestamp info given.
  420. stream_id = self._stream_id(path)
  421. iset = self._get_intervals(stream_id)
  422. interval = Interval(start, end)
  423. if iset.intersects(interval):
  424. raise OverlapError("new data overlaps existing data at range: "
  425. + str(iset & interval))
  426. # Tenatively append the data. This will raise a ValueError if
  427. # there are any parse errors.
  428. table = self.data.getnode(path)
  429. row_start = table.nrows
  430. table.append_data(data, start, end, binary)
  431. row_end = table.nrows
  432. # Insert the record into the sql database.
  433. self._add_interval(stream_id, interval, row_start, row_end)
  434. # And that's all
  435. return
  436. def _bisect_left(self, a, x, lo, hi):
  437. # Like bisect.bisect_left, but doesn't choke on large indices on
  438. # 32-bit systems, like bisect's fast C implementation does.
  439. while lo < hi:
  440. mid = (lo + hi) / 2
  441. if a[mid] < x:
  442. lo = mid + 1
  443. else:
  444. hi = mid
  445. return lo
  446. def _find_start(self, table, dbinterval):
  447. """
  448. Given a DBInterval, find the row in the database that
  449. corresponds to the start time. Return the first database
  450. position with a timestamp (first element) greater than or
  451. equal to 'start'.
  452. """
  453. # Optimization for the common case where an interval wasn't truncated
  454. if dbinterval.start == dbinterval.db_start:
  455. return dbinterval.db_startpos
  456. return self._bisect_left(table,
  457. dbinterval.start,
  458. dbinterval.db_startpos,
  459. dbinterval.db_endpos)
  460. def _find_end(self, table, dbinterval):
  461. """
  462. Given a DBInterval, find the row in the database that follows
  463. the end time. Return the first database position after the
  464. row with timestamp (first element) greater than or equal
  465. to 'end'.
  466. """
  467. # Optimization for the common case where an interval wasn't truncated
  468. if dbinterval.end == dbinterval.db_end:
  469. return dbinterval.db_endpos
  470. # Note that we still use bisect_left here, because we don't
  471. # want to include the given timestamp in the results. This is
  472. # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
  473. # non-overlapping data.
  474. return self._bisect_left(table,
  475. dbinterval.end,
  476. dbinterval.db_startpos,
  477. dbinterval.db_endpos)
  478. def stream_extract(self, path, start = None, end = None,
  479. count = False, markup = False, binary = False):
  480. """
  481. Returns (data, restart) tuple.
  482. 'data' is ASCII-formatted data from the database, formatted
  483. according to the layout of the stream.
  484. 'restart', if not None, means that there were too many results to
  485. return in a single request. The data is complete from the
  486. starting timestamp to the point at which it was truncated,
  487. and a new request with a start time of 'restart' will fetch
  488. the next block of data.
  489. 'count', if true, means to not return raw data, but just the count
  490. of rows that would have been returned. This is much faster
  491. than actually fetching the data. It is not limited by
  492. max_results.
  493. 'markup', if true, indicates that returned data should be
  494. marked with a comment denoting when a particular interval
  495. starts, and another comment when an interval ends.
  496. 'binary', if true, means to return raw binary rather than
  497. ASCII-formatted data.
  498. """
  499. stream_id = self._stream_id(path)
  500. table = self.data.getnode(path)
  501. intervals = self._get_intervals(stream_id)
  502. (start, end) = self._check_user_times(start, end)
  503. requested = Interval(start, end)
  504. result = []
  505. matched = 0
  506. remaining = self.max_results
  507. restart = None
  508. if binary and (markup or count):
  509. raise NilmDBError("binary mode can't be used with markup or count")
  510. for interval in intervals.intersection(requested):
  511. # Reading single rows from the table is too slow, so
  512. # we use two bisections to find both the starting and
  513. # ending row for this particular interval, then
  514. # read the entire range as one slice.
  515. row_start = self._find_start(table, interval)
  516. row_end = self._find_end(table, interval)
  517. if count:
  518. matched += row_end - row_start
  519. continue
  520. # Shorten it if we'll hit the maximum number of results
  521. row_max = row_start + remaining
  522. if row_max < row_end:
  523. row_end = row_max
  524. restart = table[row_max]
  525. # Add markup
  526. if markup:
  527. result.append("# interval-start " +
  528. timestamp_to_string(interval.start) + "\n")
  529. # Gather these results up
  530. result.append(table.get_data(row_start, row_end, binary))
  531. # Count them
  532. remaining -= row_end - row_start
  533. # Add markup, and exit if restart is set.
  534. if restart is not None:
  535. if markup:
  536. result.append("# interval-end " +
  537. timestamp_to_string(restart) + "\n")
  538. break
  539. if markup:
  540. result.append("# interval-end " +
  541. timestamp_to_string(interval.end) + "\n")
  542. if count:
  543. return matched
  544. return ("".join(result), restart)
  545. def stream_remove(self, path, start = None, end = None):
  546. """
  547. Remove data from the specified time interval within a stream.
  548. Removes data in the interval [start, end), and intervals are
  549. truncated or split appropriately.
  550. Returns a (removed, restart) tuple.
  551. 'removed' is the number of data points that were removed.
  552. 'restart', if not None, means there were too many rows to
  553. remove in a single request. This function should be called
  554. again with a start time of 'restart' to complete the removal.
  555. """
  556. stream_id = self._stream_id(path)
  557. table = self.data.getnode(path)
  558. intervals = self._get_intervals(stream_id)
  559. (start, end) = self._check_user_times(start, end)
  560. to_remove = Interval(start, end)
  561. removed = 0
  562. remaining = self.max_removals
  563. int_remaining = self.max_int_removals
  564. restart = None
  565. # Can't remove intervals from within the iterator, so we need to
  566. # remember what's currently in the intersection now.
  567. all_candidates = list(intervals.intersection(to_remove, orig = True))
  568. remove_start = None
  569. remove_end = None
  570. for (dbint, orig) in all_candidates:
  571. # Stop if we've hit the max number of interval removals
  572. if int_remaining <= 0:
  573. restart = dbint.start
  574. break
  575. # Find row start and end
  576. row_start = self._find_start(table, dbint)
  577. row_end = self._find_end(table, dbint)
  578. # Shorten it if we'll hit the maximum number of removals
  579. row_max = row_start + remaining
  580. if row_max < row_end:
  581. row_end = row_max
  582. dbint.end = table[row_max]
  583. restart = dbint.end
  584. # Adjust the DBInterval to match the newly found ends
  585. dbint.db_start = dbint.start
  586. dbint.db_end = dbint.end
  587. dbint.db_startpos = row_start
  588. dbint.db_endpos = row_end
  589. # Remove interval from the database
  590. self._remove_interval(stream_id, orig, dbint)
  591. # Remove data from the underlying table storage,
  592. # coalescing adjacent removals to reduce the number of calls
  593. # to table.remove.
  594. if remove_end == row_start:
  595. # Extend our coalesced region
  596. remove_end = row_end
  597. else:
  598. # Perform previous removal, then save this one
  599. if remove_end is not None:
  600. table.remove(remove_start, remove_end)
  601. remove_start = row_start
  602. remove_end = row_end
  603. # Count how many were removed
  604. removed += row_end - row_start
  605. remaining -= row_end - row_start
  606. int_remaining -= 1
  607. if restart is not None:
  608. break
  609. # Perform any final coalesced removal
  610. if remove_end is not None:
  611. table.remove(remove_start, remove_end)
  612. return (removed, restart)