diff options
author | Kefu Chai <kchai@redhat.com> | 2020-06-09 14:11:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-09 14:11:32 +0200 |
commit | e13193fcaaa57101ca34c90b02d1920d4710eca0 (patch) | |
tree | ce19877b3c72f6dc073ad057884000c73e00fa72 /src/kv/RocksDBStore.cc | |
parent | Merge pull request #35186 from liewegas/wip-cas (diff) | |
parent | kv/RocksDBStore: Added resharding control (diff) | |
download | ceph-e13193fcaaa57101ca34c90b02d1920d4710eca0.tar.xz ceph-e13193fcaaa57101ca34c90b02d1920d4710eca0.zip |
Merge pull request #34667 from aclamk/wip-rocksdb-reshard
Resharding tool for sharded rocksdb
Reviewed-by: Josh Durgin <jdurgin@redhat.com>
Diffstat (limited to 'src/kv/RocksDBStore.cc')
-rw-r--r-- | src/kv/RocksDBStore.cc | 431 |
1 files changed, 426 insertions, 5 deletions
diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index af42aae5ef7..31ce5d729c9 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -92,9 +92,6 @@ public: for (auto& p : store.merge_ops) { names[p.first] = p.second->name(); } - for (auto& p : store.cf_handles) { - names.erase(p.first); - } for (auto& p : names) { store.assoc_name += '.'; store.assoc_name += p.first; @@ -296,7 +293,7 @@ int RocksDBStore::ParseOptionsFromStringStatic( return -EINVAL; } } - lgeneric_dout(cct, 0) << " set rocksdb option " << it->first + lgeneric_dout(cct, 1) << " set rocksdb option " << it->first << " = " << it->second << dendl; } return 0; @@ -414,6 +411,8 @@ int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& if (priv) { dout(10) << __func__ << " using custom Env " << priv << dendl; opt.env = static_cast<rocksdb::Env*>(priv); + } else { + env = opt.env; } opt.env->SetAllowNonOwnerAccess(false); @@ -985,7 +984,6 @@ int RocksDBStore::_test_init(const string& dir) RocksDBStore::~RocksDBStore() { close(); - if (priv) { delete static_cast<rocksdb::Env*>(priv); } @@ -2635,3 +2633,426 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator() return std::make_shared<RocksDBWholeSpaceIteratorImpl>( db->NewIterator(rocksdb::ReadOptions(), default_cf)); } + +int RocksDBStore::prepare_for_reshard(const std::string& new_sharding, + std::vector<std::string>& to_process_columns, + std::vector<rocksdb::ColumnFamilyHandle*>& to_process_handles) +{ + //0. lock db from opening + //1. list existing columns + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs + //4. open db, acquire existing column handles + //5. calculate missing columns + //6. create missing columns + //7. construct cf_handles according to new sharding + //8. check is all cf_handles are filled + + bool b; + std::vector<ColumnFamily> new_sharding_def; + char const* error_position; + std::string error_msg; + b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg); + if (!b) { + dout(1) << __func__ << " bad sharding: " << dendl; + dout(1) << __func__ << new_sharding << dendl; + dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl; + return -EINVAL; + } + + //0. lock db from opening + std::string stored_sharding_text; + rocksdb::ReadFileToString(env, + sharding_def_file, + &stored_sharding_text); + if (stored_sharding_text.find("reshardingXcommencingXlocked") == string::npos) { + rocksdb::Status status; + if (stored_sharding_text.size() != 0) + stored_sharding_text += " "; + stored_sharding_text += "reshardingXcommencingXlocked"; + env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(env, stored_sharding_text, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -EIO; + } + } + + //1. list existing columns + + rocksdb::Status status; + std::vector<std::string> existing_columns; + rocksdb::Options opt; + int r = load_rocksdb_options(false, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + return r; + } + status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns); + if (!status.ok()) { + derr << "Unable to list column families: " << status.ToString() << dendl; + return -EINVAL; + } + dout(5) << "existing columns = " << existing_columns << dendl; + + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open + + std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open; + for (const auto& full_name : existing_columns) { + //split col_name to <prefix>-<number> + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt); + if (!status.ok()) { + derr << __func__ << " failure parsing column options: " << options << dendl; + return -EINVAL; + } + if (base_name != rocksdb::kDefaultColumnFamilyName) + install_cf_mergeop(base_name, &cf_opt); + cfs_to_open.emplace_back(full_name, cf_opt); + } + + //4. open db, acquire existing column handles + std::vector<rocksdb::ColumnFamilyHandle*> handles; + status = rocksdb::DB::Open(rocksdb::DBOptions(opt), + path, cfs_to_open, &handles, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + for (size_t i = 0; i < cfs_to_open.size(); i++) { + dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl; + } + + //5. calculate missing columns + std::vector<std::string> new_sharding_columns; + std::vector<std::string> missing_columns; + sharding_def_to_columns(new_sharding_def, + new_sharding_columns); + dout(5) << "target columns = " << new_sharding_columns << dendl; + for (const auto& n : new_sharding_columns) { + bool found = false; + for (const auto& e : existing_columns) { + if (n == e) { + found = true; + break; + } + } + if (!found) { + missing_columns.push_back(n); + } + } + dout(5) << "missing columns = " << missing_columns << dendl; + + //6. create missing columns + for (const auto& full_name : missing_columns) { + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt); + if (!status.ok()) { + derr << __func__ << " failure parsing column options: " << options << dendl; + return -EINVAL; + } + install_cf_mergeop(base_name, &cf_opt); + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(cf_opt, full_name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << full_name << dendl; + return -EINVAL; + } + dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl; + existing_columns.push_back(full_name); + handles.push_back(cf); + } + + //7. construct cf_handles according to new sharding + for (size_t i = 0; i < existing_columns.size(); i++) { + std::string full_name = existing_columns[i]; + rocksdb::ColumnFamilyHandle *cf = handles[i]; + std::string base_name; + size_t shard_idx = 0; + size_t pos = full_name.find('-'); + dout(10) << "processing column " << full_name << dendl; + if (std::string::npos == pos) { + base_name = full_name; + } else { + base_name = full_name.substr(0,pos); + shard_idx = atoi(full_name.substr(pos+1).c_str()); + } + if (rocksdb::kDefaultColumnFamilyName == base_name) { + default_cf = handles[i]; + must_close_default_cf = true; + } else { + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + if (shard_idx < nsd.shard_cnt) { + add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf); + } else { + //ignore columns with index larger then shard count + } + break; + } + } + } + } + + //8. check if all cf_handles are filled + for (const auto& col : cf_handles) { + for (size_t i = 0; i < col.second.handles.size(); i++) { + if (col.second.handles[i] == nullptr) { + derr << "missing handle for column " << col.first << " shard " << i << dendl; + return -EIO; + } + } + } + to_process_columns = existing_columns; + to_process_handles = handles; + return 0; +} + +int RocksDBStore::reshard_cleanup(const std::vector<std::string>& current_columns, + const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles) +{ + std::vector<std::string> new_sharding_columns; + for (const auto& col: cf_handles) { + if (col.second.handles.size() == 1) { + new_sharding_columns.push_back(col.first); + } else { + for (size_t i = 0; i < col.second.handles.size(); i++) { + new_sharding_columns.push_back(col.first + "-" + to_string(i)); + } + } + } + + for (size_t i = 0; i < current_columns.size(); i++) { + bool found = false; + for (size_t j = 0; j < new_sharding_columns.size(); j++) { + if (current_columns[i] == new_sharding_columns[j]) { + found = true; + break; + } + } + if (found || current_columns[i] == rocksdb::kDefaultColumnFamilyName) { + dout(5) << "Column " << current_columns[i] << " is part of new sharding." << dendl; + continue; + } + dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl; + + // verify that column is empty + rocksdb::Iterator* it; + it = db->NewIterator(rocksdb::ReadOptions(), current_handles[i]); + ceph_assert(it); + it->SeekToFirst(); + ceph_assert(!it->Valid()); + delete it; + rocksdb::Status status; + status = db->DropColumnFamily(current_handles[i]); + if (!status.ok()) { + derr << __func__ << " Failed to delete column: " + << current_columns[i] << dendl; + return -EINVAL; + } + } + return 0; +} + +int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in) +{ + rocksdb::Status status; + int r; + std::vector<std::string> to_process_columns; + std::vector<rocksdb::ColumnFamilyHandle*> to_process_handles; + + resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl(); + size_t bytes_in_batch = 0; + size_t keys_in_batch = 0; + size_t bytes_per_iterator = 0; + size_t keys_per_iterator = 0; + size_t keys_processed = 0; + size_t keys_moved = 0; + + rocksdb::WriteBatch* bat = nullptr; + + auto flush_batch = [&]() { + dout(10) << "flushing batch, " << keys_in_batch << " keys, for " + << bytes_in_batch << " bytes" << dendl; + rocksdb::WriteOptions woptions; + woptions.sync = true; + rocksdb::Status s = db->Write(woptions, bat); + ceph_assert(s.ok()); + bytes_in_batch = 0; + keys_in_batch = 0; + delete bat; + bat = new rocksdb::WriteBatch(); + ceph_assert(bat); + }; + + auto process_column = [&](rocksdb::ColumnFamilyHandle* handle, + const std::string& fixed_prefix) + { + int r = 0; + dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl; + rocksdb::Iterator* it; + it = db->NewIterator(rocksdb::ReadOptions(), handle); + ceph_assert(it); + bat = new rocksdb::WriteBatch(); + ceph_assert(bat); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + rocksdb::Slice raw_key = it->key(); + dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl; + //check if need to refresh iterator + if (bytes_per_iterator >= ctrl.bytes_per_iterator || + keys_per_iterator >= ctrl.keys_per_iterator) { + dout(8) << "refreshing iterator" << dendl; + bytes_per_iterator = 0; + keys_per_iterator = 0; + std::string raw_key_str = raw_key.ToString(); + delete it; + it = db->NewIterator(rocksdb::ReadOptions(), handle); + ceph_assert(it); + it->Seek(raw_key_str); + ceph_assert(it->Valid()); + raw_key = it->key(); + } + rocksdb::Slice value = it->value(); + std::string prefix, key; + if (fixed_prefix.size() == 0) { + split_key(raw_key, &prefix, &key); + } else { + prefix = fixed_prefix; + key = raw_key.ToString(); + } + keys_processed++; + if ((keys_processed % 10000) == 0) { + dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl; + } + std::string new_raw_key; + rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key); + if (new_handle == nullptr) { + new_handle = default_cf; + } + if (handle == new_handle) { + continue; + } + if (new_handle == default_cf) { + new_raw_key = combine_strings(prefix, key); + } else { + new_raw_key = key; + } + bat->Delete(handle, raw_key); + bat->Put(new_handle, new_raw_key, value); + dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) << + " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) << + " size " << value.size() << dendl; + keys_moved++; + bytes_in_batch += new_raw_key.size() * 2 + value.size(); + keys_in_batch++; + bytes_per_iterator += new_raw_key.size() * 2 + value.size(); + keys_per_iterator++; + + //check if need to write batch + if (bytes_in_batch >= ctrl.bytes_per_batch || + keys_in_batch >= ctrl.keys_per_batch) { + flush_batch(); + if (ctrl.unittest_fail_after_first_batch) { + r = -1000; + goto out; + } + } + } + flush_batch(); + out: + delete it; + delete bat; + return r; + }; + + r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles); + if (r != 0) { + dout(1) << "failed to prepare db for reshard" << dendl; + goto cleanup; + } + + ceph_assert(to_process_columns.size() == to_process_handles.size()); + for (size_t idx = 0; idx < to_process_columns.size(); idx++) { + dout(5) << "Processing column=" << to_process_columns[idx] << + " handle=" << to_process_handles[idx] << dendl; + if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) { + ceph_assert(to_process_handles[idx] == default_cf); + r = process_column(default_cf, std::string()); + } else { + std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-')); + dout(10) << "Prefix: " << fixed_prefix << dendl; + r = process_column(to_process_handles[idx], fixed_prefix); + } + if (r != 0) { + derr << "Error processing column " << to_process_columns[idx] << dendl; + goto cleanup; + } + if (ctrl.unittest_fail_after_processing_column) { + r = -1001; + goto cleanup; + } + } + + r = reshard_cleanup(to_process_columns, to_process_handles); + if (r != 0) { + dout(5) << "failed to cleanup after reshard" << dendl; + goto cleanup; + } + + if (ctrl.unittest_fail_after_successful_processing) { + r = -1002; + goto cleanup; + } + env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(env, new_sharding, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + r = -EIO; + } + + cleanup: + //close column handles + for (const auto& col: cf_handles) { + for (size_t i = 0; i < col.second.handles.size(); i++) { + db->DestroyColumnFamilyHandle(col.second.handles[i]); + } + } + cf_handles.clear(); + close(); + return r; +} |