diff options
author | Gal Salomon <gal.salomon@gmail.com> | 2024-03-19 12:45:58 +0100 |
---|---|---|
committer | Gal Salomon <gal.salomon@gmail.com> | 2024-03-31 15:33:26 +0200 |
commit | 06eadd66db4eb0a8688ee39ae13093a68f0868b5 (patch) | |
tree | 227976477a138820fe36cca8b287e0d4fcfceb13 /src/rgw/rgw_s3select.cc | |
parent | Merge pull request #56190 from rhcs-dashboard/disable-multi-cluster-non-hub (diff) | |
download | ceph-06eadd66db4eb0a8688ee39ae13093a68f0868b5.tar.xz ceph-06eadd66db4eb0a8688ee39ae13093a68f0868b5.zip |
refactor of s3select response handler, adding continuation-response to parquet flow, refacor of error response
bug fix (init_success_response).
s3select submodule
fix for json-error-flow
Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
Diffstat (limited to 'src/rgw/rgw_s3select.cc')
-rw-r--r-- | src/rgw/rgw_s3select.cc | 160 |
1 files changed, 87 insertions, 73 deletions
diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index c8e887d59b0..1b7dced2782 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -46,13 +46,13 @@ void aws_response_handler::push_header(const char* header_name, const char* head char x; short s; x = char(strlen(header_name)); - m_buff_header.append(&x, sizeof(x)); - m_buff_header.append(header_name); + get_buffer()->append(&x, sizeof(x)); + get_buffer()->append(header_name); x = char(7); - m_buff_header.append(&x, sizeof(x)); + get_buffer()->append(&x, sizeof(x)); s = htons(uint16_t(strlen(header_value))); - m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s)); - m_buff_header.append(header_value); + get_buffer()->append(reinterpret_cast<char*>(&s), sizeof(s)); + get_buffer()->append(header_value); } #define IDX( x ) static_cast<int>( x ) @@ -67,7 +67,7 @@ int aws_response_handler::create_header_records() push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]); //3 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - return m_buff_header.size(); + return get_buffer()->size(); } int aws_response_handler::create_header_continuation() @@ -77,7 +77,7 @@ int aws_response_handler::create_header_continuation() push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]); //2 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - return m_buff_header.size(); + return get_buffer()->size(); } int aws_response_handler::create_header_progress() @@ -89,7 +89,7 @@ int aws_response_handler::create_header_progress() push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); //3 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - return m_buff_header.size(); + return get_buffer()->size(); } int aws_response_handler::create_header_stats() @@ -101,7 +101,7 @@ int aws_response_handler::create_header_stats() push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); //3 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - return m_buff_header.size(); + return get_buffer()->size(); } int aws_response_handler::create_header_end() @@ -111,7 +111,7 @@ int aws_response_handler::create_header_end() push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]); //2 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - return m_buff_header.size(); + return get_buffer()->size(); } int aws_response_handler::create_error_header_records(const char* error_message) @@ -124,10 +124,10 @@ int aws_response_handler::create_error_header_records(const char* error_message) push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message); //3 push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]); - return m_buff_header.size(); + return get_buffer()->size(); } -int aws_response_handler::create_message(u_int32_t header_len) +int aws_response_handler::create_message(u_int32_t header_len,std::string *msg_string = nullptr) { //message description(AWS): //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4] @@ -135,24 +135,27 @@ int aws_response_handler::create_message(u_int32_t header_len) //are created later to the produced SQL result, and actually wrapping the payload. auto push_encode_int = [&](u_int32_t s, int pos) { u_int32_t x = htonl(s); - sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x)); + msg_string->replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x)); }; + + msg_string = (msg_string == nullptr) ? &sql_result : msg_string; + u_int32_t total_byte_len = 0; u_int32_t preload_crc = 0; u_int32_t message_crc = 0; - total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size + total_byte_len = msg_string->size() + 4; //the total is greater in 4 bytes than current size push_encode_int(total_byte_len, 0); push_encode_int(header_len, 4); crc32.reset(); - crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes + crc32 = std::for_each(msg_string->data(), msg_string->data() + 8, crc32); //crc for starting 8 bytes preload_crc = crc32(); push_encode_int(preload_crc, 8); crc32.reset(); - crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum + crc32 = std::for_each(msg_string->begin(), msg_string->end(), crc32); //crc for payload + checksum message_crc = crc32(); u_int32_t x = htonl(message_crc); - sql_result.append(reinterpret_cast<char*>(&x), sizeof(x)); - return sql_result.size(); + msg_string->append(reinterpret_cast<char*>(&x), sizeof(x)); + return msg_string->size(); } void aws_response_handler::init_response() @@ -163,58 +166,63 @@ void aws_response_handler::init_response() void aws_response_handler::init_success_response() { - m_buff_header.clear(); - header_size = create_header_records(); - sql_result.append(m_buff_header.c_str(), header_size); -#ifdef PAYLOAD_TAG - sql_result.append(PAYLOAD_LINE); -#endif + get_buffer()->clear(); + m_success_header_size = create_header_records(); + sql_result.append(get_buffer()->c_str(), m_success_header_size); } void aws_response_handler::send_continuation_response() { - sql_result.resize(header_crc_size, '\0'); - m_buff_header.clear(); + set_continue_buffer(); + continue_result.resize(header_crc_size, '\0'); + get_buffer()->clear(); header_size = create_header_continuation(); - sql_result.append(m_buff_header.c_str(), header_size); - int buff_len = create_message(header_size); - s->formatter->write_bin_data(sql_result.data(), buff_len); + continue_result.append(get_buffer()->c_str(), header_size); + int buff_len = create_message(header_size,&continue_result); + s->formatter->write_bin_data(continue_result.data(), buff_len); rgw_flush_formatter_and_reset(s, s->formatter); + get_buffer()->clear(); + set_main_buffer(); } void aws_response_handler::init_progress_response() { sql_result.resize(header_crc_size, '\0'); - m_buff_header.clear(); + get_buffer()->clear(); header_size = create_header_progress(); - sql_result.append(m_buff_header.c_str(), header_size); + sql_result.append(get_buffer()->c_str(), header_size); } void aws_response_handler::init_stats_response() { sql_result.resize(header_crc_size, '\0'); - m_buff_header.clear(); + get_buffer()->clear(); header_size = create_header_stats(); - sql_result.append(m_buff_header.c_str(), header_size); + sql_result.append(get_buffer()->c_str(), header_size); } void aws_response_handler::init_end_response() { sql_result.resize(header_crc_size, '\0'); - m_buff_header.clear(); + get_buffer()->clear(); header_size = create_header_end(); - sql_result.append(m_buff_header.c_str(), header_size); + sql_result.append(get_buffer()->c_str(), header_size); int buff_len = create_message(header_size); s->formatter->write_bin_data(sql_result.data(), buff_len); rgw_flush_formatter_and_reset(s, s->formatter); } -void aws_response_handler::init_error_response(const char* error_message) +void aws_response_handler::send_error_response(const char* error_message) { - //currently not in use. the headers in the case of error, are not extracted by AWS-cli. - m_buff_header.clear(); + //currently not in use. need to change the s3-test, this error-response raises a boto3 exception + error_result.resize(header_crc_size, '\0'); + get_buffer()->clear(); header_size = create_error_header_records(error_message); - sql_result.append(m_buff_header.c_str(), header_size); + error_result.append(get_buffer()->c_str(), header_size); + + int buff_len = create_message(header_size,&error_result); + s->formatter->write_bin_data(error_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); } void aws_response_handler::send_success_response() @@ -222,12 +230,12 @@ void aws_response_handler::send_success_response() #ifdef PAYLOAD_TAG sql_result.append(END_PAYLOAD_LINE); #endif - int buff_len = create_message(header_size); + int buff_len = create_message(m_success_header_size); s->formatter->write_bin_data(sql_result.data(), buff_len); rgw_flush_formatter_and_reset(s, s->formatter); } -void aws_response_handler::send_error_response(const char* error_code, +void aws_response_handler::send_error_response_rgw_formatter(const char* error_code, const char* error_message, const char* resource_id) { @@ -265,7 +273,6 @@ void aws_response_handler::send_stats_response() } RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): - m_buff_header(std::make_unique<char[]>(1000)), m_scan_range_ind(false), m_start_scan_sz(0), m_end_scan_sz(0), @@ -301,6 +308,11 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): m_aws_response_handler.send_success_response(); return 0; }; + fp_s3select_continue = [this](std::string& result) { + fp_chunked_transfer_encoding(); + m_aws_response_handler.send_continuation_response(); + return 0; + }; fp_debug_mesg = [&](const char* mesg){ ldpp_dout(this, 10) << mesg << dendl; @@ -371,9 +383,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* int status = 0; uint32_t length_before_processing, length_post_processing; csv_object::csv_defintions csv; - const char* s3select_syntax_error = "s3select-Syntax-Error"; - const char* s3select_resource_id = "resource-id"; - const char* s3select_processTime_error = "s3select-ProcessingTime-Error"; s3select_syntax.parse_query(query); if (m_row_delimiter.size()) { @@ -410,14 +419,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* } else if(m_header_info.compare("USE")==0) { csv.use_header_info=true; } - m_s3_csv_object.set_external_debug_system(fp_debug_mesg); - m_s3_csv_object.set_result_formatters(fp_s3select_result_format,fp_result_header_format); + m_s3_csv_object.set_csv_query(&s3select_syntax, csv); + + m_s3_csv_object.set_external_system_functions(fp_s3select_continue, + fp_s3select_result_format, + fp_result_header_format, + fp_debug_mesg); + if (s3select_syntax.get_error_description().empty() == false) { //error-flow (syntax-error) - m_aws_response_handler.send_error_response(s3select_syntax_error, - s3select_syntax.get_error_description().c_str(), - s3select_resource_id); + m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl; ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl; return -1; @@ -434,9 +446,8 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* if (status < 0) { //error flow(processing-time) - m_aws_response_handler.send_error_response(s3select_processTime_error, - m_s3_csv_object.get_error_description().c_str(), - s3select_resource_id); + m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id); + ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl; return -1; } @@ -444,8 +455,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* } if ((length_post_processing-length_before_processing) != 0) { ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl; - } else { - m_aws_response_handler.send_continuation_response(); } ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl; if (enable_progress == true) { @@ -463,7 +472,12 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) if (!m_s3_parquet_object.is_set()) { //parsing the SQL statement. s3select_syntax.parse_query(m_sql_query.c_str()); - //m_s3_parquet_object.set_external_debug_system(fp_debug_mesg); + + m_s3_parquet_object.set_external_system_functions(fp_s3select_continue, + fp_s3select_result_format, + fp_result_header_format, + fp_debug_mesg); + try { //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api); @@ -477,19 +491,21 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) } if (s3select_syntax.get_error_description().empty() == false) { //the SQL statement failed the syntax parser - fp_result_header_format(m_aws_response_handler.get_sql_result()); - m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data()); - fp_s3select_result_format(m_aws_response_handler.get_sql_result()); + fp_chunked_transfer_encoding(); + m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str()); + ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; status = -1; } else { fp_result_header_format(m_aws_response_handler.get_sql_result()); //at this stage the Parquet-processing "takes control", it keep calling to s3-range-request according to the SQL statement. - status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format); + status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result()); if (status < 0) { - m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description()); - fp_s3select_result_format(m_aws_response_handler.get_sql_result()); - ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl; + + fp_chunked_transfer_encoding(); + m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str()); + + return -1; } } #endif @@ -500,17 +516,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char { int status = 0; - const char* s3select_processTime_error = "s3select-ProcessingTime-Error"; - const char* s3select_syntax_error = "s3select-Syntax-Error"; - const char* s3select_resource_id = "resourcse-id"; - const char* s3select_json_error = "json-Format-Error"; + m_s3_csv_object.set_external_system_functions(fp_s3select_continue, + fp_s3select_result_format, + fp_result_header_format, + fp_debug_mesg); m_aws_response_handler.init_response(); //the JSON data-type should be(currently) only DOCUMENT if (m_json_datatype.compare("DOCUMENT") != 0) { const char* s3select_json_error_msg = "s3-select query: wrong json dataType should use DOCUMENT; "; - m_aws_response_handler.send_error_response(s3select_json_error, + m_aws_response_handler.send_error_response_rgw_formatter(s3select_json_error, s3select_json_error_msg, s3select_resource_id); ldpp_dout(this, 10) << s3select_json_error_msg << dendl; @@ -521,7 +537,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char s3select_syntax.parse_query(m_sql_query.c_str()); if (s3select_syntax.get_error_description().empty() == false) { //SQL statement is wrong(syntax). - m_aws_response_handler.send_error_response(s3select_syntax_error, + m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error, s3select_syntax.get_error_description().c_str(), s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; @@ -543,7 +559,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char } catch(base_s3select_exception& e) { ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl; m_aws_response_handler.get_sql_result().append(e.what()); - m_aws_response_handler.send_error_response(s3select_processTime_error, + m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error, e.what(), s3select_resource_id); return -EINVAL; @@ -552,7 +568,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing); if (status < 0) { //error flow(processing-time) - m_aws_response_handler.send_error_response(s3select_processTime_error, + m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error, m_s3_json_object.get_error_description().c_str(), s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl; @@ -562,8 +578,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char if (length_post_processing-length_before_processing != 0) { m_aws_response_handler.send_success_response(); - } else { - m_aws_response_handler.send_continuation_response(); } if (enable_progress == true) { m_aws_response_handler.init_progress_response(); |