summaryrefslogtreecommitdiffstats
path: root/src/kv/RocksDBStore.cc
diff options
context:
space:
mode:
authorKefu Chai <kchai@redhat.com>2020-06-09 14:11:32 +0200
committerGitHub <noreply@github.com>2020-06-09 14:11:32 +0200
commite13193fcaaa57101ca34c90b02d1920d4710eca0 (patch)
treece19877b3c72f6dc073ad057884000c73e00fa72 /src/kv/RocksDBStore.cc
parentMerge pull request #35186 from liewegas/wip-cas (diff)
parentkv/RocksDBStore: Added resharding control (diff)
downloadceph-e13193fcaaa57101ca34c90b02d1920d4710eca0.tar.xz
ceph-e13193fcaaa57101ca34c90b02d1920d4710eca0.zip
Merge pull request #34667 from aclamk/wip-rocksdb-reshard
Resharding tool for sharded rocksdb Reviewed-by: Josh Durgin <jdurgin@redhat.com>
Diffstat (limited to 'src/kv/RocksDBStore.cc')
-rw-r--r--src/kv/RocksDBStore.cc431
1 files changed, 426 insertions, 5 deletions
diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc
index af42aae5ef7..31ce5d729c9 100644
--- a/src/kv/RocksDBStore.cc
+++ b/src/kv/RocksDBStore.cc
@@ -92,9 +92,6 @@ public:
for (auto& p : store.merge_ops) {
names[p.first] = p.second->name();
}
- for (auto& p : store.cf_handles) {
- names.erase(p.first);
- }
for (auto& p : names) {
store.assoc_name += '.';
store.assoc_name += p.first;
@@ -296,7 +293,7 @@ int RocksDBStore::ParseOptionsFromStringStatic(
return -EINVAL;
}
}
- lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
+ lgeneric_dout(cct, 1) << " set rocksdb option " << it->first
<< " = " << it->second << dendl;
}
return 0;
@@ -414,6 +411,8 @@ int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options&
if (priv) {
dout(10) << __func__ << " using custom Env " << priv << dendl;
opt.env = static_cast<rocksdb::Env*>(priv);
+ } else {
+ env = opt.env;
}
opt.env->SetAllowNonOwnerAccess(false);
@@ -985,7 +984,6 @@ int RocksDBStore::_test_init(const string& dir)
RocksDBStore::~RocksDBStore()
{
close();
-
if (priv) {
delete static_cast<rocksdb::Env*>(priv);
}
@@ -2635,3 +2633,426 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
db->NewIterator(rocksdb::ReadOptions(), default_cf));
}
+
+int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
+ std::vector<std::string>& to_process_columns,
+ std::vector<rocksdb::ColumnFamilyHandle*>& to_process_handles)
+{
+ //0. lock db from opening
+ //1. list existing columns
+ //2. apply merge operator to (main + columns) opts
+ //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
+ //4. open db, acquire existing column handles
+ //5. calculate missing columns
+ //6. create missing columns
+ //7. construct cf_handles according to new sharding
+ //8. check is all cf_handles are filled
+
+ bool b;
+ std::vector<ColumnFamily> new_sharding_def;
+ char const* error_position;
+ std::string error_msg;
+ b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg);
+ if (!b) {
+ dout(1) << __func__ << " bad sharding: " << dendl;
+ dout(1) << __func__ << new_sharding << dendl;
+ dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl;
+ return -EINVAL;
+ }
+
+ //0. lock db from opening
+ std::string stored_sharding_text;
+ rocksdb::ReadFileToString(env,
+ sharding_def_file,
+ &stored_sharding_text);
+ if (stored_sharding_text.find("reshardingXcommencingXlocked") == string::npos) {
+ rocksdb::Status status;
+ if (stored_sharding_text.size() != 0)
+ stored_sharding_text += " ";
+ stored_sharding_text += "reshardingXcommencingXlocked";
+ env->CreateDir(sharding_def_dir);
+ status = rocksdb::WriteStringToFile(env, stored_sharding_text,
+ sharding_def_file, true);
+ if (!status.ok()) {
+ derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+ return -EIO;
+ }
+ }
+
+ //1. list existing columns
+
+ rocksdb::Status status;
+ std::vector<std::string> existing_columns;
+ rocksdb::Options opt;
+ int r = load_rocksdb_options(false, opt);
+ if (r) {
+ dout(1) << __func__ << " load rocksdb options failed" << dendl;
+ return r;
+ }
+ status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns);
+ if (!status.ok()) {
+ derr << "Unable to list column families: " << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ dout(5) << "existing columns = " << existing_columns << dendl;
+
+ //2. apply merge operator to (main + columns) opts
+ //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
+
+ std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open;
+ for (const auto& full_name : existing_columns) {
+ //split col_name to <prefix>-<number>
+ std::string base_name;
+ size_t pos = full_name.find('-');
+ if (std::string::npos == pos)
+ base_name = full_name;
+ else
+ base_name = full_name.substr(0,pos);
+
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ // search if we have options for this column
+ std::string options;
+ for (const auto& nsd : new_sharding_def) {
+ if (nsd.name == base_name) {
+ options = nsd.options;
+ break;
+ }
+ }
+ status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " failure parsing column options: " << options << dendl;
+ return -EINVAL;
+ }
+ if (base_name != rocksdb::kDefaultColumnFamilyName)
+ install_cf_mergeop(base_name, &cf_opt);
+ cfs_to_open.emplace_back(full_name, cf_opt);
+ }
+
+ //4. open db, acquire existing column handles
+ std::vector<rocksdb::ColumnFamilyHandle*> handles;
+ status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
+ path, cfs_to_open, &handles, &db);
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ for (size_t i = 0; i < cfs_to_open.size(); i++) {
+ dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl;
+ }
+
+ //5. calculate missing columns
+ std::vector<std::string> new_sharding_columns;
+ std::vector<std::string> missing_columns;
+ sharding_def_to_columns(new_sharding_def,
+ new_sharding_columns);
+ dout(5) << "target columns = " << new_sharding_columns << dendl;
+ for (const auto& n : new_sharding_columns) {
+ bool found = false;
+ for (const auto& e : existing_columns) {
+ if (n == e) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ missing_columns.push_back(n);
+ }
+ }
+ dout(5) << "missing columns = " << missing_columns << dendl;
+
+ //6. create missing columns
+ for (const auto& full_name : missing_columns) {
+ std::string base_name;
+ size_t pos = full_name.find('-');
+ if (std::string::npos == pos)
+ base_name = full_name;
+ else
+ base_name = full_name.substr(0,pos);
+
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ // search if we have options for this column
+ std::string options;
+ for (const auto& nsd : new_sharding_def) {
+ if (nsd.name == base_name) {
+ options = nsd.options;
+ break;
+ }
+ }
+ status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " failure parsing column options: " << options << dendl;
+ return -EINVAL;
+ }
+ install_cf_mergeop(base_name, &cf_opt);
+ rocksdb::ColumnFamilyHandle *cf;
+ status = db->CreateColumnFamily(cf_opt, full_name, &cf);
+ if (!status.ok()) {
+ derr << __func__ << " Failed to create rocksdb column family: "
+ << full_name << dendl;
+ return -EINVAL;
+ }
+ dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl;
+ existing_columns.push_back(full_name);
+ handles.push_back(cf);
+ }
+
+ //7. construct cf_handles according to new sharding
+ for (size_t i = 0; i < existing_columns.size(); i++) {
+ std::string full_name = existing_columns[i];
+ rocksdb::ColumnFamilyHandle *cf = handles[i];
+ std::string base_name;
+ size_t shard_idx = 0;
+ size_t pos = full_name.find('-');
+ dout(10) << "processing column " << full_name << dendl;
+ if (std::string::npos == pos) {
+ base_name = full_name;
+ } else {
+ base_name = full_name.substr(0,pos);
+ shard_idx = atoi(full_name.substr(pos+1).c_str());
+ }
+ if (rocksdb::kDefaultColumnFamilyName == base_name) {
+ default_cf = handles[i];
+ must_close_default_cf = true;
+ } else {
+ for (const auto& nsd : new_sharding_def) {
+ if (nsd.name == base_name) {
+ if (shard_idx < nsd.shard_cnt) {
+ add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf);
+ } else {
+ //ignore columns with index larger then shard count
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ //8. check if all cf_handles are filled
+ for (const auto& col : cf_handles) {
+ for (size_t i = 0; i < col.second.handles.size(); i++) {
+ if (col.second.handles[i] == nullptr) {
+ derr << "missing handle for column " << col.first << " shard " << i << dendl;
+ return -EIO;
+ }
+ }
+ }
+ to_process_columns = existing_columns;
+ to_process_handles = handles;
+ return 0;
+}
+
+int RocksDBStore::reshard_cleanup(const std::vector<std::string>& current_columns,
+ const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles)
+{
+ std::vector<std::string> new_sharding_columns;
+ for (const auto& col: cf_handles) {
+ if (col.second.handles.size() == 1) {
+ new_sharding_columns.push_back(col.first);
+ } else {
+ for (size_t i = 0; i < col.second.handles.size(); i++) {
+ new_sharding_columns.push_back(col.first + "-" + to_string(i));
+ }
+ }
+ }
+
+ for (size_t i = 0; i < current_columns.size(); i++) {
+ bool found = false;
+ for (size_t j = 0; j < new_sharding_columns.size(); j++) {
+ if (current_columns[i] == new_sharding_columns[j]) {
+ found = true;
+ break;
+ }
+ }
+ if (found || current_columns[i] == rocksdb::kDefaultColumnFamilyName) {
+ dout(5) << "Column " << current_columns[i] << " is part of new sharding." << dendl;
+ continue;
+ }
+ dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl;
+
+ // verify that column is empty
+ rocksdb::Iterator* it;
+ it = db->NewIterator(rocksdb::ReadOptions(), current_handles[i]);
+ ceph_assert(it);
+ it->SeekToFirst();
+ ceph_assert(!it->Valid());
+ delete it;
+ rocksdb::Status status;
+ status = db->DropColumnFamily(current_handles[i]);
+ if (!status.ok()) {
+ derr << __func__ << " Failed to delete column: "
+ << current_columns[i] << dendl;
+ return -EINVAL;
+ }
+ }
+ return 0;
+}
+
+int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in)
+{
+ rocksdb::Status status;
+ int r;
+ std::vector<std::string> to_process_columns;
+ std::vector<rocksdb::ColumnFamilyHandle*> to_process_handles;
+
+ resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl();
+ size_t bytes_in_batch = 0;
+ size_t keys_in_batch = 0;
+ size_t bytes_per_iterator = 0;
+ size_t keys_per_iterator = 0;
+ size_t keys_processed = 0;
+ size_t keys_moved = 0;
+
+ rocksdb::WriteBatch* bat = nullptr;
+
+ auto flush_batch = [&]() {
+ dout(10) << "flushing batch, " << keys_in_batch << " keys, for "
+ << bytes_in_batch << " bytes" << dendl;
+ rocksdb::WriteOptions woptions;
+ woptions.sync = true;
+ rocksdb::Status s = db->Write(woptions, bat);
+ ceph_assert(s.ok());
+ bytes_in_batch = 0;
+ keys_in_batch = 0;
+ delete bat;
+ bat = new rocksdb::WriteBatch();
+ ceph_assert(bat);
+ };
+
+ auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
+ const std::string& fixed_prefix)
+ {
+ int r = 0;
+ dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
+ rocksdb::Iterator* it;
+ it = db->NewIterator(rocksdb::ReadOptions(), handle);
+ ceph_assert(it);
+ bat = new rocksdb::WriteBatch();
+ ceph_assert(bat);
+
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ rocksdb::Slice raw_key = it->key();
+ dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
+ //check if need to refresh iterator
+ if (bytes_per_iterator >= ctrl.bytes_per_iterator ||
+ keys_per_iterator >= ctrl.keys_per_iterator) {
+ dout(8) << "refreshing iterator" << dendl;
+ bytes_per_iterator = 0;
+ keys_per_iterator = 0;
+ std::string raw_key_str = raw_key.ToString();
+ delete it;
+ it = db->NewIterator(rocksdb::ReadOptions(), handle);
+ ceph_assert(it);
+ it->Seek(raw_key_str);
+ ceph_assert(it->Valid());
+ raw_key = it->key();
+ }
+ rocksdb::Slice value = it->value();
+ std::string prefix, key;
+ if (fixed_prefix.size() == 0) {
+ split_key(raw_key, &prefix, &key);
+ } else {
+ prefix = fixed_prefix;
+ key = raw_key.ToString();
+ }
+ keys_processed++;
+ if ((keys_processed % 10000) == 0) {
+ dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl;
+ }
+ std::string new_raw_key;
+ rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key);
+ if (new_handle == nullptr) {
+ new_handle = default_cf;
+ }
+ if (handle == new_handle) {
+ continue;
+ }
+ if (new_handle == default_cf) {
+ new_raw_key = combine_strings(prefix, key);
+ } else {
+ new_raw_key = key;
+ }
+ bat->Delete(handle, raw_key);
+ bat->Put(new_handle, new_raw_key, value);
+ dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) <<
+ " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) <<
+ " size " << value.size() << dendl;
+ keys_moved++;
+ bytes_in_batch += new_raw_key.size() * 2 + value.size();
+ keys_in_batch++;
+ bytes_per_iterator += new_raw_key.size() * 2 + value.size();
+ keys_per_iterator++;
+
+ //check if need to write batch
+ if (bytes_in_batch >= ctrl.bytes_per_batch ||
+ keys_in_batch >= ctrl.keys_per_batch) {
+ flush_batch();
+ if (ctrl.unittest_fail_after_first_batch) {
+ r = -1000;
+ goto out;
+ }
+ }
+ }
+ flush_batch();
+ out:
+ delete it;
+ delete bat;
+ return r;
+ };
+
+ r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles);
+ if (r != 0) {
+ dout(1) << "failed to prepare db for reshard" << dendl;
+ goto cleanup;
+ }
+
+ ceph_assert(to_process_columns.size() == to_process_handles.size());
+ for (size_t idx = 0; idx < to_process_columns.size(); idx++) {
+ dout(5) << "Processing column=" << to_process_columns[idx] <<
+ " handle=" << to_process_handles[idx] << dendl;
+ if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) {
+ ceph_assert(to_process_handles[idx] == default_cf);
+ r = process_column(default_cf, std::string());
+ } else {
+ std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-'));
+ dout(10) << "Prefix: " << fixed_prefix << dendl;
+ r = process_column(to_process_handles[idx], fixed_prefix);
+ }
+ if (r != 0) {
+ derr << "Error processing column " << to_process_columns[idx] << dendl;
+ goto cleanup;
+ }
+ if (ctrl.unittest_fail_after_processing_column) {
+ r = -1001;
+ goto cleanup;
+ }
+ }
+
+ r = reshard_cleanup(to_process_columns, to_process_handles);
+ if (r != 0) {
+ dout(5) << "failed to cleanup after reshard" << dendl;
+ goto cleanup;
+ }
+
+ if (ctrl.unittest_fail_after_successful_processing) {
+ r = -1002;
+ goto cleanup;
+ }
+ env->CreateDir(sharding_def_dir);
+ status = rocksdb::WriteStringToFile(env, new_sharding,
+ sharding_def_file, true);
+ if (!status.ok()) {
+ derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+ r = -EIO;
+ }
+
+ cleanup:
+ //close column handles
+ for (const auto& col: cf_handles) {
+ for (size_t i = 0; i < col.second.handles.size(); i++) {
+ db->DestroyColumnFamilyHandle(col.second.handles[i]);
+ }
+ }
+ cf_handles.clear();
+ close();
+ return r;
+}