Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rest data info id list #15

Merged
merged 10 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class DataServerConfig {

private int clientOffDelayMs;

private int notifyTempDataIntervalMs;

private int rpcTimeout;

private CommonConfig commonConfig;
Expand Down Expand Up @@ -233,6 +235,24 @@ public void setNotifyIntervalMs(int notifyIntervalMs) {
this.notifyIntervalMs = notifyIntervalMs;
}

/**
* Getter method for property <tt>notifyTempDataIntervalMs</tt>.
*
* @return property value of notifyTempDataIntervalMs
*/
public int getNotifyTempDataIntervalMs() {
return notifyTempDataIntervalMs;
}

/**
* Setter method for property <tt>notifyTempDataIntervalMs</tt>.
*
* @param notifyTempDataIntervalMs value to be assigned to property notifyTempDataIntervalMs
*/
public void setNotifyTempDataIntervalMs(int notifyTempDataIntervalMs) {
this.notifyTempDataIntervalMs = notifyTempDataIntervalMs;
}

/**
* Getter method for property <tt>rpcTimeout</tt>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.registry.server.data.change;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
Expand All @@ -31,6 +32,7 @@

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

/**
Expand Down Expand Up @@ -178,11 +180,19 @@ private void notifyTempPub(Datum datum, DataSourceTypeEnum sourceType,
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
long version = datum.getVersion();

Datum existDatum = DatumCache.get(dataCenter, dataInfoId);
if (existDatum != null) {
Map<String, Publisher> cachePubMap = existDatum.getPubMap();
if (cachePubMap != null && !cachePubMap.isEmpty()) {
datum.getPubMap().putAll(cachePubMap);
}
}

LOGGER
.info(
"[DataChangeHandler][{}] datum handle temp pub,datum={},dataCenter={}, dataInfoId={}, version={}, sourceType={}, changeType={},isContainsUnPub={}",
name, datum.hashCode(), dataCenter, dataInfoId, version, sourceType,
changeType, datum.isContainsUnPub());
"[DataChangeHandler][{}] datum handle temp pub,datum={},dataCenter={}, dataInfoId={}, version={}, sourceType={}, changeType={}",
name, datum.hashCode(), dataCenter, dataInfoId, version, sourceType, changeType);

for (IDataChangeNotifier notifier : dataChangeNotifiers) {
if (notifier.getSuitableSource().contains(sourceType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.registry.server.data.change.event;

import com.alipay.sofa.registry.common.model.PublishType;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.log.Logger;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class DataChangeEventQueue {

private final int notifyIntervalMs;

private final int notifyTempDataIntervalMs;

private final ReentrantLock lock = new ReentrantLock();

private DataServerConfig dataServerConfig;
Expand All @@ -92,6 +95,7 @@ public DataChangeEventQueue(int idx, DataServerConfig dataServerConfig) {
eventQueue = new LinkedBlockingDeque<>(queueSize);
}
this.notifyIntervalMs = dataServerConfig.getNotifyIntervalMs();
this.notifyTempDataIntervalMs = dataServerConfig.getNotifyTempDataIntervalMs();
}

/**
Expand Down Expand Up @@ -121,7 +125,9 @@ public ChangeData take() throws InterruptedException {
lock.lock();
try {
Datum datum = changeData.getDatum();
CHANGE_DATA_MAP.get(datum.getDataCenter()).remove(datum.getDataInfoId());
if (changeData.getSourceType() != DataSourceTypeEnum.PUB_TEMP) {
CHANGE_DATA_MAP.get(datum.getDataCenter()).remove(datum.getDataInfoId());
}
return changeData;
} finally {
lock.unlock();
Expand Down Expand Up @@ -174,8 +180,15 @@ public void start() {
DataChangeScopeEnum scope = event.getScope();
if (scope == DataChangeScopeEnum.DATUM) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
handleDatum(dataChangeEvent.getChangeType(),
dataChangeEvent.getSourceType(), dataChangeEvent.getDatum());
//Temporary push data will be notify as soon as,and not merge to normal pub data;
if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP){
addTempChangeData(dataChangeEvent.getDatum(),dataChangeEvent.getChangeType(),
dataChangeEvent.getSourceType());
}
else {
handleDatum(dataChangeEvent.getChangeType(),
dataChangeEvent.getSourceType(), dataChangeEvent.getDatum());
}
} else if (scope == DataChangeScopeEnum.CLIENT) {
handleHost((ClientChangeEvent) event);
}
Expand Down Expand Up @@ -255,4 +268,12 @@ private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourc
lock.unlock();
}
}

private void addTempChangeData(Datum targetDatum, DataChangeTypeEnum changeType,
DataSourceTypeEnum sourceType) {

ChangeData tempChangeData = new ChangeData(targetDatum, this.notifyTempDataIntervalMs,
sourceType, changeType);
CHANGE_QUEUE.put(tempChangeData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ data.server.httpServerPort=9622
data.server.queueCount=4
data.server.queueSize=10240
data.server.notifyIntervalMs=500
data.server.notifyTempDataIntervalMs=10
data.server.rpcTimeout=3000
data.server.metaServerPort=9611
data.server.storeNodes=3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ public Map<String, Object> getPushSwitch() {
return resultMap;
}

@GET
@Path("getDataInfoIdList")
@Produces(MediaType.APPLICATION_JSON)
public Collection<String> getDataInfoIdList() {
Collection<String> ret = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里建议使用HashSet去重

ret.addAll(sessionInterests.getInterestDataInfoIds());
ret.addAll(sessionDataStore.getStoreDataInfoIds());
return sessionInterests.getInterestDataInfoIds();
}

@GET
@Path("checkSumDataInfoIdList")
@Produces(MediaType.APPLICATION_JSON)
public int checkSumDataInfoIdList() {
return sessionInterests.getInterestDataInfoIds().hashCode()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以基于上面的方法,返回 getDataInfoIdList().hashcode()即可

+ sessionDataStore.getStoreDataInfoIds().hashCode();
}

private void fillServerList(String type,
Map<String, Collection<? extends StoreData>> serverList,
Collection<Publisher> publishers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ private void fireReceiveDataPushTask(Map<String, Datum> datums,
Map<String, Subscriber> subscriberMap,
PushTaskClosure pushTaskClosure) {
Collection<Subscriber> subscribers = new ArrayList<>(subscriberMap.values());
LOGGER.info("Datums push={}", datums);
ReceivedData receivedData = ReceivedDataConverter.getReceivedDataMulti(datums, scopeEnum,
subscriberRegisterIdList, subscriber);

Expand All @@ -248,8 +249,8 @@ private void fireReceiveDataPushTask(Map<String, Datum> datums,
TaskEvent taskEvent = new TaskEvent(parameter, TaskType.RECEIVED_DATA_MULTI_PUSH_TASK);
taskEvent.setTaskClosure(pushTaskClosure);
taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
taskLogger.info("send {} taskURL:{},taskScope:{}", taskEvent.getTaskType(),
subscriber.getSourceAddress(), scopeEnum);
taskLogger.info("send {} taskURL:{},taskScope:{},version:{}", taskEvent.getTaskType(),
subscriber.getSourceAddress(), scopeEnum, receivedData.getVersion());
taskListenerManager.sendTaskEvent(taskEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ public void execute() {

String dataInfoId = datum.getDataInfoId();

PushTaskClosure pushTaskClosure = getTaskClosure();

for (ScopeEnum scopeEnum : ScopeEnum.values()) {
Map<InetSocketAddress, Map<String, Subscriber>> map = getCache(scopeEnum,
dataInfoId);
Expand All @@ -113,29 +111,29 @@ public void execute() {
if (ifLocalDataCenter) {
if (isOldVersion) {
fireUserDataElementPushTask(entry.getKey(), datum,
subscribersSend, pushTaskClosure);
subscribersSend);
} else {
fireReceivedDataMultiPushTask(datum,
subscriberRegisterIdList, ScopeEnum.zone,
subscriber, pushTaskClosure, subscriberMap);
subscriber, subscriberMap);
}
}
break;
case dataCenter:
if (ifLocalDataCenter) {
if (isOldVersion) {
fireUserDataElementMultiPushTask(entry.getKey(), datum,
subscribersSend, pushTaskClosure);
subscribersSend);
} else {
fireReceivedDataMultiPushTask(datum,
subscriberRegisterIdList, scopeEnum, subscriber,
pushTaskClosure, subscriberMap);
subscriberMap);
}
}
break;
case global:
fireReceivedDataMultiPushTask(datum, subscriberRegisterIdList,
scopeEnum, subscriber, pushTaskClosure, subscriberMap);
scopeEnum, subscriber, subscriberMap);
break;
default:
LOGGER.warn("unknown scope, {}", subscriber);
Expand All @@ -144,38 +142,11 @@ public void execute() {
}
}
}
pushTaskClosure.start();
}
}

public PushTaskClosure getTaskClosure() {
PushTaskClosure pushTaskClosure = new PushTaskClosure(executorManager.getPushTaskClosureExecutor());
pushTaskClosure.setTaskClosure((status, task) -> {
if (status == ProcessingResult.Success) {
Datum datum = dataPushRequest.getDatum();
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
Long version = datum.getVersion();

if (sessionServerConfig.isStopPushSwitch()) {
LOGGER.info("Stop Push switch on,dataCenter {} dataInfoId {} version {} can not be update!",
dataCenter, dataInfoId, version);
return;
}

LOGGER.info("Push all temp data tasks success,dataCenter:{} dataInfoId:{} version:{} update!",
dataCenter, dataInfoId,
version);
} else {
LOGGER.warn("Push temp data tasks found error,subscribers version can not be update!");
}
});
return pushTaskClosure;
}

private void fireReceivedDataMultiPushTask(Datum datum, List<String> subscriberRegisterIdList,
ScopeEnum scopeEnum, Subscriber subscriber,
PushTaskClosure pushTaskClosure, Map<String, Subscriber> subscriberMap) {
ScopeEnum scopeEnum, Subscriber subscriber, Map<String, Subscriber> subscriberMap) {
Collection<Subscriber> subscribers = new ArrayList<>(subscriberMap.values());
String dataId = datum.getDataId();
Predicate<String> zonePredicate = (zone) -> {
Expand All @@ -194,17 +165,17 @@ private void fireReceivedDataMultiPushTask(Datum datum, List<String> subscriberR
}
return false;
};
LOGGER.info("Datum push={}",datum);
ReceivedData receivedData = ReceivedDataConverter.getReceivedDataMulti(datum, scopeEnum,
subscriberRegisterIdList, sessionServerConfig.getSessionServerRegion(), zonePredicate);

//trigger push to client node
Map<ReceivedData, URL> parameter = new HashMap<>();
parameter.put(receivedData, subscriber.getSourceAddress());
TaskEvent taskEvent = new TaskEvent(parameter, TaskType.RECEIVED_DATA_MULTI_PUSH_TASK);
taskEvent.setTaskClosure(pushTaskClosure);
taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
taskLogger.info("send {} taskURL:{},taskScope:{}", taskEvent.getTaskType(), subscriber.getSourceAddress(),
scopeEnum);
taskLogger.info("send {} taskURL:{},taskScope:{},version:{}", taskEvent.getTaskType(), subscriber.getSourceAddress(),
scopeEnum,receivedData.getVersion());
taskListenerManager.sendTaskEvent(taskEvent);
}

Expand All @@ -214,11 +185,9 @@ private Map<InetSocketAddress, Map<String, Subscriber>> getCache(ScopeEnum scope
}

private void fireUserDataElementPushTask(InetSocketAddress address, Datum datum,
Collection<Subscriber> subscribers,
PushTaskClosure pushTaskClosure) {
Collection<Subscriber> subscribers) {

TaskEvent taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_PUSH_TASK);
taskEvent.setTaskClosure(pushTaskClosure);

taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum);
Expand All @@ -233,11 +202,9 @@ private void fireUserDataElementPushTask(InetSocketAddress address, Datum datum,
}

private void fireUserDataElementMultiPushTask(InetSocketAddress address, Datum datum,
Collection<Subscriber> subscribers,
PushTaskClosure pushTaskClosure) {
Collection<Subscriber> subscribers) {

TaskEvent taskEvent = new TaskEvent(TaskType.USER_DATA_ELEMENT_MULTI_PUSH_TASK);
taskEvent.setTaskClosure(pushTaskClosure);

taskEvent.setAttribute(Constant.PUSH_CLIENT_SUBSCRIBERS, subscribers);
taskEvent.setAttribute(Constant.PUSH_CLIENT_DATUM, datum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,26 @@
*/
public interface DataStore extends DataManager<Publisher, String, String> {

/**
* get all publishers by dataInfoId
* @param dataInfoId
* @return
*/
Collection<Publisher> getStoreDataByDataInfoId(String dataInfoId);

/***
* get Publiser by registerId and dataInfoId
* @param registerId
* @param dataInfoId
* @return
*/
Publisher queryById(String registerId, String dataInfoId);

/**
* get all publisher dataInfoIds
*
* @return
*/
Collection<String> getStoreDataInfoIds();

}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ public Publisher queryById(String registerId, String dataInfoId) {
return publishers.get(registerId);
}

@Override
public Collection<String> getStoreDataInfoIds() {
return registry.keySet();
}

@Override
public long count() {
AtomicLong count = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private static void initRegistryClientAndChannel() {
public static class MySubscriberDataObserver implements SubscriberDataObserver {
@Override
public void handleData(String dataId, UserData data) {

BaseIntegrationTest.dataId = dataId;
BaseIntegrationTest.userData = data;
}
Expand Down
Loading