From 7987c48b5f54efdd792a88eb745d5c2346d61b53 Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Tue, 5 Nov 2019 11:28:00 +0800 Subject: [PATCH 1/4] Add distributed lock center module --- sharding-orchestration/pom.xml | 1 + .../pom.xml | 37 +++ .../pom.xml | 39 +++ .../lock/api/DistributedLockCenter.java | 78 ++++++ .../api/DistributedLockConfiguration.java | 77 ++++++ .../exception/DistributedLockException.java | 36 +++ .../pom.xml | 53 +++++ ...CuratorZookeeperDistributedLockCenter.java | 223 ++++++++++++++++++ .../CuratorZookeeperExceptionHandler.java | 59 +++++ ...distributed.lock.api.DistributedLockCenter | 18 ++ ...torZookeeperDistributedLockCenterTest.java | 80 +++++++ .../curator/util/EmbedTestingServer.java | 67 ++++++ 12 files changed, 768 insertions(+) create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockCenter.java create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/pom.xml create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java create mode 100644 sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/util/EmbedTestingServer.java diff --git a/sharding-orchestration/pom.xml b/sharding-orchestration/pom.xml index 72cb520510550..7bed9e04474f3 100644 --- a/sharding-orchestration/pom.xml +++ b/sharding-orchestration/pom.xml @@ -32,5 +32,6 @@ sharding-orchestration-reg sharding-orchestration-zookeeper-curator-integration-test sharding-orchestration-config + sharding-orchestration-distributed-lock diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml new file mode 100644 index 0000000000000..24ff606e40dca --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + sharding-orchestration + org.apache.shardingsphere + 4.0.0-RC3-SNAPSHOT + + sharding-orchestration-distributed-lock + ${project.artifactId} + pom + + + sharding-orchestration-distributed-lock-api + sharding-orchestration-distributed-lock-zookeeper-curator + + + + \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml new file mode 100644 index 0000000000000..da27e9a1b21e2 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + sharding-orchestration-distributed-lock + org.apache.shardingsphere + 4.0.0-RC3-SNAPSHOT + + + sharding-orchestration-distributed-lock-api + ${project.artifactId} + + + + org.apache.shardingsphere + sharding-core-api + ${project.version} + + + + \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockCenter.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockCenter.java new file mode 100644 index 0000000000000..6a393478f4832 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockCenter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.api; + +import org.apache.shardingsphere.spi.TypeBasedSPI; + +/** + * Distributed Lock center. + * + * @author wangguangyuan + */ +public interface DistributedLockCenter extends TypeBasedSPI { + + /** + * Initialize distributed lock center. + * + * @param config distributed lock center configuration + */ + void init(DistributedLockConfiguration config); + + /** + * Get data from distributed lock center. + * + *

Maybe use cache if existed.

+ * + * @param key key of data + * @return value of data + */ + String get(String key); + + /** + * Persist data. + * + * @param key key of data + * @param value value of data + */ + void persist(String key, String value); + + /** + * Close. + */ + void close(); + + /** + * Initialize the lock of the key. + * + * @param key key of data + */ + void initLock(String key); + + /** + * Try to get the lock of the key. + * + * @return get the lock or not + */ + boolean tryLock(); + + /** + * Try to release the lock of the key. + * + */ + void tryRelease(); +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java new file mode 100644 index 0000000000000..bd54ba5b5831c --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.api; + +import lombok.Getter; +import lombok.Setter; +import org.apache.shardingsphere.api.config.TypeBasedSPIConfiguration; + +import java.util.Properties; + +/** + * Distributed Lock configuration. + * + * @author wangguangyuan + */ +@Getter +@Setter +public class DistributedLockConfiguration extends TypeBasedSPIConfiguration { + + /** + * Server list of distributed lock center. + */ + private String serverLists; + + /** + * Namespace of distributed lock center. + */ + private String namespace; + + /** + * Digest of distributed lock center. + */ + private String digest; + + /** + * Operation timeout time in milliseconds. + */ + private int operationTimeoutMilliseconds = 500; + + /** + * Max number of times to retry. + */ + private int maxRetries = 3; + + /** + * Time interval in milliseconds on each retry. + */ + private int retryIntervalMilliseconds = 500; + + /** + * Time to live in seconds of ephemeral keys. + */ + private int timeToLiveSeconds = 60; + + public DistributedLockConfiguration(final String type) { + super(type); + } + + public DistributedLockConfiguration(final String type, final Properties properties) { + super(type, properties); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java new file mode 100644 index 0000000000000..df2e43c06197e --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.exception; + +/** + * Distributed Lock exception. + * + * @author wangguangyuan + */ +public class DistributedLockException extends RuntimeException { + + private static final long serialVersionUID = -6417179023552012152L; + + public DistributedLockException(final String errorMessage, final Object... args) { + super(String.format(errorMessage, args)); + } + + public DistributedLockException(final Exception cause) { + super(cause); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/pom.xml new file mode 100644 index 0000000000000..17402f8b5ef3d --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/pom.xml @@ -0,0 +1,53 @@ + + + + 4.0.0 + + sharding-orchestration-distributed-lock + org.apache.shardingsphere + 4.0.0-RC3-SNAPSHOT + + sharding-orchestration-distributed-lock-zookeeper-curator + ${project.artifactId} + + + + org.apache.shardingsphere + sharding-orchestration-distributed-lock-api + ${project.version} + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-test + + + diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java new file mode 100644 index 0000000000000..aa8e7d1d43a4e --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator; + +import com.google.common.base.Charsets; +import com.google.common.base.Strings; +import lombok.Getter; +import lombok.Setter; +import lombok.SneakyThrows; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter; +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockConfiguration; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.OperationTimeoutException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * Distributed lock center for zookeeper with curator. + * + * @author wangguangyuan + */ +public final class CuratorZookeeperDistributedLockCenter implements DistributedLockCenter { + + private final Map caches = new HashMap<>(); + + private CuratorFramework client; + + private InterProcessMutex leafLock; + + @Getter + @Setter + private Properties properties = new Properties(); + + @Override + public void init(final DistributedLockConfiguration config) { + client = buildCuratorClient(config); + initCuratorClient(config); + } + + private CuratorFramework buildCuratorClient(final DistributedLockConfiguration config) { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(config.getServerLists()) + .retryPolicy(new ExponentialBackoffRetry(config.getRetryIntervalMilliseconds(), config.getMaxRetries(), config.getRetryIntervalMilliseconds() * config.getMaxRetries())) + .namespace(config.getNamespace()); + if (0 != config.getTimeToLiveSeconds()) { + builder.sessionTimeoutMs(config.getTimeToLiveSeconds() * 1000); + } + if (0 != config.getOperationTimeoutMilliseconds()) { + builder.connectionTimeoutMs(config.getOperationTimeoutMilliseconds()); + } + if (!Strings.isNullOrEmpty(config.getDigest())) { + builder.authorization("digest", config.getDigest().getBytes(Charsets.UTF_8)) + .aclProvider(new ACLProvider() { + + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + return builder.build(); + } + + private void initCuratorClient(final DistributedLockConfiguration config) { + client.start(); + try { + if (!client.blockUntilConnected(config.getRetryIntervalMilliseconds() * config.getMaxRetries(), TimeUnit.MILLISECONDS)) { + client.close(); + throw new OperationTimeoutException(); + } + } catch (final InterruptedException | OperationTimeoutException ex) { + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + @Override + public String get(final String key) { + TreeCache cache = findTreeCache(key); + if (null == cache) { + return getDirectly(key); + } + ChildData resultInCache = cache.getCurrentData(key); + if (null != resultInCache) { + return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8); + } + return getDirectly(key); + } + + private TreeCache findTreeCache(final String key) { + for (Entry entry : caches.entrySet()) { + if (key.startsWith(entry.getKey())) { + return entry.getValue(); + } + } + return null; + } + + @Override + public void persist(final String key, final String value) { + try { + if (!isExisted(key)) { + client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8)); + } else { + update(key, value); + } + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + private void update(final String key, final String value) { + try { + client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + private String getDirectly(final String key) { + try { + return new String(client.getData().forPath(key), Charsets.UTF_8); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + return null; + } + } + + private boolean isExisted(final String key) { + try { + return null != client.checkExists().forPath(key); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + return false; + } + } + + @Override + public void close() { + for (Entry each : caches.entrySet()) { + each.getValue().close(); + } + waitForCacheClose(); + CloseableUtils.closeQuietly(client); + } + + /* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常 + * 因为异步处理, 可能会导致client先关闭而cache还未关闭结束. + * 等待Curator新版本解决这个bug. + * BUG地址:https://issues.apache.org/jira/browse/CURATOR-157 + */ + private void waitForCacheClose() { + try { + Thread.sleep(500L); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public String getType() { + return "zookeeper"; + } + + @Override + public void initLock(final String key) { + leafLock = new InterProcessMutex(client, key); + } + + @Override + @SneakyThrows + public boolean tryLock() { + return leafLock.acquire(5, TimeUnit.SECONDS); + } + + @Override + @SneakyThrows + public void tryRelease() { + leafLock.release(); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java new file mode 100644 index 0000000000000..dce5432dc92d7 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.orchestration.distributed.lock.exception.DistributedLockException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; + +/** + * Curator zookeeper exception handler. + * + * @author wangguangyuan + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public final class CuratorZookeeperExceptionHandler { + /** + * Handle exception. + * + *

Ignore interrupt and connection invalid exception.

+ * + * @param cause to be handled exception + */ + public static void handleException(final Exception cause) { + if (null == cause) { + return; + } + if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) { + log.debug("Ignored exception for: {}", cause.getMessage()); + } else if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } else { + throw new DistributedLockException(cause); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException; + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter new file mode 100644 index 0000000000000..2ac4e65473e88 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.CuratorZookeeperDistributedLockCenter diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java new file mode 100644 index 0000000000000..67e66bc62209d --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.test; + +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockCenter; +import org.apache.shardingsphere.orchestration.distributed.lock.api.DistributedLockConfiguration; +import org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.CuratorZookeeperDistributedLockCenter; +import org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.util.EmbedTestingServer; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Properties; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class CuratorZookeeperDistributedLockCenterTest { + + private static DistributedLockCenter curatorZookeeperRegistryCenter = new CuratorZookeeperDistributedLockCenter(); + + @BeforeClass + public static void init() { + EmbedTestingServer.start(); + DistributedLockConfiguration configuration = new DistributedLockConfiguration(curatorZookeeperRegistryCenter.getType(), new Properties()); + configuration.setServerLists("127.0.0.1:3181"); + curatorZookeeperRegistryCenter.init(configuration); + } + + @Test + public void assertPersist() { + curatorZookeeperRegistryCenter.persist("/test", "value1"); + assertThat(curatorZookeeperRegistryCenter.get("/test"), is("value1")); + } + + @Test + public void assertUpdate() { + curatorZookeeperRegistryCenter.persist("/test", "value2"); + assertThat(curatorZookeeperRegistryCenter.get("/test"), is("value2")); + } + + @Test + public void assertPersistEphemeral() { + curatorZookeeperRegistryCenter.persist("/test/ephemeral", "value3"); + assertThat(curatorZookeeperRegistryCenter.get("/test/ephemeral"), is("value3")); + } + + @Test + public void assertLock() { + curatorZookeeperRegistryCenter.initLock("/test/lock1"); + assertThat(curatorZookeeperRegistryCenter.tryLock(), is(true)); + } + + @Test + public void assertRelease() { + curatorZookeeperRegistryCenter.initLock("/test/lock2"); + curatorZookeeperRegistryCenter.tryLock(); + curatorZookeeperRegistryCenter.tryRelease(); + } + + @Test(expected = IllegalMonitorStateException.class) + public void assertReleaseWithoutLock() { + curatorZookeeperRegistryCenter.initLock("/test/lock3"); + curatorZookeeperRegistryCenter.tryRelease(); + } +} diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/util/EmbedTestingServer.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/util/EmbedTestingServer.java new file mode 100644 index 0000000000000..50cb185988e26 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/util/EmbedTestingServer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.distributed.lock.zookeeper.curator.util; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.KeeperException; + +import java.io.File; +import java.io.IOException; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class EmbedTestingServer { + + private static final int PORT = 3181; + + private static volatile TestingServer testingServer; + + /** + * Start embed zookeeper server. + */ + public static void start() { + if (null != testingServer) { + return; + } + try { + testingServer = new TestingServer(PORT, new File(String.format("target/test_zk_data/%s/", System.nanoTime()))); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + if (!isIgnoredException(ex)) { + throw new RuntimeException(ex); + } + } finally { + Runtime.getRuntime().addShutdownHook(new Thread() { + + @Override + public void run() { + try { + testingServer.close(); + } catch (final IOException ignored) { + } + } + }); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof KeeperException.ConnectionLossException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException; + } +} From 5720519377f195243daaac48725bea4d5004717a Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Tue, 5 Nov 2019 14:43:41 +0800 Subject: [PATCH 2/4] In distributed lock center module, modify code style. --- .../sharding-orchestration-distributed-lock/pom.xml | 4 +--- .../sharding-orchestration-distributed-lock-api/pom.xml | 3 +-- .../lock/api/DistributedLockConfiguration.java | 2 +- .../lock/exception/DistributedLockException.java | 2 +- .../curator/CuratorZookeeperDistributedLockCenter.java | 9 +++++---- .../curator/CuratorZookeeperExceptionHandler.java | 1 + .../test/CuratorZookeeperDistributedLockCenterTest.java | 2 +- 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml index 24ff606e40dca..b995bb89a7f7d 100644 --- a/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/pom.xml @@ -32,6 +32,4 @@ sharding-orchestration-distributed-lock-api sharding-orchestration-distributed-lock-zookeeper-curator - - - \ No newline at end of file + diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml index da27e9a1b21e2..af6fc4d5509ea 100644 --- a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/pom.xml @@ -35,5 +35,4 @@ ${project.version} - - \ No newline at end of file + diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java index bd54ba5b5831c..67d0ad401e7f7 100644 --- a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/api/DistributedLockConfiguration.java @@ -30,7 +30,7 @@ */ @Getter @Setter -public class DistributedLockConfiguration extends TypeBasedSPIConfiguration { +public final class DistributedLockConfiguration extends TypeBasedSPIConfiguration { /** * Server list of distributed lock center. diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java index df2e43c06197e..43abf94ee9dac 100644 --- a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-api/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/exception/DistributedLockException.java @@ -22,7 +22,7 @@ * * @author wangguangyuan */ -public class DistributedLockException extends RuntimeException { +public final class DistributedLockException extends RuntimeException { private static final long serialVersionUID = -6417179023552012152L; diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java index aa8e7d1d43a4e..04bf9fbaa1906 100644 --- a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperDistributedLockCenter.java @@ -186,10 +186,11 @@ public void close() { CloseableUtils.closeQuietly(client); } - /* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常 - * 因为异步处理, 可能会导致client先关闭而cache还未关闭结束. - * 等待Curator新版本解决这个bug. - * BUG地址:https://issues.apache.org/jira/browse/CURATOR-157 + /* TODO wait 500ms, close cache before close client, or will throw exception + * Because of asynchronous processing, may cause client to close + * first and cache has not yet closed the end. + * Wait for new version of Curator to fix this. + * BUG address:https://issues.apache.org/jira/browse/CURATOR-157 */ private void waitForCacheClose() { try { diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java index dce5432dc92d7..64d173d6388b3 100644 --- a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/CuratorZookeeperExceptionHandler.java @@ -33,6 +33,7 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) @Slf4j public final class CuratorZookeeperExceptionHandler { + /** * Handle exception. * diff --git a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java index 67e66bc62209d..749ed4caacfbd 100644 --- a/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java +++ b/sharding-orchestration/sharding-orchestration-distributed-lock/sharding-orchestration-distributed-lock-zookeeper-curator/src/test/java/org/apache/shardingsphere/orchestration/distributed/lock/zookeeper/curator/test/CuratorZookeeperDistributedLockCenterTest.java @@ -29,7 +29,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; -public class CuratorZookeeperDistributedLockCenterTest { +public final class CuratorZookeeperDistributedLockCenterTest { private static DistributedLockCenter curatorZookeeperRegistryCenter = new CuratorZookeeperDistributedLockCenter(); From dcd23388ad6076832f5f3fdfdf1aaf270981405c Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Thu, 7 Nov 2019 12:24:45 +0800 Subject: [PATCH 3/4] Add sharding-orchestration-center. --- sharding-orchestration/pom.xml | 1 + .../sharding-orchestration-center/pom.xml | 59 ++++ .../center/api/ConfigCenter.java | 81 +++++ .../center/api/DistributedLockManagement.java | 82 +++++ .../center/api/RegistryCenter.java | 90 +++++ .../OrchestrationConfiguration.java | 80 +++++ .../exception/OrchestrationException.java | 39 +++ .../instance/CuratorZookeeperInstance.java | 317 ++++++++++++++++++ .../CuratorZookeeperExceptionHandler.java | 63 ++++ .../center/listener/DataChangedEvent.java | 48 +++ .../listener/DataChangedEventListener.java | 36 ++ ...here.orchestration.center.api.ConfigCenter | 18 + ...ation.center.api.DistributedLockManagement | 18 + ...re.orchestration.center.api.RegistryCenter | 18 + .../CuratorZookeeperInstanceTest.java | 89 +++++ .../center/util/EmbedTestingServer.java | 67 ++++ 16 files changed, 1106 insertions(+) create mode 100644 sharding-orchestration/sharding-orchestration-center/pom.xml create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/ConfigCenter.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/DistributedLockManagement.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/RegistryCenter.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/configuration/OrchestrationConfiguration.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/exception/OrchestrationException.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstance.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/handler/CuratorZookeeperExceptionHandler.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEvent.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEventListener.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement create mode 100644 sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter create mode 100644 sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstanceTest.java create mode 100644 sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/util/EmbedTestingServer.java diff --git a/sharding-orchestration/pom.xml b/sharding-orchestration/pom.xml index 7bed9e04474f3..27efe7737ecec 100644 --- a/sharding-orchestration/pom.xml +++ b/sharding-orchestration/pom.xml @@ -33,5 +33,6 @@ sharding-orchestration-zookeeper-curator-integration-test sharding-orchestration-config sharding-orchestration-distributed-lock + sharding-orchestration-center diff --git a/sharding-orchestration/sharding-orchestration-center/pom.xml b/sharding-orchestration/sharding-orchestration-center/pom.xml new file mode 100644 index 0000000000000..257b429cd59cd --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/pom.xml @@ -0,0 +1,59 @@ + + + + 4.0.0 + + sharding-orchestration + org.apache.shardingsphere + 4.0.0-RC3-SNAPSHOT + + + sharding-orchestration-center + ${project.artifactId} + + + + org.apache.shardingsphere + sharding-core-api + ${project.version} + + + org.apache.shardingsphere + sharding-orchestration-reg-api + ${project.version} + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-test + + + diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/ConfigCenter.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/ConfigCenter.java new file mode 100644 index 0000000000000..79f385f94585f --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/ConfigCenter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.api; + +import org.apache.shardingsphere.orchestration.center.configuration.OrchestrationConfiguration; +import org.apache.shardingsphere.orchestration.center.listener.DataChangedEventListener; +import org.apache.shardingsphere.spi.TypeBasedSPI; + +import java.util.List; + +/** + * Config center. + * + * @author zhangliang + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +public interface ConfigCenter extends TypeBasedSPI { + + /** + * Initialize config center. + * + * @param config config center configuration + */ + void init(OrchestrationConfiguration config); + + /** + * Get data from config center. + * + *

Maybe use cache if existed.

+ * + * @param key key of data + * @return value of data + */ + String get(String key); + + /** + * Get node's sub-nodes list. + * + * @param key key of data + * @return sub-nodes name list + */ + List getChildrenKeys(String key); + + /** + * Persist data. + * + * @param key key of data + * @param value value of data + */ + void persist(String key, String value); + + /** + * Watch key or path of the config server. + * + * @param key key of data + * @param dataChangedEventListener data changed event listener + */ + void watch(String key, DataChangedEventListener dataChangedEventListener); + + /** + * Close. + */ + void close(); +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/DistributedLockManagement.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/DistributedLockManagement.java new file mode 100644 index 0000000000000..542c0e6fae07c --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/DistributedLockManagement.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.api; + +import org.apache.shardingsphere.orchestration.center.configuration.OrchestrationConfiguration; +import org.apache.shardingsphere.spi.TypeBasedSPI; + +/** + * Distributed Lock center. + * + * @author zhangliang + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +public interface DistributedLockManagement extends TypeBasedSPI { + + /** + * Initialize distributed lock center. + * + * @param config distributed lock center configuration + */ + void init(OrchestrationConfiguration config); + + /** + * Get data from distributed lock center. + * + *

Maybe use cache if existed.

+ * + * @param key key of data + * @return value of data + */ + String get(String key); + + /** + * Persist data. + * + * @param key key of data + * @param value value of data + */ + void persist(String key, String value); + + /** + * Close. + */ + void close(); + + /** + * Initialize the lock of the key. + * + * @param key key of data + */ + void initLock(String key); + + /** + * Try to get the lock of the key. + * + * @return get the lock or not + */ + boolean tryLock(); + + /** + * Try to release the lock of the key. + * + */ + void tryRelease(); +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/RegistryCenter.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/RegistryCenter.java new file mode 100644 index 0000000000000..8441795ada5a4 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/api/RegistryCenter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.api; + +import org.apache.shardingsphere.orchestration.center.configuration.OrchestrationConfiguration; +import org.apache.shardingsphere.orchestration.center.listener.DataChangedEventListener; +import org.apache.shardingsphere.spi.TypeBasedSPI; + +import java.util.List; + +/** + * Registry center. + * + * @author zhangliang + * @author zhaojun + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +public interface RegistryCenter extends TypeBasedSPI { + + /** + * Initialize registry center. + * + * @param config registry center configuration + */ + void init(OrchestrationConfiguration config); + + /** + * Get data from registry center. + * + *

Maybe use cache if existed.

+ * + * @param key key of data + * @return value of data + */ + String get(String key); + + /** + * Get node's sub-nodes list. + * + * @param key key of data + * @return sub-nodes name list + */ + List getChildrenKeys(String key); + + /** + * Persist data. + * + * @param key key of data + * @param value value of data + */ + void persist(String key, String value); + + /** + * Persist ephemeral data. + * + * @param key key of data + * @param value value of data + */ + void persistEphemeral(String key, String value); + + /** + * Watch key or path of the registry. + * + * @param key key of data + * @param dataChangedEventListener data changed event listener + */ + void watch(String key, DataChangedEventListener dataChangedEventListener); + + /** + * Close. + */ + void close(); +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/configuration/OrchestrationConfiguration.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/configuration/OrchestrationConfiguration.java new file mode 100644 index 0000000000000..898901c11e4c8 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/configuration/OrchestrationConfiguration.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.configuration; + +import lombok.Getter; +import lombok.Setter; +import org.apache.shardingsphere.api.config.TypeBasedSPIConfiguration; + +import java.util.Properties; + +/** + * Config center configuration. + * + * @author zhangliang + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +@Getter +@Setter +public final class OrchestrationConfiguration extends TypeBasedSPIConfiguration { + + /** + * Server list of config center. + */ + private String serverLists; + + /** + * Namespace of config center. + */ + private String namespace; + + /** + * Digest of config center. + */ + private String digest; + + /** + * Operation timeout time in milliseconds. + */ + private int operationTimeoutMilliseconds = 500; + + /** + * Max number of times to retry. + */ + private int maxRetries = 3; + + /** + * Time interval in milliseconds on each retry. + */ + private int retryIntervalMilliseconds = 500; + + /** + * Time to live in seconds of ephemeral keys. + */ + private int timeToLiveSeconds = 60; + + public OrchestrationConfiguration(final String type) { + super(type); + } + + public OrchestrationConfiguration(final String type, final Properties properties) { + super(type, properties); + } +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/exception/OrchestrationException.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/exception/OrchestrationException.java new file mode 100644 index 0000000000000..5f304aa58f1a1 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/exception/OrchestrationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.exception; + +/** + * Config center exception. + * + * @author zhangliang + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +public final class OrchestrationException extends RuntimeException { + + private static final long serialVersionUID = -6417179023552012152L; + + public OrchestrationException(final String errorMessage, final Object... args) { + super(String.format(errorMessage, args)); + } + + public OrchestrationException(final Exception cause) { + super(cause); + } +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstance.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstance.java new file mode 100644 index 0000000000000..5f66ec44d8480 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstance.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.instance; + +import com.google.common.base.Charsets; +import com.google.common.base.Strings; +import lombok.Getter; +import lombok.Setter; +import lombok.SneakyThrows; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.shardingsphere.orchestration.center.api.ConfigCenter; +import org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement; +import org.apache.shardingsphere.orchestration.center.api.RegistryCenter; +import org.apache.shardingsphere.orchestration.center.configuration.OrchestrationConfiguration; +import org.apache.shardingsphere.orchestration.center.instance.handler.CuratorZookeeperExceptionHandler; +import org.apache.shardingsphere.orchestration.center.listener.DataChangedEvent; +import org.apache.shardingsphere.orchestration.center.listener.DataChangedEventListener; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.OperationTimeoutException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import java.io.UnsupportedEncodingException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * Distributed lock center for zookeeper with curator. + * + * @author zhangliang + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +public final class CuratorZookeeperInstance implements ConfigCenter, DistributedLockManagement, RegistryCenter { + + private final Map caches = new HashMap<>(); + + private CuratorFramework client; + + private InterProcessMutex leafLock; + + @Getter + @Setter + private Properties properties = new Properties(); + + @Override + public void init(final OrchestrationConfiguration config) { + client = buildCuratorClient(config); + initCuratorClient(config); + } + + private CuratorFramework buildCuratorClient(final OrchestrationConfiguration config) { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(config.getServerLists()) + .retryPolicy(new ExponentialBackoffRetry(config.getRetryIntervalMilliseconds(), config.getMaxRetries(), config.getRetryIntervalMilliseconds() * config.getMaxRetries())) + .namespace(config.getNamespace()); + if (0 != config.getTimeToLiveSeconds()) { + builder.sessionTimeoutMs(config.getTimeToLiveSeconds() * 1000); + } + if (0 != config.getOperationTimeoutMilliseconds()) { + builder.connectionTimeoutMs(config.getOperationTimeoutMilliseconds()); + } + if (!Strings.isNullOrEmpty(config.getDigest())) { + builder.authorization("digest", config.getDigest().getBytes(Charsets.UTF_8)) + .aclProvider(new ACLProvider() { + + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + return builder.build(); + } + + private void initCuratorClient(final OrchestrationConfiguration config) { + client.start(); + try { + if (!client.blockUntilConnected(config.getRetryIntervalMilliseconds() * config.getMaxRetries(), TimeUnit.MILLISECONDS)) { + client.close(); + throw new OperationTimeoutException(); + } + } catch (final InterruptedException | OperationTimeoutException ex) { + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + @Override + public String get(final String key) { + TreeCache cache = findTreeCache(key); + if (null == cache) { + return getDirectly(key); + } + ChildData resultInCache = cache.getCurrentData(key); + if (null != resultInCache) { + return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8); + } + return getDirectly(key); + } + + private TreeCache findTreeCache(final String key) { + for (Entry entry : caches.entrySet()) { + if (key.startsWith(entry.getKey())) { + return entry.getValue(); + } + } + return null; + } + + @Override + public void persist(final String key, final String value) { + try { + if (!isExisted(key)) { + client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8)); + } else { + update(key, value); + } + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + private void update(final String key, final String value) { + try { + client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + private String getDirectly(final String key) { + try { + return new String(client.getData().forPath(key), Charsets.UTF_8); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + return null; + } + } + + private boolean isExisted(final String key) { + try { + return null != client.checkExists().forPath(key); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + return false; + } + } + + @Override + public void persistEphemeral(final String key, final String value) { + try { + if (isExisted(key)) { + client.delete().deletingChildrenIfNeeded().forPath(key); + } + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8)); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + + @Override + public List getChildrenKeys(final String key) { + try { + List result = client.getChildren().forPath(key); + Collections.sort(result, new Comparator() { + + @Override + public int compare(final String o1, final String o2) { + return o2.compareTo(o1); + } + }); + return result; + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + return Collections.emptyList(); + } + } + + @Override + public void watch(final String key, final DataChangedEventListener dataChangedEventListener) { + final String path = key + "/"; + if (!caches.containsKey(path)) { + addCacheData(key); + } + TreeCache cache = caches.get(path); + cache.getListenable().addListener(new TreeCacheListener() { + + @Override + public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws UnsupportedEncodingException { + ChildData data = event.getData(); + if (null == data || null == data.getPath()) { + return; + } + DataChangedEvent.ChangedType changedType = getChangedType(event); + if (DataChangedEvent.ChangedType.IGNORED != changedType) { + dataChangedEventListener.onChange(new DataChangedEvent(data.getPath(), null == data.getData() ? null : new String(data.getData(), "UTF-8"), changedType)); + } + } + }); + } + + private DataChangedEvent.ChangedType getChangedType(final TreeCacheEvent event) { + switch (event.getType()) { + case NODE_UPDATED: + return DataChangedEvent.ChangedType.UPDATED; + case NODE_REMOVED: + return DataChangedEvent.ChangedType.DELETED; + default: + return DataChangedEvent.ChangedType.IGNORED; + } + } + + private void addCacheData(final String cachePath) { + TreeCache cache = new TreeCache(client, cachePath); + try { + cache.start(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + CuratorZookeeperExceptionHandler.handleException(ex); + } + caches.put(cachePath + "/", cache); + } + + @Override + public void close() { + for (Entry each : caches.entrySet()) { + each.getValue().close(); + } + waitForCacheClose(); + CloseableUtils.closeQuietly(client); + } + + /* TODO wait 500ms, close cache before close client, or will throw exception + * Because of asynchronous processing, may cause client to close + * first and cache has not yet closed the end. + * Wait for new version of Curator to fix this. + * BUG address:https://issues.apache.org/jira/browse/CURATOR-157 + */ + private void waitForCacheClose() { + try { + Thread.sleep(500L); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public String getType() { + return "zookeeper"; + } + + @Override + public void initLock(final String key) { + leafLock = new InterProcessMutex(client, key); + } + + @Override + @SneakyThrows + public boolean tryLock() { + return leafLock.acquire(5, TimeUnit.SECONDS); + } + + @Override + @SneakyThrows + public void tryRelease() { + leafLock.release(); + } +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/handler/CuratorZookeeperExceptionHandler.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/handler/CuratorZookeeperExceptionHandler.java new file mode 100644 index 0000000000000..3bb0b91147e82 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/instance/handler/CuratorZookeeperExceptionHandler.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.instance.handler; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.orchestration.center.exception.OrchestrationException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; + +/** + * Curator zookeeper exception handler. + * + * @author zhangliang + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public final class CuratorZookeeperExceptionHandler { + + /** + * Handle exception. + * + *

Ignore interrupt and connection invalid exception.

+ * + * @param cause to be handled exception + */ + public static void handleException(final Exception cause) { + if (null == cause) { + return; + } + if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) { + log.debug("Ignored exception for: {}", cause.getMessage()); + } else if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } else { + throw new OrchestrationException(cause); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException; + } +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEvent.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEvent.java new file mode 100644 index 0000000000000..21507d5dda24c --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEvent.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.listener; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Data changed event. + * + * @author junxiong + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +@RequiredArgsConstructor +@Getter +public final class DataChangedEvent { + + private final String key; + + private final String value; + + private final ChangedType changedType; + + /** + * Data changed type. + */ + public enum ChangedType { + + UPDATED, DELETED, IGNORED + } +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEventListener.java b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEventListener.java new file mode 100644 index 0000000000000..8f0a4e6ee954f --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/java/org/apache/shardingsphere/orchestration/center/listener/DataChangedEventListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.listener; + +/** + * Listener for data changed event. + * + * @author junxiong + * @author sunbufu + * @author dongzonglei + * @author wangguangyuan + */ +public interface DataChangedEventListener { + + /** + * Fire when data changed. + * + * @param dataChangedEvent data changed event + */ + void onChange(DataChangedEvent dataChangedEvent); +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter new file mode 100644 index 0000000000000..2943ee6e8c599 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement new file mode 100644 index 0000000000000..2943ee6e8c599 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter new file mode 100644 index 0000000000000..2943ee6e8c599 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstanceTest.java b/sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstanceTest.java new file mode 100644 index 0000000000000..c46e316cba51d --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/instance/CuratorZookeeperInstanceTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.instance; + +import org.apache.shardingsphere.orchestration.center.configuration.OrchestrationConfiguration; +import org.apache.shardingsphere.orchestration.center.util.EmbedTestingServer; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; +import java.util.Properties; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class CuratorZookeeperInstanceTest { + + private static CuratorZookeeperInstance curatorZookeeperInstance = new CuratorZookeeperInstance(); + + @BeforeClass + public static void init() { + EmbedTestingServer.start(); + OrchestrationConfiguration configuration = new OrchestrationConfiguration(curatorZookeeperInstance.getType(), new Properties()); + configuration.setServerLists("127.0.0.1:3181"); + curatorZookeeperInstance.init(configuration); + } + + @Test + public void assertPersist() { + curatorZookeeperInstance.persist("/test", "value1"); + assertThat(curatorZookeeperInstance.get("/test"), is("value1")); + } + + @Test + public void assertUpdate() { + curatorZookeeperInstance.persist("/test", "value2"); + assertThat(curatorZookeeperInstance.get("/test"), is("value2")); + } + + @Test + public void assertPersistEphemeral() { + curatorZookeeperInstance.persistEphemeral("/test/ephemeral", "value3"); + assertThat(curatorZookeeperInstance.get("/test/ephemeral"), is("value3")); + } + + @Test + public void assertGetChildrenKeys() { + curatorZookeeperInstance.persist("/test/children/1", "value11"); + curatorZookeeperInstance.persist("/test/children/2", "value12"); + curatorZookeeperInstance.persist("/test/children/3", "value13"); + List childrenKeys = curatorZookeeperInstance.getChildrenKeys("/test/children"); + assertThat(childrenKeys.size(), is(3)); + } + + @Test + public void assertLock() { + curatorZookeeperInstance.initLock("/test/lock1"); + assertThat(curatorZookeeperInstance.tryLock(), is(true)); + } + + @Test + public void assertRelease() { + curatorZookeeperInstance.initLock("/test/lock2"); + curatorZookeeperInstance.tryLock(); + curatorZookeeperInstance.tryRelease(); + } + + @Test(expected = IllegalMonitorStateException.class) + public void assertReleaseWithoutLock() { + curatorZookeeperInstance.initLock("/test/lock3"); + curatorZookeeperInstance.tryRelease(); + } +} diff --git a/sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/util/EmbedTestingServer.java b/sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/util/EmbedTestingServer.java new file mode 100644 index 0000000000000..a395454f50039 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-center/src/test/java/org/apache/shardingsphere/orchestration/center/util/EmbedTestingServer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.center.util; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.curator.test.TestingServer; +import org.apache.zookeeper.KeeperException; + +import java.io.File; +import java.io.IOException; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class EmbedTestingServer { + + private static final int PORT = 3181; + + private static volatile TestingServer testingServer; + + /** + * Start embed zookeeper server. + */ + public static void start() { + if (null != testingServer) { + return; + } + try { + testingServer = new TestingServer(PORT, new File(String.format("target/test_zk_data/%s/", System.nanoTime()))); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + if (!isIgnoredException(ex)) { + throw new RuntimeException(ex); + } + } finally { + Runtime.getRuntime().addShutdownHook(new Thread() { + + @Override + public void run() { + try { + testingServer.close(); + } catch (final IOException ignored) { + } + } + }); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof KeeperException.ConnectionLossException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException; + } +} From 44ec5301040f57cf4bf0f389160477f97bf0fa30 Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Thu, 7 Nov 2019 15:49:44 +0800 Subject: [PATCH 4/4] modify META-INF.services files in sharding-orchestration-center. --- ....apache.shardingsphere.orchestration.center.api.ConfigCenter | 2 +- ...ingsphere.orchestration.center.api.DistributedLockManagement | 2 +- ...pache.shardingsphere.orchestration.center.api.RegistryCenter | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter index 2943ee6e8c599..63641fa641621 100644 --- a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter +++ b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.ConfigCenter @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance \ No newline at end of file +org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement index 2943ee6e8c599..63641fa641621 100644 --- a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement +++ b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.DistributedLockManagement @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance \ No newline at end of file +org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance diff --git a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter index 2943ee6e8c599..63641fa641621 100644 --- a/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter +++ b/sharding-orchestration/sharding-orchestration-center/src/main/resources/META-INF.services/org.apache.shardingsphere.orchestration.center.api.RegistryCenter @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance \ No newline at end of file +org.apache.shardingsphere.orchestration.center.instance.CuratorZookeeperInstance