Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support min_gtid annotation #2469

Merged
merged 3 commits into from
Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions include/query_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class Query_Processor_Output {
int log;
int firewall_whitelist_mode;
char *comment; // #643
char *min_gtid;
std::string *new_query;
void * operator new(size_t size) {
return l_alloc(size);
Expand Down Expand Up @@ -188,6 +189,7 @@ class Query_Processor_Output {
error_msg=NULL;
OK_msg=NULL;
comment=NULL; // #643
min_gtid=NULL;
firewall_whitelist_mode = WUS_NOT_FOUND;
}
void destroy() {
Expand All @@ -199,6 +201,10 @@ class Query_Processor_Output {
free(OK_msg);
OK_msg=NULL;
}
if (min_gtid) {
free(min_gtid);
min_gtid = NULL;
}
if (comment) { // #643
free(comment);
}
Expand Down Expand Up @@ -305,7 +311,7 @@ class Query_Processor {
~Query_Processor();
void print_version();
void reset_all(bool lock=true);
void wrlock(); // explicit write lock, to be used in multi-isert
void wrlock(); // explicit write lock, to be used in multi-insert
void wrunlock(); // explicit write unlock
bool insert(QP_rule_t *qr, bool lock=true); // insert a new rule. Uses a generic void pointer to a structure that may vary depending from the Query Processor
QP_rule_t * new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *client_addr, char *proxy_addr, int proxy_port, char *digest, char *match_digest, char *match_pattern, bool negate_match_pattern, char *re_modifiers, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int cache_empty_result, int cache_timeout, int reconnect, int timeout, int retries, int delay, int next_query_flagIN, int mirror_hostgroup, int mirror_flagOUT, char *error_msg, char *OK_msg, int sticky_conn, int multiplex, int gtid_from_hostgroup, int log, bool apply, char *comment); // to use a generic query rule struct, this is generated by this function and returned as generic void pointer
Expand All @@ -319,7 +325,7 @@ class Query_Processor {
void end_thread();
void commit(); // this applies all the changes in memory
SQLite3_result * get_current_query_rules();
SQLite3_result * get_stats_query_rules();
SQLite3_result * get_stats_query_rules();

void update_query_processor_stats();

Expand All @@ -329,6 +335,7 @@ class Query_Processor {
void query_parser_free(SQP_par_t *qp);
char * get_digest_text(SQP_par_t *qp);
uint64_t get_digest(SQP_par_t *qp);
bool is_valid_gtid(char *gtid, size_t gtid_len);

void update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info, MySQL_Session *sess);

Expand Down
29 changes: 20 additions & 9 deletions lib/MySQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6390,11 +6390,11 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C

void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection() {
// Get a MySQL Connection

with_gtid = false;

MySQL_Connection *mc=NULL;
MySQL_Backend * _gtid_from_backend = NULL;
char uuid[64];
char * gtid_uuid=NULL;
uint64_t trxid = 0;
unsigned long long now_us = 0;
if (qpo->max_lag_ms >= 0) {
Expand All @@ -6410,22 +6410,33 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
}
}
if (session_fast_forward == false) {
if (qpo->gtid_from_hostgroup >= 0) {
if (qpo->min_gtid) {
gtid_uuid = qpo->min_gtid;
} else if (qpo->gtid_from_hostgroup >= 0) {
_gtid_from_backend = find_backend(qpo->gtid_from_hostgroup);
if (_gtid_from_backend) {
if (_gtid_from_backend->gtid_uuid[0]) {
with_gtid = true;
gtid_uuid = _gtid_from_backend->gtid_uuid;
}
}
}
if (with_gtid) {
int l = index(_gtid_from_backend->gtid_uuid,':') - _gtid_from_backend->gtid_uuid;
trxid = strtoull(index(_gtid_from_backend->gtid_uuid,':')+1, NULL, 10);

char *sep_pos = NULL;
if (gtid_uuid != NULL) {
sep_pos = index(gtid_uuid,':');
if (sep_pos == NULL) {
gtid_uuid = NULL; // gtid is invalid
}
}

if (gtid_uuid != NULL) {
int l = sep_pos - gtid_uuid;
trxid = strtoull(sep_pos+1, NULL, 10);
int m;
int n=0;
for (m=0; m<l; m++) {
if (_gtid_from_backend->gtid_uuid[m] != '-') {
uuid[n]=_gtid_from_backend->gtid_uuid[m];
if (gtid_uuid[m] != '-') {
uuid[n]=gtid_uuid[m];
n++;
}
}
Expand Down
29 changes: 29 additions & 0 deletions lib/Query_Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2779,6 +2779,17 @@ bool Query_Processor::query_parser_first_comment(Query_Processor_Output *qpo, ch
}
}
}
if (!strcasecmp(key,"min_gtid")) {
size_t l = strlen(value);
if (is_valid_gtid(value, l)) {
char *buf=(char*)malloc(l+1);
strncpy(buf, value, l);
buf[l+1] = '\0';
qpo->min_gtid = buf;
} else {
proxy_warning("Invalid gtid value=%s\n", value);
}
}
}

proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Variables in comment %s , key=%s , value=%s\n", token, key, value);
Expand All @@ -2789,6 +2800,24 @@ bool Query_Processor::query_parser_first_comment(Query_Processor_Output *qpo, ch
return ret;
}

bool Query_Processor::is_valid_gtid(char *gtid, size_t gtid_len) {
if (gtid_len < 3) {
return false;
}
char *sep_pos = index(gtid, ':');
if (sep_pos == NULL) {
return false;
}
size_t uuid_len = sep_pos - gtid;
if (uuid_len < 1) {
return false;
}
if (gtid_len < uuid_len + 2) {
return false;
}
return true;
}

void Query_Processor::query_parser_free(SQP_par_t *qp) {
if (qp->digest_text) {
if (qp->digest_text != qp->buf) {
Expand Down