Implemented in command line, client, server, nilmdb, bulkdatatags/nilmdb-1.4.0
@@ -101,6 +101,12 @@ class Client(object): | |||
params = { "path": path } | |||
return self.http.post("stream/destroy", params) | |||
def stream_rename(self, oldpath, newpath): | |||
"""Rename a stream.""" | |||
params = { "oldpath": oldpath, | |||
"newpath": newpath } | |||
return self.http.post("stream/rename", params) | |||
def stream_remove(self, path, start = None, end = None): | |||
"""Remove data from the specified time range""" | |||
params = { | |||
@@ -15,7 +15,7 @@ from argparse import ArgumentDefaultsHelpFormatter as def_form | |||
# things up -- they're still called with Cmdline as self. | |||
subcommands = [ "help", "info", "create", "list", "metadata", | |||
"insert", "extract", "remove", "destroy", | |||
"intervals" ] | |||
"intervals", "rename" ] | |||
# Import the subcommand modules | |||
subcmd_mods = {} | |||
@@ -0,0 +1,28 @@ | |||
from nilmdb.utils.printf import * | |||
import nilmdb.client | |||
from argparse import ArgumentDefaultsHelpFormatter as def_form | |||
def setup(self, sub): | |||
cmd = sub.add_parser("rename", help="Rename a stream", | |||
formatter_class = def_form, | |||
description=""" | |||
Rename a stream. | |||
Only the stream's path is renamed; no | |||
metadata is changed. | |||
""") | |||
cmd.set_defaults(handler = cmd_rename) | |||
group = cmd.add_argument_group("Required arguments") | |||
group.add_argument("oldpath", | |||
help="Old path, e.g. /foo/old") | |||
group.add_argument("newpath", | |||
help="New path, e.g. /foo/bar/new") | |||
return cmd | |||
def cmd_rename(self): | |||
"""Rename a stream""" | |||
try: | |||
self.client.stream_rename(self.args.oldpath, self.args.newpath) | |||
except nilmdb.client.ClientError as e: | |||
self.die("error renaming stream: %s", str(e)) |
@@ -12,6 +12,7 @@ import os | |||
import cPickle as pickle | |||
import re | |||
import sys | |||
import tempfile | |||
#from . import pyrocket as rocket | |||
from . import rocket | |||
@@ -56,6 +57,14 @@ class BulkData(object): | |||
return path.encode('utf-8') | |||
return path | |||
def _create_check_ospath(self, ospath): | |||
if ospath[-1] == '/': | |||
raise ValueError("invalid path; should not end with a /") | |||
if Table.exists(ospath): | |||
raise ValueError("stream already exists at this path") | |||
if os.path.isdir(ospath): | |||
raise ValueError("subdirs of this path already exist") | |||
def _create_parents(self, unicodepath): | |||
"""Verify the path name, and create parent directories if they | |||
don't exist. Returns a list of elements that got created.""" | |||
@@ -72,9 +81,10 @@ class BulkData(object): | |||
if not Table.valid_path(path): | |||
raise ValueError("path name is invalid or contains reserved words") | |||
# Create the table. Note that we make a distinction here | |||
# between NilmDB paths (always Unix style, split apart | |||
# manually) and OS paths (built up with os.path.join) | |||
# Create the table's base dir. Note that we make a | |||
# distinction here between NilmDB paths (always Unix style, | |||
# split apart manually) and OS paths (built up with | |||
# os.path.join) | |||
# Make directories leading up to this one | |||
elements = path.lstrip('/').split('/') | |||
@@ -91,7 +101,7 @@ class BulkData(object): | |||
except Exception as e: | |||
# Try to remove paths that we created; ignore errors | |||
exc_info = sys.exc_info() | |||
for ospath in reversed(made_dirs): | |||
for ospath in reversed(made_dirs): # pragma: no cover (hard to hit) | |||
try: | |||
os.rmdir(ospath) | |||
except OSError: | |||
@@ -115,10 +125,7 @@ class BulkData(object): | |||
# Make the final dir | |||
ospath = os.path.join(self.root, *elements) | |||
if Table.exists(ospath): | |||
raise ValueError("stream already exists at this path") | |||
if os.path.isdir(ospath): | |||
raise ValueError("subdirs of this path already exist") | |||
self._create_check_ospath(ospath) | |||
os.mkdir(ospath) | |||
try: | |||
@@ -139,6 +146,55 @@ class BulkData(object): | |||
# Success | |||
return | |||
def _remove_leaves(self, unicodepath): | |||
"""Remove empty directories starting at the leaves of unicodepath""" | |||
path = self._encode_filename(unicodepath) | |||
elements = path.lstrip('/').split('/') | |||
for i in reversed(range(len(elements))): | |||
ospath = os.path.join(self.root, *elements[0:i+1]) | |||
try: | |||
os.rmdir(ospath) | |||
except OSError: | |||
pass | |||
def rename(self, oldunicodepath, newunicodepath): | |||
"""Move entire tree from 'oldunicodepath' to | |||
'newunicodepath'""" | |||
oldpath = self._encode_filename(oldunicodepath) | |||
newpath = self._encode_filename(newunicodepath) | |||
# Get OS paths | |||
oldelements = oldpath.lstrip('/').split('/') | |||
oldospath = os.path.join(self.root, *oldelements) | |||
newelements = newpath.lstrip('/').split('/') | |||
newospath = os.path.join(self.root, *newelements) | |||
# Basic checks | |||
if oldospath == newospath: | |||
raise ValueError("old and new paths are the same") | |||
self._create_check_ospath(newospath) | |||
# Move the table to a temporary location | |||
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) | |||
tmppath = os.path.join(tmpdir, "table") | |||
os.rename(oldospath, tmppath) | |||
try: | |||
# Create parent dirs for new location | |||
self._create_parents(newunicodepath) | |||
# Move table into new location | |||
os.rename(tmppath, newospath) | |||
except Exception: | |||
# On failure, move the table back to original path | |||
os.rename(tmppath, oldospath) | |||
os.rmdir(tmpdir) | |||
raise | |||
# Prune old dirs | |||
self._remove_leaves(oldunicodepath) | |||
os.rmdir(tmpdir) | |||
def destroy(self, unicodepath): | |||
"""Fully remove all data at a particular path. No way to undo | |||
it! The group/path structure is removed, too.""" | |||
@@ -160,13 +216,8 @@ class BulkData(object): | |||
for name in dirs: | |||
os.rmdir(os.path.join(root, name)) | |||
# Remove empty parent directories | |||
for i in reversed(range(len(elements))): | |||
ospath = os.path.join(self.root, *elements[0:i+1]) | |||
try: | |||
os.rmdir(ospath) | |||
except OSError: | |||
break | |||
# Remove leftover empty directories | |||
self._remove_leaves(unicodepath) | |||
# Cache open tables | |||
@nilmdb.utils.lru_cache(size = table_cache_size, | |||
@@ -405,6 +405,18 @@ class NilmDB(object): | |||
data.update(newdata) | |||
self.stream_set_metadata(path, data) | |||
def stream_rename(self, oldpath, newpath): | |||
"""Rename a stream.""" | |||
stream_id = self._stream_id(oldpath) | |||
# Rename the data | |||
self.data.rename(oldpath, newpath) | |||
# Rename the stream in the database | |||
with self.con as con: | |||
con.execute("UPDATE streams SET path=? WHERE id=?", | |||
(newpath, stream_id)) | |||
def stream_destroy(self, path): | |||
"""Fully remove a table and all of its data from the database. | |||
No way to undo it! Metadata is removed.""" | |||
@@ -212,6 +212,16 @@ class Stream(NilmApp): | |||
"""Delete a stream and its associated data.""" | |||
return self.db.stream_destroy(path) | |||
# /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1 | |||
@cherrypy.expose | |||
@cherrypy.tools.json_in() | |||
@cherrypy.tools.json_out() | |||
@exception_to_httperror(NilmDBError, ValueError) | |||
@cherrypy.tools.CORS_allow(methods = ["POST"]) | |||
def rename(self, oldpath, newpath): | |||
"""Rename a stream.""" | |||
return self.db.stream_rename(oldpath, newpath) | |||
# /stream/get_metadata?path=/newton/prep | |||
# /stream/get_metadata?path=/newton/prep&key=foo&key=bar | |||
@cherrypy.expose | |||
@@ -968,3 +968,85 @@ class TestCmdline(object): | |||
self.ok("destroy /diff/1") | |||
self.ok("destroy /diff/2") | |||
def test_16_rename(self): | |||
# Test renaming. Force file size smaller so we get more files | |||
server_stop() | |||
recursive_unlink(testdb) | |||
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file | |||
"files_per_dir" : 3 }) | |||
# Fill data | |||
self.ok("create /newton/prep float32_8") | |||
os.environ['TZ'] = "UTC" | |||
with open("tests/data/prep-20120323T1004-timestamped") as input: | |||
self.ok("insert -s 20120323T1004 -e 20120323T1006 /newton/prep", | |||
input) | |||
# Extract it | |||
self.ok("extract /newton/prep --start '2000-01-01' " + | |||
"--end '2012-03-23 10:04:01'") | |||
extract_before = self.captured | |||
def check_path(*components): | |||
# Verify the paths look right on disk | |||
seek = os.path.join(testdb, "data", *components) | |||
for (dirpath, dirnames, filenames) in os.walk(testdb): | |||
if "_format" in filenames: | |||
if dirpath == seek: | |||
break | |||
raise AssertionError("data also found at " + dirpath) | |||
else: | |||
raise AssertionError("data not found at " + seek) | |||
# Verify "list" output | |||
self.ok("list") | |||
self.match("/" + "/".join(components) + " float32_8\n") | |||
# Lots of renames | |||
check_path("newton", "prep") | |||
self.fail("rename /newton/prep /newton/prep") | |||
self.contain("old and new paths are the same") | |||
check_path("newton", "prep") | |||
self.fail("rename /newton/prep /newton") | |||
self.contain("subdirs of this path already exist") | |||
self.fail("rename /newton/prep /newton/prep/") | |||
self.contain("invalid path") | |||
self.ok("rename /newton/prep /newton/foo") | |||
check_path("newton", "foo") | |||
self.ok("rename /newton/foo /totally/different/thing") | |||
check_path("totally", "different", "thing") | |||
self.ok("rename /totally/different/thing /totally/something") | |||
check_path("totally", "something") | |||
self.ok("rename /totally/something /totally/something/cool") | |||
check_path("totally", "something", "cool") | |||
self.ok("rename /totally/something/cool /foo/bar") | |||
check_path("foo", "bar") | |||
self.ok("create /xxx/yyy/zzz float32_8") | |||
self.fail("rename /foo/bar /xxx/yyy") | |||
self.contain("subdirs of this path already exist") | |||
self.fail("rename /foo/bar /xxx/yyy/zzz") | |||
self.contain("stream already exists at this path") | |||
self.fail("rename /foo/bar /xxx/yyy/zzz/www") | |||
self.contain("path is subdir of existing node") | |||
self.ok("rename /foo/bar /xxx/yyy/mmm") | |||
self.ok("destroy /xxx/yyy/zzz") | |||
check_path("xxx", "yyy", "mmm") | |||
# Extract it at the final path | |||
self.ok("extract /xxx/yyy/mmm --start '2000-01-01' " + | |||
"--end '2012-03-23 10:04:01'") | |||
eq_(self.captured, extract_before) | |||
self.ok("destroy /xxx/yyy/mmm") | |||
# Make sure temporary rename dirs weren't left around | |||
for (dirpath, dirnames, filenames) in os.walk(testdb): | |||
if "rename-" in dirpath: | |||
raise AssertionError("temporary directories not cleaned up") | |||
if "totally" in dirpath or "newton" in dirpath: | |||
raise AssertionError("old directories not cleaned up") | |||
server_stop() | |||
server_start() |