summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ceph.spec.in5
-rw-r--r--debian/control2
-rwxr-xr-xinstall-deps.sh7
-rw-r--r--src/rgw/CMakeLists.txt11
-rw-r--r--src/rgw/rgw_rest_s3.cc577
-rw-r--r--src/rgw/rgw_rest_s3.h155
-rw-r--r--src/rgw/rgw_s3select.cc669
-rw-r--r--src/rgw/rgw_s3select.h8
-rw-r--r--src/rgw/rgw_s3select_private.h230
m---------src/s3select0
10 files changed, 935 insertions, 729 deletions
diff --git a/ceph.spec.in b/ceph.spec.in
index 1ed34e3f1a8..82f7a6f72cd 100644
--- a/ceph.spec.in
+++ b/ceph.spec.in
@@ -24,6 +24,7 @@
%bcond_with zbd
%bcond_with cmake_verbose_logging
%bcond_without ceph_test_package
+%bcond_without arrow_parquet
%ifarch s390
%bcond_with tcmalloc
%else
@@ -232,6 +233,10 @@ BuildRequires: xfsprogs-devel
BuildRequires: xmlstarlet
BuildRequires: nasm
BuildRequires: lua-devel
+%if 0%{with arrow_parquet}
+BuildRequires: arrow-devel >= 4.0.0
+BuildRequires: parquet-devel
+%endif
%if 0%{with seastar} || 0%{with jaeger}
BuildRequires: yaml-cpp-devel >= 0.6
%endif
diff --git a/debian/control b/debian/control
index 0b89cc904a5..c252ca45fc9 100644
--- a/debian/control
+++ b/debian/control
@@ -107,6 +107,8 @@ Build-Depends: automake,
xmlstarlet <pkg.ceph.check>,
nasm [amd64],
zlib1g-dev,
+ libarrow-dev (>= 4.0.0),
+ libparquet-dev (>= 4.0.0),
Standards-Version: 4.4.0
Package: ceph
diff --git a/install-deps.sh b/install-deps.sh
index b07e53dd54e..0fa7675ea35 100755
--- a/install-deps.sh
+++ b/install-deps.sh
@@ -355,12 +355,19 @@ else
if $with_jaeger; then
build_profiles+=",pkg.ceph.jaeger"
fi
+
+ wget -qO - https://dist.apache.org/repos/dist/dev/arrow/KEYS | $SUDO apt-key add -
+ echo "deb [arch=amd64] https://apache.jfrog.io/artifactory/arrow/ubuntu $(lsb_release -sc) main" | $SUDO tee /etc/apt/sources.list.d/arrow.list
+ $SUDO apt update
+
$SUDO env DEBIAN_FRONTEND=noninteractive mk-build-deps \
--build-profiles "${build_profiles#,}" \
--install --remove \
--tool="apt-get -y --no-install-recommends $backports" $control || exit 1
$SUDO env DEBIAN_FRONTEND=noninteractive apt-get -y remove ceph-build-deps
if [ "$control" != "debian/control" ] ; then rm $control; fi
+ $SUDO rm -f /etc/apt/sources.list.d/arrow.list
+
;;
centos|fedora|rhel|ol|virtuozzo)
builddepcmd="dnf -y builddep --allowerasing"
diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt
index d53dad434df..4104c5a7f24 100644
--- a/src/rgw/CMakeLists.txt
+++ b/src/rgw/CMakeLists.txt
@@ -2,6 +2,14 @@ find_program(GPERF gperf)
if(NOT GPERF)
message(FATAL_ERROR "Can't find gperf")
endif()
+
+find_package(Arrow QUIET)
+if(Arrow_FOUND)
+ set(ARROW_LIBRARIES "-larrow -lparquet")
+ add_definitions(-D_ARROW_EXIST)
+ message("-- arrow is installed, radosgw/s3select-op is able to process parquet objects")
+endif()
+
function(gperf_generate input output)
add_custom_command(
OUTPUT ${output}
@@ -122,6 +130,7 @@ set(librgw_common_srcs
rgw_rest_realm.cc
rgw_rest_role.cc
rgw_rest_s3.cc
+ rgw_s3select.cc
rgw_role.cc
rgw_sal.cc
rgw_sal_rados.cc
@@ -204,6 +213,7 @@ target_link_libraries(rgw_common
${CURL_LIBRARIES}
${EXPAT_LIBRARIES}
${LUA_LIBRARIES}
+ ${ARROW_LIBRARIES}
PUBLIC
spawn)
target_include_directories(rgw_common
@@ -302,6 +312,7 @@ target_link_libraries(rgw_a
common_utf8 global
${CRYPTO_LIBS}
${LUA_LIBRARIES}
+ ${ARROW_LIBRARIES}
OATH::OATH
PUBLIC
rgw_common
diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc
index 1f043f19a43..1cb855d09e3 100644
--- a/src/rgw/rgw_rest_s3.cc
+++ b/src/rgw/rgw_rest_s3.cc
@@ -22,7 +22,6 @@
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
#endif
-#include <s3select/include/s3select.h>
#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
#pragma clang diagnostic pop
#endif
@@ -71,6 +70,8 @@
#include "rgw_sts.h"
#include "rgw_sal_rados.h"
+#include "rgw_s3select.h"
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
@@ -4585,7 +4586,7 @@ RGWOp *RGWHandler_REST_Obj_S3::op_post()
return new RGWInitMultipart_ObjStore_S3;
if (is_select_op())
- return new RGWSelectObj_ObjStore_S3;
+ return rgw::s3select::create_s3select_op();
return new RGWPostObj_ObjStore_S3;
}
@@ -6126,575 +6127,3 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable(
return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKNOWN;
}
-
-using namespace s3selectEngine;
-
-std::string &aws_response_handler::get_sql_result()
-{
- return sql_result;
-}
-
-uint64_t aws_response_handler::get_processed_size()
-{
- return processed_size;
-}
-
-void aws_response_handler::update_processed_size(uint64_t value)
-{
- processed_size += value;
-}
-
-uint64_t aws_response_handler::get_total_bytes_returned()
-{
- return total_bytes_returned;
-}
-
-void aws_response_handler::update_total_bytes_returned(uint64_t value)
-{
- total_bytes_returned += value;
-}
-
-void aws_response_handler::push_header(const char *header_name, const char *header_value)
-{
- char x;
- short s;
-
- x = char(strlen(header_name));
- m_buff_header.append(&x, sizeof(x));
- m_buff_header.append(header_name);
-
- x = char(7);
- m_buff_header.append(&x, sizeof(x));
-
- s = htons(uint16_t(strlen(header_value)));
- m_buff_header.append(reinterpret_cast<char *>(&s), sizeof(s));
- m_buff_header.append(header_value);
-}
-
-#define IDX( x ) static_cast<int>( x )
-
-int aws_response_handler::create_header_records()
-{
- //headers description(AWS)
- //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
-
- //1
- push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::RECORDS)]);
- //2
- push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]);
- //3
- push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-
- return m_buff_header.size();
-}
-
-int aws_response_handler::create_header_continuation()
-{
- //headers description(AWS)
- //1
- push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]);
- //2
- push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-
- return m_buff_header.size();
-}
-
-int aws_response_handler::create_header_progress()
-{
- //headers description(AWS)
- //1
- push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::PROGRESS)]);
- //2
- push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
- //3
- push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-
- return m_buff_header.size();
-}
-
-int aws_response_handler::create_header_stats()
-{
- //headers description(AWS)
- //1
- push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::STATS)]);
- //2
- push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
- //3
- push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-
- return m_buff_header.size();
-}
-
-int aws_response_handler::create_header_end()
-{
- //headers description(AWS)
- //1
- push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]);
- //2
- push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-
- return m_buff_header.size();
-}
-
-int aws_response_handler::create_error_header_records(const char *error_message)
-{
- //headers description(AWS)
- //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
-
- //1
- push_header(header_name_str[IDX(header_name_En::ERROR_CODE)], header_value_str[IDX(header_value_En::ENGINE_ERROR)]);
- //2
- push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message);
- //3
- push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]);
-
- return m_buff_header.size();
-}
-
-int aws_response_handler::create_message(u_int32_t header_len)
-{
- //message description(AWS):
- //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
- //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
- //are created later to the produced SQL result, and actually wrapping the payload.
-
- auto push_encode_int = [&](u_int32_t s, int pos) {
- u_int32_t x = htonl(s);
- sql_result.replace(pos, sizeof(x), reinterpret_cast<char *>(&x), sizeof(x));
- };
-
- u_int32_t total_byte_len = 0;
- u_int32_t preload_crc = 0;
- u_int32_t message_crc = 0;
-
- total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
-
- push_encode_int(total_byte_len, 0);
- push_encode_int(header_len, 4);
-
- crc32.reset();
- crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
- preload_crc = crc32();
- push_encode_int(preload_crc, 8);
-
- crc32.reset();
- crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
- message_crc = crc32();
-
- u_int32_t x = htonl(message_crc);
- sql_result.append(reinterpret_cast<char *>(&x), sizeof(x));
-
- return sql_result.size();
-}
-
-void aws_response_handler::init_response()
-{ //12 positions for header-crc
- sql_result.resize(header_crc_size,'\0');
-}
-
-void aws_response_handler::init_success_response()
-{
- m_buff_header.clear();
- header_size = create_header_records();
- sql_result.append(m_buff_header.c_str(), header_size);
- sql_result.append(PAYLOAD_LINE);
-}
-
-void aws_response_handler::send_continuation_response()
-{
- sql_result.resize(header_crc_size,'\0');
- m_buff_header.clear();
- header_size = create_header_continuation();
- sql_result.append(m_buff_header.c_str(), header_size);
- int buff_len = create_message(header_size);
- s->formatter->write_bin_data(sql_result.data(), buff_len);
- rgw_flush_formatter_and_reset(s, s->formatter);
-}
-
-void aws_response_handler::init_progress_response()
-{
- sql_result.resize(header_crc_size,'\0');
- m_buff_header.clear();
- header_size = create_header_progress();
- sql_result.append(m_buff_header.c_str(), header_size);
-}
-
-void aws_response_handler::init_stats_response()
-{
- sql_result.resize(header_crc_size,'\0');
- m_buff_header.clear();
- header_size = create_header_stats();
- sql_result.append(m_buff_header.c_str(), header_size);
-}
-
-void aws_response_handler::init_end_response()
-{
- sql_result.resize(header_crc_size,'\0');
- m_buff_header.clear();
- header_size = create_header_end();
- sql_result.append(m_buff_header.c_str(), header_size);
- int buff_len = create_message(header_size);
- s->formatter->write_bin_data(sql_result.data(), buff_len);
- rgw_flush_formatter_and_reset(s, s->formatter);
-}
-
-void aws_response_handler::init_error_response(const char *error_message)
-{ //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
- m_buff_header.clear();
- header_size = create_error_header_records(error_message);
- sql_result.append(m_buff_header.c_str(), header_size);
-}
-
-void aws_response_handler::send_success_response()
-{
- sql_result.append(END_PAYLOAD_LINE);
- int buff_len = create_message(header_size);
- s->formatter->write_bin_data(sql_result.data(), buff_len);
- rgw_flush_formatter_and_reset(s, s->formatter);
-}
-
-void aws_response_handler::send_error_response(const char *error_code,
- const char *error_message,
- const char *resource_id)
-{
-
- set_req_state_err(s, 0);
- dump_errno(s, 400);
- end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING);
- dump_start(s);
-
- s->formatter->open_object_section("Error");
-
- s->formatter->dump_string("Code", error_code);
- s->formatter->dump_string("Message", error_message);
- s->formatter->dump_string("Resource", "#Resource#");
- s->formatter->dump_string("RequestId", resource_id);
-
- s->formatter->close_section();
-
- rgw_flush_formatter_and_reset(s, s->formatter);
-}
-
-void aws_response_handler::send_progress_response()
-{
- std::string progress_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Progress>"
- ,get_processed_size(),get_processed_size(),get_total_bytes_returned());
-
- sql_result.append(progress_payload);
- int buff_len = create_message(header_size);
- s->formatter->write_bin_data(sql_result.data(), buff_len);
- rgw_flush_formatter_and_reset(s, s->formatter);
-}
-
-void aws_response_handler::send_stats_response()
-{
- std::string stats_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>"
- ,get_processed_size(),get_processed_size(),get_total_bytes_returned());
-
- sql_result.append(stats_payload);
- int buff_len = create_message(header_size);
- s->formatter->write_bin_data(sql_result.data(), buff_len);
- rgw_flush_formatter_and_reset(s, s->formatter);
-}
-
-RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
- m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
- chunk_number(0)
-{
- set_get_data(true);
-}
-
-RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
-{}
-
-int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
-{
- //retrieve s3-select query from payload
- bufferlist data;
- int ret;
- int max_size = 4096;
- std::tie(ret, data) = read_all_input(s, max_size, false);
- if (ret != 0) {
- ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
- return ret;
- }
-
- m_s3select_query = data.to_str();
- if (m_s3select_query.length() > 0) {
- ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl;
- }
- else {
- ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
- return -1;
- }
-
- int status = handle_aws_cli_parameters(m_sql_query);
- if (status<0) {
- return status;
- }
-
- return RGWGetObj_ObjStore_S3::get_params(y);
-}
-
-int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
-{
- int status = 0;
- uint32_t length_before_processing, length_post_processing;
- csv_object::csv_defintions csv;
- const char* s3select_syntax_error = "s3select-Syntax-Error";
- const char* s3select_resource_id = "resourcse-id";
- const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
-
- if (m_s3_csv_object==0) {
- s3select_syntax->parse_query(query);
-
- if (m_row_delimiter.size()) {
- csv.row_delimiter = *m_row_delimiter.c_str();
- }
-
- if (m_column_delimiter.size()) {
- csv.column_delimiter = *m_column_delimiter.c_str();
- }
-
- if (m_quot.size()) {
- csv.quot_char = *m_quot.c_str();
- }
-
- if (m_escape_char.size()) {
- csv.escape_char = *m_escape_char.c_str();
- }
-
- if (m_enable_progress.compare("true")==0) {
- enable_progress = true;
- } else {
- enable_progress = false;
- }
-
- if (output_row_delimiter.size()) {
- csv.output_row_delimiter = *output_row_delimiter.c_str();
- }
-
- if (output_column_delimiter.size()) {
- csv.output_column_delimiter = *output_column_delimiter.c_str();
- }
-
- if (output_quot.size()) {
- csv.output_quot_char = *output_quot.c_str();
- }
-
- if (output_escape_char.size()) {
- csv.output_escape_char = *output_escape_char.c_str();
- }
-
- if(output_quote_fields.compare("ALWAYS") == 0) {
- csv.quote_fields_always = true;
- }
- else if(output_quote_fields.compare("ASNEEDED") == 0) {
- csv.quote_fields_asneeded = true;
- }
-
- if(m_header_info.compare("IGNORE")==0) {
- csv.ignore_header_info=true;
- }
- else if(m_header_info.compare("USE")==0) {
- csv.use_header_info=true;
- }
- m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
- }
-
- m_aws_response_handler->init_response();
-
- if (s3select_syntax->get_error_description().empty() == false)
- { //error-flow (syntax-error)
- m_aws_response_handler->send_error_response(s3select_syntax_error,
- s3select_syntax->get_error_description().c_str(),
- s3select_resource_id);
-
- ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
- return -1;
- }
- else
- {
- if (input == nullptr) {
- input = "";
- }
- m_aws_response_handler->init_success_response();
- length_before_processing = (m_aws_response_handler->get_sql_result()).size();
-
- //query is correct(syntax), processing is starting.
- status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler->get_sql_result(), input, input_length, s->obj_size);
- length_post_processing = (m_aws_response_handler->get_sql_result()).size();
- m_aws_response_handler->update_total_bytes_returned(length_post_processing-length_before_processing);
- if (status < 0)
- { //error flow(processing-time)
- m_aws_response_handler->send_error_response(s3select_processTime_error,
- m_s3_csv_object->get_error_description().c_str(),
- s3select_resource_id);
-
- ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl;
- return -1;
- }
-
- if (chunk_number == 0)
- {//success flow
- if (op_ret < 0)
- {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- // Explicitly use chunked transfer encoding so that we can stream the result
- // to the user without having to wait for the full length of it.
- end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
- }
- chunk_number++;
- }
-
- if (length_post_processing-length_before_processing != 0) {
- m_aws_response_handler->send_success_response();
- } else {
- m_aws_response_handler->send_continuation_response();
- }
-
- if (enable_progress == true) {
- m_aws_response_handler->init_progress_response();
- m_aws_response_handler->send_progress_response();
- }
-
- return status;
-}
-
-int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
-{
- std::string input_tag{"InputSerialization"};
- std::string output_tag{"OutputSerialization"};
-
- if(chunk_number !=0) {
- return 0;
- }
-
-#define GT "&gt;"
-#define LT "&lt;"
- if (m_s3select_query.find(GT) != std::string::npos) {
- boost::replace_all(m_s3select_query, GT, ">");
- }
- if (m_s3select_query.find(LT) != std::string::npos) {
- boost::replace_all(m_s3select_query, LT, "<");
- }
-
- //AWS cli s3select parameters
- extract_by_tag(m_s3select_query, "Expression", sql_query);
- extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
-
- size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
- size_t _qe = m_s3select_query.find("</" + input_tag + ">", _qi);
- m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2));
-
- extract_by_tag(m_s3select_input,"FieldDelimiter", m_column_delimiter);
- extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot);
- extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter);
- extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info);
-
- if (m_row_delimiter.size()==0) {
- m_row_delimiter='\n';
- }
- else if(m_row_delimiter.compare("&#10;") == 0)
- {//presto change
- m_row_delimiter='\n';
- }
-
- extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char);
- extract_by_tag(m_s3select_input, "CompressionType", m_compression_type);
-
- size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0);
- size_t _qs = m_s3select_query.find("</" + output_tag + ">", _qi);
- m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2));
-
- extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter);
- extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot);
- extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char);
- extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields);
- extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter);
-
- if (output_row_delimiter.size()==0) {
- output_row_delimiter='\n';
- }
- else if(output_row_delimiter.compare("&#10;") == 0)
- {//presto change
- output_row_delimiter='\n';
- }
-
- if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) {
- ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl;
- return -1;
- }
-
- return 0;
-}
-
-int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result)
-{
- result = "";
- size_t _qs = input.find("<" + tag_name + ">", 0);
- size_t qs_input = _qs + tag_name.size() + 2;
- if (_qs == std::string::npos) {
- return -1;
- }
- size_t _qe = input.find("</" + tag_name + ">", qs_input);
- if (_qe == std::string::npos) {
- return -1;
- }
-
- result = input.substr(qs_input, _qe - qs_input);
-
- return 0;
-}
-
-int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
-{
- int status=0;
- if (m_aws_response_handler == nullptr) {
- m_aws_response_handler = std::make_unique<aws_response_handler>(s,this);
- }
-
- if(len == 0 && s->obj_size != 0) {
- return 0;
- }
-
- if (s->obj_size == 0) {
- status = run_s3select(m_sql_query.c_str(), nullptr, 0);
- } else {
-
- auto bl_len = bl.get_num_buffers();
-
- int i=0;
-
- for(auto& it : bl.buffers()) {
-
- ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
- << " len " << len << " obj-size " << s->obj_size << dendl;
-
- if(it.length() == 0) {
- ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
- << " obj-size " << s->obj_size << dendl;
- continue;
- }
-
- m_aws_response_handler->update_processed_size(it.length());
-
- status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
- if(status<0) {
- break;
- }
- i++;
- }
- }
-
- if (m_aws_response_handler->get_processed_size() == s->obj_size) {
- if (status >=0) {
- m_aws_response_handler->init_stats_response();
- m_aws_response_handler->send_stats_response();
- m_aws_response_handler->init_end_response();
- }
- }
- return status;
-}
diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h
index 7c534cbfffd..d9d60376d07 100644
--- a/src/rgw/rgw_rest_s3.h
+++ b/src/rgw/rgw_rest_s3.h
@@ -908,161 +908,6 @@ inline int valid_s3_bucket_name(const std::string& name, bool relaxed=false)
return 0;
}
-namespace s3selectEngine
-{
-class s3select;
-class csv_object;
-}
-
-
-class aws_response_handler
-{//TODO this class should reside on s3select submodule
-
-private:
- std::string sql_result;
- struct req_state *s;//TODO will be replace by callback
- uint32_t header_size;
- // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
- boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
- RGWOp *m_rgwop;
- std::string m_buff_header;
- uint64_t total_bytes_returned;
- uint64_t processed_size;
-
- enum class header_name_En
- {
- EVENT_TYPE,
- CONTENT_TYPE,
- MESSAGE_TYPE,
- ERROR_CODE,
- ERROR_MESSAGE
- };
-
- enum class header_value_En
- {
- RECORDS,
- OCTET_STREAM,
- EVENT,
- CONT,
- PROGRESS,
- END,
- XML,
- STATS,
- ENGINE_ERROR,
- ERROR_TYPE
- };
-
- const char *PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
- const char *END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
- const char *header_name_str[5] = {":event-type", ":content-type", ":message-type",":error-code",":error-message"};
- const char *header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error","error"};
- static constexpr size_t header_crc_size = 12;
-
- void push_header(const char * header_name,const char* header_value);
-
- int create_message(u_int32_t header_len);
-
-public:
- //12 positions for header-crc
- aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop),total_bytes_returned{0},processed_size{0}
- { }
-
- std::string &get_sql_result();
-
- uint64_t get_processed_size();
-
- void update_processed_size(uint64_t value);
-
- uint64_t get_total_bytes_returned();
-
- void update_total_bytes_returned(uint64_t value);
-
- int create_header_records();
-
- int create_header_continuation();
-
- int create_header_progress();
-
- int create_header_stats();
-
- int create_header_end();
-
- int create_error_header_records(const char* error_message);
-
- void init_response();
-
- void init_success_response();
-
- void send_continuation_response();
-
- void init_progress_response();
-
- void init_end_response();
-
- void init_stats_response();
-
- void init_error_response(const char* error_message);
-
- void send_success_response();
-
- void send_progress_response();
-
- void send_stats_response();
-
- void send_error_response(const char* error_code,
- const char* error_message,
- const char* resource_id);
-
-}; //end class aws_response_handler
-
-class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
-{
-
-private:
- std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
- std::string m_s3select_query;
- std::string m_s3select_input;
- std::string m_s3select_output;
- std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
- std::string m_column_delimiter;
- std::string m_quot;
- std::string m_row_delimiter;
- std::string m_compression_type;
- std::string m_escape_char;
- std::string m_header_info;
- std::string m_sql_query;
- std::string m_enable_progress;
- std::string output_column_delimiter;
- std::string output_quot;
- std::string output_escape_char;
- std::string output_quote_fields;
- std::string output_row_delimiter;
-
- std::unique_ptr<aws_response_handler> m_aws_response_handler;
- bool enable_progress;
-
-public:
- unsigned int chunk_number;
-
- RGWSelectObj_ObjStore_S3();
- virtual ~RGWSelectObj_ObjStore_S3();
-
- virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;
-
- virtual int get_params(optional_yield y) override;
-
-private:
-
- int run_s3select(const char* query, const char* input, size_t input_length);
-
- int extract_by_tag(std::string input, std::string tag_name, std::string& result);
-
- void convert_escape_seq(std::string& esc);
-
- int handle_aws_cli_parameters(std::string& sql_query);
-};
-
-
namespace rgw::auth::s3 {
class AWSEngine : public rgw::auth::Engine {
diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc
new file mode 100644
index 00000000000..19c787303be
--- /dev/null
+++ b/src/rgw/rgw_s3select.cc
@@ -0,0 +1,669 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_s3select_private.h"
+
+namespace rgw::s3select {
+RGWOp* create_s3select_op()
+{
+ return new RGWSelectObj_ObjStore_S3();
+}
+};
+
+using namespace s3selectEngine;
+
+std::string& aws_response_handler::get_sql_result()
+{
+ return sql_result;
+}
+
+uint64_t aws_response_handler::get_processed_size()
+{
+ return processed_size;
+}
+
+void aws_response_handler::update_processed_size(uint64_t value)
+{
+ processed_size += value;
+}
+
+uint64_t aws_response_handler::get_total_bytes_returned()
+{
+ return total_bytes_returned;
+}
+
+void aws_response_handler::update_total_bytes_returned(uint64_t value)
+{
+ total_bytes_returned += value;
+}
+
+void aws_response_handler::push_header(const char* header_name, const char* header_value)
+{
+ char x;
+ short s;
+ x = char(strlen(header_name));
+ m_buff_header.append(&x, sizeof(x));
+ m_buff_header.append(header_name);
+ x = char(7);
+ m_buff_header.append(&x, sizeof(x));
+ s = htons(uint16_t(strlen(header_value)));
+ m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s));
+ m_buff_header.append(header_value);
+}
+
+#define IDX( x ) static_cast<int>( x )
+
+int aws_response_handler::create_header_records()
+{
+ //headers description(AWS)
+ //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+ //1
+ push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::RECORDS)]);
+ //2
+ push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]);
+ //3
+ push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
+ return m_buff_header.size();
+}
+
+int aws_response_handler::create_header_continuation()
+{
+ //headers description(AWS)
+ //1
+ push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]);
+ //2
+ push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
+ return m_buff_header.size();
+}
+
+int aws_response_handler::create_header_progress()
+{
+ //headers description(AWS)
+ //1
+ push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::PROGRESS)]);
+ //2
+ push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
+ //3
+ push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
+ return m_buff_header.size();
+}
+
+int aws_response_handler::create_header_stats()
+{
+ //headers description(AWS)
+ //1
+ push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::STATS)]);
+ //2
+ push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
+ //3
+ push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
+ return m_buff_header.size();
+}
+
+int aws_response_handler::create_header_end()
+{
+ //headers description(AWS)
+ //1
+ push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]);
+ //2
+ push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
+ return m_buff_header.size();
+}
+
+int aws_response_handler::create_error_header_records(const char* error_message)
+{
+ //headers description(AWS)
+ //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+ //1
+ push_header(header_name_str[IDX(header_name_En::ERROR_CODE)], header_value_str[IDX(header_value_En::ENGINE_ERROR)]);
+ //2
+ push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message);
+ //3
+ push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]);
+ return m_buff_header.size();
+}
+
+int aws_response_handler::create_message(u_int32_t header_len)
+{
+ //message description(AWS):
+ //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
+ //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
+ //are created later to the produced SQL result, and actually wrapping the payload.
+ auto push_encode_int = [&](u_int32_t s, int pos) {
+ u_int32_t x = htonl(s);
+ sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
+ };
+ u_int32_t total_byte_len = 0;
+ u_int32_t preload_crc = 0;
+ u_int32_t message_crc = 0;
+ total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
+ push_encode_int(total_byte_len, 0);
+ push_encode_int(header_len, 4);
+ crc32.reset();
+ crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
+ preload_crc = crc32();
+ push_encode_int(preload_crc, 8);
+ crc32.reset();
+ crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
+ message_crc = crc32();
+ u_int32_t x = htonl(message_crc);
+ sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
+ return sql_result.size();
+}
+
+void aws_response_handler::init_response()
+{
+ //12 positions for header-crc
+ sql_result.resize(header_crc_size, '\0');
+}
+
+void aws_response_handler::init_success_response()
+{
+ m_buff_header.clear();
+ header_size = create_header_records();
+ sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(PAYLOAD_LINE);
+}
+
+void aws_response_handler::send_continuation_response()
+{
+ sql_result.resize(header_crc_size, '\0');
+ m_buff_header.clear();
+ header_size = create_header_continuation();
+ sql_result.append(m_buff_header.c_str(), header_size);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::init_progress_response()
+{
+ sql_result.resize(header_crc_size, '\0');
+ m_buff_header.clear();
+ header_size = create_header_progress();
+ sql_result.append(m_buff_header.c_str(), header_size);
+}
+
+void aws_response_handler::init_stats_response()
+{
+ sql_result.resize(header_crc_size, '\0');
+ m_buff_header.clear();
+ header_size = create_header_stats();
+ sql_result.append(m_buff_header.c_str(), header_size);
+}
+
+void aws_response_handler::init_end_response()
+{
+ sql_result.resize(header_crc_size, '\0');
+ m_buff_header.clear();
+ header_size = create_header_end();
+ sql_result.append(m_buff_header.c_str(), header_size);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::init_error_response(const char* error_message)
+{
+ //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
+ m_buff_header.clear();
+ header_size = create_error_header_records(error_message);
+ sql_result.append(m_buff_header.c_str(), header_size);
+}
+
+void aws_response_handler::send_success_response()
+{
+ sql_result.append(END_PAYLOAD_LINE);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_error_response(const char* error_code,
+ const char* error_message,
+ const char* resource_id)
+{
+ set_req_state_err(s, 0);
+ dump_errno(s, 400);
+ end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING);
+ dump_start(s);
+ s->formatter->open_object_section("Error");
+ s->formatter->dump_string("Code", error_code);
+ s->formatter->dump_string("Message", error_message);
+ s->formatter->dump_string("Resource", "#Resource#");
+ s->formatter->dump_string("RequestId", resource_id);
+ s->formatter->close_section();
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_progress_response()
+{
+ std::string progress_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Progress>"
+ , get_processed_size(), get_processed_size(), get_total_bytes_returned());
+ sql_result.append(progress_payload);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_stats_response()
+{
+ std::string stats_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>"
+ , get_processed_size(), get_processed_size(), get_total_bytes_returned());
+ sql_result.append(stats_payload);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
+ m_buff_header(std::make_unique<char[]>(1000)),
+ m_parquet_type(false),
+ chunk_number(0)
+{
+ set_get_data(true);
+ fp_get_obj_size = [&]() {
+ return get_obj_size();
+ };
+ fp_range_req = [&](int64_t start, int64_t len, void* buff, optional_yield* y) {
+ ldout(s->cct, 10) << "S3select: range-request start: " << start << " length: " << len << dendl;
+ auto status = range_request(start, len, buff, *y);
+ return status;
+ };
+#ifdef _ARROW_EXIST
+ m_rgw_api.set_get_size_api(fp_get_obj_size);
+ m_rgw_api.set_range_req_api(fp_range_req);
+#endif
+ fp_result_header_format = [this](std::string& result) {
+ m_aws_response_handler.init_response();
+ m_aws_response_handler.init_success_response();
+ return 0;
+ };
+ fp_s3select_result_format = [this](std::string& result) {
+ m_aws_response_handler.send_success_response();
+ return 0;
+ };
+}
+
+RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
+{}
+
+int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
+{
+ if(m_s3select_query.empty() == false) {
+ return 0;
+ }
+ if(s->object->get_name().find(".parquet") != std::string::npos) { //aws cli is missing the parquet
+#ifdef _ARROW_EXIST
+ m_parquet_type = true;
+#else
+ ldpp_dout(this, 10) << "arrow library is not installed" << dendl;
+#endif
+ }
+ //retrieve s3-select query from payload
+ bufferlist data;
+ int ret;
+ int max_size = 4096;
+ std::tie(ret, data) = read_all_input(s, max_size, false);
+ if (ret != 0) {
+ ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
+ return ret;
+ }
+ m_s3select_query = data.to_str();
+ if (m_s3select_query.length() > 0) {
+ ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl;
+ } else {
+ ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
+ return -1;
+ }
+ int status = handle_aws_cli_parameters(m_sql_query);
+ if (status<0) {
+ return status;
+ }
+ return RGWGetObj_ObjStore_S3::get_params(y);
+}
+
+int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
+{
+ int status = 0;
+ uint32_t length_before_processing, length_post_processing;
+ csv_object::csv_defintions csv;
+ const char* s3select_syntax_error = "s3select-Syntax-Error";
+ const char* s3select_resource_id = "resourcse-id";
+ const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
+
+ s3select_syntax.parse_query(query);
+ if (m_row_delimiter.size()) {
+ csv.row_delimiter = *m_row_delimiter.c_str();
+ }
+ if (m_column_delimiter.size()) {
+ csv.column_delimiter = *m_column_delimiter.c_str();
+ }
+ if (m_quot.size()) {
+ csv.quot_char = *m_quot.c_str();
+ }
+ if (m_escape_char.size()) {
+ csv.escape_char = *m_escape_char.c_str();
+ }
+ if (m_enable_progress.compare("true")==0) {
+ enable_progress = true;
+ } else {
+ enable_progress = false;
+ }
+ if (output_row_delimiter.size()) {
+ csv.output_row_delimiter = *output_row_delimiter.c_str();
+ }
+ if (output_column_delimiter.size()) {
+ csv.output_column_delimiter = *output_column_delimiter.c_str();
+ }
+ if (output_quot.size()) {
+ csv.output_quot_char = *output_quot.c_str();
+ }
+ if (output_escape_char.size()) {
+ csv.output_escape_char = *output_escape_char.c_str();
+ }
+ if(output_quote_fields.compare("ALWAYS") == 0) {
+ csv.quote_fields_always = true;
+ } else if(output_quote_fields.compare("ASNEEDED") == 0) {
+ csv.quote_fields_asneeded = true;
+ }
+ if(m_header_info.compare("IGNORE")==0) {
+ csv.ignore_header_info=true;
+ } else if(m_header_info.compare("USE")==0) {
+ csv.use_header_info=true;
+ }
+ m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
+ m_aws_response_handler.init_response();
+ if (s3select_syntax.get_error_description().empty() == false) {
+ //error-flow (syntax-error)
+ m_aws_response_handler.send_error_response(s3select_syntax_error,
+ s3select_syntax.get_error_description().c_str(),
+ s3select_resource_id);
+ ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
+ return -1;
+ } else {
+ if (input == nullptr) {
+ input = "";
+ }
+ m_aws_response_handler.init_success_response();
+ length_before_processing = (m_aws_response_handler.get_sql_result()).size();
+ //query is correct(syntax), processing is starting.
+ status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, s->obj_size);
+ length_post_processing = (m_aws_response_handler.get_sql_result()).size();
+ m_aws_response_handler.update_total_bytes_returned(length_post_processing-length_before_processing);
+ if (status < 0) {
+ //error flow(processing-time)
+ m_aws_response_handler.send_error_response(s3select_processTime_error,
+ m_s3_csv_object.get_error_description().c_str(),
+ s3select_resource_id);
+ ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
+ return -1;
+ }
+ if (chunk_number == 0) {
+ //success flow
+ if (op_ret < 0) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ // Explicitly use chunked transfer encoding so that we can stream the result
+ // to the user without having to wait for the full length of it.
+ end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+ }
+ chunk_number++;
+ }
+ if (length_post_processing-length_before_processing != 0) {
+ m_aws_response_handler.send_success_response();
+ } else {
+ m_aws_response_handler.send_continuation_response();
+ }
+ if (enable_progress == true) {
+ m_aws_response_handler.init_progress_response();
+ m_aws_response_handler.send_progress_response();
+ }
+ return status;
+}
+
+int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
+{
+ int status = 0;
+#ifdef _ARROW_EXIST
+ if (!m_s3_parquet_object.is_set()) {
+ s3select_syntax.parse_query(m_sql_query.c_str());
+ try {
+ m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
+ } catch(base_s3select_exception& e) {
+ ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl;
+ fp_result_header_format(m_aws_response_handler.get_sql_result());
+ m_aws_response_handler.get_sql_result().append(e.what());
+ fp_s3select_result_format(m_aws_response_handler.get_sql_result());
+ return -1;
+ }
+ }
+ if (s3select_syntax.get_error_description().empty() == false) {
+ fp_result_header_format(m_aws_response_handler.get_sql_result());
+ m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
+ fp_s3select_result_format(m_aws_response_handler.get_sql_result());
+ ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
+ status = -1;
+ } else {
+ fp_result_header_format(m_aws_response_handler.get_sql_result());
+ status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
+ if (status < 0) {
+ m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
+ fp_s3select_result_format(m_aws_response_handler.get_sql_result());
+ ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl;
+ }
+ }
+#endif
+ return status;
+}
+
+int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
+{
+ std::string input_tag{"InputSerialization"};
+ std::string output_tag{"OutputSerialization"};
+ if(chunk_number !=0) {
+ return 0;
+ }
+#define GT "&gt;"
+#define LT "&lt;"
+ if (m_s3select_query.find(GT) != std::string::npos) {
+ boost::replace_all(m_s3select_query, GT, ">");
+ }
+ if (m_s3select_query.find(LT) != std::string::npos) {
+ boost::replace_all(m_s3select_query, LT, "<");
+ }
+ //AWS cli s3select parameters
+ extract_by_tag(m_s3select_query, "Expression", sql_query);
+ extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
+ size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
+ size_t _qe = m_s3select_query.find("</" + input_tag + ">", _qi);
+ m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2));
+ extract_by_tag(m_s3select_input, "FieldDelimiter", m_column_delimiter);
+ extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot);
+ extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter);
+ extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info);
+ if (m_row_delimiter.size()==0) {
+ m_row_delimiter='\n';
+ } else if(m_row_delimiter.compare("&#10;") == 0) {
+ //presto change
+ m_row_delimiter='\n';
+ }
+ extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char);
+ extract_by_tag(m_s3select_input, "CompressionType", m_compression_type);
+ size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0);
+ size_t _qs = m_s3select_query.find("</" + output_tag + ">", _qi);
+ m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2));
+ extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter);
+ extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot);
+ extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char);
+ extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields);
+ extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter);
+ if (output_row_delimiter.size()==0) {
+ output_row_delimiter='\n';
+ } else if(output_row_delimiter.compare("&#10;") == 0) {
+ //presto change
+ output_row_delimiter='\n';
+ }
+ if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) {
+ ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl;
+ return -1;
+ }
+ return 0;
+}
+
+int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result)
+{
+ result = "";
+ size_t _qs = input.find("<" + tag_name + ">", 0);
+ size_t qs_input = _qs + tag_name.size() + 2;
+ if (_qs == std::string::npos) {
+ return -1;
+ }
+ size_t _qe = input.find("</" + tag_name + ">", qs_input);
+ if (_qe == std::string::npos) {
+ return -1;
+ }
+ result = input.substr(qs_input, _qe - qs_input);
+ return 0;
+}
+
+size_t RGWSelectObj_ObjStore_S3::get_obj_size()
+{
+ return s->obj_size;
+}
+
+int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff, optional_yield y)
+{
+ //purpose: implementation for arrow::ReadAt, this may take several async calls.
+ //send_response_date(call_back) accumulate buffer, upon completion control is back to ReadAt.
+ range_req_str = "bytes=" + std::to_string(ofs) + "-" + std::to_string(ofs+len-1);
+ range_str = range_req_str.c_str();
+ range_parsed = false;
+ RGWGetObj::parse_range();
+ requested_buffer.clear();
+ m_request_range = len;
+ ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl;
+ RGWGetObj::execute(y);
+ memcpy(buff, requested_buffer.data(), len);
+ ldout(s->cct, 10) << "S3select: done waiting, buffer is complete buffer-size:" << requested_buffer.size() << dendl;
+ return len;
+}
+
+void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
+{
+ int status = 0;
+ char parquet_magic[4];
+ static constexpr uint8_t parquet_magic1[4] = {'P', 'A', 'R', '1'};
+ static constexpr uint8_t parquet_magicE[4] = {'P', 'A', 'R', 'E'};
+ get_params(y);
+#ifdef _ARROW_EXIST
+ m_rgw_api.m_y = &y;
+#endif
+ if (m_parquet_type) {
+ //parquet processing
+ range_request(0, 4, parquet_magic, y);
+ if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) {
+ ldout(s->cct, 10) << s->object->get_name() << " does not contain parquet magic" << dendl;
+ op_ret = -ERR_INVALID_REQUEST;
+ return;
+ }
+ s3select_syntax.parse_query(m_sql_query.c_str());
+ status = run_s3select_on_parquet(m_sql_query.c_str());
+ if (status) {
+ ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl;
+ op_ret = -ERR_INVALID_REQUEST;
+ } else {
+ ldout(s->cct, 10) << "S3select: complete query with success " << dendl;
+ }
+ } else {
+ //CSV processing
+ RGWGetObj::execute(y);
+ }
+}
+
+int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len)
+{
+ if (chunk_number == 0) {
+ if (op_ret < 0) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ }
+ // Explicitly use chunked transfer encoding so that we can stream the result
+ // to the user without having to wait for the full length of it.
+ if (chunk_number == 0) {
+ end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+ }
+ chunk_number++;
+ size_t append_in_callback = 0;
+ int part_no = 1;
+ //concat the requested buffer
+ for (auto& it : bl.buffers()) {
+ if(it.length() == 0) {
+ ldout(s->cct, 10) << "S3select: get zero-buffer while appending request-buffer " << dendl;
+ }
+ append_in_callback += it.length();
+ ldout(s->cct, 10) << "S3select: part " << part_no++ << " it.length() = " << it.length() << dendl;
+ requested_buffer.append(&(it)[0]+ofs, len);
+ }
+ ldout(s->cct, 10) << "S3select:append_in_callback = " << append_in_callback << dendl;
+ if (requested_buffer.size() < m_request_range) {
+ ldout(s->cct, 10) << "S3select: need another round buffe-size: " << requested_buffer.size() << " request range length:" << m_request_range << dendl;
+ return 0;
+ } else {//buffer is complete
+ ldout(s->cct, 10) << "S3select: buffer is complete " << requested_buffer.size() << " request range length:" << m_request_range << dendl;
+ m_request_range = 0;
+ }
+ return 0;
+}
+
+int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
+{
+ int status = 0;
+
+ if (s->obj_size == 0) {
+ status = run_s3select(m_sql_query.c_str(), nullptr, 0);
+ } else {
+ auto bl_len = bl.get_num_buffers();
+ int i=0;
+ for(auto& it : bl.buffers()) {
+ ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
+ << " len " << len << " obj-size " << s->obj_size << dendl;
+ if(it.length() == 0) {
+ ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
+ << " obj-size " << s->obj_size << dendl;
+ continue;
+ }
+ m_aws_response_handler.update_processed_size(it.length());
+ status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
+ if(status<0) {
+ break;
+ }
+ i++;
+ }
+ }
+ if (m_aws_response_handler.get_processed_size() == s->obj_size) {
+ if (status >=0) {
+ m_aws_response_handler.init_stats_response();
+ m_aws_response_handler.send_stats_response();
+ m_aws_response_handler.init_end_response();
+ }
+ }
+ return status;
+}
+
+int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
+{
+ if (!m_aws_response_handler.is_set()) {
+ m_aws_response_handler.set(s, this);
+ }
+ if(len == 0 && s->obj_size != 0) {
+ return 0;
+ }
+ if (m_parquet_type) {
+ return parquet_processing(bl,ofs,len);
+ }
+ return csv_processing(bl,ofs,len);
+}
+
diff --git a/src/rgw/rgw_s3select.h b/src/rgw/rgw_s3select.h
new file mode 100644
index 00000000000..00d55cf93ea
--- /dev/null
+++ b/src/rgw/rgw_s3select.h
@@ -0,0 +1,8 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+//
+
+namespace rgw::s3select {
+RGWOp* create_s3select_op();
+}
+
diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h
new file mode 100644
index 00000000000..9174c04908f
--- /dev/null
+++ b/src/rgw/rgw_s3select_private.h
@@ -0,0 +1,230 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+//
+#pragma once
+
+#include <errno.h>
+#include <array>
+#include <string.h>
+#include <string_view>
+
+#include "common/ceph_crypto.h"
+#include "common/split.h"
+#include "common/Formatter.h"
+#include "common/utf8.h"
+#include "common/ceph_json.h"
+#include "common/safe_io.h"
+#include "common/errno.h"
+#include "auth/Crypto.h"
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/replace.hpp>
+#include <boost/tokenizer.hpp>
+#define BOOST_BIND_GLOBAL_PLACEHOLDERS
+#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion"
+#endif
+#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION
+#pragma clang diagnostic pop
+#endif
+#undef BOOST_BIND_GLOBAL_PLACEHOLDERS
+
+#include <liboath/oath.h>
+
+
+#include <s3select/include/s3select.h>
+#include "rgw_rest_s3.h"
+#include "rgw_s3select.h"
+
+class aws_response_handler
+{
+
+private:
+ std::string sql_result;
+ struct req_state* s;
+ uint32_t header_size;
+ // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
+ boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
+ RGWOp* m_rgwop;
+ std::string m_buff_header;
+ uint64_t total_bytes_returned;
+ uint64_t processed_size;
+
+ enum class header_name_En {
+ EVENT_TYPE,
+ CONTENT_TYPE,
+ MESSAGE_TYPE,
+ ERROR_CODE,
+ ERROR_MESSAGE
+ };
+
+ enum class header_value_En {
+ RECORDS,
+ OCTET_STREAM,
+ EVENT,
+ CONT,
+ PROGRESS,
+ END,
+ XML,
+ STATS,
+ ENGINE_ERROR,
+ ERROR_TYPE
+ };
+
+ const char* PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
+ const char* END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
+ const char* header_name_str[5] = {":event-type", ":content-type", ":message-type", ":error-code", ":error-message"};
+ const char* header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error", "error"};
+ static constexpr size_t header_crc_size = 12;
+
+ void push_header(const char* header_name, const char* header_value);
+
+ int create_message(u_int32_t header_len);
+
+public:
+ aws_response_handler(struct req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0}
+ {}
+
+ aws_response_handler() : s(nullptr), m_rgwop(nullptr), total_bytes_returned{0}, processed_size{0}
+ {}
+
+ bool is_set()
+ {
+ if(s==nullptr || m_rgwop == nullptr){
+ return false;
+ }
+ return true;
+ }
+
+ void set(struct req_state* ps, RGWOp* rgwop)
+ {
+ s = ps;
+ m_rgwop = rgwop;
+ }
+
+ std::string& get_sql_result();
+
+ uint64_t get_processed_size();
+
+ void update_processed_size(uint64_t value);
+
+ uint64_t get_total_bytes_returned();
+
+ void update_total_bytes_returned(uint64_t value);
+
+ int create_header_records();
+
+ int create_header_continuation();
+
+ int create_header_progress();
+
+ int create_header_stats();
+
+ int create_header_end();
+
+ int create_error_header_records(const char* error_message);
+
+ void init_response();
+
+ void init_success_response();
+
+ void send_continuation_response();
+
+ void init_progress_response();
+
+ void init_end_response();
+
+ void init_stats_response();
+
+ void init_error_response(const char* error_message);
+
+ void send_success_response();
+
+ void send_progress_response();
+
+ void send_stats_response();
+
+ void send_error_response(const char* error_code,
+ const char* error_message,
+ const char* resource_id);
+
+}; //end class aws_response_handler
+
+class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
+{
+
+private:
+ s3selectEngine::s3select s3select_syntax;
+ std::string m_s3select_query;
+ std::string m_s3select_input;
+ std::string m_s3select_output;
+ s3selectEngine::csv_object m_s3_csv_object;
+#ifdef _ARROW_EXIST
+ s3selectEngine::parquet_object m_s3_parquet_object;
+#endif
+ std::string m_column_delimiter;
+ std::string m_quot;
+ std::string m_row_delimiter;
+ std::string m_compression_type;
+ std::string m_escape_char;
+ std::unique_ptr<char[]> m_buff_header;
+ std::string m_header_info;
+ std::string m_sql_query;
+ std::string m_enable_progress;
+ std::string output_column_delimiter;
+ std::string output_quot;
+ std::string output_escape_char;
+ std::string output_quote_fields;
+ std::string output_row_delimiter;
+ aws_response_handler m_aws_response_handler;
+ bool enable_progress;
+
+ //parquet request
+ bool m_parquet_type;
+#ifdef _ARROW_EXIST
+ s3selectEngine::rgw_s3select_api m_rgw_api;
+#endif
+ //a request for range may statisfy by several calls to send_response_date;
+ size_t m_request_range;
+ std::string requested_buffer;
+ std::string range_req_str;
+ std::function<int(std::string&)> fp_result_header_format;
+ std::function<int(std::string&)> fp_s3select_result_format;
+ int m_header_size;
+
+public:
+ unsigned int chunk_number;
+
+ RGWSelectObj_ObjStore_S3();
+ virtual ~RGWSelectObj_ObjStore_S3();
+
+ virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;
+
+ virtual int get_params(optional_yield y) override;
+
+ virtual void execute(optional_yield) override;
+
+private:
+
+ int csv_processing(bufferlist& bl, off_t ofs, off_t len);
+
+ int parquet_processing(bufferlist& bl, off_t ofs, off_t len);
+
+ int run_s3select(const char* query, const char* input, size_t input_length);
+
+ int run_s3select_on_parquet(const char* query);
+
+ int extract_by_tag(std::string input, std::string tag_name, std::string& result);
+
+ void convert_escape_seq(std::string& esc);
+
+ int handle_aws_cli_parameters(std::string& sql_query);
+
+ int range_request(int64_t start, int64_t len, void*, optional_yield);
+
+ size_t get_obj_size();
+ std::function<int(int64_t, int64_t, void*, optional_yield*)> fp_range_req;
+ std::function<size_t(void)> fp_get_obj_size;
+
+};
+
diff --git a/src/s3select b/src/s3select
-Subproject f118a761300d7d10b910bc1ba24935e093c064e
+Subproject 1609bb2ab5441d2314f56858f4f98fc2be509f8