summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_s3select.cc
diff options
context:
space:
mode:
authorGal Salomon <gal.salomon@gmail.com>2024-03-19 12:45:58 +0100
committerGal Salomon <gal.salomon@gmail.com>2024-03-31 15:33:26 +0200
commit06eadd66db4eb0a8688ee39ae13093a68f0868b5 (patch)
tree227976477a138820fe36cca8b287e0d4fcfceb13 /src/rgw/rgw_s3select.cc
parentMerge pull request #56190 from rhcs-dashboard/disable-multi-cluster-non-hub (diff)
downloadceph-06eadd66db4eb0a8688ee39ae13093a68f0868b5.tar.xz
ceph-06eadd66db4eb0a8688ee39ae13093a68f0868b5.zip
refactor of s3select response handler, adding continuation-response to parquet flow, refacor of error response
bug fix (init_success_response). s3select submodule fix for json-error-flow Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
Diffstat (limited to 'src/rgw/rgw_s3select.cc')
-rw-r--r--src/rgw/rgw_s3select.cc160
1 files changed, 87 insertions, 73 deletions
diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc
index c8e887d59b0..1b7dced2782 100644
--- a/src/rgw/rgw_s3select.cc
+++ b/src/rgw/rgw_s3select.cc
@@ -46,13 +46,13 @@ void aws_response_handler::push_header(const char* header_name, const char* head
char x;
short s;
x = char(strlen(header_name));
- m_buff_header.append(&x, sizeof(x));
- m_buff_header.append(header_name);
+ get_buffer()->append(&x, sizeof(x));
+ get_buffer()->append(header_name);
x = char(7);
- m_buff_header.append(&x, sizeof(x));
+ get_buffer()->append(&x, sizeof(x));
s = htons(uint16_t(strlen(header_value)));
- m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s));
- m_buff_header.append(header_value);
+ get_buffer()->append(reinterpret_cast<char*>(&s), sizeof(s));
+ get_buffer()->append(header_value);
}
#define IDX( x ) static_cast<int>( x )
@@ -67,7 +67,7 @@ int aws_response_handler::create_header_records()
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_continuation()
@@ -77,7 +77,7 @@ int aws_response_handler::create_header_continuation()
push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]);
//2
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_progress()
@@ -89,7 +89,7 @@ int aws_response_handler::create_header_progress()
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_stats()
@@ -101,7 +101,7 @@ int aws_response_handler::create_header_stats()
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_end()
@@ -111,7 +111,7 @@ int aws_response_handler::create_header_end()
push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]);
//2
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_error_header_records(const char* error_message)
@@ -124,10 +124,10 @@ int aws_response_handler::create_error_header_records(const char* error_message)
push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
-int aws_response_handler::create_message(u_int32_t header_len)
+int aws_response_handler::create_message(u_int32_t header_len,std::string *msg_string = nullptr)
{
//message description(AWS):
//[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
@@ -135,24 +135,27 @@ int aws_response_handler::create_message(u_int32_t header_len)
//are created later to the produced SQL result, and actually wrapping the payload.
auto push_encode_int = [&](u_int32_t s, int pos) {
u_int32_t x = htonl(s);
- sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
+ msg_string->replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
};
+
+ msg_string = (msg_string == nullptr) ? &sql_result : msg_string;
+
u_int32_t total_byte_len = 0;
u_int32_t preload_crc = 0;
u_int32_t message_crc = 0;
- total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
+ total_byte_len = msg_string->size() + 4; //the total is greater in 4 bytes than current size
push_encode_int(total_byte_len, 0);
push_encode_int(header_len, 4);
crc32.reset();
- crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
+ crc32 = std::for_each(msg_string->data(), msg_string->data() + 8, crc32); //crc for starting 8 bytes
preload_crc = crc32();
push_encode_int(preload_crc, 8);
crc32.reset();
- crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
+ crc32 = std::for_each(msg_string->begin(), msg_string->end(), crc32); //crc for payload + checksum
message_crc = crc32();
u_int32_t x = htonl(message_crc);
- sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
- return sql_result.size();
+ msg_string->append(reinterpret_cast<char*>(&x), sizeof(x));
+ return msg_string->size();
}
void aws_response_handler::init_response()
@@ -163,58 +166,63 @@ void aws_response_handler::init_response()
void aws_response_handler::init_success_response()
{
- m_buff_header.clear();
- header_size = create_header_records();
- sql_result.append(m_buff_header.c_str(), header_size);
-#ifdef PAYLOAD_TAG
- sql_result.append(PAYLOAD_LINE);
-#endif
+ get_buffer()->clear();
+ m_success_header_size = create_header_records();
+ sql_result.append(get_buffer()->c_str(), m_success_header_size);
}
void aws_response_handler::send_continuation_response()
{
- sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ set_continue_buffer();
+ continue_result.resize(header_crc_size, '\0');
+ get_buffer()->clear();
header_size = create_header_continuation();
- sql_result.append(m_buff_header.c_str(), header_size);
- int buff_len = create_message(header_size);
- s->formatter->write_bin_data(sql_result.data(), buff_len);
+ continue_result.append(get_buffer()->c_str(), header_size);
+ int buff_len = create_message(header_size,&continue_result);
+ s->formatter->write_bin_data(continue_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
+ get_buffer()->clear();
+ set_main_buffer();
}
void aws_response_handler::init_progress_response()
{
sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ get_buffer()->clear();
header_size = create_header_progress();
- sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(get_buffer()->c_str(), header_size);
}
void aws_response_handler::init_stats_response()
{
sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ get_buffer()->clear();
header_size = create_header_stats();
- sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(get_buffer()->c_str(), header_size);
}
void aws_response_handler::init_end_response()
{
sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ get_buffer()->clear();
header_size = create_header_end();
- sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(get_buffer()->c_str(), header_size);
int buff_len = create_message(header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
}
-void aws_response_handler::init_error_response(const char* error_message)
+void aws_response_handler::send_error_response(const char* error_message)
{
- //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
- m_buff_header.clear();
+ //currently not in use. need to change the s3-test, this error-response raises a boto3 exception
+ error_result.resize(header_crc_size, '\0');
+ get_buffer()->clear();
header_size = create_error_header_records(error_message);
- sql_result.append(m_buff_header.c_str(), header_size);
+ error_result.append(get_buffer()->c_str(), header_size);
+
+ int buff_len = create_message(header_size,&error_result);
+ s->formatter->write_bin_data(error_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
}
void aws_response_handler::send_success_response()
@@ -222,12 +230,12 @@ void aws_response_handler::send_success_response()
#ifdef PAYLOAD_TAG
sql_result.append(END_PAYLOAD_LINE);
#endif
- int buff_len = create_message(header_size);
+ int buff_len = create_message(m_success_header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
}
-void aws_response_handler::send_error_response(const char* error_code,
+void aws_response_handler::send_error_response_rgw_formatter(const char* error_code,
const char* error_message,
const char* resource_id)
{
@@ -265,7 +273,6 @@ void aws_response_handler::send_stats_response()
}
RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
- m_buff_header(std::make_unique<char[]>(1000)),
m_scan_range_ind(false),
m_start_scan_sz(0),
m_end_scan_sz(0),
@@ -301,6 +308,11 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
m_aws_response_handler.send_success_response();
return 0;
};
+ fp_s3select_continue = [this](std::string& result) {
+ fp_chunked_transfer_encoding();
+ m_aws_response_handler.send_continuation_response();
+ return 0;
+ };
fp_debug_mesg = [&](const char* mesg){
ldpp_dout(this, 10) << mesg << dendl;
@@ -371,9 +383,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
int status = 0;
uint32_t length_before_processing, length_post_processing;
csv_object::csv_defintions csv;
- const char* s3select_syntax_error = "s3select-Syntax-Error";
- const char* s3select_resource_id = "resource-id";
- const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
s3select_syntax.parse_query(query);
if (m_row_delimiter.size()) {
@@ -410,14 +419,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
} else if(m_header_info.compare("USE")==0) {
csv.use_header_info=true;
}
- m_s3_csv_object.set_external_debug_system(fp_debug_mesg);
- m_s3_csv_object.set_result_formatters(fp_s3select_result_format,fp_result_header_format);
+
m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
+
+ m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+ fp_s3select_result_format,
+ fp_result_header_format,
+ fp_debug_mesg);
+
if (s3select_syntax.get_error_description().empty() == false) {
//error-flow (syntax-error)
- m_aws_response_handler.send_error_response(s3select_syntax_error,
- s3select_syntax.get_error_description().c_str(),
- s3select_resource_id);
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
return -1;
@@ -434,9 +446,8 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
if (status < 0) {
//error flow(processing-time)
- m_aws_response_handler.send_error_response(s3select_processTime_error,
- m_s3_csv_object.get_error_description().c_str(),
- s3select_resource_id);
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id);
+
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
return -1;
}
@@ -444,8 +455,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
}
if ((length_post_processing-length_before_processing) != 0) {
ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
- } else {
- m_aws_response_handler.send_continuation_response();
}
ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl;
if (enable_progress == true) {
@@ -463,7 +472,12 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
if (!m_s3_parquet_object.is_set()) {
//parsing the SQL statement.
s3select_syntax.parse_query(m_sql_query.c_str());
- //m_s3_parquet_object.set_external_debug_system(fp_debug_mesg);
+
+ m_s3_parquet_object.set_external_system_functions(fp_s3select_continue,
+ fp_s3select_result_format,
+ fp_result_header_format,
+ fp_debug_mesg);
+
try {
//at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
@@ -477,19 +491,21 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
}
if (s3select_syntax.get_error_description().empty() == false) {
//the SQL statement failed the syntax parser
- fp_result_header_format(m_aws_response_handler.get_sql_result());
- m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
- fp_s3select_result_format(m_aws_response_handler.get_sql_result());
+ fp_chunked_transfer_encoding();
+ m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
status = -1;
} else {
fp_result_header_format(m_aws_response_handler.get_sql_result());
//at this stage the Parquet-processing "takes control", it keep calling to s3-range-request according to the SQL statement.
- status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
+ status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result());
if (status < 0) {
- m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
- fp_s3select_result_format(m_aws_response_handler.get_sql_result());
- ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl;
+
+ fp_chunked_transfer_encoding();
+ m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+
+ return -1;
}
}
#endif
@@ -500,17 +516,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
{
int status = 0;
- const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
- const char* s3select_syntax_error = "s3select-Syntax-Error";
- const char* s3select_resource_id = "resourcse-id";
- const char* s3select_json_error = "json-Format-Error";
+ m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+ fp_s3select_result_format,
+ fp_result_header_format,
+ fp_debug_mesg);
m_aws_response_handler.init_response();
//the JSON data-type should be(currently) only DOCUMENT
if (m_json_datatype.compare("DOCUMENT") != 0) {
const char* s3select_json_error_msg = "s3-select query: wrong json dataType should use DOCUMENT; ";
- m_aws_response_handler.send_error_response(s3select_json_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_json_error,
s3select_json_error_msg,
s3select_resource_id);
ldpp_dout(this, 10) << s3select_json_error_msg << dendl;
@@ -521,7 +537,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
s3select_syntax.parse_query(m_sql_query.c_str());
if (s3select_syntax.get_error_description().empty() == false) {
//SQL statement is wrong(syntax).
- m_aws_response_handler.send_error_response(s3select_syntax_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,
s3select_syntax.get_error_description().c_str(),
s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
@@ -543,7 +559,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
} catch(base_s3select_exception& e) {
ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
m_aws_response_handler.get_sql_result().append(e.what());
- m_aws_response_handler.send_error_response(s3select_processTime_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
e.what(),
s3select_resource_id);
return -EINVAL;
@@ -552,7 +568,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
if (status < 0) {
//error flow(processing-time)
- m_aws_response_handler.send_error_response(s3select_processTime_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
m_s3_json_object.get_error_description().c_str(),
s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
@@ -562,8 +578,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
if (length_post_processing-length_before_processing != 0) {
m_aws_response_handler.send_success_response();
- } else {
- m_aws_response_handler.send_continuation_response();
}
if (enable_progress == true) {
m_aws_response_handler.init_progress_response();