Skip to content

Commit

Permalink
Merge branch 'issue/2025010200106809238' into '4_3_3_release'
Browse files Browse the repository at this point in the history
Co-authored-by: northernboylv <510614976@qq.com>
  • Loading branch information
2 people authored and ob-robot committed Jan 16, 2025
1 parent cdc5ab8 commit 60ff900
Show file tree
Hide file tree
Showing 15 changed files with 358 additions and 397 deletions.
3 changes: 2 additions & 1 deletion src/obproxy/cmd/ob_show_sqlaudit_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "lib/time/ob_hrtime.h"
#include "iocore/eventsystem/ob_event_processor.h"
#include "iocore/eventsystem/ob_task.h"
#include "proxy/mysqllib/ob_proxy_parser_utils.h"

using namespace oceanbase::common;
using namespace oceanbase::obmysql;
Expand Down Expand Up @@ -251,7 +252,7 @@ void ObSqlauditRecordQueue::enqueue(const int64_t sm_id, const int64_t gmt_creat
ip.to_string(sqlaudit_records_[index].ip_, ObSqlauditRecord::IP_LENGTH);
sql.to_string(sqlaudit_records_[index].sql_, ObSqlauditRecord::SQL_LENGTH);

ObString cmd = ObString::make_string(get_mysql_cmd_str(sql_cmd));
ObString cmd = ObString::make_string(ObProxyParserUtils::get_sql_cmd_name(sql_cmd));
cmd.to_string(sqlaudit_records_[index].sql_cmd_, ObSqlauditRecord::SQL_CMD_LENGTH);
}
}
Expand Down
43 changes: 23 additions & 20 deletions src/obproxy/proxy/mysql/ob_mysql_sm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4454,6 +4454,7 @@ inline int ObMysqlSM::handle_oceanbase_first_response_packet(ObMysqlAnalyzeStatu
return ret;
}

// must read first ob20/compressed packet completely
inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus &state,
const bool need_receive_completed, int64_t &first_pkt_len)
{
Expand Down Expand Up @@ -4519,7 +4520,7 @@ inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus
if (client_session_->is_trans_internal_routing()
&& ObMysqlTransact::is_in_trans(trans_state_)
&& client_session_->get_trans_coordinator_ss_addr() != trans_state_.server_info_.addr_) {
resp_result.is_trans_completed_ = false;
resp_result.set_is_trans_completed(false);
LOG_DEBUG("error packet from non-coordinator node, keep session in transaction",
"coordinator_addr", client_session_->get_trans_coordinator_ss_addr(),
"curr_addr", trans_state_.server_info_.addr_);
Expand All @@ -4537,14 +4538,16 @@ inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus
&& !is_compressed_ob20
&& !need_receive_completed
&& cmd != OB_MYSQL_COM_STMT_PREPARE
&& cmd != OB_MYSQL_COM_STMT_PREPARE_EXECUTE) {
&& cmd != OB_MYSQL_COM_STMT_PREPARE_EXECUTE
&& first_pkt_len < server_buffer_reader_->read_avail()) { // make sure read whole first packet
if (state == ANALYZE_CONT) {
if (!resp_analyzer_.is_last_pkt(result)) {
if (server_buffer_reader_->read_avail() >= ANALYZE_FIRST_OB20_RESP_MAX_LEN) {
// use tunnel to process the response
resp_result.reset(); // make sure tunnel use a clean resp_result_
resp_result.reset_transmit_control(); // make sure tunnel to transmit as normal
state = ANALYZE_DONE; // finish analyzsis of first response
} else {
// read more data then call this again
first_pkt_len = ANALYZE_FIRST_OB20_RESP_MAX_LEN; // inc the water mark to read more data
state = ANALYZE_CONT; // continue to read data from net
}
Expand All @@ -4553,8 +4556,8 @@ inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus
} else { /* do nothing */ }

// save flt from response analyze result to sm
save_response_flt_result_to_sm(resp_result.flt_);
if (OB_FAIL(handle_feedback_proxy_info(resp_result.extra_info_))) {
save_response_flt_result_to_sm(resp_result.get_flt());
if (OB_FAIL(handle_feedback_proxy_info(resp_result.get_extra_info()))) {
LOG_WDIAG("fail to handle feedback proxy info", K_(sm_id), K(result), K(ret));
}
}
Expand Down Expand Up @@ -6280,7 +6283,7 @@ void ObMysqlSM::print_mysql_complete_log(ObMysqlTunnelProducer *p)
K(cpu_flow_control_count),
K(memory_flow_control_count),
"sql", trans_state_.trans_info_.client_request_.get_print_sql(),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
K(result));
} else if (OB_UNLIKELY(print_info_log)) {
LOG_INFO("finishing mysql tunnel",
Expand All @@ -6298,7 +6301,7 @@ void ObMysqlSM::print_mysql_complete_log(ObMysqlTunnelProducer *p)
K(cpu_flow_control_count),
K(memory_flow_control_count),
"sql", trans_state_.trans_info_.client_request_.get_print_sql(),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
K(result));
} else {
LOG_DEBUG("finishing mysql tunnel",
Expand All @@ -6316,7 +6319,7 @@ void ObMysqlSM::print_mysql_complete_log(ObMysqlTunnelProducer *p)
K(cpu_flow_control_count),
K(memory_flow_control_count),
"sql", trans_state_.trans_info_.client_request_.get_print_sql(),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
K(result));
}
}
Expand Down Expand Up @@ -6402,7 +6405,7 @@ int ObMysqlSM::tunnel_handler_server_cmd_complete(ObMysqlTunnelProducer &p)
if (OB_FAIL(trim_ok_packet(*(c->buffer_reader_)))) {
LOG_WDIAG("fail to trim ok packet", K_(sm_id), K(ret));
} else {
resp_result.is_last_ok_handled_ = true;
resp_result.set_is_last_ok_handled(true);
// we change the writer_ in analyze_extra_ok_packet/rebuild_ok_packet
// so we should set bytes_read_ to the corrent value
p.bytes_read_ += (resp_result.get_rewritten_last_ok_pkt_len()
Expand Down Expand Up @@ -8741,7 +8744,7 @@ void ObMysqlSM::do_internal_request()
default: {
if (!is_supported_mysql_cmd(trans_state_.trans_info_.sql_cmd_)) {
LOG_WDIAG("not supported mysql cmd", K_(sm_id), "sql_cmd", trans_state_.trans_info_.sql_cmd_,
"cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_));
"cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_));
trans_state_.mysql_errcode_ = OB_NOT_SUPPORTED;
if (OB_FAIL(ObMysqlTransact::encode_error_message(trans_state_))) {
LOG_WDIAG("[ObMysqlSM::do_internal_request] fail to build not supported err resp", K_(sm_id), K(ret));
Expand Down Expand Up @@ -8829,8 +8832,8 @@ inline void ObMysqlSM::set_client_abort(const ObMysqlTransact::ObAbortStateType
"client_ip", trans_state_.client_info_.addr_,
"server_ip", trans_state_.server_info_.addr_,
"event", ObMysqlDebugNames::get_event_name(event),
"request_cmd", get_mysql_cmd_str(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"request_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
"sql", trans_state_.trans_info_.get_print_sql());
} else if (client_session_->get_session_info().get_priv_info().user_name_ != ObProxyTableInfo::DETECT_USERNAME_USER) {
LOG_WDIAG("client will abort soon",
Expand All @@ -8846,8 +8849,8 @@ inline void ObMysqlSM::set_client_abort(const ObMysqlTransact::ObAbortStateType
"user_name", client_session_->get_session_info().get_priv_info().user_name_,
"db", client_session_->get_session_info().get_database_name(),
"event", ObMysqlDebugNames::get_event_name(event),
"request_cmd", get_mysql_cmd_str(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"request_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
"sql", trans_state_.trans_info_.get_print_sql());

OBPROXY_XF_LOG(INFO, xflush_head,
Expand All @@ -8858,7 +8861,7 @@ inline void ObMysqlSM::set_client_abort(const ObMysqlTransact::ObAbortStateType
"user_name", client_session_->get_session_info().get_priv_info().user_name_,
"db", client_session_->get_session_info().get_database_name(),
"sql", trans_state_.trans_info_.client_request_.get_print_sql(),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
"event", ObMysqlDebugNames::get_event_name(event));
}
}
Expand Down Expand Up @@ -9878,8 +9881,8 @@ void ObMysqlSM::handle_obproxy_error_transfer()
ObMySQLCmd request_cmd = trans_state_.trans_info_.client_request_.get_packet_meta().cmd_;
if (OB_MYSQL_COM_QUIT == request_cmd || (NULL != client_session_ && client_session_->is_proxy_mysql_client())) {
LOG_INFO("[setup_error_transfer] Now closing connection caused by OB_MYSQL_COM_QUIT", K_(sm_id),
"request_cmd", get_mysql_cmd_str(request_cmd),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"request_cmd", ObProxyParserUtils::get_sql_cmd_name(request_cmd),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
"sql", trans_state_.trans_info_.get_print_sql());
terminate_sm_ = true;
} else {
Expand All @@ -9888,8 +9891,8 @@ void ObMysqlSM::handle_obproxy_error_transfer()
}
LOG_WDIAG("[setup_error_transfer] Now closing connection", K_(sm_id),
"client_ip", trans_state_.client_info_.addr_,
"request_cmd", get_mysql_cmd_str(request_cmd),
"sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_),
"request_cmd", ObProxyParserUtils::get_sql_cmd_name(request_cmd),
"sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_),
"sql", trans_state_.trans_info_.get_print_sql());
terminate_sm_ = true;
}
Expand Down Expand Up @@ -10712,7 +10715,7 @@ void ObMysqlSM::get_monitor_error_info(int32_t &error_code, ObString &error_msg,
error_code = trans_state_.inner_errcode_;
msg = trans_state_.inner_errmsg_ != NULL ? trans_state_.inner_errmsg_ : ob_strerror(trans_state_.inner_errcode_);
error_msg.assign_ptr(msg, static_cast<int32_t>(STRLEN(msg)));
} else if (resp_result.is_resp_completed_) {
} else if (resp_result.is_resp_completed()) {
is_error_resp = resp_result.is_error_resp();
if (is_error_resp) {
error_code = resp_result.get_error_code();
Expand Down
2 changes: 1 addition & 1 deletion src/obproxy/proxy/mysql/ob_mysql_sm.h
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ inline bool ObMysqlSM::need_print_trace_stat() const

inline const common::ObString &ObMysqlSM::get_server_trace_id()
{
return trans_state_.trans_info_.resp_result_.server_trace_id_;
return trans_state_.trans_info_.resp_result_.get_server_trace_id();
}

struct ObMysqlSMListBucket
Expand Down
Loading

0 comments on commit 60ff900

Please sign in to comment.