Skip to content

Commit

Permalink
Merge pull request sofastack#14 from Synex-wh/fix_push_confirm_success
Browse files Browse the repository at this point in the history
Fix push confirm success
  • Loading branch information
atellwu authored Nov 21, 2019
2 parents 840d8c6 + 5b1f784 commit 1fbde8a
Show file tree
Hide file tree
Showing 24 changed files with 1,122 additions and 284 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<lookout.version>1.5.2</lookout.version>
<mockito.version>1.10.19</mockito.version>
<powermock.version>1.6.6</powermock.version>
<jraft.version>1.2.5</jraft.version>
<jraft.version>1.2.7.beta1</jraft.version>
<metrics.version>4.0.2</metrics.version>
<commons-io.version>2.4</commons-io.version>
<jetty.version>[9.4.17.v20190418,9.4.19.v20190610]</jetty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void onException(Throwable e) {

@Override
public Executor getExecutor() {
return null;
return callbackHandler.getExecutor();
}
}, timeoutMillis);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public void onException(Throwable e) {

@Override
public Executor getExecutor() {
return null;
return callbackHandler.getExecutor();
}
}, timeoutMillis);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public interface MetaServerConfig {

boolean isEnableMetrics();

int getRockDBCacheSize();

/**
* decision mode enum
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package com.alipay.sofa.registry.server.meta.bootstrap;

import java.io.File;

import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.springframework.boot.context.properties.ConfigurationProperties;

import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import java.io.File;

/**
*
Expand Down Expand Up @@ -94,6 +93,8 @@ public class MetaServerConfigBean implements MetaServerConfig {
+ File.separator
+ "raftData";

private int rockDBCacheSize = 64; //64M

@Override
public int getSessionServerPort() {
return sessionServerPort;
Expand Down Expand Up @@ -603,6 +604,24 @@ public void setEnableMetrics(boolean enableMetrics) {
this.enableMetrics = enableMetrics;
}

/**
* Getter method for property <tt>RockDBCacheSize</tt>.
*
* @return property value of RockDBCacheSize
*/
public int getRockDBCacheSize() {
return rockDBCacheSize;
}

/**
* Setter method for property <tt>RockDBCacheSize</tt>.
*
* @param rockDBCacheSize value to be assigned to property RockDBCacheSize
*/
public void setRockDBCacheSize(int rockDBCacheSize) {
this.rockDBCacheSize = rockDBCacheSize;
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@
*/
package com.alipay.sofa.registry.server.meta.remoting;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;

import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
Expand All @@ -47,6 +38,14 @@
import com.alipay.sofa.registry.server.meta.bootstrap.NodeConfig;
import com.alipay.sofa.registry.server.meta.executor.ExecutorManager;
import com.alipay.sofa.registry.server.meta.registry.Registry;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
*
Expand Down Expand Up @@ -141,6 +140,9 @@ public void stopProcess(PeerId leader) {
RaftServerConfig raftServerConfig = new RaftServerConfig();
raftServerConfig.setMetricsLogger(METRICS_LOGGER);
raftServerConfig.setEnableMetrics(metaServerConfig.isEnableMetrics());
if (metaServerConfig.getRockDBCacheSize() > 0) {
raftServerConfig.setRockDBCacheSize(metaServerConfig.getRockDBCacheSize());
}

raftServer.start(raftServerConfig);
}
Expand Down Expand Up @@ -384,4 +386,13 @@ public AtomicBoolean getServerStart() {
public AtomicBoolean getClsStart() {
return clsStart;
}

/**
* Getter method for property <tt>raftServer</tt>.
*
* @return property value of raftServer
*/
public RaftServer getRaftServer() {
return raftServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
*/
package com.alipay.sofa.registry.server.meta.resource;

import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.jraft.bootstrap.ServiceStateMachine;
import com.alipay.sofa.registry.metrics.ReporterUtils;
import com.alipay.sofa.registry.server.meta.bootstrap.MetaServerBootstrap;
import com.alipay.sofa.registry.server.meta.remoting.RaftExchanger;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PostConstruct;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand All @@ -25,16 +34,6 @@
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;

import org.springframework.beans.factory.annotation.Autowired;

import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.jraft.bootstrap.ServiceStateMachine;
import com.alipay.sofa.registry.metrics.ReporterUtils;
import com.alipay.sofa.registry.server.meta.bootstrap.MetaServerBootstrap;
import com.alipay.sofa.registry.server.meta.remoting.RaftExchanger;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;

/**
*
* @author shangyu.wh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,6 @@
*/
package com.alipay.sofa.registry.server.session.bootstrap;

import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Resource;
import javax.ws.rs.Path;
import javax.ws.rs.ext.Provider;

import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;

import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.metaserver.FetchProvideDataRequest;
Expand All @@ -51,11 +36,25 @@
import com.alipay.sofa.registry.server.session.node.NodeManagerFactory;
import com.alipay.sofa.registry.server.session.node.RaftClientManager;
import com.alipay.sofa.registry.server.session.node.SessionProcessIdGenerator;
import com.alipay.sofa.registry.server.session.provideData.ProvideDataProcessor;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.session.remoting.handler.AbstractClientHandler;
import com.alipay.sofa.registry.server.session.remoting.handler.AbstractServerHandler;
import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager;
import com.alipay.sofa.registry.task.batcher.TaskDispatchers;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import javax.ws.rs.Path;
import javax.ws.rs.ext.Provider;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The type Session server bootstrap.
Expand Down Expand Up @@ -109,6 +108,9 @@ public class SessionServerBootstrap {
@Autowired
private Registry sessionRegistry;

@Autowired
private ProvideDataProcessor provideDataProcessorManager;

private Server server;

private Server httpServer;
Expand Down Expand Up @@ -309,20 +311,7 @@ private void fetchStopPushSwitch(URL leaderUrl) {
Object ret = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
if (ret instanceof ProvideData) {
ProvideData provideData = (ProvideData) ret;
if (provideData.getProvideData() == null
|| provideData.getProvideData().getObject() == null) {
LOGGER.info("Fetch session stop push switch no data existed,config not change!");
return;
}
String data = (String) provideData.getProvideData().getObject();
sessionServerConfig.setStopPushSwitch(Boolean.valueOf(data));
if (data != null) {
if (!Boolean.valueOf(data)) {
//stop push init on,then begin fetch data schedule task
sessionServerConfig.setBeginDataFetchTask(true);
}
}
LOGGER.info("Fetch session stop push data switch {} success!", data);
provideDataProcessorManager.fetchDataProcess(provideData);
} else {
LOGGER.info("Fetch session stop push switch data null,config not change!");
}
Expand All @@ -334,16 +323,7 @@ private void fetchEnableDataRenewSnapshot(URL leaderUrl) {
Object data = sendMetaRequest(fetchProvideDataRequest, leaderUrl);
if (data instanceof ProvideData) {
ProvideData provideData = (ProvideData) data;
if (provideData == null || provideData.getProvideData() == null
|| provideData.getProvideData().getObject() == null) {
LOGGER
.info("Fetch enableDataRenewSnapshot but no data existed, current config not change!");
return;
}
boolean enableDataRenewSnapshot = Boolean.parseBoolean((String) provideData
.getProvideData().getObject());
LOGGER.info("Fetch enableDataRenewSnapshot {} success!", enableDataRenewSnapshot);
this.sessionRegistry.setEnableDataRenewSnapshot(enableDataRenewSnapshot);
provideDataProcessorManager.fetchDataProcess(provideData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
import com.alipay.sofa.registry.server.session.node.service.DataNodeServiceImpl;
import com.alipay.sofa.registry.server.session.node.service.MetaNodeService;
import com.alipay.sofa.registry.server.session.node.service.MetaNodeServiceImpl;
import com.alipay.sofa.registry.server.session.provideData.ProvideDataProcessor;
import com.alipay.sofa.registry.server.session.provideData.ProvideDataProcessorManager;
import com.alipay.sofa.registry.server.session.provideData.processor.BlackListProvideDataProcessor;
import com.alipay.sofa.registry.server.session.provideData.processor.RenewSnapshotProvideDataProcessor;
import com.alipay.sofa.registry.server.session.provideData.processor.StopPushProvideDataProcessor;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.session.registry.SessionRegistry;
import com.alipay.sofa.registry.server.session.remoting.ClientNodeExchanger;
Expand Down Expand Up @@ -739,4 +744,37 @@ public RenewService renewService() {
return new DefaultRenewService();
}
}

@Configuration
public static class SessionProvideDataConfiguration {

@Bean
public ProvideDataProcessor provideDataProcessorManager() {
return new ProvideDataProcessorManager();
}

@Bean
public ProvideDataProcessor blackListProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
ProvideDataProcessor blackListProvideDataProcessor = new BlackListProvideDataProcessor();
((ProvideDataProcessorManager) provideDataProcessorManager)
.addProvideDataProcessor(blackListProvideDataProcessor);
return blackListProvideDataProcessor;
}

@Bean
public ProvideDataProcessor renewSnapshotProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
ProvideDataProcessor renewSnapshotProvideDataProcessor = new RenewSnapshotProvideDataProcessor();
((ProvideDataProcessorManager) provideDataProcessorManager)
.addProvideDataProcessor(renewSnapshotProvideDataProcessor);
return renewSnapshotProvideDataProcessor;
}

@Bean
public ProvideDataProcessor stopPushProvideDataProcessor(ProvideDataProcessor provideDataProcessorManager) {
ProvideDataProcessor stopPushProvideDataProcessor = new StopPushProvideDataProcessor();
((ProvideDataProcessorManager) provideDataProcessorManager)
.addProvideDataProcessor(stopPushProvideDataProcessor);
return stopPushProvideDataProcessor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.filter.blacklist.BlacklistManager;
import com.alipay.sofa.registry.server.session.node.service.MetaNodeService;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.session.provideData.ProvideDataProcessor;
import com.alipay.sofa.registry.server.session.scheduler.task.ProvideDataChangeFetchTask;
import com.alipay.sofa.registry.server.session.scheduler.task.SessionTask;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.server.session.store.Watchers;
import com.alipay.sofa.registry.task.batcher.TaskDispatcher;
import com.alipay.sofa.registry.task.batcher.TaskDispatchers;
Expand Down Expand Up @@ -59,17 +57,11 @@ public class ProvideDataChangeFetchTaskListener implements TaskListener {
@Autowired
private Exchange boltExchange;

@Autowired
private Interests sessionInterests;

@Autowired
private Watchers sessionWatchers;

@Autowired
private Registry sessionRegistry;

@Autowired
private BlacklistManager blacklistManager;
private ProvideDataProcessor provideDataProcessorManager;

private TaskDispatcher<String, SessionTask> singleTaskDispatcher;

Expand Down Expand Up @@ -97,7 +89,7 @@ public void handleEvent(TaskEvent event) {

SessionTask provideDataChangeFetchTask = new ProvideDataChangeFetchTask(
sessionServerConfig, taskListenerManager, metaNodeService, sessionWatchers,
boltExchange, sessionInterests, sessionRegistry, blacklistManager);
boltExchange, provideDataProcessorManager);

provideDataChangeFetchTask.setTaskEvent(event);

Expand Down
Loading

0 comments on commit 1fbde8a

Please sign in to comment.