From 75606bb94db205f6fc5ea776f41969a61fb924b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 21:20:15 +0200 Subject: [PATCH 1/6] Simplifying MySQL_Thread::run() phase 1 --- include/MySQL_Session.h | 1 + include/MySQL_Thread.h | 8 +++ lib/MySQL_Session.cpp | 10 +++ lib/MySQL_Thread.cpp | 154 +++++++++++++++++++++++----------------- 4 files changed, 109 insertions(+), 64 deletions(-) diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index ecd2f8ffdf..6c0b057062 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -263,6 +263,7 @@ class MySQL_Session void generate_proxysql_internal_session_json(json &); bool known_query_for_locked_on_hostgroup(uint64_t); void unable_to_parse_set_statement(bool *); + bool has_any_backend(); }; #define KILL_QUERY 1 diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index c69fb53f19..5ccf4be108 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -98,6 +98,14 @@ class MySQL_Thread Session_Regex **match_regexes; +#ifdef IDLE_THREADS + void worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr); + void worker_threads_get_sessions_from_idle_threads(); +#endif // IDLE_THREADS + + unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); + bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); + protected: int nfds; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 4426979278..785554f65e 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -7226,3 +7226,13 @@ void MySQL_Session::unable_to_parse_set_statement(bool *lock_hostgroup) { } } +bool MySQL_Session::has_any_backend() { + for (unsigned int j=0;j < mybes->len;j++) { + MySQL_Backend *tmp_mybe=(MySQL_Backend *)mybes->index(j); + MySQL_Data_Stream *__myds=tmp_mybe->server_myds; + if (__myds->myconn) { + return true; + } + } + return false; +} diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index e97132a0a7..fec7aa8667 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3879,39 +3879,9 @@ void MySQL_Thread::run() { // here we try to move it to the maintenance thread if (myds->myds_type==MYDS_FRONTEND && myds->sess) { if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) { - unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; - if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { - // make sure data stream has no pending data out and session is not throttled (#1939) - // because epoll thread does not handle data stream with data out - if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { - unsigned int j; - int conns=0; - for (j=0;jsess->mybes->len;j++) { - MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); - MySQL_Data_Stream *__myds=tmp_mybe->server_myds; - if (__myds->myconn) { - conns++; - } - } - unsigned long long idle_since = curtime - myds->sess->IdleTime(); - if (conns==0) { - mypolls.remove_index_fast(n); - myds->mypolls=NULL; - unsigned int i; - for (i=0;ilen;i++) { - MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); - if (mysess==myds->sess) { - mysess->thread=NULL; - unregister_session(i); - mysess->idle_since = idle_since; - idle_mysql_sessions->add(mysess); - break; - } - } - n--; // compensate mypolls.remove_index_fast(n) and n++ of loop - continue; - } - } + if (move_session_to_idle_mysql_sessions(myds, n)) { + n--; // compensate mypolls.remove_index_fast(n) and n++ of loop + continue; } } } @@ -3979,37 +3949,8 @@ void MySQL_Thread::run() { if (idle_maintenance_thread==false) { int r=rand()%(GloMTH->num_threads); MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker; - if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { - pthread_mutex_lock(&thr->myexchange.mutex_idles); - bool empty_queue=true; - if (thr->myexchange.idle_mysql_sessions->len) { - // there are already sessions in the queues. We assume someone already notified worker 0 - empty_queue=false; - } - while (idle_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); - thr->myexchange.idle_mysql_sessions->add(mysess); - } - pthread_mutex_unlock(&thr->myexchange.mutex_idles); - if (empty_queue==true) { - unsigned char c=1; - int fd=thr->pipefd[1]; - if (write(fd,&c,1)==-1) { - //proxy_error("Error while signaling maintenance thread\n"); - } - } - } - pthread_mutex_lock(&myexchange.mutex_resumes); - if (myexchange.resume_mysql_sessions->len) { - //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; - while (myexchange.resume_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); - register_session(mysess, false); - MySQL_Data_Stream *myds=mysess->client_myds; - mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); - } - } - pthread_mutex_unlock(&myexchange.mutex_resumes); + worker_thread_assigns_sessions_to_idle_thread(thr); + worker_threads_get_sessions_from_idle_threads(); } } @@ -4342,6 +4283,58 @@ void MySQL_Thread::run() { } } +unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) { + int i=0; + for (i=0;ilen;i++) { + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); + if (mysess==sess) { + return i; + } + } + return i; +} + +#ifdef IDLE_THREADS +void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr) { + if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { + pthread_mutex_lock(&thr->myexchange.mutex_idles); + bool empty_queue=true; + if (thr->myexchange.idle_mysql_sessions->len) { + // there are already sessions in the queues. We assume someone already notified worker 0 + empty_queue=false; + } + while (idle_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); + thr->myexchange.idle_mysql_sessions->add(mysess); + } + pthread_mutex_unlock(&thr->myexchange.mutex_idles); + if (empty_queue==true) { + unsigned char c=1; + int fd=thr->pipefd[1]; + if (write(fd,&c,1)==-1) { + //proxy_error("Error while signaling maintenance thread\n"); + } + } + } +} + +void MySQL_Thread::worker_threads_get_sessions_from_idle_threads() { + worker_threads_get_sessions_from_idle_threads(); + pthread_mutex_lock(&myexchange.mutex_resumes); + if (myexchange.resume_mysql_sessions->len) { + //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; + while (myexchange.resume_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); + register_session(mysess, false); + MySQL_Data_Stream *myds=mysess->client_myds; + mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + } + } + pthread_mutex_unlock(&myexchange.mutex_resumes); +} +#endif // IDLE_THREADS + + bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { if (mypolls.fds[n].revents) { #ifdef IDLE_THREADS @@ -6748,3 +6741,36 @@ void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) { } } } + +bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n) { + unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; + if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { + // make sure data stream has no pending data out and session is not throttled (#1939) + // because epoll thread does not handle data stream with data out + if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { + //unsigned int j; + bool has_backends = myds->sess->has_any_backend(); +/* + for (j=0;jsess->mybes->len;j++) { + MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); + MySQL_Data_Stream *__myds=tmp_mybe->server_myds; + if (__myds->myconn) { + conns++; + } + } +*/ + if (has_backends==false) { + unsigned long long idle_since = curtime - myds->sess->IdleTime(); + mypolls.remove_index_fast(n); + myds->mypolls=NULL; + unsigned int i = find_session_idx_in_mysql_sessions(myds->sess); + myds->sess->thread=NULL; + unregister_session(i); + myds->sess->idle_since = idle_since; + idle_mysql_sessions->add(myds->sess); + return true; + } + } + } + return false; +} From 64bc42a1228dfc8ac5e95dab83569223ba720a39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 21:41:00 +0200 Subject: [PATCH 2/6] Simplifying MySQL_Thread::run() phase 2 --- include/MySQL_Thread.h | 2 ++ lib/MySQL_Thread.cpp | 61 +++++++++++++++++++++++------------------- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 5ccf4be108..e02cdacc7a 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -101,6 +101,8 @@ class MySQL_Thread #ifdef IDLE_THREADS void worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr); void worker_threads_get_sessions_from_idle_threads(); + void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr); + void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr); #endif // IDLE_THREADS unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index fec7aa8667..06546f44c4 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -4241,35 +4241,9 @@ void MySQL_Thread::run() { unsigned int w=rand()%(GloMTH->num_threads); MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; if (resume_mysql_sessions->len) { - pthread_mutex_lock(&thr->myexchange.mutex_resumes); - if (shutdown==0 && thr->shutdown==0) - while (resume_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0); - thr->myexchange.resume_mysql_sessions->add(mysess); - } - pthread_mutex_unlock(&thr->myexchange.mutex_resumes); - { - unsigned char c=0; - //MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; - int fd=thr->pipefd[1]; - if (write(fd,&c,1)==-1) { - //proxy_error("Error while signaling maintenance thread\n"); - } - } + idle_thread_assigns_sessions_to_worker_thread(thr); } else { - //VALGRIND_DISABLE_ERROR_REPORTING; - pthread_mutex_lock(&thr->myexchange.mutex_resumes); - //VALGRIND_ENABLE_ERROR_REPORTING; - if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) { - unsigned char c=0; - int fd=thr->pipefd[1]; - if (write(fd,&c,1)==-1) { - //proxy_error("Error while signaling maintenance thread\n"); - } - } - //VALGRIND_DISABLE_ERROR_REPORTING; - pthread_mutex_unlock(&thr->myexchange.mutex_resumes); - //VALGRIND_ENABLE_ERROR_REPORTING; + idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(thr); } } else { #endif // IDLE_THREADS @@ -4295,6 +4269,37 @@ unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *ses } #ifdef IDLE_THREADS +void MySQL_Thread::idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr) { + pthread_mutex_lock(&thr->myexchange.mutex_resumes); + if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) { + unsigned char c=0; + int fd=thr->pipefd[1]; + if (write(fd,&c,1)==-1) { + //proxy_error("Error while signaling maintenance thread\n"); + } + } + pthread_mutex_unlock(&thr->myexchange.mutex_resumes); +} + +void MySQL_Thread::idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr) { + pthread_mutex_lock(&thr->myexchange.mutex_resumes); + if (shutdown==0 && thr->shutdown==0) + while (resume_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0); + thr->myexchange.resume_mysql_sessions->add(mysess); + } + pthread_mutex_unlock(&thr->myexchange.mutex_resumes); + { + unsigned char c=0; + //MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; + // we signal the thread to inform there are sessions + int fd=thr->pipefd[1]; + if (write(fd,&c,1)==-1) { + //proxy_error("Error while signaling maintenance thread\n"); + } + } +} + void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr) { if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { pthread_mutex_lock(&thr->myexchange.mutex_idles); From 72912f0b88254ada55df35b3abc3ab742b5b6b95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 22:05:39 +0200 Subject: [PATCH 3/6] Simplifying MySQL_Thread::run() phase 3 --- include/MySQL_Thread.h | 4 ++- lib/MySQL_Thread.cpp | 73 ++++++++++++++++++++++++------------------ 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index e02cdacc7a..49f6c7e909 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -100,13 +100,15 @@ class MySQL_Thread #ifdef IDLE_THREADS void worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr); - void worker_threads_get_sessions_from_idle_threads(); + void worker_thread_gets_sessions_from_idle_thread(); + void idle_thread_gets_sessions_from_worker_thread(); void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr); void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr); #endif // IDLE_THREADS unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); + bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n); protected: int nfds; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 06546f44c4..9a3ab56262 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3814,23 +3814,7 @@ void MySQL_Thread::run() { __run_skip_1: if (idle_maintenance_thread) { - pthread_mutex_lock(&myexchange.mutex_idles); - while (myexchange.idle_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); - register_session(mysess, false); - MySQL_Data_Stream *myds=mysess->client_myds; - mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); - // add in epoll() - struct epoll_event event; - memset(&event,0,sizeof(event)); // let's make valgrind happy - event.data.u32=mysess->thread_session_id; - event.events = EPOLLIN; - epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event); - // we map thread_id -> position in mysql_session (end of the list) - sessmap[mysess->thread_session_id]=mysql_sessions->len-1; - //fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx); - } - pthread_mutex_unlock(&myexchange.mutex_idles); + idle_thread_gets_sessions_from_worker_thread(); goto __run_skip_1a; } #endif // IDLE_THREADS @@ -3927,17 +3911,7 @@ void MySQL_Thread::run() { } } if (myds->myds_type==MYDS_BACKEND) { - if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) { - unsigned int buffered_data=0; - buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; - buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; - // we pause receiving from backend at mysql_thread___threshold_resultset_size * 8 - // but assuming that client isn't completely blocked, we will stop checking for data - // only at mysql_thread___threshold_resultset_size * 4 - if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) { - mypolls.fds[n].events = 0; - } - } + set_backend_to_be_skipped_if_frontend_is_slow(myds, n); } } } @@ -3950,7 +3924,7 @@ void MySQL_Thread::run() { int r=rand()%(GloMTH->num_threads); MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker; worker_thread_assigns_sessions_to_idle_thread(thr); - worker_threads_get_sessions_from_idle_threads(); + worker_thread_gets_sessions_from_idle_thread(); } } @@ -4238,6 +4212,7 @@ void MySQL_Thread::run() { #ifdef IDLE_THREADS __run_skip_2: if (GloVars.global.idle_threads && idle_maintenance_thread) { + // this is an idle thread unsigned int w=rand()%(GloMTH->num_threads); MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; if (resume_mysql_sessions->len) { @@ -4256,6 +4231,7 @@ void MySQL_Thread::run() { #endif // IDLE_THREADS } } +// end of ::run() unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) { int i=0; @@ -4323,8 +4299,7 @@ void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *t } } -void MySQL_Thread::worker_threads_get_sessions_from_idle_threads() { - worker_threads_get_sessions_from_idle_threads(); +void MySQL_Thread::worker_thread_gets_sessions_from_idle_thread() { pthread_mutex_lock(&myexchange.mutex_resumes); if (myexchange.resume_mysql_sessions->len) { //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; @@ -6779,3 +6754,39 @@ bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, } return false; } + +bool MySQL_Thread::set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n) { + if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) { + unsigned int buffered_data=0; + buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; + buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; + // we pause receiving from backend at mysql_thread___threshold_resultset_size * 8 + // but assuming that client isn't completely blocked, we will stop checking for data + // only at mysql_thread___threshold_resultset_size * 4 + if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) { + mypolls.fds[n].events = 0; + return true; + } + } + return false; +} + +void MySQL_Thread::idle_thread_gets_sessions_from_worker_thread() { + pthread_mutex_lock(&myexchange.mutex_idles); + while (myexchange.idle_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); + register_session(mysess, false); + MySQL_Data_Stream *myds=mysess->client_myds; + mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + // add in epoll() + struct epoll_event event; + memset(&event,0,sizeof(event)); // let's make valgrind happy + event.data.u32=mysess->thread_session_id; + event.events = EPOLLIN; + epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event); + // we map thread_id -> position in mysql_session (end of the list) + sessmap[mysess->thread_session_id]=mysql_sessions->len-1; + //fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx); + } + pthread_mutex_unlock(&myexchange.mutex_idles); +} From da21ca1b24bd56c5a2e83490a998992296fa1eb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 22:20:54 +0200 Subject: [PATCH 4/6] Simplifying MySQL_Thread::run() phase 4 --- include/MySQL_Thread.h | 1 + lib/MySQL_Thread.cpp | 77 +++++++++++++++++++++++------------------- 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 49f6c7e909..0726079737 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -109,6 +109,7 @@ class MySQL_Thread unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n); + void handle_mirror_queue_mysql_sessions(); protected: int nfds; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 9a3ab56262..fd83afad02 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3741,6 +3741,7 @@ void MySQL_Thread::unregister_session(int idx) { } + // main loop void MySQL_Thread::run() { unsigned int n; @@ -3818,41 +3819,9 @@ void MySQL_Thread::run() { goto __run_skip_1a; } #endif // IDLE_THREADS - while (mirror_queue_mysql_sessions->len) { - if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) { - __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); - goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime - } else { - int idx; - idx=fastrand()%(mirror_queue_mysql_sessions->len); - MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx); - register_session(newsess); - newsess->handler(); // execute immediately - if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed - unregister_session(mysql_sessions->len-1); - unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; - if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3; - if (mirror_queue_mysql_sessions_cache->len <= l) { - bool to_cache=true; - if (newsess->mybe) { - if (newsess->mybe->server_myds) { - to_cache=false; - } - } - if (to_cache) { - __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); - mirror_queue_mysql_sessions_cache->add(newsess); - } else { - delete newsess; - } - } else { - delete newsess; - } - } - //newsess->to_process=0; - } - } -__mysql_thread_exit_add_mirror: + + handle_mirror_queue_mysql_sessions(); + for (n = 0; n < mypolls.len; n++) { MySQL_Data_Stream *myds=NULL; myds=mypolls.myds[n]; @@ -6790,3 +6759,41 @@ void MySQL_Thread::idle_thread_gets_sessions_from_worker_thread() { } pthread_mutex_unlock(&myexchange.mutex_idles); } + +void MySQL_Thread::handle_mirror_queue_mysql_sessions() { + while (mirror_queue_mysql_sessions->len) { + if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) { + __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); + //goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime + return; + } else { + int idx; + idx=fastrand()%(mirror_queue_mysql_sessions->len); + MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx); + register_session(newsess); + newsess->handler(); // execute immediately + if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed + unregister_session(mysql_sessions->len-1); + unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; + if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3; + if (mirror_queue_mysql_sessions_cache->len <= l) { + bool to_cache=true; + if (newsess->mybe) { + if (newsess->mybe->server_myds) { + to_cache=false; + } + } + if (to_cache) { + __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); + mirror_queue_mysql_sessions_cache->add(newsess); + } else { + delete newsess; + } + } else { + delete newsess; + } + } + //newsess->to_process=0; + } + } +} From 9c2750027d1d1005d5a544304c0041eb5d7c1fca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 23:41:53 +0200 Subject: [PATCH 5/6] Simplifying MySQL_Thread::run() phase 5 --- include/MySQL_Thread.h | 8 ++ lib/MySQL_Thread.cpp | 257 +++++++++++++++++++++++------------------ 2 files changed, 153 insertions(+), 112 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 0726079737..d73ac709ed 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -104,12 +104,20 @@ class MySQL_Thread void idle_thread_gets_sessions_from_worker_thread(); void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr); void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr); + void idle_thread_prepares_session_to_send_to_worker_thread(int i); #endif // IDLE_THREADS unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n); void handle_mirror_queue_mysql_sessions(); + void handle_kill_queues(); + void check_timing_out_session(unsigned int n); + void check_for_invalid_fd(unsigned int n); + void read_one_byte_from_pipe(unsigned int n); + void tune_timeout_for_myds_needs_pause(MySQL_Data_Stream *myds); + void tune_timeout_for_session_needs_pause(MySQL_Data_Stream *myds); + void configure_pollout(MySQL_Data_Stream *myds, unsigned int n); protected: int nfds; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index fd83afad02..31ddc97d8c 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3841,47 +3841,16 @@ void MySQL_Thread::run() { } #endif // IDLE_THREADS if (unlikely(myds->wait_until)) { - if (myds->wait_until > curtime) { - if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) { - mypolls.poll_timeout= myds->wait_until - curtime; - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , wait_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->wait_until, curtime); - } - } + tune_timeout_for_myds_needs_pause(myds); } if (myds->sess) { if (unlikely(myds->sess->pause_until > 0)) { - if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) { - mypolls.poll_timeout= myds->sess->pause_until - curtime; - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , pause_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->pause_until, curtime); - } + tune_timeout_for_session_needs_pause(myds); } } myds->revents=0; if (myds->myds_type!=MYDS_LISTENER) { - if (myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) { - myds->set_pollout(); - } else { - if (myds->DSS > STATE_MARIADB_BEGIN && myds->DSS < STATE_MARIADB_END) { - mypolls.fds[n].events = POLLIN; - if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE) - mypolls.fds[n].events |= POLLOUT; - } else { - myds->set_pollout(); - } - } - if (unlikely(myds->sess->pause_until > curtime)) { - if (myds->myds_type==MYDS_FRONTEND) { - myds->remove_pollout(); - } - if (myds->myds_type==MYDS_BACKEND) { - if (mysql_thread___throttle_ratio_server_to_client) { - mypolls.fds[n].events = 0; - } - } - } - if (myds->myds_type==MYDS_BACKEND) { - set_backend_to_be_skipped_if_frontend_is_slow(myds, n); - } + configure_pollout(myds, n); } } proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); @@ -3972,12 +3941,7 @@ void MySQL_Thread::run() { maintenance_loop=false; } - pthread_mutex_lock(&kq.m); - if (kq.conn_ids.size() + kq.query_ids.size()) { - Scan_Sessions_to_Kill_All(); - maintenance_loop=true; - } - pthread_mutex_unlock(&kq.m); + handle_kill_queues(); // update polls statistics mypolls.loops++; @@ -4027,31 +3991,10 @@ void MySQL_Thread::run() { int i; for (i=0; iindex(sess_pos); - MySQL_Data_Stream *tmp_myds=mysess->client_myds; - int dsidx=tmp_myds->poll_fds_idx; - //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); - mypolls.remove_index_fast(dsidx); - tmp_myds->mypolls=NULL; - mysess->thread=NULL; - // we first delete the association in sessmap - sessmap.erase(mysess->thread_session_id); - if (mysql_sessions->len > 1) { - // take the last element and adjust the map - MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); - if (mysess->thread_session_id != mysess_last->thread_session_id) - sessmap[mysess_last->thread_session_id]=sess_pos; - } - unregister_session(sess_pos); - resume_mysql_sessions->add(mysess); - epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); - } + idle_thread_prepares_session_to_send_to_worker_thread(i); } } + // FIXME: this loop seems suboptimal, it can be combined with the previous one for (i=0; isess) { - if (_myds->wait_until && curtime > _myds->wait_until) { - // timeout - _myds->sess->to_process=1; - } else { - if (_myds->sess->pause_until && curtime > _myds->sess->pause_until) { - // timeout - _myds->sess->to_process=1; - } - } - } + check_timing_out_session(n); } } else { - // check if the FD is valid - if (mypolls.fds[n].revents==POLLNVAL) { - // debugging output before assert - MySQL_Data_Stream *_myds=mypolls.myds[n]; - if (_myds) { - if (_myds->myconn) { - proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, myds->fd, myds->myconn->fd); - assert(mypolls.fds[n].revents!=POLLNVAL); - } - } - // if we reached her, we didn't assert() yet - proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, myds->fd); - assert(mypolls.fds[n].revents!=POLLNVAL); - } + check_for_invalid_fd(n); // this is designed to assert in case of failure switch(myds->myds_type) { - // Note: this logic that was here was removed completely because we added mariadb client library. + // Note: this logic that was here was removed completely because we added mariadb client library. case MYDS_LISTENER: // we got a new connection! listener_handle_new_connection(myds,n); @@ -4175,7 +4075,7 @@ void MySQL_Thread::run() { if (rc==false) { n--; } - } + } } #ifdef IDLE_THREADS @@ -4214,6 +4114,32 @@ unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *ses } #ifdef IDLE_THREADS +void MySQL_Thread::idle_thread_prepares_session_to_send_to_worker_thread(int i) { + // NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it + if (events[i].events) { + uint32_t sess_thr_id=events[i].data.u32; + uint32_t sess_pos=sessmap[sess_thr_id]; + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos); + MySQL_Data_Stream *tmp_myds=mysess->client_myds; + int dsidx=tmp_myds->poll_fds_idx; + //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); + mypolls.remove_index_fast(dsidx); + tmp_myds->mypolls=NULL; + mysess->thread=NULL; + // we first delete the association in sessmap + sessmap.erase(mysess->thread_session_id); + if (mysql_sessions->len > 1) { + // take the last element and adjust the map + MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); + if (mysess->thread_session_id != mysess_last->thread_session_id) + sessmap[mysess_last->thread_session_id]=sess_pos; + } + unregister_session(sess_pos); + resume_mysql_sessions->add(mysess); + epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); + } +} + void MySQL_Thread::idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr) { pthread_mutex_lock(&thr->myexchange.mutex_resumes); if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) { @@ -6797,3 +6723,110 @@ void MySQL_Thread::handle_mirror_queue_mysql_sessions() { } } } + +void MySQL_Thread::handle_kill_queues() { + pthread_mutex_lock(&kq.m); + if (kq.conn_ids.size() + kq.query_ids.size()) { + Scan_Sessions_to_Kill_All(); + maintenance_loop=true; + } + pthread_mutex_unlock(&kq.m); +} + +void MySQL_Thread::check_timing_out_session(unsigned int n) { + // FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout + // check for timeout + // no events. This section is copied from process_data_on_data_stream() + MySQL_Data_Stream *_myds=mypolls.myds[n]; + if (_myds && _myds->sess) { + if (_myds->wait_until && curtime > _myds->wait_until) { + // timeout + _myds->sess->to_process=1; + } else { + if (_myds->sess->pause_until && curtime > _myds->sess->pause_until) { + // timeout + _myds->sess->to_process=1; + } + } + } +} + +void MySQL_Thread::check_for_invalid_fd(unsigned int n) { + // check if the FD is valid + if (mypolls.fds[n].revents==POLLNVAL) { + // debugging output before assert + MySQL_Data_Stream *_myds=mypolls.myds[n]; + if (_myds) { + if (_myds->myconn) { + proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd, _myds->myconn->fd); + assert(mypolls.fds[n].revents!=POLLNVAL); + } + } + // if we reached her, we didn't assert() yet + proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd); + assert(mypolls.fds[n].revents!=POLLNVAL); + } +} + +void MySQL_Thread::read_one_byte_from_pipe(unsigned int n) { + if (mypolls.fds[n].revents) { + unsigned char c; + if (read(mypolls.fds[n].fd, &c, 1)==-1) {// read just one byte + proxy_error("Error during read from signal_all_threads()\n"); + } + proxy_debug(PROXY_DEBUG_GENERIC,3, "Got signal from admin , done nothing\n"); + //fprintf(stderr,"Got signal from admin , done nothing\n"); // FIXME: this is just the skeleton for issue #253 + if (c) { + // we are being signaled to sleep for some ms. Before going to sleep we also release the mutex + pthread_mutex_unlock(&thread_mutex); + usleep(c*1000); + pthread_mutex_lock(&thread_mutex); + // we enter in maintenance loop only if c is set + // when threads are signaling each other, there is no need to set maintenance_loop + maintenance_loop=true; + } + } +} + +void MySQL_Thread::tune_timeout_for_myds_needs_pause(MySQL_Data_Stream *myds) { + if (myds->wait_until > curtime) { + if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) { + mypolls.poll_timeout= myds->wait_until - curtime; + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , wait_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->wait_until, curtime); + } + } +} + +void MySQL_Thread::tune_timeout_for_session_needs_pause(MySQL_Data_Stream *myds) { + if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) { + mypolls.poll_timeout= myds->sess->pause_until - curtime; + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , pause_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->sess->pause_until, curtime); + } +} + +void MySQL_Thread::configure_pollout(MySQL_Data_Stream *myds, unsigned int n) { + if (myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) { + myds->set_pollout(); + } else { + if (myds->DSS > STATE_MARIADB_BEGIN && myds->DSS < STATE_MARIADB_END) { + mypolls.fds[n].events = POLLIN; + if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE) + mypolls.fds[n].events |= POLLOUT; + } else { + myds->set_pollout(); + } + } + if (unlikely(myds->sess->pause_until > curtime)) { + if (myds->myds_type==MYDS_FRONTEND) { + myds->remove_pollout(); + } + if (myds->myds_type==MYDS_BACKEND) { + if (mysql_thread___throttle_ratio_server_to_client) { + mypolls.fds[n].events = 0; + } + } + } + if (myds->myds_type==MYDS_BACKEND) { + set_backend_to_be_skipped_if_frontend_is_slow(myds, n); + } +} From 8a0e1c17482adb615ea30d137b4d8a48b45cf077 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 7 Apr 2020 00:17:22 +0200 Subject: [PATCH 6/6] Simplifying MySQL_Thread::run() phase 6 --- include/MySQL_Thread.h | 1 + lib/MySQL_Thread.cpp | 80 ++++++++++++++++++++++-------------------- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index d73ac709ed..827a5ac8cd 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -105,6 +105,7 @@ class MySQL_Thread void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr); void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr); void idle_thread_prepares_session_to_send_to_worker_thread(int i); + void idle_thread_to_kill_idle_sessions(); #endif // IDLE_THREADS unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 31ddc97d8c..ebb2efe72e 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3848,10 +3848,10 @@ void MySQL_Thread::run() { tune_timeout_for_session_needs_pause(myds); } } - myds->revents=0; - if (myds->myds_type!=MYDS_LISTENER) { - configure_pollout(myds, n); - } + myds->revents=0; + if (myds->myds_type!=MYDS_LISTENER) { + configure_pollout(myds, n); + } } proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); } @@ -4007,40 +4007,7 @@ void MySQL_Thread::run() { } } if (mysql_sessions->len && maintenance_loop) { -#define SESS_TO_SCAN 128 - if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) { - mysess_idx=0; - } - unsigned int i; - unsigned long long min_idle = 0; - if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) { - min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000; - } - for (i=0;ilen; i++) { - uint32_t sess_pos=mysess_idx; - MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos); - if (mysess->idle_since < min_idle) { - mysess->killed=true; - MySQL_Data_Stream *tmp_myds=mysess->client_myds; - int dsidx=tmp_myds->poll_fds_idx; - //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); - mypolls.remove_index_fast(dsidx); - tmp_myds->mypolls=NULL; - mysess->thread=NULL; - // we first delete the association in sessmap - sessmap.erase(mysess->thread_session_id); - if (mysql_sessions->len > 1) { - // take the last element and adjust the map - MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); - if (mysess->thread_session_id != mysess_last->thread_session_id) - sessmap[mysess_last->thread_session_id]=sess_pos; - } - unregister_session(sess_pos); - resume_mysql_sessions->add(mysess); - epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); - } - mysess_idx++; - } + idle_thread_to_kill_idle_sessions(); } goto __run_skip_2; } @@ -4114,6 +4081,43 @@ unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *ses } #ifdef IDLE_THREADS +void MySQL_Thread::idle_thread_to_kill_idle_sessions() { +#define SESS_TO_SCAN 128 + if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) { + mysess_idx=0; + } + unsigned int i; + unsigned long long min_idle = 0; + if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) { + min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000; + } + for (i=0;ilen; i++) { + uint32_t sess_pos=mysess_idx; + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos); + if (mysess->idle_since < min_idle) { + mysess->killed=true; + MySQL_Data_Stream *tmp_myds=mysess->client_myds; + int dsidx=tmp_myds->poll_fds_idx; + //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); + mypolls.remove_index_fast(dsidx); + tmp_myds->mypolls=NULL; + mysess->thread=NULL; + // we first delete the association in sessmap + sessmap.erase(mysess->thread_session_id); + if (mysql_sessions->len > 1) { + // take the last element and adjust the map + MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); + if (mysess->thread_session_id != mysess_last->thread_session_id) + sessmap[mysess_last->thread_session_id]=sess_pos; + } + unregister_session(sess_pos); + resume_mysql_sessions->add(mysess); + epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); + } + mysess_idx++; + } +} + void MySQL_Thread::idle_thread_prepares_session_to_send_to_worker_thread(int i) { // NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it if (events[i].events) {