summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/common/options.cc7
-rw-r--r--src/kv/RocksDBStore.cc102
-rw-r--r--src/kv/RocksDBStore.h4
3 files changed, 102 insertions, 11 deletions
diff --git a/src/common/options.cc b/src/common/options.cc
index 268f496d8f2..d5ecc21c160 100644
--- a/src/common/options.cc
+++ b/src/common/options.cc
@@ -3789,7 +3789,12 @@ std::vector<Option> get_global_options() {
Option("rocksdb_enable_rmrange", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(false)
- .set_description(""),
+ .set_description("Refer to github.com/facebook/rocksdb/wiki/DeleteRange-Implementation"),
+
+ Option("rocksdb_max_items_rmrange", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+ .set_default(1024)
+ .set_description("Delete Range will be called if number of keys exceeded, must enable rocksdb_enable_rmrange first")
+ .add_see_also("rocksdb_enable_rmrange"),
Option("rocksdb_bloom_bits_per_key", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(20)
diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc
index f056d8c2abc..4756697f270 100644
--- a/src/kv/RocksDBStore.cc
+++ b/src/kv/RocksDBStore.cc
@@ -946,7 +946,25 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix
if (cf) {
if (db->enable_rmrange) {
string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
- bat.DeleteRange(cf, string(), endprefix);
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ bat.SetSavePoint();
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(cf, string(), endprefix);
+ return;
+ }
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(cf, string(), endprefix);
+ }
} else {
auto it = db->get_iterator(prefix);
for (it->seek_to_first();
@@ -959,9 +977,29 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix
if (db->enable_rmrange) {
string endprefix = prefix;
endprefix.push_back('\x01');
- bat.DeleteRange(db->default_cf,
- combine_strings(prefix, string()),
- combine_strings(endprefix, string()));
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ bat.SetSavePoint();
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(db->default_cf,
+ combine_strings(prefix, string()),
+ combine_strings(endprefix, string()));
+ return;
+ }
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(db->default_cf,
+ combine_strings(prefix, string()),
+ combine_strings(endprefix, string()));
+ }
} else {
auto it = db->get_iterator(prefix);
for (it->seek_to_first();
@@ -980,7 +1018,28 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
auto cf = db->get_cf_handle(prefix);
if (cf) {
if (db->enable_rmrange) {
- bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ auto it = db->get_iterator(prefix);
+ bat.SetSavePoint();
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ return;
+ }
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ it->next();
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ }
} else {
auto it = db->get_iterator(prefix);
it->lower_bound(start);
@@ -994,10 +1053,35 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
}
} else {
if (db->enable_rmrange) {
- bat.DeleteRange(
- db->default_cf,
- rocksdb::Slice(combine_strings(prefix, start)),
- rocksdb::Slice(combine_strings(prefix, end)));
+ if (db->max_items_rmrange) {
+ uint64_t cnt = db->max_items_rmrange;
+ auto it = db->get_iterator(prefix);
+ bat.SetSavePoint();
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ if (!cnt) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(
+ db->default_cf,
+ rocksdb::Slice(combine_strings(prefix, start)),
+ rocksdb::Slice(combine_strings(prefix, end)));
+ return;
+ }
+ bat.Delete(db->default_cf,
+ combine_strings(prefix, it->key()));
+ it->next();
+ --cnt;
+ }
+ bat.PopSavePoint();
+ } else {
+ bat.DeleteRange(
+ db->default_cf,
+ rocksdb::Slice(combine_strings(prefix, start)),
+ rocksdb::Slice(combine_strings(prefix, end)));
+ }
} else {
auto it = db->get_iterator(prefix);
it->lower_bound(start);
diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h
index 393853d16c0..ea7c77d3c76 100644
--- a/src/kv/RocksDBStore.h
+++ b/src/kv/RocksDBStore.h
@@ -120,6 +120,7 @@ public:
bool compact_on_mount;
bool disableWAL;
bool enable_rmrange;
+ const uint64_t max_items_rmrange;
void compact() override;
void compact_async() override {
@@ -159,7 +160,8 @@ public:
compact_thread(this),
compact_on_mount(false),
disableWAL(false),
- enable_rmrange(cct->_conf->rocksdb_enable_rmrange)
+ enable_rmrange(cct->_conf->rocksdb_enable_rmrange),
+ max_items_rmrange(cct->_conf.get_val<uint64_t>("rocksdb_max_items_rmrange"))
{}
~RocksDBStore() override;