diff options
Diffstat (limited to 'src/rgw/rgw_s3select.cc')
-rw-r--r-- | src/rgw/rgw_s3select.cc | 67 |
1 files changed, 43 insertions, 24 deletions
diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 800d276a6aa..d8be76a6b1c 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -344,7 +344,7 @@ RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) { - if(m_s3select_query.empty() == false) { + if (m_s3select_query.empty() == false) { return 0; } #ifndef _ARROW_EXIST @@ -416,14 +416,14 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* if (output_escape_char.size()) { csv.output_escape_char = *output_escape_char.c_str(); } - if(output_quote_fields.compare("ALWAYS") == 0) { + if (output_quote_fields.compare("ALWAYS") == 0) { csv.quote_fields_always = true; - } else if(output_quote_fields.compare("ASNEEDED") == 0) { + } else if (output_quote_fields.compare("ASNEEDED") == 0) { csv.quote_fields_asneeded = true; } - if(m_header_info.compare("IGNORE")==0) { + if (m_header_info.compare("IGNORE")==0) { csv.ignore_header_info=true; - } else if(m_header_info.compare("USE")==0) { + } else if (m_header_info.compare("USE")==0) { csv.use_header_info=true; } @@ -478,6 +478,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) if (!m_s3_parquet_object.is_set()) { //parsing the SQL statement. s3select_syntax.parse_query(m_sql_query.c_str()); + parquet_object::csv_definitions parquet; m_s3_parquet_object.set_external_system_functions(fp_s3select_continue, fp_s3select_result_format, @@ -485,8 +486,10 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) fp_debug_mesg); try { + //setting the Parquet-reader properties. i.e. the buffer-size for the Parquet-reader + parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size); //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object - m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api); + m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api, parquet); } catch(base_s3select_exception& e) { ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl; fp_result_header_format(m_aws_response_handler.get_sql_result()); @@ -524,6 +527,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char fp_s3select_result_format, fp_result_header_format, fp_debug_mesg); + json_object::csv_definitions json; m_aws_response_handler.init_response(); @@ -547,8 +551,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char } //initializing json processor - json_object::csv_definitions output_definition; - m_s3_json_object.set_json_query(&s3select_syntax,output_definition); + m_s3_json_object.set_json_query(&s3select_syntax, json); if (input == nullptr) { input = ""; @@ -706,6 +709,7 @@ int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff RGWGetObj::parse_range(); requested_buffer.clear(); m_request_range = len; + m_aws_response_handler.update_processed_size(len); ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl; RGWGetObj::execute(y); if (buff) { @@ -730,7 +734,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding); } - if(s->cct->_conf->rgw_disable_s3select == true) + if (s->cct->_conf->rgw_disable_s3select == true) { std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter"; ldpp_dout(this, 10) << error_msg << dendl; @@ -749,12 +753,26 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) return; } s3select_syntax.parse_query(m_sql_query.c_str()); + //the run_s3select_on_parquet() calling the s3select-query-engine, that read and process the parquet object with RGW::range_request, + //upon query-engine finish the processing, the control is back to execute() + //the parquet-reader indicates the end of the parquet object. status = run_s3select_on_parquet(m_sql_query.c_str()); if (status) { ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl; op_ret = -ERR_INVALID_REQUEST; } else { - ldout(s->cct, 10) << "S3select: complete query with success " << dendl; + //status per amount of processed data +#ifdef _ARROW_EXIST + m_aws_response_handler.update_total_bytes_returned(m_s3_parquet_object.get_return_result_size()); +#endif + m_aws_response_handler.init_stats_response(); + m_aws_response_handler.send_stats_response(); + m_aws_response_handler.init_end_response(); + ldpp_dout(this, 10) << "s3select : reached the end of parquet query request : aws_response_handler.get_processed_size() " + << m_aws_response_handler.get_processed_size() + << "m_object_size_for_processing : " << m_object_size_for_processing << dendl; + + ldout(s->cct, 10) << "S3select: complete parquet query with success " << dendl; } } else { //CSV or JSON processing @@ -762,7 +780,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) m_requested_range = (m_end_scan_sz - m_start_scan_sz); - if(m_is_trino_request){ + if (m_is_trino_request){ // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row, // thus the additional length will be processed, and no broken row for Trino. // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range) @@ -778,7 +796,8 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) } int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len) -{ +{//purpose: to process the returned buffer from range-request, and to send it to the Parquet-reader. + //range_request() is called by arrow::ReadAt, and upon request completion the control is back to RGWSelectObj_ObjStore_S3::execute() fp_chunked_transfer_encoding(); size_t append_in_callback = 0; int part_no = 1; @@ -809,7 +828,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, //the purpose is to return "perfect" results, with no broken or missing lines. off_t new_offset = 0; - if(m_scan_range_ind){//only upon range-scan + if (m_scan_range_ind){//only upon range-scan int64_t sc=0; int64_t start =0; const char* row_delimiter = m_row_delimiter.c_str(); @@ -817,10 +836,10 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl; //chop the head of the first chunk and only upon the slice does not include the head of the object. - if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){ + if (m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){ char* p = const_cast<char*>(it_cp+ofs); while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++; - if(!strncmp(row_delimiter,p,1)){ + if (!strncmp(row_delimiter,p,1)){ new_offset += (p - (it_cp+ofs))+1; } } @@ -831,14 +850,14 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, //chop the end of the last chunk for this request //if it's the last chunk, search for first row-delimiter for the following different use-cases - if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ + if ((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ //had pass the requested range, start to search for first delimiter - if(m_aws_response_handler.get_processed_size()>m_requested_range){ + if (m_aws_response_handler.get_processed_size()>m_requested_range){ //the previous chunk contain the complete request(all data) and an extra bytes. //thus, search for the first row-delimiter //[:previous (RR) ... ][:current (RD) ] start = 0; - } else if(m_aws_response_handler.get_processed_size()){ + } else if (m_aws_response_handler.get_processed_size()){ //the *current* chunk contain the complete request in the middle of the chunk. //thus, search for the first row-delimiter after the complete request position //[:current (RR) .... (RD) ] @@ -852,7 +871,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject { char* p = const_cast<char*>(it_cp) + ofs + sc; - if(!strncmp(row_delimiter,p,1)){ + if (!strncmp(row_delimiter,p,1)){ ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl; len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len) //the end of row exist in current chunk. @@ -872,7 +891,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) { int status = 0; - if(m_skip_next_chunk == true){ + if (m_skip_next_chunk == true){ return status; } @@ -894,13 +913,13 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le } - if(ofs > it.length()){ + if (ofs > it.length()){ //safety check ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; ofs = 0; } - if(m_is_trino_request){ + if (m_is_trino_request){ //TODO replace len with it.length() ? ; test Trino flow with compressed objects. //is it possible to send get-by-ranges? in parallel? shape_chunk_per_trino_requests(&(it)[0], ofs, len); @@ -964,7 +983,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l continue; } - if((ofs + len) > it.length()){ + if ((ofs + len) > it.length()){ ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; ofs = 0; len = it.length(); @@ -1025,7 +1044,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ if (len == 0 && s->obj_size != 0) { return 0; } - if (m_parquet_type) { + if (m_parquet_type) {//bufferlist sendback upon range-request return parquet_processing(bl,ofs,len); } if (m_json_type) { |