Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve/heartbeat #3276

Merged
merged 8 commits into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.remoting.exchange.support.header;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.Channel;

/**
* CloseTimerTask
*/
public class CloseTimerTask extends AbstractTimerTask {

private static final Logger logger = LoggerFactory.getLogger(CloseTimerTask.class);

private final int idleTimeout;

public CloseTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
super(channelProvider, heartbeatTimeoutTick);
this.idleTimeout = idleTimeout;
}

@Override
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
Long now = now();
// check ping & pong at server
if ((lastRead != null && now - lastRead > idleTimeout)
lexburner marked this conversation as resolved.
Show resolved Hide resolved
|| (lastWrite != null && now - lastWrite > idleTimeout)) {
logger.warn("Close channel " + channel + ", because idleCheck timeout: "
+ idleTimeout + "ms");
channel.close();
}
} catch (Throwable t) {
logger.warn("Exception when close remote channel " + channel.getRemoteAddress(), t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ public class HeaderExchangeClient implements ExchangeClient {

private final Client client;
private final ExchangeChannel channel;
// heartbeat(ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private int idleTimeout;

private HashedWheelTimer heartbeatTimer;
private HashedWheelTimer idleCheckTimer;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -55,16 +54,16 @@ public HeaderExchangeClient(Client client, boolean needHeartbeat) {

this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
lexburner marked this conversation as resolved.
Show resolved Hide resolved
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

if (needHeartbeat) {
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), tickDuration,
TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
startIdleCheckTimer();
}
}

Expand Down Expand Up @@ -178,28 +177,28 @@ public boolean hasAttribute(String key) {
return channel.hasAttribute(key);
}

private void startHeartbeatTimer() {
private void startIdleCheckTimer() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
idleCheckTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
idleCheckTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

private void stopHeartbeatTimer() {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
private void stopIdleCheckTimer() {
if (idleCheckTimer != null) {
idleCheckTimer.stop();
idleCheckTimer = null;
}
}

private void doClose() {
stopHeartbeatTimer();
stopIdleCheckTimer();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,24 @@ public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());

private final Server server;
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private int idleTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);

private HashedWheelTimer heartbeatTimer;
private HashedWheelTimer idleCheckTimer;

public HeaderExchangeServer(Server server) {
if (server == null) {
beiwei30 marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

startHeartbeatTimer();
startIdleCheckTimer();
}

public Server getServer() {
Expand Down Expand Up @@ -149,7 +148,7 @@ private void doClose() {
if (!closed.compareAndSet(false, true)) {
return;
}
stopHeartbeatTimer();
stopIdleCheckTimer();
}

@Override
Expand Down Expand Up @@ -210,14 +209,14 @@ public void reset(URL url) {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

h can be rename to urlHeartBeatKey what do you say?

int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
lexburner marked this conversation as resolved.
Show resolved Hide resolved
if (t < h * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
if (h != heartbeat || t != heartbeatTimeout) {
if (h != heartbeat || t != idleTimeout) {
heartbeat = h;
heartbeatTimeout = t;
idleTimeout = t;

stopHeartbeatTimer();
startHeartbeatTimer();
stopIdleCheckTimer();
startIdleCheckTimer();
}
}
} catch (Throwable t) {
Expand Down Expand Up @@ -260,27 +259,23 @@ private long calculateLeastDuration(int time) {
}
}

private void startHeartbeatTimer() {
private void startIdleCheckTimer() {
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-heartbeat", true), tickDuration,
idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), tickDuration,
TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);

AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());

long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);

// init task and start timer.
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
idleCheckTimer.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}

private void stopHeartbeatTimer() {
if (heartbeatTimer != null) {
heartbeatTimer.stop();
heartbeatTimer = null;
private void stopIdleCheckTimer() {
if (idleCheckTimer != null) {
idleCheckTimer.stop();
idleCheckTimer = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,30 @@ public class ReconnectTimerTask extends AbstractTimerTask {

private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);

private final int heartbeatTimeout;
private final int idleTimeout;

ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) {
public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
super(channelProvider, heartbeatTimeoutTick);
this.heartbeatTimeout = heartbeatTimeout1;
this.idleTimeout = idleTimeout;
}

@Override
protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long now = now();
if (lastRead != null && now - lastRead > heartbeatTimeout) {
if (channel instanceof Client) {
try {
Long lastRead = lastRead(channel);
Long now = now();
// check pong at client
if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
logger.warn("Reconnect to remote channel " + channel.getRemoteAddress() + ", because heartbeat read idle time out: "
+ heartbeatTimeout + "ms");
((Client) channel).reconnect();
} catch (Throwable t) {
// do nothing
}
} else {
try {
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
+ heartbeatTimeout + "ms");
channel.close();
} catch (Throwable t) {
logger.warn("Exception when close channel " + channel, t);
} catch (Exception e) {
logger.error(channel + "reconnect failed during idle time.", e);
}
}
} catch (Throwable t) {
logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
}
}
}