Skip to content

Commit

Permalink
improve:remove the heartbeat on server side
Browse files Browse the repository at this point in the history
  • Loading branch information
xujingfeng committed Jan 19, 2019
1 parent db8e58e commit aca12dd
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.remoting.exchange.support.header;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.Channel;

/**
* CloseTimerTask
*/
public class CloseTimerTask extends AbstractTimerTask {

private static final Logger logger = LoggerFactory.getLogger(CloseTimerTask.class);

private final int idleTimeout;

public CloseTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
super(channelProvider, heartbeatTimeoutTick);
this.idleTimeout = idleTimeout;
}

@Override
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
Long now = now();
// check ping & pong at server
if ((lastRead != null && now - lastRead > idleTimeout)
|| (lastWrite != null && now - lastWrite > idleTimeout)) {
logger.warn("Close channel " + channel + ", because idleCheck timeout: "
+ idleTimeout + "ms");
channel.close();
}
} catch (Throwable t) {
logger.warn("Exception when close remote channel " + channel.getRemoteAddress(), t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ public class HeaderExchangeClient implements ExchangeClient {

private final Client client;
private final ExchangeChannel channel;
// heartbeat(ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private int idleTimeout;

private HashedWheelTimer heartbeatTimer;
private HashedWheelTimer idleCheckTimer;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
Expand All @@ -55,16 +54,16 @@ public HeaderExchangeClient(Client client, boolean needHeartbeat) {

this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

if (needHeartbeat) {
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), tickDuration,
TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
startIdleCheckTimer();
}
}

Expand Down Expand Up @@ -178,28 +177,28 @@ public boolean hasAttribute(String key) {
return channel.hasAttribute(key);
}

private void startHeartbeatTimer() {
private void startIdleCheckTimer() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
idleCheckTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
idleCheckTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

private void stopHeartbeatTimer() {
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
private void stopIdleCheckTimer() {
if (idleCheckTimer != null) {
idleCheckTimer.stop();
idleCheckTimer = null;
}
}

private void doClose() {
stopHeartbeatTimer();
stopIdleCheckTimer();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,24 @@ public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());

private final Server server;
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private int idleTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);

private HashedWheelTimer heartbeatTimer;
private HashedWheelTimer idleCheckTimer;

public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

startHeartbeatTimer();
startIdleCheckTimer();
}

public Server getServer() {
Expand Down Expand Up @@ -148,7 +147,7 @@ private void doClose() {
if (!closed.compareAndSet(false, true)) {
return;
}
stopHeartbeatTimer();
stopIdleCheckTimer();
}

@Override
Expand Down Expand Up @@ -209,14 +208,14 @@ public void reset(URL url) {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
if (t < h * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
if (h != heartbeat || t != heartbeatTimeout) {
if (h != heartbeat || t != idleTimeout) {
heartbeat = h;
heartbeatTimeout = t;
idleTimeout = t;

stopHeartbeatTimer();
startHeartbeatTimer();
stopIdleCheckTimer();
startIdleCheckTimer();
}
}
} catch (Throwable t) {
Expand Down Expand Up @@ -259,27 +258,23 @@ private long calculateLeastDuration(int time) {
}
}

private void startHeartbeatTimer() {
private void startIdleCheckTimer() {
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-heartbeat", true), tickDuration,
idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), tickDuration,
TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);

AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());

long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
idleCheckTimer.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}

private void stopHeartbeatTimer() {
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
private void stopIdleCheckTimer() {
if (idleCheckTimer != null) {
idleCheckTimer.stop();
idleCheckTimer = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,26 @@ public class ReconnectTimerTask extends AbstractTimerTask {

private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);

private final int heartbeatTimeout;
private final int idleTimeout;

ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) {
public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
super(channelProvider, heartbeatTimeoutTick);
this.heartbeatTimeout = heartbeatTimeout1;
this.idleTimeout = idleTimeout;
}

@Override
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long now = now();
if (lastRead != null && now - lastRead > heartbeatTimeout) {
// check pong at client
if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
+ heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
} else {
channel.close();
+ idleTimeout + "ms");
try {
((Client) channel).reconnect();
} catch (Exception e) {
logger.error(channel + "reconnect failed during idle time.", e);
}
}
} catch (Throwable t) {
Expand Down

0 comments on commit aca12dd

Please sign in to comment.