diff --git a/src/obproxy/cmd/ob_show_sqlaudit_handler.cpp b/src/obproxy/cmd/ob_show_sqlaudit_handler.cpp index 93d3f31a..d947e1b0 100644 --- a/src/obproxy/cmd/ob_show_sqlaudit_handler.cpp +++ b/src/obproxy/cmd/ob_show_sqlaudit_handler.cpp @@ -15,6 +15,7 @@ #include "lib/time/ob_hrtime.h" #include "iocore/eventsystem/ob_event_processor.h" #include "iocore/eventsystem/ob_task.h" +#include "proxy/mysqllib/ob_proxy_parser_utils.h" using namespace oceanbase::common; using namespace oceanbase::obmysql; @@ -251,7 +252,7 @@ void ObSqlauditRecordQueue::enqueue(const int64_t sm_id, const int64_t gmt_creat ip.to_string(sqlaudit_records_[index].ip_, ObSqlauditRecord::IP_LENGTH); sql.to_string(sqlaudit_records_[index].sql_, ObSqlauditRecord::SQL_LENGTH); - ObString cmd = ObString::make_string(get_mysql_cmd_str(sql_cmd)); + ObString cmd = ObString::make_string(ObProxyParserUtils::get_sql_cmd_name(sql_cmd)); cmd.to_string(sqlaudit_records_[index].sql_cmd_, ObSqlauditRecord::SQL_CMD_LENGTH); } } diff --git a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp index d0725579..f13bed92 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp @@ -4454,6 +4454,7 @@ inline int ObMysqlSM::handle_oceanbase_first_response_packet(ObMysqlAnalyzeStatu return ret; } +// must read first ob20/compressed packet completely inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus &state, const bool need_receive_completed, int64_t &first_pkt_len) { @@ -4519,7 +4520,7 @@ inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus if (client_session_->is_trans_internal_routing() && ObMysqlTransact::is_in_trans(trans_state_) && client_session_->get_trans_coordinator_ss_addr() != trans_state_.server_info_.addr_) { - resp_result.is_trans_completed_ = false; + resp_result.set_is_trans_completed(false); LOG_DEBUG("error packet from non-coordinator node, keep session in transaction", "coordinator_addr", client_session_->get_trans_coordinator_ss_addr(), "curr_addr", trans_state_.server_info_.addr_); @@ -4537,14 +4538,16 @@ inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus && !is_compressed_ob20 && !need_receive_completed && cmd != OB_MYSQL_COM_STMT_PREPARE - && cmd != OB_MYSQL_COM_STMT_PREPARE_EXECUTE) { + && cmd != OB_MYSQL_COM_STMT_PREPARE_EXECUTE + && first_pkt_len < server_buffer_reader_->read_avail()) { // make sure read whole first packet if (state == ANALYZE_CONT) { if (!resp_analyzer_.is_last_pkt(result)) { if (server_buffer_reader_->read_avail() >= ANALYZE_FIRST_OB20_RESP_MAX_LEN) { // use tunnel to process the response - resp_result.reset(); // make sure tunnel use a clean resp_result_ + resp_result.reset_transmit_control(); // make sure tunnel to transmit as normal state = ANALYZE_DONE; // finish analyzsis of first response } else { + // read more data then call this again first_pkt_len = ANALYZE_FIRST_OB20_RESP_MAX_LEN; // inc the water mark to read more data state = ANALYZE_CONT; // continue to read data from net } @@ -4553,8 +4556,8 @@ inline int ObMysqlSM::handle_first_compress_response_packet(ObMysqlAnalyzeStatus } else { /* do nothing */ } // save flt from response analyze result to sm - save_response_flt_result_to_sm(resp_result.flt_); - if (OB_FAIL(handle_feedback_proxy_info(resp_result.extra_info_))) { + save_response_flt_result_to_sm(resp_result.get_flt()); + if (OB_FAIL(handle_feedback_proxy_info(resp_result.get_extra_info()))) { LOG_WDIAG("fail to handle feedback proxy info", K_(sm_id), K(result), K(ret)); } } @@ -6280,7 +6283,7 @@ void ObMysqlSM::print_mysql_complete_log(ObMysqlTunnelProducer *p) K(cpu_flow_control_count), K(memory_flow_control_count), "sql", trans_state_.trans_info_.client_request_.get_print_sql(), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), K(result)); } else if (OB_UNLIKELY(print_info_log)) { LOG_INFO("finishing mysql tunnel", @@ -6298,7 +6301,7 @@ void ObMysqlSM::print_mysql_complete_log(ObMysqlTunnelProducer *p) K(cpu_flow_control_count), K(memory_flow_control_count), "sql", trans_state_.trans_info_.client_request_.get_print_sql(), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), K(result)); } else { LOG_DEBUG("finishing mysql tunnel", @@ -6316,7 +6319,7 @@ void ObMysqlSM::print_mysql_complete_log(ObMysqlTunnelProducer *p) K(cpu_flow_control_count), K(memory_flow_control_count), "sql", trans_state_.trans_info_.client_request_.get_print_sql(), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), K(result)); } } @@ -6402,7 +6405,7 @@ int ObMysqlSM::tunnel_handler_server_cmd_complete(ObMysqlTunnelProducer &p) if (OB_FAIL(trim_ok_packet(*(c->buffer_reader_)))) { LOG_WDIAG("fail to trim ok packet", K_(sm_id), K(ret)); } else { - resp_result.is_last_ok_handled_ = true; + resp_result.set_is_last_ok_handled(true); // we change the writer_ in analyze_extra_ok_packet/rebuild_ok_packet // so we should set bytes_read_ to the corrent value p.bytes_read_ += (resp_result.get_rewritten_last_ok_pkt_len() @@ -8741,7 +8744,7 @@ void ObMysqlSM::do_internal_request() default: { if (!is_supported_mysql_cmd(trans_state_.trans_info_.sql_cmd_)) { LOG_WDIAG("not supported mysql cmd", K_(sm_id), "sql_cmd", trans_state_.trans_info_.sql_cmd_, - "cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_)); + "cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_)); trans_state_.mysql_errcode_ = OB_NOT_SUPPORTED; if (OB_FAIL(ObMysqlTransact::encode_error_message(trans_state_))) { LOG_WDIAG("[ObMysqlSM::do_internal_request] fail to build not supported err resp", K_(sm_id), K(ret)); @@ -8829,8 +8832,8 @@ inline void ObMysqlSM::set_client_abort(const ObMysqlTransact::ObAbortStateType "client_ip", trans_state_.client_info_.addr_, "server_ip", trans_state_.server_info_.addr_, "event", ObMysqlDebugNames::get_event_name(event), - "request_cmd", get_mysql_cmd_str(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), "sql", trans_state_.trans_info_.get_print_sql()); } else if (client_session_->get_session_info().get_priv_info().user_name_ != ObProxyTableInfo::DETECT_USERNAME_USER) { LOG_WDIAG("client will abort soon", @@ -8846,8 +8849,8 @@ inline void ObMysqlSM::set_client_abort(const ObMysqlTransact::ObAbortStateType "user_name", client_session_->get_session_info().get_priv_info().user_name_, "db", client_session_->get_session_info().get_database_name(), "event", ObMysqlDebugNames::get_event_name(event), - "request_cmd", get_mysql_cmd_str(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.client_request_.get_packet_meta().cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), "sql", trans_state_.trans_info_.get_print_sql()); OBPROXY_XF_LOG(INFO, xflush_head, @@ -8858,7 +8861,7 @@ inline void ObMysqlSM::set_client_abort(const ObMysqlTransact::ObAbortStateType "user_name", client_session_->get_session_info().get_priv_info().user_name_, "db", client_session_->get_session_info().get_database_name(), "sql", trans_state_.trans_info_.client_request_.get_print_sql(), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), "event", ObMysqlDebugNames::get_event_name(event)); } } @@ -9878,8 +9881,8 @@ void ObMysqlSM::handle_obproxy_error_transfer() ObMySQLCmd request_cmd = trans_state_.trans_info_.client_request_.get_packet_meta().cmd_; if (OB_MYSQL_COM_QUIT == request_cmd || (NULL != client_session_ && client_session_->is_proxy_mysql_client())) { LOG_INFO("[setup_error_transfer] Now closing connection caused by OB_MYSQL_COM_QUIT", K_(sm_id), - "request_cmd", get_mysql_cmd_str(request_cmd), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(request_cmd), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), "sql", trans_state_.trans_info_.get_print_sql()); terminate_sm_ = true; } else { @@ -9888,8 +9891,8 @@ void ObMysqlSM::handle_obproxy_error_transfer() } LOG_WDIAG("[setup_error_transfer] Now closing connection", K_(sm_id), "client_ip", trans_state_.client_info_.addr_, - "request_cmd", get_mysql_cmd_str(request_cmd), - "sql_cmd", get_mysql_cmd_str(trans_state_.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(request_cmd), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(trans_state_.trans_info_.sql_cmd_), "sql", trans_state_.trans_info_.get_print_sql()); terminate_sm_ = true; } @@ -10712,7 +10715,7 @@ void ObMysqlSM::get_monitor_error_info(int32_t &error_code, ObString &error_msg, error_code = trans_state_.inner_errcode_; msg = trans_state_.inner_errmsg_ != NULL ? trans_state_.inner_errmsg_ : ob_strerror(trans_state_.inner_errcode_); error_msg.assign_ptr(msg, static_cast(STRLEN(msg))); - } else if (resp_result.is_resp_completed_) { + } else if (resp_result.is_resp_completed()) { is_error_resp = resp_result.is_error_resp(); if (is_error_resp) { error_code = resp_result.get_error_code(); diff --git a/src/obproxy/proxy/mysql/ob_mysql_sm.h b/src/obproxy/proxy/mysql/ob_mysql_sm.h index d823fc50..eba35347 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_sm.h +++ b/src/obproxy/proxy/mysql/ob_mysql_sm.h @@ -786,7 +786,7 @@ inline bool ObMysqlSM::need_print_trace_stat() const inline const common::ObString &ObMysqlSM::get_server_trace_id() { - return trans_state_.trans_info_.resp_result_.server_trace_id_; + return trans_state_.trans_info_.resp_result_.get_server_trace_id(); } struct ObMysqlSMListBucket diff --git a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp index e1000829..a610d575 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp @@ -4949,7 +4949,9 @@ void ObMysqlTransact::handle_error_resp(ObTransState &s, bool &is_user_request) ObServerStateType pre_state = s.current_.state_; s.current_.state_ = RESPONSE_ERROR; - s.trace_log_.log_it("[get_error]", "code", static_cast(resp.error_pkt_.get_err_code()), "trace_id", resp.server_trace_id_); + s.trace_log_.log_it("[get_error]", + "code", static_cast(resp.get_error_pkt().get_err_code()), + "trace_id", resp.get_server_trace_id()); if (resp.is_mysql_wrong_arguments_error() || resp.is_trans_free_route_not_supported_error() || resp.is_internal_error()) { @@ -4958,11 +4960,13 @@ void ObMysqlTransact::handle_error_resp(ObTransState &s, bool &is_user_request) if (resp.is_client_session_killed_error()) { COLLECT_INTERNAL_DIAGNOSIS( - s.sm_->connection_diagnosis_trace_, obutils::OB_SERVER_INTERNAL_TRACE, - resp.error_pkt_.get_err_code(), - "this session was killed by other user session, error_code: %d, " - "error_msg: %.*s", - resp.error_pkt_.get_err_code(), resp.error_pkt_.get_message().length(), resp.error_pkt_.get_message().ptr()); + s.sm_->connection_diagnosis_trace_, obutils::OB_SERVER_INTERNAL_TRACE, + resp.get_error_pkt().get_err_code(), + "this session was killed by other user session, error_code: %d, " + "error_msg: %.*s", + resp.get_error_pkt().get_err_code(), + resp.get_error_pkt().get_message().length(), + resp.get_error_pkt().get_message().ptr()); } switch(s.current_.send_action_) { @@ -6207,8 +6211,8 @@ void ObMysqlTransact::handle_oceanbase_server_resp_error(ObTransState &s, ObMySQ case ORA_FATAL_ERROR: LOG_EDIAG("ob ora fatal error", "sql", s.trans_info_.client_request_.get_print_sql(), - "origin_sql_cmd", get_mysql_cmd_str(request_cmd), - "current_sql_cmd", get_mysql_cmd_str(current_cmd)); + "origin_sql_cmd", ObProxyParserUtils::get_sql_cmd_name(request_cmd), + "current_sql_cmd", ObProxyParserUtils::get_sql_cmd_name(current_cmd)); if (obmysql::OB_MYSQL_COM_STMT_EXECUTE == request_cmd) { ObClientSessionInfo &cs_info = s.sm_->get_client_session()->get_session_info(); ObServerSessionInfo &ss_info = s.sm_->get_server_session()->get_session_info(); @@ -6369,14 +6373,14 @@ inline void ObMysqlTransact::handle_response_from_server(ObTransState &s) LOG_INFO("INACTIVE_TIMEOUT caused by OB_MYSQL_COM_QUIT, which is a normal condition", "server_state", ObMysqlTransact::get_server_state_name(s.current_.state_), "addr", ObIpEndpoint(ss->get_netvc()->get_remote_addr()), - "request_cmd", get_mysql_cmd_str(s.trans_info_.client_request_.get_packet_meta().cmd_), - "sql_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.client_request_.get_packet_meta().cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_), "sql", s.trans_info_.get_print_sql()); } else if (s.need_retry_) { LOG_WDIAG("connection error", "server_state", ObMysqlTransact::get_server_state_name(s.current_.state_), - "request_cmd", get_mysql_cmd_str(s.trans_info_.client_request_.get_packet_meta().cmd_), - "sql_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.client_request_.get_packet_meta().cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_), "sql", s.trans_info_.get_print_sql()); } } @@ -6901,8 +6905,8 @@ inline void ObMysqlTransact::handle_server_connection_break(ObTransState &s) "proxy_user_name", s.sm_->client_session_->get_session_info().get_priv_info().get_proxy_user_name(), "database_name", s.sm_->client_session_->get_session_info().get_database_name(), "server_state", ObMysqlTransact::get_server_state_name(s.current_.state_), - "request_cmd", get_mysql_cmd_str(s.trans_info_.client_request_.get_packet_meta().cmd_), - "sql_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.client_request_.get_packet_meta().cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_), "sql", s.trans_info_.get_print_sql()); if (obmysql::OB_MYSQL_COM_QUIT != s.trans_info_.sql_cmd_) { @@ -6915,7 +6919,7 @@ inline void ObMysqlTransact::handle_server_connection_break(ObTransState &s) "db", s.sm_->client_session_->get_session_info().get_database_name(), "server_state", ObMysqlTransact::get_server_state_name(s.current_.state_), "sql", s.trans_info_.client_request_.get_print_sql(), - "request_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_)); + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_)); s.trace_log_.log_it("[svr_connection_break]", "cli", s.client_info_.addr_, @@ -6926,7 +6930,7 @@ inline void ObMysqlTransact::handle_server_connection_break(ObTransState &s) "db", s.sm_->client_session_->get_session_info().get_database_name(), "svr_state", ObString(ObMysqlTransact::get_server_state_name(s.current_.state_)), "sql", s.trans_info_.client_request_.get_print_sql(), - "sql_cmd", ObString(get_mysql_cmd_str(s.trans_info_.sql_cmd_)), + "sql_cmd", ObString(ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_)), "coord", s.sm_->client_session_->get_trans_coordinator_ss_addr()); LOG_WDIAG("trace_log", K(s.trace_log_)); } @@ -6936,8 +6940,8 @@ inline void ObMysqlTransact::handle_server_connection_break(ObTransState &s) "client_ip", s.client_info_.addr_, "server_ip", s.server_info_.addr_, "server_state", ObMysqlTransact::get_server_state_name(s.current_.state_), - "request_cmd", get_mysql_cmd_str(s.trans_info_.client_request_.get_packet_meta().cmd_), - "sql_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_), + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.client_request_.get_packet_meta().cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_), "sql", s.trans_info_.get_print_sql()); if (obmysql::OB_MYSQL_COM_QUIT != s.trans_info_.sql_cmd_) { @@ -6946,14 +6950,14 @@ inline void ObMysqlTransact::handle_server_connection_break(ObTransState &s) "server_ip", s.server_info_.addr_, "server_state", ObMysqlTransact::get_server_state_name(s.current_.state_), "sql", s.trans_info_.client_request_.get_print_sql(), - "request_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_)); + "request_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_)); s.trace_log_.log_it("[svr_connection_break]", "cli", s.client_info_.addr_, "svr", s.server_info_.addr_, "svr_state", ObString(ObMysqlTransact::get_server_state_name(s.current_.state_)), "sql", s.trans_info_.client_request_.get_print_sql(), - "sql_cmd", ObString(get_mysql_cmd_str(s.trans_info_.sql_cmd_))); + "sql_cmd", ObString(ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_))); LOG_WDIAG("trace_log", K(s.trace_log_)); } } @@ -7527,14 +7531,14 @@ inline void ObMysqlTransact::handle_server_failed(ObTransState &s) ObRespAnalyzeResult &resp = s.trans_info_.resp_result_; if (resp.is_error_resp()) { - switch (resp.error_pkt_.get_err_code()) { + switch (resp.get_error_pkt().get_err_code()) { case -OB_SERVER_IS_INIT: case -OB_SERVER_IS_STOPPING: case -OB_PACKET_CHECKSUM_ERROR: case -OB_ALLOCATE_MEMORY_FAILED: // congestion control LOG_INFO("ObMysqlTransact::handle_server_failed", "err code", - resp.error_pkt_.get_err_code(), KPC(s.congestion_entry_)); + resp.get_error_pkt().get_err_code(), KPC(s.congestion_entry_)); if (s.sm_->client_session_->get_session_info().is_oceanbase_server()) { s.set_alive_failed(); } @@ -7558,14 +7562,14 @@ inline void ObMysqlTransact::handle_server_failed(ObTransState &s) "maybe this new server do not support old agreement, try next server", "zone_type", zone_type_to_str(s.pll_info_.route_.cur_chosen_server_.zone_type_), "origin_name", s.pll_info_.te_name_, - "sql_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_), "sql", s.trans_info_.client_request_.get_print_sql(), "route info", s.pll_info_.route_); } else { LOG_WDIAG("zone is readonly, proxy should not send request to it, " "maybe zone type has been changed, try next server", "origin_name", s.pll_info_.te_name_, - "sql_cmd", get_mysql_cmd_str(s.trans_info_.sql_cmd_), + "sql_cmd", ObProxyParserUtils::get_sql_cmd_name(s.trans_info_.sql_cmd_), "sql", s.trans_info_.client_request_.get_print_sql(), "route info", s.pll_info_.route_); } diff --git a/src/obproxy/proxy/mysqllib/ob_proxy_session_info_handler.cpp b/src/obproxy/proxy/mysqllib/ob_proxy_session_info_handler.cpp index 72ca10f5..c49cad54 100644 --- a/src/obproxy/proxy/mysqllib/ob_proxy_session_info_handler.cpp +++ b/src/obproxy/proxy/mysqllib/ob_proxy_session_info_handler.cpp @@ -91,7 +91,7 @@ int ObProxySessionInfoHandler::analyze_extra_ok_packet(ObIOBufferReader &reader, } // we will trim the extra ok packet anaway - resp_result.rewritten_last_ok_pkt_len_ = 0; + resp_result.set_rewritten_last_ok_pkt_len(0); ObMIOBuffer *writer = reader.writer(); if (OB_ISNULL(writer)) { ret = OB_ERR_UNEXPECTED; @@ -150,7 +150,7 @@ int ObProxySessionInfoHandler::rebuild_ok_packet(ObIOBufferReader &reader, K(des_ok), "dst_size", des_ok.get_serialize_size()); // 4. trim the orig ok packet - resp_result.rewritten_last_ok_pkt_len_ = des_ok.get_serialize_size() + MYSQL_NET_HEADER_LENGTH; + resp_result.set_rewritten_last_ok_pkt_len(des_ok.get_serialize_size() + MYSQL_NET_HEADER_LENGTH); ObMIOBuffer *writer = reader.writer(); if (OB_ISNULL(writer)) { ret = OB_ERR_UNEXPECTED; @@ -167,7 +167,7 @@ int ObProxySessionInfoHandler::rebuild_ok_packet(ObIOBufferReader &reader, } } else { // TODO: we may trim the last byte later - resp_result.rewritten_last_ok_pkt_len_ = pkt_len; + resp_result.set_rewritten_last_ok_pkt_len(pkt_len); } return ret; } @@ -924,9 +924,9 @@ inline int ObProxySessionInfoHandler::handle_partition_hit_var( need_save = false; } else { if (value == ObString::make_string("1")) { - resp_result.is_partition_hit_ = true; + resp_result.set_is_partition_hit(true); } else { - resp_result.is_partition_hit_ = false; + resp_result.set_is_partition_hit(false); } } @@ -941,9 +941,8 @@ inline int ObProxySessionInfoHandler::handle_last_insert_id_var( bool &need_save) { if (!is_auth_request) { - resp_result.is_last_insert_id_changed_ = true; + resp_result.set_is_last_insert_id_changed(true); } - return handle_common_var(client_info, str_kv, is_auth_request, resp_result, need_save); } @@ -1000,7 +999,7 @@ inline int ObProxySessionInfoHandler::handle_common_var( } else if (OB_FAIL(client_info.is_equal_with_snapshot(str_kv.key_, str_kv.value_, is_equal))) { // maybe observer has upgraded if (OB_UNLIKELY(OB_ERR_SYS_VARIABLE_UNKNOWN == ret)) { - resp_result.has_new_sys_var_ = true; + resp_result.set_has_new_sys_var(true); LOG_WDIAG("unknown system variable, maybe observer has upgrade", K(str_kv), K(ret)); ret = OB_SUCCESS; // do not save the new variable; @@ -1107,7 +1106,7 @@ inline int ObProxySessionInfoHandler::handle_sys_var(ObClientSessionInfo &client if (OB_FAIL(ret)) { if (OB_ERR_SYS_VARIABLE_UNKNOWN == ret) { // maybe observer has upgraded - resp_result.has_new_sys_var_ = true; + resp_result.set_has_new_sys_var(true); LOG_WDIAG("unknown system variable, maybe observer has upgrade", K(str_kv), K(ret)); ret = OB_SUCCESS; } else { @@ -1118,7 +1117,7 @@ inline int ObProxySessionInfoHandler::handle_sys_var(ObClientSessionInfo &client // if last_insert_id has changed, assign version // NOTE: do this after update sys variable if (OB_SUCC(ret)) { - if (resp_result.is_last_insert_id_changed_) { + if (resp_result.is_last_insert_id_changed()) { assign_last_insert_id_version(client_info, server_info); } } @@ -1168,7 +1167,7 @@ inline int ObProxySessionInfoHandler::handle_weak_read_replica_hit_var(const ObS { int ret = OB_SUCCESS; need_save = false; - resp_result.weak_read_hit_replica_ = get_weak_read_hit_replica_enum(value); + resp_result.set_weak_read_hit_replica(get_weak_read_hit_replica_enum(value)); return ret; } @@ -1192,7 +1191,7 @@ int ObProxySessionInfoHandler::save_changed_session_info(ObClientSessionInfo &cl } // 2. save sys var - resp_result.has_new_sys_var_ = false; + resp_result.set_has_new_sys_var(false); const ObIArray &sys_var = ok_pkt.get_system_vars(); if (!sys_var.empty()) { for (int64_t i = 0; i < sys_var.count() && OB_SUCC(ret); ++i) { @@ -1223,7 +1222,7 @@ int ObProxySessionInfoHandler::save_changed_session_info(ObClientSessionInfo &cl LOG_WDIAG("fail to set changed database name", K(db_name), K(ret)); } } else { - resp_result.is_server_db_reset_ = true; + resp_result.set_is_server_db_reset(true); LOG_DEBUG("db has been reset"); } } @@ -1239,7 +1238,7 @@ int ObProxySessionInfoHandler::save_changed_session_info(ObClientSessionInfo &cl } else { if (PROXY_IDC_NAME_USER_SESSION_VAR == str_kv.key_) { const ObString value = trim_quote(str_kv.value_); - resp_result.has_proxy_idc_name_user_var_ = true; + resp_result.set_has_proxy_idc_name_user_var(true); client_info.set_idc_name(value); LOG_INFO("succ to update user session variable proxy_idc_name", "idc_name", client_info.get_idc_name()); diff --git a/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.cpp b/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.cpp index 25236e62..b8249940 100644 --- a/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.cpp +++ b/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.cpp @@ -28,28 +28,35 @@ int64_t ObRespAnalyzeResult::to_string(char *buf, const int64_t buf_len) const { int64_t pos = 0; J_OBJ_START(); - J_KV(K_(is_decompressed), - K_(is_trans_completed), - K_(is_resp_completed), - K_(ending_type), - K_(is_partition_hit), - K_(has_new_sys_var), - K_(has_proxy_idc_name_user_var), - K_(is_server_db_reset), - K_(reserved_ok_len_of_mysql), - K_(reserved_ok_len_of_compressed), - K_(connection_id), - K_(scramble_buf), - K_(is_resultset_resp), - K_(server_capabilities_lower_.capability), - K_(ok_packet_action_type), - K_(last_ok_pkt_len), - K_(rewritten_last_ok_pkt_len), - K_(extra_info)); + J_KV(K(transmit_control_.is_decompressed_), + K(transmit_control_.is_trans_completed_), + K(transmit_control_.is_resp_completed_), + K(transmit_control_.reserved_ok_len_of_mysql_), + K(transmit_control_.reserved_ok_len_of_compressed_), + K(transmit_control_.is_last_ok_handled_), + K(transmit_control_.last_ok_pkt_len_), + K(transmit_control_.rewritten_last_ok_pkt_len_), + K(transmit_control_.ok_packet_action_type_), + K(format_.is_auth_switch_req_), + K(format_.is_resultset_resp_), + K(format_.is_server_db_reset_), + K(format_.ending_type_), + K(sysvar_.is_partition_hit_), + K(sysvar_.is_last_insert_id_changed_), + K(sysvar_.has_new_sys_var_), + K(sysvar_.has_proxy_idc_name_user_var_), + K(sysvar_.weak_read_hit_replica_), + K(sysvar_.server_trace_id_), + K(handshake_.connection_id_), + K(handshake_.server_capabilities_lower_.capability_), + K(handshake_.server_capabilities_upper_.capability_), + K(ob20_.is_server_trans_internal_routing_), + K(ob20_.extra_info_), + K(ob20_.flt_)); if (is_error_resp()) { J_COMMA(); - J_KV(K_(error_pkt)); + J_KV(K(error_.error_pkt_)); } J_OBJ_END(); return pos; diff --git a/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.h b/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.h index de3478f2..2387a07f 100644 --- a/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.h +++ b/src/obproxy/proxy/mysqllib/ob_resp_analyze_result.h @@ -45,259 +45,284 @@ class ObRespAnalyzeResult { public: ObRespAnalyzeResult() - : is_decompressed_(false), - is_trans_completed_(false), - is_resp_completed_(false), - ending_type_(ObMysqlRespEndingType::MAX_PACKET_ENDING_TYPE), - is_partition_hit_(false), - is_last_insert_id_changed_(false), - is_server_db_reset_(false), - has_new_sys_var_(false), - has_proxy_idc_name_user_var_(false), - connection_id_(0), - is_resultset_resp_(false), - ok_packet_action_type_(ObOKPacketActionType::OK_PACKET_ACTION_SEND), - reserved_ok_len_of_mysql_(0), - reserved_ok_len_of_compressed_(0), - last_ok_pkt_len_(0), - rewritten_last_ok_pkt_len_(0), - is_last_ok_handled_(false), - weak_read_hit_replica_(ObWeakReadHitReplica::MAX_REPLICA), - error_pkt_(), - error_pkt_buf_(), - server_trace_id_(), - extra_info_(), - flt_(), - is_server_trans_internal_routing_(false), - is_auth_switch_req_(false) - { - server_capabilities_upper_.capability_ = 0; - server_capabilities_lower_.capability_ = 0; - memset(scramble_buf_, 0, sizeof(scramble_buf_)); - memset(server_trace_id_buf_, 0, sizeof(server_trace_id_buf_)); - } + { + reset(); + memset(handshake_.scramble_buf_, 0, sizeof(handshake_.scramble_buf_)); + memset(sysvar_.server_trace_id_buf_, 0, sizeof(sysvar_.server_trace_id_buf_)); + } ~ObRespAnalyzeResult() { - error_pkt_buf_.reset(); - extra_info_.reset(); - flt_.reset(); + error_.error_pkt_buf_.reset(); + ob20_.extra_info_.reset(); + ob20_.flt_.reset(); } void reset(); - bool is_decompressed() const { return is_decompressed_; } - inline bool is_trans_completed() const { return is_trans_completed_; } - inline bool is_resp_completed() const { return is_resp_completed_; } - bool is_partition_hit() const { return is_partition_hit_; } - ObWeakReadHitReplica get_weak_read_hit_replica() const { return weak_read_hit_replica_; } - bool is_last_insert_id_changed() const { return is_last_insert_id_changed_; } - bool is_server_db_reset() const { return is_server_db_reset_; } - bool has_new_sys_var() const { return has_new_sys_var_; } - bool has_proxy_idc_name_user_var() const { return has_proxy_idc_name_user_var_; } - bool is_last_ok_handled() const { return is_last_ok_handled_; } + void reset_transmit_control(); + inline bool is_decompressed() const { return transmit_control_.is_decompressed_; } + inline void set_is_decompressed(bool v) { transmit_control_.is_decompressed_ = v; } + inline bool is_trans_completed() const { return transmit_control_.is_trans_completed_; } + inline void set_is_trans_completed(bool v) { transmit_control_.is_trans_completed_ = v; } + inline bool is_resp_completed() const { return transmit_control_.is_resp_completed_; } + inline void set_is_resp_completed(bool v) { transmit_control_.is_resp_completed_ = v; } + inline int64_t get_reserved_ok_len_of_mysql() const { return transmit_control_.reserved_ok_len_of_mysql_; } + inline void set_reserved_ok_len_of_mysql(int64_t len) { transmit_control_.reserved_ok_len_of_mysql_ = len; } + inline int64_t get_reserved_ok_len_of_compressed() const { return transmit_control_.reserved_ok_len_of_compressed_; } + inline void set_reserved_ok_len_of_compressed(int64_t len) { transmit_control_.reserved_ok_len_of_compressed_ = len; } + inline int64_t get_last_ok_pkt_len() const { return transmit_control_.last_ok_pkt_len_; } + inline void set_last_ok_pkt_len(int64_t len) { transmit_control_.last_ok_pkt_len_ = len; } + inline int64_t get_rewritten_last_ok_pkt_len() const { return transmit_control_.rewritten_last_ok_pkt_len_; } + inline void set_rewritten_last_ok_pkt_len(int64_t len) { transmit_control_.rewritten_last_ok_pkt_len_ = len; } + inline ObOKPacketActionType get_ok_packet_action_type() const { return transmit_control_.ok_packet_action_type_; } + inline void set_ok_packet_action_type(ObOKPacketActionType action) { transmit_control_.ok_packet_action_type_ = action; } + inline bool is_last_ok_handled() const { return transmit_control_.is_last_ok_handled_; } + inline void set_is_last_ok_handled(bool v) { transmit_control_.is_last_ok_handled_ = v; } + inline bool is_local_infile_0xfb_resp() const { return format_.ending_type_ == LOCAL_INFILE_ENDING_TYPE; } - bool is_error_resp() const { return ERROR_PACKET_ENDING_TYPE == ending_type_; } - bool is_ok_resp() const { return OK_PACKET_ENDING_TYPE == ending_type_; } - bool is_eof_resp() const { return EOF_PACKET_ENDING_TYPE == ending_type_; } - bool is_handshake_pkt() const { return HANDSHAKE_PACKET_ENDING_TYPE == ending_type_; } - bool is_auth_switch_req() const { return is_auth_switch_req_; } - obmysql::OMPKError &get_error_pkt() { return error_pkt_; } - const obmysql::OMPKError &get_error_pkt() const { return error_pkt_; } - uint16_t get_error_code() const { return error_pkt_.get_err_code(); } - common::ObString get_error_message() const { return error_pkt_.get_message(); } - int64_t get_reserved_ok_len_of_mysql() const { return reserved_ok_len_of_mysql_; } - int64_t get_reserved_ok_len_of_compressed() const { return reserved_ok_len_of_compressed_; } - int64_t get_last_ok_pkt_len() const { return last_ok_pkt_len_; } - int64_t get_rewritten_last_ok_pkt_len() const { return rewritten_last_ok_pkt_len_; } - uint32_t get_connection_id() const { return connection_id_; } - common::ObString get_scramble_string() const { return common::ObString::make_string(scramble_buf_); } - ObOKPacketActionType get_ok_packet_action_type() const { return ok_packet_action_type_; } - bool is_resultset_resp() const { return is_resultset_resp_; } - bool is_local_infile_0xfb_resp() const { return ending_type_ == LOCAL_INFILE_ENDING_TYPE; } - bool is_server_can_use_compress() const { return (1 == server_capabilities_lower_.capability_flag_.OB_SERVER_CAN_USE_COMPRESS); } - void set_server_trace_id(const common::ObString &trace_id); - bool support_ssl() const { return 1 == server_capabilities_lower_.capability_flag_.OB_SERVER_SSL; } - bool is_server_trans_internal_routing() const { return is_server_trans_internal_routing_; } - bool is_not_supported_error() const + inline bool is_error_resp() const { return ERROR_PACKET_ENDING_TYPE == format_.ending_type_; } + inline bool is_ok_resp() const { return OK_PACKET_ENDING_TYPE == format_.ending_type_; } + inline bool is_eof_resp() const { return EOF_PACKET_ENDING_TYPE == format_.ending_type_; } + inline bool is_handshake_pkt() const { return HANDSHAKE_PACKET_ENDING_TYPE == format_.ending_type_; } + inline void set_ending_type(ObMysqlRespEndingType type) { format_.ending_type_ = type; } + bool is_resultset_resp() const { return format_.is_resultset_resp_; } + inline void set_is_resultset_resp(bool v) { format_.is_resultset_resp_ = v; } + inline bool is_server_db_reset() const { return format_.is_server_db_reset_; } + inline void set_is_server_db_reset(bool v) { format_.is_server_db_reset_ = v; } + inline bool is_auth_switch_req() const { return format_.is_auth_switch_req_; } + inline void set_is_auth_switch_req(bool v) { format_.is_auth_switch_req_ = v; } + inline bool is_partition_hit() const { return sysvar_.is_partition_hit_; } + inline void set_is_partition_hit(bool v) { sysvar_.is_partition_hit_ = v; } + inline ObWeakReadHitReplica get_weak_read_hit_replica() const { return sysvar_.weak_read_hit_replica_; } + inline void set_weak_read_hit_replica(ObWeakReadHitReplica replica) { sysvar_.weak_read_hit_replica_ = replica; } + inline bool is_last_insert_id_changed() const { return sysvar_.is_last_insert_id_changed_; } + inline void set_is_last_insert_id_changed(bool v) { sysvar_.is_last_insert_id_changed_ = v; } + inline bool has_new_sys_var() const { return sysvar_.has_new_sys_var_; } + inline void set_has_new_sys_var(bool v) { sysvar_.has_new_sys_var_ = v; } + inline bool has_proxy_idc_name_user_var() const { return sysvar_.has_proxy_idc_name_user_var_; } + inline void set_has_proxy_idc_name_user_var(bool v) { sysvar_.has_proxy_idc_name_user_var_ = v; } + inline uint32_t get_connection_id() const { return handshake_.connection_id_; } + inline void set_connection_id(uint32_t id) { handshake_.connection_id_ = id; } + inline common::ObString get_scramble_string() const { return common::ObString::make_string(handshake_.scramble_buf_); } + inline char* get_scramble_buf() { return handshake_.scramble_buf_; } + inline const int64_t get_scramble_buf_len() const { return sizeof(handshake_.scramble_buf_); } + inline bool is_server_can_use_compress() const { return (1 == handshake_.server_capabilities_lower_.capability_flag_.OB_SERVER_CAN_USE_COMPRESS); } + inline bool support_ssl() const { return 1 == handshake_.server_capabilities_lower_.capability_flag_.OB_SERVER_SSL; } + inline void set_server_cap_lower(uint16_t cap_lower) { handshake_.server_capabilities_lower_.capability_ = cap_lower; } + inline void set_server_cap_upper(uint16_t cap_upper) { handshake_.server_capabilities_upper_.capability_ = cap_upper; } + inline uint32_t get_server_capability() const { return ((handshake_.server_capabilities_upper_.capability_ << 16) + | handshake_.server_capabilities_lower_.capability_); } + inline bool is_server_trans_internal_routing() const { return ob20_.is_server_trans_internal_routing_; } + inline void set_is_server_trans_internal_routing(bool v) { ob20_.is_server_trans_internal_routing_ = v; } + inline obmysql::OMPKError &get_error_pkt() { return error_.error_pkt_; } + inline const obmysql::OMPKError &get_error_pkt() const { return error_.error_pkt_; } + inline uint16_t get_error_code() const { return error_.error_pkt_.get_err_code(); } + inline common::ObString get_error_message() const { return error_.error_pkt_.get_message(); } + inline obutils::ObVariableLenBuffer &get_error_pkt_buf() { return error_.error_pkt_buf_; } + inline bool is_not_supported_error() const { - return (is_error_resp() && ER_NOT_SUPPORTED_YET == error_pkt_.get_err_code()); + return (is_error_resp() && ER_NOT_SUPPORTED_YET == error_.error_pkt_.get_err_code()); } - bool is_bad_db_error() const + inline bool is_bad_db_error() const { - return (is_error_resp() && ER_BAD_DB_ERROR == error_pkt_.get_err_code()); + return (is_error_resp() && ER_BAD_DB_ERROR == error_.error_pkt_.get_err_code()); } - bool is_unknown_tenant_error() const + inline bool is_unknown_tenant_error() const { - return (is_error_resp() && -common::OB_TENANT_NOT_EXIST == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_TENANT_NOT_EXIST == error_.error_pkt_.get_err_code()); } - bool is_tenant_not_in_server_error() const + inline bool is_tenant_not_in_server_error() const { - return (is_error_resp() && -common::OB_TENANT_NOT_IN_SERVER == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_TENANT_NOT_IN_SERVER == error_.error_pkt_.get_err_code()); } - bool is_cluster_not_match_error() const + inline bool is_cluster_not_match_error() const { - return (is_error_resp() && -common::OB_CLUSTER_NO_MATCH == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_CLUSTER_NO_MATCH == error_.error_pkt_.get_err_code()); } - bool is_server_init_error() const + inline bool is_server_init_error() const { - return (is_error_resp() && -common::OB_SERVER_IS_INIT == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_SERVER_IS_INIT == error_.error_pkt_.get_err_code()); } - bool is_server_stopping_error() const + inline bool is_server_stopping_error() const { - return (is_error_resp() && -common::OB_SERVER_IS_STOPPING == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_SERVER_IS_STOPPING == error_.error_pkt_.get_err_code()); } - bool is_session_entry_exist() const + inline bool is_session_entry_exist() const { - return (is_error_resp() && -common::OB_SESSION_ENTRY_EXIST == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_SESSION_ENTRY_EXIST == error_.error_pkt_.get_err_code()); } - bool is_net_packet_too_large_error() const + inline bool is_net_packet_too_large_error() const { - return (is_error_resp() && ER_NET_PACKET_TOO_LARGE == error_pkt_.get_err_code()); + return (is_error_resp() && ER_NET_PACKET_TOO_LARGE == error_.error_pkt_.get_err_code()); } - bool is_connect_error() const + inline bool is_connect_error() const { - return (is_error_resp() && -common::OB_CONNECT_ERROR == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_CONNECT_ERROR == error_.error_pkt_.get_err_code()); } - bool is_readonly_error() const + inline bool is_readonly_error() const { - return (is_error_resp() && -common::OB_ERR_READ_ONLY == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_ERR_READ_ONLY == error_.error_pkt_.get_err_code()); } - bool is_service_name_not_found_error() const + inline bool is_service_name_not_found_error() const { - return (is_error_resp() && -common::OB_SERVICE_NAME_NOT_FOUND == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_SERVICE_NAME_NOT_FOUND == error_.error_pkt_.get_err_code()); } - bool is_not_primary_tenant() const + inline bool is_not_primary_tenant() const { - return (is_error_resp() && -common::OB_NOT_PRIMARY_TENANT == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_NOT_PRIMARY_TENANT == error_.error_pkt_.get_err_code()); } - bool is_reroute_error() const + inline bool is_reroute_error() const { - return (is_error_resp() && -common::OB_ERR_REROUTE == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_ERR_REROUTE == error_.error_pkt_.get_err_code()); } - bool is_ora_fatal_error() const + inline bool is_ora_fatal_error() const { - return (is_error_resp() && -common::OB_ORA_FATAL_ERROR == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_ORA_FATAL_ERROR == error_.error_pkt_.get_err_code()); } - bool is_standby_weak_readonly_error() const + inline bool is_standby_weak_readonly_error() const { - return (is_error_resp() && -common::OB_STANDBY_WEAK_READ_ONLY == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_STANDBY_WEAK_READ_ONLY == error_.error_pkt_.get_err_code()); } - bool is_trans_free_route_not_supported_error() const + inline bool is_trans_free_route_not_supported_error() const { - return (is_error_resp() && -common::OB_TRANS_FREE_ROUTE_NOT_SUPPORTED == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_TRANS_FREE_ROUTE_NOT_SUPPORTED == error_.error_pkt_.get_err_code()); } - bool is_mysql_wrong_arguments_error() const + inline bool is_mysql_wrong_arguments_error() const { - return (is_error_resp() && ER_WRONG_ARGUMENTS == error_pkt_.get_err_code()); + return (is_error_resp() && ER_WRONG_ARGUMENTS == error_.error_pkt_.get_err_code()); } - bool is_internal_error() const + inline bool is_internal_error() const { - return (is_error_resp() && -common::OB_INTERNAL_ERROR == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_INTERNAL_ERROR == error_.error_pkt_.get_err_code()); } - bool is_client_session_killed_error() const + inline bool is_client_session_killed_error() const { - return (is_error_resp() && -common::OB_ERR_KILL_CLIENT_SESSION == error_pkt_.get_err_code()); + return (is_error_resp() && -common::OB_ERR_KILL_CLIENT_SESSION == error_.error_pkt_.get_err_code()); } - - inline uint32_t get_server_capability() const + inline Ob20ExtraInfo &get_extra_info() { return ob20_.extra_info_; } + inline const Ob20ExtraInfo &get_extra_info() const { return ob20_.extra_info_; } + inline common::FLTObjManage &get_flt() { return ob20_.flt_; } + inline const common::FLTObjManage &get_flt() const { return ob20_.flt_; } + inline const common::ObString& get_server_trace_id() const { return sysvar_.server_trace_id_; } + inline void set_server_trace_id(const common::ObString &trace_id) { - return ((server_capabilities_upper_.capability_ << 16) | server_capabilities_lower_.capability_); + if (trace_id.empty()) { + sysvar_.server_trace_id_.reset(); + } else { + common::ObString::obstr_size_t copy_len = std::min(trace_id.length(), + common::ObString::obstr_size_t(common::OB_MAX_TRACE_ID_LENGTH)); + if (copy_len >= 0) { // just for defense + MEMCPY(sysvar_.server_trace_id_buf_, trace_id.ptr(), copy_len); + sysvar_.server_trace_id_.assign(sysvar_.server_trace_id_buf_, copy_len); + } + } } + int64_t to_string(char *buf, const int64_t buf_len) const; +private: + struct { + /* control the flow of transmitting response used by tunnel and plugin */ + bool is_decompressed_: 1; // whether need tunnel or plugin to decompress + bool is_trans_completed_: 1; // whether transaction completed + bool is_resp_completed_: 1; // whether whole data of response be analyzed + bool is_last_ok_handled_: 1; // whether the last ok pkt be trimmed + int64_t reserved_ok_len_of_mysql_; // reserve the lastest one mysql pkt data in the tunnel for triming if neccessary + int64_t reserved_ok_len_of_compressed_; // reserve the lastest one mysql pkt data in the ObMysqlResponseCompressTransformPlugin for triming if neccessary + int64_t last_ok_pkt_len_; // the last ok pkt len including mysql header + int64_t rewritten_last_ok_pkt_len_; // the last ok pkt len including mysql header after rebuild it + ObOKPacketActionType ok_packet_action_type_; // rebuild or trim the last ok pkt + } transmit_control_; - Ob20ExtraInfo &get_extra_info() { return extra_info_; } - const Ob20ExtraInfo &get_extra_info() const { return extra_info_; } + /* format of the response */ + struct { + bool is_auth_switch_req_: 1; // if resp is auth switch request + bool is_resultset_resp_: 1; // if resultset then handle_resultset_resp() + bool is_server_db_reset_: 1; // if db reset(empty db) then disconnect all server session of current client session (ObMysqlTransact::handle_db_reset) + ObMysqlRespEndingType ending_type_; // if resp is eof/ok/err/handshake + } format_; - int64_t to_string(char *buf, const int64_t buf_len) const; + /* system variables in response */ + struct { + bool is_partition_hit_: 1; // if miss partition then update location cached and print log + bool is_last_insert_id_changed_: 1; // if changed then call set_lii_server_session() on client_session + bool has_new_sys_var_: 1; // if observer respond new sys var then add it to cluster resource (add_sys_var_renew_task) + bool has_proxy_idc_name_user_var_: 1; // if observer respond the sys var then update the client session ldc + ObWeakReadHitReplica weak_read_hit_replica_; // if observer respond the sys var then set + common::ObString server_trace_id_; // if observer respond the sys var then set + char server_trace_id_buf_[common::OB_MAX_TRACE_ID_LENGTH]; + } sysvar_; + + /* properties of handshake packet */ + struct { + uint32_t connection_id_; // for handshake pkt + obmysql::OMPKHandshake::ServerCapabilitiesLower server_capabilities_lower_; // for handshake pkt + obmysql::OMPKHandshake::ServerCapabilitiesUpper server_capabilities_upper_; // for handshake pkt + char scramble_buf_[obmysql::OMPKHandshake::SCRAMBLE_TOTAL_SIZE + 1]; // for handshake pkt + } handshake_; + + /* save whole error packet */ + struct { + obmysql::OMPKError error_pkt_; // for error pkt + obutils::ObVariableLenBuffer error_pkt_buf_; // for error pkt + + } error_; - /* define elements */ - bool is_decompressed_; - bool is_trans_completed_; - bool is_resp_completed_; - // indicate this mysql response ending pkt type(ok, error, eof, string eof,etc.) - ObMysqlRespEndingType ending_type_; - // indicate whether request data hit the target observer, - // if not, maybe need update location cache. - bool is_partition_hit_; - // indicate whether sql request make last_insert_id changed, - // used to record last_insert_id server session. - bool is_last_insert_id_changed_; - // indicate whether observer session level database was set to empty. - // when it was set to emtpy, all server session in conneciton pool shuold disconnect. - bool is_server_db_reset_; - // if observer add new sys var, this value will be set - bool has_new_sys_var_; - //extract from handshake pkt - // if observer add new sys var, this value will be set - bool has_proxy_idc_name_user_var_; - uint32_t connection_id_; - obmysql::OMPKHandshake::ServerCapabilitiesLower server_capabilities_lower_; - obmysql::OMPKHandshake::ServerCapabilitiesUpper server_capabilities_upper_; - bool is_resultset_resp_; - char scramble_buf_[obmysql::OMPKHandshake::SCRAMBLE_TOTAL_SIZE + 1]; - ObOKPacketActionType ok_packet_action_type_; - int64_t reserved_ok_len_of_mysql_; // Analyze MySQL Protocol response from server - int64_t reserved_ok_len_of_compressed_; // Analyze OB20/Compressed response from server - int64_t last_ok_pkt_len_; - int64_t rewritten_last_ok_pkt_len_; - bool is_last_ok_handled_; - ObWeakReadHitReplica weak_read_hit_replica_; + /* properties in oceanbase 2.0 */ + struct { + bool is_server_trans_internal_routing_: 1; // for oceanbase 2.0 + Ob20ExtraInfo extra_info_; // for oceanbase 2.0 extra info + common::FLTObjManage flt_; // for oceanbase 2.0 extra info's full-link trace info - // only one of structs below is valid, according to ending_type_ - obmysql::OMPKError error_pkt_; - obutils::ObVariableLenBuffer error_pkt_buf_; // only store error pkt - common::ObString server_trace_id_; - char server_trace_id_buf_[common::OB_MAX_TRACE_ID_LENGTH]; + } ob20_; - Ob20ExtraInfo extra_info_; - common::FLTObjManage flt_; - bool is_server_trans_internal_routing_; - bool is_auth_switch_req_; DISALLOW_COPY_AND_ASSIGN(ObRespAnalyzeResult); }; -inline void ObRespAnalyzeResult::reset() +// reset the members that control the flow of tunnel and plugin +inline void ObRespAnalyzeResult::reset_transmit_control() { - is_decompressed_ = false; - is_trans_completed_ = false; - is_resp_completed_ = false; - ending_type_ = MAX_PACKET_ENDING_TYPE; - is_partition_hit_ = true; - is_last_insert_id_changed_ = false; - is_server_db_reset_ = false; - has_new_sys_var_ = false; - has_proxy_idc_name_user_var_ = false; - connection_id_ = 0; - scramble_buf_[0] = '\0'; - ok_packet_action_type_ = OK_PACKET_ACTION_SEND; - reserved_ok_len_of_mysql_ = 0; - reserved_ok_len_of_compressed_ = 0; - last_ok_pkt_len_ = 0; - rewritten_last_ok_pkt_len_ = 0; - is_last_ok_handled_ = false; - is_server_trans_internal_routing_ = false; - is_auth_switch_req_ = false; - weak_read_hit_replica_ = MAX_REPLICA; - server_capabilities_lower_.capability_ = 0; - server_capabilities_upper_.capability_ = 0; - is_resultset_resp_ = false; - server_trace_id_.reset(); - error_pkt_buf_.reset(); - extra_info_.reset(); - flt_.reset(); + set_is_decompressed(false); + set_is_trans_completed(false); + set_is_resp_completed(false); + set_reserved_ok_len_of_mysql(0); + set_reserved_ok_len_of_compressed(0); + set_is_last_ok_handled(false); + set_last_ok_pkt_len(0); + set_rewritten_last_ok_pkt_len(0); + set_ok_packet_action_type(OK_PACKET_ACTION_SEND); } -inline void ObRespAnalyzeResult::set_server_trace_id(const common::ObString &trace_id) +inline void ObRespAnalyzeResult::reset() { - if (trace_id.empty()) { - server_trace_id_.reset(); - } else { - common::ObString::obstr_size_t copy_len = std::min(trace_id.length(), - common::ObString::obstr_size_t(common::OB_MAX_TRACE_ID_LENGTH)); - if (copy_len >= 0) { // just for defense - MEMCPY(server_trace_id_buf_, trace_id.ptr(), copy_len); - server_trace_id_.assign(server_trace_id_buf_, copy_len); - } - } -} + // transmit control + reset_transmit_control(); + // format + set_is_auth_switch_req(false); + set_is_resultset_resp(false); + set_is_server_db_reset(false); + set_ending_type(MAX_PACKET_ENDING_TYPE); + + // sys var + set_is_partition_hit(true); + set_is_last_insert_id_changed(false); + set_has_new_sys_var(false); + set_has_proxy_idc_name_user_var(false); + set_weak_read_hit_replica(MAX_REPLICA); + sysvar_.server_trace_id_.reset(); + sysvar_.server_trace_id_buf_[0] = '\0'; + + // handshake + set_connection_id(0); + set_server_cap_lower(0); + set_server_cap_upper(0); + handshake_.scramble_buf_[0] = '\0'; + + // error + error_.error_pkt_buf_.reset(); + + // ob20 + set_is_server_trans_internal_routing(false); + ob20_.extra_info_.reset(); + ob20_.flt_.reset(); +} } // end of namespace proxy } // end of namespace obproxy diff --git a/src/obproxy/proxy/mysqllib/ob_resp_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_resp_analyzer.cpp index f59c8d63..774e6f4c 100644 --- a/src/obproxy/proxy/mysqllib/ob_resp_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_resp_analyzer.cpp @@ -132,9 +132,9 @@ int ObRespAnalyzer::handle_analyze_last_mysql(ObRespAnalyzeResult &resp_result) } if (is_resp_completed) { - resp_result.is_trans_completed_ = is_trans_completed; - resp_result.is_resp_completed_ = is_resp_completed; - resp_result.ending_type_ = ending_type; + resp_result.set_is_trans_completed(is_trans_completed); + resp_result.set_is_resp_completed(is_resp_completed); + resp_result.set_ending_type(ending_type); is_mysql_stream_end_ = true; LOG_DEBUG("mark mysql stream end", K(is_mysql_stream_end_), K(is_trans_completed), K(is_resp_completed)); } @@ -234,7 +234,7 @@ int ObRespAnalyzer::handle_analyze_mysql_end(const char *pkt_end, ObRespAnalyzeR if (OK_PACKET_ACTION_CONSUME == ok_packet_action_type || OK_PACKET_ACTION_REWRITE == ok_packet_action_type) { if (OB_NOT_NULL(resp_result)) { - resp_result->last_ok_pkt_len_ = pkt_len + MYSQL_NET_HEADER_LENGTH; + resp_result->set_last_ok_pkt_len(pkt_len + MYSQL_NET_HEADER_LENGTH); } } else { // do nothing @@ -274,7 +274,7 @@ int ObRespAnalyzer::handle_analyze_mysql_end(const char *pkt_end, ObRespAnalyzeR is_last_eof_pkt = false; } } else if ((OB_MYSQL_COM_CHANGE_USER == req_cmd_ || OB_MYSQL_COM_LOGIN == req_cmd_) && resp_result != NULL) { - resp_result->is_auth_switch_req_ = true; + resp_result->set_is_auth_switch_req(true); } if (0 == eof_pkt_cnt) { @@ -376,7 +376,7 @@ int ObRespAnalyzer::handle_analyze_mysql_end(const char *pkt_end, ObRespAnalyzeR reserve_pkt_body_buf_.reset(); if (OB_LIKELY(NULL != resp_result)) { - resp_result->ok_packet_action_type_ = ok_packet_action_type; + resp_result->set_ok_packet_action_type(ok_packet_action_type); if (is_last_pkt() && OB_FAIL(handle_analyze_last_mysql(*resp_result))) { LOG_WDIAG("fail to handle_analyze_last_mysql", K(ret), K(*resp_result)); } @@ -524,7 +524,7 @@ void ObRespAnalyzer::handle_analyze_ob20_tailer(ObRespAnalyzeResult &resp_result LOG_DEBUG("analyze oceanbase 2.0 tailer", "tailer_crc", ob20_analyzer_.get_local_payload_checksum()); if (is_last_oceanbase_pkt()) { if (SIMPLE_MODE == analyze_mode_) { - resp_result.is_resp_completed_ = true; // 不设置会导致 tunnel 出现问题 + resp_result.set_is_resp_completed(true); // 不设置会导致 tunnel 出现问题 is_mysql_stream_end_ = true; } is_oceanbase_stream_end_ = true; @@ -620,7 +620,7 @@ int ObRespAnalyzer::handle_analyze_compressed_mysql_payload(const char *buf, con // the last compressed packet which contains the last mysql packet was analyzed if (is_last_compressed_pkt()) { if (SIMPLE_MODE == analyze_mode_) { - resp_result.is_resp_completed_ = true; // 不设置会导致 tunnel 出现问题 + resp_result.set_is_resp_completed(true);// 不设置会导致 tunnel 出现问题 is_mysql_stream_end_ = true; } is_compressed_stream_end_ = true; // mark compressed mysql packet stream end @@ -655,7 +655,7 @@ int ObRespAnalyzer::handle_analyze_mysql_payload(const char *buf, const int64_t } else if (OB_FAIL(stream_analyze_mysql(seg, resp_result))) { LOG_WDIAG("fail to stream_analyze_mysql", K(ret), K(seg), K(resp_result)); } else { - resp_result.reserved_ok_len_of_compressed_ = reserved_len_; + resp_result.set_reserved_ok_len_of_compressed(reserved_len_); } return ret; @@ -691,7 +691,7 @@ int ObRespAnalyzer::handle_analyze_compressed_payload(const char *buf, const int if (OB_FAIL(stream_analyze_mysql(seg, resp_result))) { LOG_WDIAG("fail to stream_analyze_mysql", K(ret), K(seg), K(resp_result)); } else { - resp_result.reserved_ok_len_of_compressed_ = reserved_len_; + resp_result.set_reserved_ok_len_of_compressed(reserved_len_); } } } @@ -703,7 +703,7 @@ int ObRespAnalyzer::handle_analyze_ob20_extra_info(const char *buf, const int64_ { int ret = OB_SUCCESS; const Ob20ProtocolFlags &flags = ob20_analyzer_.get_header().flag_; - FLTObjManage &flt = resp_result.flt_; + FLTObjManage &flt = resp_result.get_flt(); Ob20ExtraInfo &extra_info = resp_result.get_extra_info(); // only process extra info in the last oceanbase 2.0 packet if (!extra_info.extra_info_buf_.is_inited() && @@ -807,7 +807,7 @@ int ObRespAnalyzer::handle_analyze_ob20_header(ObRespAnalyzeResult &resp_result) } if (header.flag_.is_last_packet()) { - resp_result.is_server_trans_internal_routing_ = header.flag_.is_trans_internal_routing(); + resp_result.set_is_server_trans_internal_routing(header.flag_.is_trans_internal_routing()); } LOG_DEBUG("analyze oceanbase 2.0 header", "magic_num", header.magic_num_, @@ -859,10 +859,10 @@ int ObRespAnalyzer::analyze_one_packet_header( // compressed mysql or oceanabse 2.0 if (ObProxyProtocol::PROTOCOL_NORMAL != protocol_) { if (is_last_pkt(result)) { // only has one compressed packet - resp_result.is_resultset_resp_ = false; + resp_result.set_is_resultset_resp(false); analyze_mode_ = DECOMPRESS_MODE; } else { - resp_result.is_resultset_resp_ = true; + resp_result.set_is_resultset_resp(true); // only works for oceanbase 2.0 exclude prepare and prepare-execute if (ObProxyProtocol::PROTOCOL_OB20 == protocol_ && !params_.is_compressed_ @@ -919,7 +919,7 @@ int ObRespAnalyzer::analyze_one_packet_header( // if it is result + eof + error + ok, it may be not.... // treat multi stmt as result set protocol bool is_resultset_resp = ObRespAnalyzerUtil::is_resultset_resp(req_cmd_, result.mysql_header_.pkt_type_, server_status); - resp_result.is_resultset_resp_ = is_resultset_resp; + resp_result.set_is_resultset_resp(is_resultset_resp); LOG_DEBUG("after analyze one response packet", "packet_type", result.mysql_header_.pkt_type_, "cmd", req_cmd_, @@ -965,7 +965,7 @@ int ObRespAnalyzer::analyze_all_packets( ret = OB_ERR_UNEXPECTED; LOG_WDIAG("written_len is not expected", K(written_len), K(tmp_read_avail), K(ret)); } else { - resp_result.is_decompressed_ = true; + resp_result.set_is_decompressed(true); LOG_DEBUG("all mysql pkts decompressed", K(result)); } } @@ -984,12 +984,12 @@ int ObRespAnalyzer::analyze_all_packets( if (OB_SUCC(ret)) { if (ANALYZE_DONE == result.status_) { if (is_stream_end()) { - if (OB_MYSQL_COM_LOGIN == req_cmd_ || OB_MYSQL_COM_CHANGE_USER == req_cmd_) { - resp_result.is_resultset_resp_ = false; - } else if (resp_result.is_eof_resp() - || ((OB_MYSQL_COM_STMT_PREPARE == req_cmd_ || OB_MYSQL_COM_STMT_PREPARE_EXECUTE == req_cmd_) - && !resp_result.is_error_resp())) { - resp_result.is_resultset_resp_ = true; + if (OB_MYSQL_COM_LOGIN == req_cmd_ || OB_MYSQL_COM_CHANGE_USER == req_cmd_) { + resp_result.set_is_resultset_resp(false); + } else if (resp_result.is_eof_resp() + || ((OB_MYSQL_COM_STMT_PREPARE == req_cmd_ || OB_MYSQL_COM_STMT_PREPARE_EXECUTE == req_cmd_) + && !resp_result.is_error_resp())) { + resp_result.set_is_resultset_resp(true); } } else { if(OB_FAIL(ObProto20Utils::analyze_first_mysql_packet(reader, result))) { @@ -998,7 +998,7 @@ int ObRespAnalyzer::analyze_all_packets( // if it is result + eof + error + ok, it may be not.... // treat multi stmt as result set protocol uint8_t pkt_type = result.mysql_header_.pkt_type_; - resp_result.is_resultset_resp_ = + resp_result.set_is_resultset_resp( ((OB_MYSQL_COM_QUERY == req_cmd_ || OB_MYSQL_COM_STMT_EXECUTE == req_cmd_ || OB_MYSQL_COM_STMT_FETCH == req_cmd_) @@ -1009,10 +1009,10 @@ int ObRespAnalyzer::analyze_all_packets( && MYSQL_LOCAL_INFILE_TYPE != pkt_type) || OB_MYSQL_COM_STMT_PREPARE == req_cmd_ || OB_MYSQL_COM_STMT_PREPARE_EXECUTE == req_cmd_ - || OB_MYSQL_COM_FIELD_LIST == req_cmd_; + || OB_MYSQL_COM_FIELD_LIST == req_cmd_); } LOG_DEBUG("analyze OB20 first response finished", K(result), - "is_resultset_resp", resp_result.is_resultset_resp_); + "is_resultset_resp", resp_result.is_resultset_resp()); } } } @@ -1025,10 +1025,10 @@ int ObRespAnalyzer::analyze_all_packets( if (is_stream_end()) { result.status_ = ANALYZE_DONE; if (OB_MYSQL_COM_LOGIN == req_cmd_ || OB_MYSQL_COM_CHANGE_USER == req_cmd_) { - resp_result.is_resultset_resp_ = false; + resp_result.set_is_resultset_resp(false); } else if (resp_result.is_eof_resp() || ((OB_MYSQL_COM_STMT_PREPARE == req_cmd_ || OB_MYSQL_COM_STMT_PREPARE_EXECUTE == req_cmd_) && !resp_result.is_error_resp())) { - resp_result.is_resultset_resp_ = true; + resp_result.set_is_resultset_resp(true); } } else { result.status_ = ANALYZE_CONT; @@ -1556,7 +1556,7 @@ int ObRespAnalyzer::analyze_error_pkt(ObRespAnalyzeResult *resp_result) int ret = OB_SUCCESS; if (NULL != resp_result) { obutils::ObVariableLenBuffer &content_buf - = resp_result->error_pkt_buf_; + = resp_result->get_error_pkt_buf(); if (OB_FAIL(build_packet_content(content_buf))) { LOG_WDIAG("fail to build packet content", K(ret)); } else { @@ -1633,24 +1633,24 @@ int ObRespAnalyzer::analyze_hanshake_pkt(ObRespAnalyzeResult *resp_result) if (OB_FAIL(packet.decode())) { LOG_WDIAG("decode packet failed", K(ret)); } else { - resp_result->server_capabilities_lower_.capability_ = packet.get_server_capability_lower(); - resp_result->server_capabilities_upper_.capability_ = packet.get_server_capability_upper(); - resp_result->connection_id_ = packet.get_thread_id(); + resp_result->set_server_cap_lower(packet.get_server_capability_lower()); + resp_result->set_server_cap_upper(packet.get_server_capability_upper()); + resp_result->set_connection_id(packet.get_thread_id()); int64_t copy_len = 0; - if (OB_FAIL(packet.get_scramble(resp_result->scramble_buf_, - static_cast(sizeof(resp_result->scramble_buf_)), + if (OB_FAIL(packet.get_scramble(resp_result->get_scramble_buf(), + resp_result->get_scramble_buf_len(), copy_len))) { LOG_WDIAG("fail to get scramble", K(ret)); - } else if (OB_UNLIKELY(copy_len >= static_cast(sizeof(resp_result->scramble_buf_)))) { + } else if (OB_UNLIKELY(copy_len >= resp_result->get_scramble_buf_len())) { ret = OB_ERR_UNEXPECTED; LOG_WDIAG("copy_len is too bigger", K(copy_len), K(ret)); } else { - resp_result->scramble_buf_[copy_len] = '\0'; + resp_result->get_scramble_buf()[copy_len] = '\0'; + LOG_DEBUG("succ to get connection id and scramble ", + "connection_id", resp_result->get_connection_id(), + "scramble_buf", resp_result->get_scramble_buf()); } } - LOG_DEBUG("succ to get connection id and scramble ", - "connection_id", resp_result->connection_id_, - "scramble_buf", resp_result->scramble_buf_); } return ret; diff --git a/src/obproxy/proxy/mysqllib/ob_resp_analyzer.h b/src/obproxy/proxy/mysqllib/ob_resp_analyzer.h index 676bda49..50685f03 100644 --- a/src/obproxy/proxy/mysqllib/ob_resp_analyzer.h +++ b/src/obproxy/proxy/mysqllib/ob_resp_analyzer.h @@ -215,7 +215,7 @@ int ObRespAnalyzer::stream_analyze_packets(ObString data, ObRespAnalyzeResult &r ret = stream_analyze_compressed_mysql(data, resp_result); } else if (ObProxyProtocol::PROTOCOL_NORMAL == protocol_) { ret = stream_analyze_mysql(data, resp_result); - resp_result.reserved_ok_len_of_mysql_ = reserved_len_; + resp_result.set_reserved_ok_len_of_mysql(reserved_len_); } else { ret = OB_ERR_UNEXPECTED; OB_LOG(WDIAG, "unexpect analyze protocol", K(protocol_), K(ret)); @@ -341,7 +341,7 @@ void ObRespAnalyzer::dealloc_mysql_pkt_buf() void ObRespAnalyzer::handle_analyze_ob20_extra_info_header(ObRespAnalyzeResult &resp_result) { if (STREAM_OCEANBASE20_EXTRA_INFO == stream_ob20_state_) { - resp_result.extra_info_.extra_info_buf_.reset(); + resp_result.get_extra_info().extra_info_buf_.reset(); } } diff --git a/src/obproxy/proxy/plugins/ob_mysql_request_execute_transform_plugin.h b/src/obproxy/proxy/plugins/ob_mysql_request_execute_transform_plugin.h index 6994d604..c3c6630f 100644 --- a/src/obproxy/proxy/plugins/ob_mysql_request_execute_transform_plugin.h +++ b/src/obproxy/proxy/plugins/ob_mysql_request_execute_transform_plugin.h @@ -108,7 +108,7 @@ class ObMysqlRequestExecuteGlobalPlugin : public ObGlobalPlugin { PROXY_API_LOG(DEBUG, "need_enable_plugin", "request_content_length", sm->trans_state_.trans_info_.request_content_length_, - "mysql_cmd", get_mysql_cmd_str(sm->trans_state_.trans_info_.sql_cmd_)); + "mysql_cmd", ObProxyParserUtils::get_sql_cmd_name(sm->trans_state_.trans_info_.sql_cmd_)); bool bret = false; diff --git a/src/obproxy/proxy/plugins/ob_mysql_request_prepare_transform_plugin.h b/src/obproxy/proxy/plugins/ob_mysql_request_prepare_transform_plugin.h index 168c24dd..007b4f95 100644 --- a/src/obproxy/proxy/plugins/ob_mysql_request_prepare_transform_plugin.h +++ b/src/obproxy/proxy/plugins/ob_mysql_request_prepare_transform_plugin.h @@ -98,7 +98,7 @@ class ObMysqlRequestPrepareGlobalPlugin : public ObGlobalPlugin { PROXY_API_LOG(DEBUG, "need_enable_plugin", "request_content_length", sm->trans_state_.trans_info_.request_content_length_, - "mysql_cmd", get_mysql_cmd_str(sm->trans_state_.trans_info_.sql_cmd_)); + "mysql_cmd", ObProxyParserUtils::get_sql_cmd_name(sm->trans_state_.trans_info_.sql_cmd_)); return (!sm->trans_state_.trans_info_.client_request_.is_internal_cmd() && ObMysqlTransact::need_use_tunnel(sm->trans_state_) && (obmysql::OB_MYSQL_COM_STMT_PREPARE == sm->trans_state_.trans_info_.sql_cmd_ diff --git a/src/obproxy/proxy/plugins/ob_mysql_response_compress_transform_plugin.cpp b/src/obproxy/proxy/plugins/ob_mysql_response_compress_transform_plugin.cpp index 7c3773df..96c17f93 100644 --- a/src/obproxy/proxy/plugins/ob_mysql_response_compress_transform_plugin.cpp +++ b/src/obproxy/proxy/plugins/ob_mysql_response_compress_transform_plugin.cpp @@ -89,9 +89,7 @@ int ObMysqlResponseCompressTransformPlugin::consume(event::ObIOBufferReader *rea // the ObRespAnalyzeResult will be use both by tunnel and this class, and if tunnel analyze finished, // is_resp_completed_ will set true, here we set back to false to ensure compress_analyzer // work happy. - if (resp_result.is_resp_completed()) { - resp_result.is_resp_completed_ = false; - } + resp_result.set_is_resp_completed(false); int64_t plugin_decompress_response_begin = sm_->get_based_hrtime(); read_avail = local_reader_->read_avail(); @@ -108,8 +106,8 @@ int ObMysqlResponseCompressTransformPlugin::consume(event::ObIOBufferReader *rea milestone_diff(plugin_decompress_response_begin, plugin_decompress_response_end); // save flt from response analyze result to sm - sm_->save_response_flt_result_to_sm(resp_result.flt_); - if (OB_FAIL(sm_->handle_feedback_proxy_info(resp_result.extra_info_))) { + sm_->save_response_flt_result_to_sm(resp_result.get_flt()); + if (OB_FAIL(sm_->handle_feedback_proxy_info(resp_result.get_extra_info()))) { PROXY_API_LOG(WDIAG, "fail to handle feedback proxy info", "sm_id", sm_->sm_id_, "result", resp_result, K(ret)); } @@ -123,7 +121,7 @@ int ObMysqlResponseCompressTransformPlugin::consume(event::ObIOBufferReader *rea if (OB_FAIL(sm_->trim_ok_packet(*local_transfer_reader_))) { PROXY_API_LOG(WDIAG, "fail to trim last ok packet", K(ret)); } else { - resp_result.is_last_ok_handled_ = true; + resp_result.set_is_last_ok_handled(true); } sm_->print_mysql_complete_log(NULL); } diff --git a/src/obproxy/proxy/plugins/ob_mysql_response_prepare_transform_plugin.h b/src/obproxy/proxy/plugins/ob_mysql_response_prepare_transform_plugin.h index 4682c7bf..96c78bc9 100644 --- a/src/obproxy/proxy/plugins/ob_mysql_response_prepare_transform_plugin.h +++ b/src/obproxy/proxy/plugins/ob_mysql_response_prepare_transform_plugin.h @@ -106,7 +106,7 @@ class ObMysqlResponsePrepareGlobalPlugin : public ObGlobalPlugin { PROXY_API_LOG(DEBUG, "need_enable_plugin", "send action", sm->trans_state_.current_.send_action_, - "mysql_cmd", get_mysql_cmd_str(sm->trans_state_.trans_info_.sql_cmd_)); + "mysql_cmd", ObProxyParserUtils::get_sql_cmd_name(sm->trans_state_.trans_info_.sql_cmd_)); return (!sm->trans_state_.trans_info_.client_request_.is_internal_cmd() && ObMysqlTransact::SERVER_SEND_REQUEST == sm->trans_state_.current_.send_action_ diff --git a/src/rpc/obmysql/ob_mysql_packet.cpp b/src/rpc/obmysql/ob_mysql_packet.cpp index 1eb060b7..dcf00f00 100644 --- a/src/rpc/obmysql/ob_mysql_packet.cpp +++ b/src/rpc/obmysql/ob_mysql_packet.cpp @@ -180,80 +180,5 @@ char const *get_info_func_name(const ObInformationFunctions func) return str; } -char const *get_mysql_cmd_str(ObMySQLCmd mysql_cmd) -{ - const char *str = "invalid"; - static const char *mysql_cmd_array[OB_MYSQL_COM_MAX_NUM] = - { - "Sleep", // OB_MYSQL_COM_SLEEP, - "Quit", // OB_MYSQL_COM_QUIT, - "Init DB", // OB_MYSQL_COM_INIT_DB, - "Query", // OB_MYSQL_COM_QUERY, - "Field List", // OB_MYSQL_COM_FIELD_LIST, - - "Create DB", // OB_MYSQL_COM_CREATE_DB, - "Drop DB", // OB_MYSQL_COM_DROP_DB, - "Refresh", // OB_MYSQL_COM_REFRESH, - "Shutdown", // OB_MYSQL_COM_SHUTDOWN, - "Statistics", // OB_MYSQL_COM_STATISTICS, - - "Process info", // OB_MYSQL_COM_PROCESS_INFO, - "Connect", // OB_MYSQL_COM_CONNECT, - "Process kill", // OB_MYSQL_COM_PROCESS_KILL, - "Debug", // OB_MYSQL_COM_DEBUG, - "Ping", // OB_MYSQL_COM_PING, - - "Time", // OB_MYSQL_COM_TIME, - "Delayed insert", // OB_MYSQL_COM_DELAYED_INSERT, - "Change user", // OB_MYSQL_COM_CHANGE_USER, - "Binlog dump", // OB_MYSQL_COM_BINLOG_DUMP, - - "Table dump", // OB_MYSQL_COM_TABLE_DUMP, - "Connect out", // OB_MYSQL_COM_CONNECT_OUT, - "Register slave", // OB_MYSQL_COM_REGISTER_SLAVE, - - "Prepare", // OB_MYSQL_COM_STMT_PREPARE, - "Execute", // OB_MYSQL_COM_STMT_EXECUTE, - "Stmt send long data", // OB_MYSQL_COM_STMT_SEND_LONG_DATA, - "Close stmt", // OB_MYSQL_COM_STMT_CLOSE, - - "Stmt reset", // OB_MYSQL_COM_STMT_RESET, - "Set option", // OB_MYSQL_COM_SET_OPTION, - "Stmt fetch", // OB_MYSQL_COM_STMT_FETCH, - "Daemon", // OB_MYSQL_COM_DAEMON, - - "Binlog dump gtid", // OB_MYSQL_COM_BINLOG_DUMP_GTID, - "Reset connection", // OB_MYSQL_COM_RESET_CONNECTION, - "Clone", // OB_MYSQL_COM_CLONE - "Subscribe group replication stream", // OB_MYSQL_COM_SUBSCRIBE_GROUP_REPLICATION_STREAM - - "End", // OB_MYSQL_COM_END, - - "Prepare execute", // OB_MYSQL_COM_STMT_PREPARE_EXECUTE, - "Send piece data", - "Get piece data", - - "Delete session", // OB_MYSQL_COM_DELETE_SESSION - "Handshake", // OB_MYSQL_COM_HANDSHAKE, - "Login" // OB_MYSQL_COM_LOGIN, - "Transfer file content", // OB_MYSQL_COM_LOAD_DATA_TRANSFER_CONTENT - "Auth switch response", // OB_MYSQL_COM_AUTH_SWITCH_RESP - - "Max" // OB_MYSQL_COM_MAX_NUM - }; - - if (mysql_cmd >= OB_MYSQL_COM_SLEEP && mysql_cmd <= OB_MYSQL_COM_END) { - str = mysql_cmd_array[mysql_cmd]; - } else if (mysql_cmd >= OB_MYSQL_COM_STMT_PREPARE_EXECUTE && mysql_cmd <= OB_MYSQL_COM_STMT_GET_PIECE_DATA) { - int start = OB_MYSQL_COM_END + 1; - str = mysql_cmd_array[mysql_cmd - OBPROXY_NEW_MYSQL_CMD_START + start]; - } else if (mysql_cmd >= OB_MYSQL_COM_DELETE_SESSION && mysql_cmd <= OB_MYSQL_COM_MAX_NUM) { - int start = OB_MYSQL_COM_END + 4; - str = mysql_cmd_array[mysql_cmd - OBPROXY_MYSQL_CMD_START + start]; - } - return str; - -} - } // end of namespace obmysql } // end of namespace oceanbase diff --git a/src/rpc/obmysql/ob_mysql_packet.h b/src/rpc/obmysql/ob_mysql_packet.h index 80f1836e..8942ae2f 100644 --- a/src/rpc/obmysql/ob_mysql_packet.h +++ b/src/rpc/obmysql/ob_mysql_packet.h @@ -227,7 +227,6 @@ enum ObServerStatusFlagsPos OB_SERVER_SESSION_STATE_CHANGED_POS, }; -char const *get_mysql_cmd_str(ObMySQLCmd mysql_cmd); //http://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_current-user enum ObInformationFunctions