Skip to content

Commit

Permalink
Merge pull request sofastack#8 from atellwu/renew_expired_strategy
Browse files Browse the repository at this point in the history
renew_expired_strategy
  • Loading branch information
Synex-wh authored Jul 26, 2019
2 parents 79cd2ad + 6cd814f commit 44d93d7
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class DataServerConfig {

private int publishExecutorQueueSize = 10000;

private int datumTimeToLiveSec = 300;
private int datumTimeToLiveSec = 900;

private int datumLeaseManagerExecutorThreadSize = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ data.server.rpcTimeout=3000
data.server.metaServerPort=9611
data.server.storeNodes=3
data.server.numberOfReplicas=1000
data.server.datumTimeToLiveSec=900

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
*/
package com.alipay.sofa.registry.server.meta.resource;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.springframework.beans.factory.annotation.Autowired;

import com.alipay.sofa.registry.common.model.console.PersistenceData;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.metaserver.DataOperator;
Expand All @@ -29,12 +36,6 @@
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskEvent.TaskType;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import org.springframework.beans.factory.annotation.Autowired;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

/**
*
Expand All @@ -58,6 +59,7 @@ public class BlacklistDataResource {

/**
* update blacklist
* e.g. curl -d '{"FORBIDDEN_PUB":{"IP_FULL":["1.1.1.1","10.15.233.150"]},"FORBIDDEN_SUB_BY_PREFIX":{"IP_FULL":["1.1.1.1"]}}' -H "Content-Type: application/json" -X POST http://localhost:9615/blacklist/update
*/
@POST
@Path("update")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ public void process(WriteDataRequest request) {
RENEW_LOGGER.debug("process: connectId={}, requestType={}, requestBody={}", connectId,
request.getRequestType(), request.getRequestBody());
}

// record the last update time
// RefreshUpdateTime is at the top, otherwise multiple snapshot can be issued concurrently
if (isWriteRequest(request)) {
refreshUpdateTime();
}

if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
// snapshot has high priority, so handle directly
doHandle(request);
Expand All @@ -119,11 +126,6 @@ public void process(WriteDataRequest request) {
}
}

// record the last update time
if (isWriteRequest(request)) {
refreshUpdateTime();
}

}

private void addQueue(WriteDataRequest request) {
Expand All @@ -145,11 +147,7 @@ private void addQueue(WriteDataRequest request) {
* @return
*/
private boolean isWriteRequest(WriteDataRequest request) {
// UN_PUBLISHER is not guaranteed to find the corresponding connectId on the data side (because of expired cleaning or bugs, etc. ),
// so don't block the renew, don't consider it as a write request
return request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT
|| request.getRequestType() == WriteDataRequestType.PUBLISHER
|| request.getRequestType() == WriteDataRequestType.CLIENT_OFF;
return request.getRequestType() != WriteDataRequestType.RENEW_DATUM;
}

/**
Expand Down Expand Up @@ -201,8 +199,11 @@ private void doHandle(WriteDataRequest request) {
return;
}
halt();
doSnapshotAsync(request);
resume();
try {
doSnapshotAsync(request);
} finally {
resume();
}
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public interface SessionServerConfig {

int getRenewDatumWheelTicksDuration();

long getRenewDatumWheelTaskDelay();
int getRenewDatumWheelTaskDelaySec();

int getRenewDatumWheelTaskRandomFirstDelaySec();

String getBlacklistPubDataIdRegex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package com.alipay.sofa.registry.server.session.bootstrap;

import java.util.Collections;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -84,31 +81,31 @@ public class SessionServerConfigBean implements SessionServerConfig {

private int schedulerCleanInvalidClientBackOffBound = 5;

private int cancelDataTaskRetryTimes = 5;
private int cancelDataTaskRetryTimes = 2;

private long cancelDataTaskRetryFirstDelay = 500;

private long cancelDataTaskRetryIncrementDelay = 500;

private int publishDataTaskRetryTimes = 5;
private int publishDataTaskRetryTimes = 2;

private long publishDataTaskRetryFirstDelay = 500;
private long publishDataTaskRetryFirstDelay = 3000;

private long publishDataTaskRetryIncrementDelay = 500;
private long publishDataTaskRetryIncrementDelay = 5000;

private int unPublishDataTaskRetryTimes = 5;
private int unPublishDataTaskRetryTimes = 2;

private long unPublishDataTaskRetryFirstDelay = 500;
private long unPublishDataTaskRetryFirstDelay = 3000;

private long unPublishDataTaskRetryIncrementDelay = 500;
private long unPublishDataTaskRetryIncrementDelay = 5000;

private int datumSnapshotTaskRetryTimes = 5;
private int datumSnapshotTaskRetryTimes = 1;

private long datumSnapshotTaskRetryFirstDelay = 1000;
private long datumSnapshotTaskRetryFirstDelay = 5000;

private long datumSnapshotTaskRetryIncrementDelay = 1000;
private long datumSnapshotTaskRetryIncrementDelay = 5000;

private int renewDatumTaskRetryTimes = 3;
private int renewDatumTaskRetryTimes = 1;

private int dataChangeFetchTaskRetryTimes = 3;

Expand Down Expand Up @@ -178,7 +175,9 @@ public class SessionServerConfigBean implements SessionServerConfig {

private int renewDatumWheelTicksDuration = 500;

private long renewDatumWheelTaskDelay = 30;
private int renewDatumWheelTaskDelaySec = 180;

private int renewDatumWheelTaskRandomFirstDelaySec = 60;

private int pushDataTaskRetryFirstDelay = 500;

Expand Down Expand Up @@ -1692,21 +1691,39 @@ public int getRenewDatumWheelTicksDuration() {
}

/**
* Getter method for property <tt>renewDatumWheelTaskDelay</tt>.
* Getter method for property <tt>renewDatumWheelTaskDelaySec</tt>.
*
* @return property value of renewDatumWheelTaskDelaySec
*/
public int getRenewDatumWheelTaskDelaySec() {
return renewDatumWheelTaskDelaySec;
}

/**
* Setter method for property <tt>renewDatumWheelTaskDelaySec </tt>.
*
* @param renewDatumWheelTaskDelaySec value to be assigned to property renewDatumWheelTaskDelaySec
*/
public void setRenewDatumWheelTaskDelaySec(int renewDatumWheelTaskDelaySec) {
this.renewDatumWheelTaskDelaySec = renewDatumWheelTaskDelaySec;
}

/**
* Getter method for property <tt>renewDatumWheelTaskRandomFirstDelaySec</tt>.
*
* @return property value of renewDatumWheelTaskDelay
* @return property value of renewDatumWheelTaskRandomFirstDelaySec
*/
public long getRenewDatumWheelTaskDelay() {
return renewDatumWheelTaskDelay;
public int getRenewDatumWheelTaskRandomFirstDelaySec() {
return renewDatumWheelTaskRandomFirstDelaySec;
}

/**
* Setter method for property <tt>renewDatumWheelTaskDelay</tt>.
* Setter method for property <tt>renewDatumWheelTaskRandomFirstDelaySec </tt>.
*
* @param renewDatumWheelTaskDelay value to be assigned to property renewDatumWheelTaskDelay
* @param renewDatumWheelTaskRandomFirstDelaySec value to be assigned to property renewDatumWheelTaskRandomFirstDelaySec
*/
public void setRenewDatumWheelTaskDelay(long renewDatumWheelTaskDelay) {
this.renewDatumWheelTaskDelay = renewDatumWheelTaskDelay;
public void setRenewDatumWheelTaskRandomFirstDelaySec(int renewDatumWheelTaskRandomFirstDelaySec) {
this.renewDatumWheelTaskRandomFirstDelaySec = renewDatumWheelTaskRandomFirstDelaySec;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.math.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;

import com.alipay.sofa.registry.common.model.constants.ValueConstants;
Expand Down Expand Up @@ -132,7 +133,7 @@ private void fireRenewDatum(Channel channel) {
RENEW_LOGGER.info("Renew task is started: {}", connectId);
executorManager.getAsyncHashedWheelTimerTask()
.newTimeout(connectId, timerOut -> sessionRegistry.renewDatum(connectId),
sessionServerConfig.getRenewDatumWheelTaskDelay(), TimeUnit.SECONDS, () -> {
randomDelay() + sessionServerConfig.getRenewDatumWheelTaskDelaySec(), TimeUnit.SECONDS, () -> {
Server sessionServer = boltExchange.getServer(sessionServerConfig.getServerPort());
Channel channelClient = sessionServer.getChannel(URL.valueOf(connectId));
boolean shouldContinue = channelClient != null && channel.isConnected();
Expand All @@ -143,4 +144,9 @@ private void fireRenewDatum(Channel channel) {
});
});
}

private long randomDelay() {
return RandomUtils.nextInt(sessionServerConfig.getRenewDatumWheelTaskRandomFirstDelaySec());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ session.server.schedulerFetchDataFirstDelay=3
session.server.schedulerFetchDataExpBackOffBound=10
#session.server.invalidForeverZones=<zone1>;<zone2>
#session.server.invalidIgnoreDataidRegex=<Regex>
#session.server.pushEmptyDataDataIdPrefixes=<some prefix string>
#session.server.pushEmptyDataDataIdPrefixes=<some prefix string>
session.server.renewDatumWheelTaskDelaySec=180
session.server.renewDatumWheelTaskRandomFirstDelaySec=60

0 comments on commit 44d93d7

Please sign in to comment.