diff --git a/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json b/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json index f9d029ecb6ac..39f062677a15 100755 --- a/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json +++ b/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json @@ -13,5 +13,10 @@ "Class org.apache.geode.management.DiskStoreMXBean": "Added new methods.", "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryCreates()": "Added new stat", "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryDestroys()": "Added new stat", - "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryUpdates()": "Added new stat" + "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryUpdates()": "Added new stat", + "Class org.apache.geode.cache.server.CacheServer": "Added two methods", + "Method org.apache.geode.cache.server.CacheServer.getRejectedProxyRequests()": "added new counter", + "Method org.apache.geode.cache.server.CacheServer.incRejectedProxyRequests()": "added new counter", + "Class org.apache.geode.management.CacheServerMXBean":"Added new counter", + "Method org.apache.geode.management.CacheServerMXBean.getRejectedProxyRequests()": "added new counter" } diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt index 966298fe1abc..d419ece9f4ba 100644 --- a/geode-assembly/src/integrationTest/resources/assembly_content.txt +++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt @@ -223,6 +223,9 @@ javadoc/org/apache/geode/cache/PartitionAttributesFactory.html javadoc/org/apache/geode/cache/PartitionResolver.html javadoc/org/apache/geode/cache/PartitionedRegionDistributionException.html javadoc/org/apache/geode/cache/PartitionedRegionStorageException.html +javadoc/org/apache/geode/cache/ProxyRequestObserver.html +javadoc/org/apache/geode/cache/ProxyRequestObserverAdapter.html +javadoc/org/apache/geode/cache/ProxyRequestObserverHolder.html javadoc/org/apache/geode/cache/Region.Entry.html javadoc/org/apache/geode/cache/Region.html javadoc/org/apache/geode/cache/RegionAccessException.html @@ -248,6 +251,7 @@ javadoc/org/apache/geode/cache/SerializedCacheValue.html javadoc/org/apache/geode/cache/StatisticsDisabledException.html javadoc/org/apache/geode/cache/SubscriptionAttributes.html javadoc/org/apache/geode/cache/SynchronizationCommitConflictException.html +javadoc/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.html javadoc/org/apache/geode/cache/TimeoutException.html javadoc/org/apache/geode/cache/TransactionDataNodeHasDepartedException.html javadoc/org/apache/geode/cache/TransactionDataNotColocatedException.html diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java new file mode 100644 index 000000000000..1d15380d4f80 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.execute; + +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.cache.client.ClientRegionShortcut.CACHING_PROXY; +import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; + +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.CacheTransactionManager; +import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.ProxyRequestObserverHolder; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.ThreadLimitingProxyRequestObserver; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.cache.execute.RegionFunctionContext; +import org.apache.geode.cache.execute.ResultCollector; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.ClientCacheRule; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.WanTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; +import org.apache.geode.test.junit.runners.GeodeParamsRunner; + + +@Category({WanTest.class}) +@RunWith(GeodeParamsRunner.class) +public class GenericDUnitTest implements Serializable { + + private static final int MAX_THREADS = 4; + + private static final String regionName = "GenericDUnitTest"; + + @ClassRule + public static final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5); + + @Rule + public DistributedRule distributedRule = new DistributedRule(6); + MemberVM locator; + MemberVM server1; + MemberVM server2; + MemberVM server3; + MemberVM server4; + ClientVM client; + + @Rule + public CacheRule cacheRule = new CacheRule(); + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Before + public void setUp() throws Exception { + locator = clusterStartupRule.startLocatorVM(0); + } + + @After + public void tearDown() throws Exception { + IgnoredException.removeAllExpectedExceptions(); + } + + enum Operation { + GET, PUT, TXPUT, GETFUNCTION, TXGETFUNCTION, PUTFUNCTION, TXPUTFUNCTION + } + + @Test + @Parameters({"GET", "PUT", "TXPUT", "GETFUNCTION", "TXGETFUNCTION", "PUTFUNCTION", + "TXPUTFUNCTION"}) + @TestCaseName("{method}(operation:{params})") + public void testSuccessfulExecution(Operation operation) throws Exception { + IgnoredException.addIgnoredException(java.lang.IllegalStateException.class); + + server1 = startServer(1, MAX_THREADS); + server2 = startServer(2, MAX_THREADS); + server3 = startServer(3, MAX_THREADS); + server4 = startServer(4, MAX_THREADS); + + List serversInA = + Arrays.asList(server1, server2, server3, server4); + + // Set client connect-timeout to a very high value so that if there are no ServerConnection + // threads available the test will time-out before the client times-out. + int connectTimeout = (int) GeodeAwaitility.getTimeout().toMillis() * 2; + + client = startClient(5, 0, connectTimeout); + + Function testFunction = new TestFunction(); + + for (MemberVM memberVM : serversInA) { + createServerRegionAndRegisterFunction(memberVM, 1, testFunction); + } + + client.invoke(() -> createClientRegion()); + + client.invoke(() -> executeQuery(regionName)); + + Object key = "key"; + Object value = "value"; + + int invocationsNo = 50; + AsyncInvocation[] invocations = new AsyncInvocation[invocationsNo]; + + IntStream.range(0, invocationsNo).forEach(i -> invocations[i] = client.invokeAsync(() -> { + String finalKey = key + "" + i; + String finalValue = finalKey; + try { + switch (operation) { + case GET: { + doGet(finalKey, regionName); + break; + } + case PUT: { + doPut(finalKey, finalValue, regionName); + break; + } + case TXPUT: { + doTxPut(finalKey, finalValue, regionName); + break; + } + case GETFUNCTION: { + executeGetFunction(testFunction, regionName, finalKey, i, false); + break; + } + case TXGETFUNCTION: { + executeGetFunction(testFunction, regionName, finalKey, i, true); + break; + } + case PUTFUNCTION: { + executePutFunction(testFunction, regionName, finalKey, finalValue, i, false); + break; + } + case TXPUTFUNCTION: { + executePutFunction(testFunction, regionName, finalKey, finalValue, i, true); + break; + } + } + } catch (Exception e) { + System.out.println( + "toberal exception calling " + operation + " operation with key: " + finalKey + ", " + + e); + } + })); + + // Sleep a bit to make sure that entries are replicated + Thread.sleep(5000); + + // Run several times to make sure that the results are the same in all servers. + IntStream.range(0, 10).forEach(x -> { + client.invoke(() -> executeQuery(regionName)); + }); + } + + void doPut(Object key, Object value, String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.getRegion(regionName); + System.out.println("toberal before put"); + region.put(key, value); + System.out.println("toberal after put"); + } + + void doTxPut(Object key, Object value, String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.getRegion(regionName); + CacheTransactionManager txManager = + cache.getCacheTransactionManager(); + txManager.begin(); + try { + System.out.println("toberal before put"); + region.put(key, value); + System.out.println("toberal after put"); + System.out.println("toberal before committing: " + key); + txManager.commit(); + System.out.println("toberal after committing: " + key); + } catch (CommitConflictException conflict) { + System.out.println("toberal CommitConflictException"); + // ... do necessary work for a transaction that failed on commit + } catch (Exception e) { + System.out.println("toberal Exception putting: " + e); + // ... do necessary work for a transaction that failed on commit + } finally { + if (txManager.exists()) { + txManager.rollback(); + } + } + } + + Object doGet(Object key, String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.getRegion(regionName); + System.out.println("toberal before get"); + Object value = region.get(key); + System.out.println("toberal after get: " + value); + return value; + } + + void executeQuery(String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + String queryString = "select * from /" + regionName; + + // Get QueryService from Cache. + QueryService queryService = cache.getQueryService(); + + // Create the Query Object. + Query query = queryService.newQuery(queryString); + + // Execute Query locally. Returns results set. + Object[] params = new Object[0]; + SelectResults results = null; + try { + results = (SelectResults) query.execute(params); + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println("toberal query results: (" + results.size() + ") - " + results); + } + + private Object executePutFunction(Function function, String regionName, Object key, Object value, + int invocation, boolean useTransaction) { + FunctionService.registerFunction(function); + final Region region = + ClusterStartupRule.getClientCache().getRegion(regionName); + + Execution execution = FunctionService.onRegion(region); + + ResultCollector resultCollector; + Object[] args = {useTransaction, invocation, key, value}; + Set filter = new HashSet(); + filter.add(key); + // without filter, results are weird. Check, toberal. + // I have retested without filter and have not seen anything strange. + resultCollector = + // execution.setArguments(args).execute(function.getId()); + execution.setArguments(args).withFilter(filter).execute(function.getId()); + Object result = resultCollector.getResult(); + return result; + } + + private Object executeGetFunction(Function function, String regionName, Object key, + int invocation, boolean useTransaction) { + FunctionService.registerFunction(function); + final Region region = + ClusterStartupRule.getClientCache().getRegion(regionName); + + Execution execution = FunctionService.onRegion(region); + + ResultCollector resultCollector; + Object[] args = {useTransaction, invocation, key}; + Set filter = new HashSet(); + filter.add(key); + System.out.println("toberal before executeGetFunction. key: " + key); + resultCollector = + execution.setArguments(args).withFilter(filter).execute(function.getId()); + Object result = resultCollector.getResult(); + System.out.println("toberal after executeGetFunction. key: " + key); + return result; + } + + private void createServerRegion(int redundantCopies) { + final PartitionAttributesFactory paf = new PartitionAttributesFactory<>(); + paf.setRedundantCopies(redundantCopies); + ClusterStartupRule.getCache().createRegionFactory(PARTITION) + .setPartitionAttributes(paf.create()) + .create(regionName); + } + + private void createServerRegionAndRegisterFunction(MemberVM server, int redundantCopies, + final Function function) { + server.invoke(() -> { + createServerRegion(redundantCopies); + FunctionService.registerFunction(function); + // DistributionMessageObserver.setInstance(new MyMessageObserver()); + int maxThreadsPerDestination = MAX_THREADS / 4; + ProxyRequestObserverHolder + .setInstance(new ThreadLimitingProxyRequestObserver(maxThreadsPerDestination)); + }); + } + + private ClientVM startClient(final int vmIndex, final int retryAttempts, final int connectTimeout) + throws Exception { + return clusterStartupRule.startClientVM( + vmIndex, + cacheRule -> cacheRule + .withLocatorConnection(locator.getPort()) + .withCacheSetup(cf -> cf.setPoolRetryAttempts(retryAttempts) + .setPoolPRSingleHopEnabled(false) + .setPoolSocketConnectTimeout(connectTimeout))); + } + + private MemberVM startServer(final int vmIndex, int maxThreads) { + return clusterStartupRule.startServerVM( + vmIndex, + cacheRule -> cacheRule + .withProperty(SERIALIZABLE_OBJECT_FILTER, + "org.apache.geode.internal.cache.execute.GenericDUnitTest*") + .withMaxThreads(maxThreads) + .withConnectionToLocator(locator.getPort())); + } + + private void createClientRegion() { + ClusterStartupRule.getClientCache().createClientRegionFactory(CACHING_PROXY).create(regionName); + } + + static class TestFunction implements Function, Serializable { + public TestFunction() { + super(); + } + + @Override + public void execute(FunctionContext context) { + final Object[] args = context.getArguments(); + if (args.length < 2) { + throw new IllegalStateException( + "Arguments length does not match required length."); + } + Object value = null; + if (args.length == 4) { + value = args[3]; + } + boolean useTransaction = (boolean) args[0]; + Integer invocation = (Integer) args[1]; + Object key = args[2]; + + RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context; + Region region = regionFunctionContext.getDataSet(); + Object result = null; + try { + CacheTransactionManager txManager = + context.getCache().getCacheTransactionManager(); + if (useTransaction) { + txManager.begin(); + } + try { + logger.info("toberal inv: {} before getting/putting: {}", invocation, key); + if (value != null) { + result = region.put(key, value); + } else { + result = region.get(key); + // Thread.sleep(100); + } + logger.info("toberal inv: {} after getting/putting: {}", invocation, key); + if (useTransaction) { + logger.info("toberal inv: {} before committing: {}", invocation, key); + txManager.commit(); + logger.info("toberal inv: {} after committing: {}", invocation, key); + } + } catch (CommitConflictException conflict) { + logger.info("toberal inv: {} CommitConflictException: {}, invocation, key: {}", + invocation, conflict, key); + // ... do necessary work for a transaction that failed on commit + } catch (Exception e) { + logger.info("toberal inv: {} Exception: {}, invocation, key: {}", invocation, e, key, e); + // ... do necessary work for a transaction that failed on commit + } finally { + if (txManager.exists()) { + logger.info("toberal inv: {} rolling-back: {}, invocation, key: {}", invocation, key); + txManager.rollback(); + } else { + logger.info("toberal inv: {} not rolling-back: {}, invocation, key: {}", invocation, + key); + } + } + } catch (Exception e) { + context.getResultSender().lastResult(e); + logger.info("toberal inv: {} after returning last result with exception: {}", invocation, + e); + } + context.getResultSender().lastResult(result); + logger.info("toberal inv: {} after returning last result: {}", invocation, result); + } + + public static final String ID = TestFunction.class.getName(); + + private static final Logger logger = LogService.getLogger(); + + @Override + public String getId() { + return ID; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean isHA() { + return false; + } + + @Override + public boolean optimizeForWrite() { + return true; + } + } + + + public static class MyMessageObserver extends DistributionMessageObserver { + private static final Logger logger = LogService.getLogger(); + + public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + logger.info("toberal beforeProcessMessage dm: {}, message: {}, sender: {}", dm, message, + message.getSender()); + } + + /** + * Called after the process method of the DistributionMessage is called + * + * @param dm the distribution manager that received the message + * @param message The message itself + */ + public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + logger.info("toberal afterProcessMessage dm: {}, message: {}", dm, message); + } + + /** + * Called just before a message is distributed. + * + * @param dm the distribution manager that's sending the message + * @param message the message itself + */ + public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) { + logger.info("toberal beforeSendMessage dm: {}, message: {}", dm, message, new Exception()); + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserver.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserver.java new file mode 100644 index 000000000000..72e1827c43f5 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserver.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache; + +import java.util.Set; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; + +public interface ProxyRequestObserver { + void beforeSendRequest(Set members); + + void afterReceiveResponse(Set members); + +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverAdapter.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverAdapter.java new file mode 100644 index 000000000000..b6274abba3eb --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverAdapter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache; + +import java.util.Set; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; + +public class ProxyRequestObserverAdapter implements ProxyRequestObserver { + @Override + public void beforeSendRequest(Set members) {} + + @Override + public void afterReceiveResponse(Set members) {} +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java new file mode 100644 index 000000000000..1430a1d3f017 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; + +import org.apache.geode.annotations.Immutable; +import org.apache.geode.annotations.internal.MakeNotStatic; +import org.apache.geode.logging.internal.log4j.api.LogService; + +/** + * This class is intended to hold a single 'observer' which will receive callbacks from the + * Distributed/Local Region when events like clear take place. There can be only one such observer + * at a time. If no observer is needed, this member variable should point to an object with + * 'do-nothing' methods, such as PutGetMessageObserverAdapter. + * + * Code which wishes to observe events during Region clear should do so using the following + * technique: + * + * class MyPutGetMessageObserver extends PutGetMessageObserverAdapter { // ... override methods of + * interest ... } + * + * PutGetMessageObserver old = PutGetMessageObserverHolder.setInstance(new + * MyPutGetMessageObserver()); + * org.apache.geode.internal.cache.LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER=true; + * + * try { Call region methods here } finally { // reset to the original PutGetMessageObserver. + * PutGetMessageObserverHolder.setInstance(old); } + * + * The Region code will call methods on this static member using the following technique: + * + * PutGetMessageObserver observer = PutGetMessageObserverHolder.getInstance(); try { + * observer.startMethod(arguments); doSomething(); } finally { observer.stopMethod(arguments); } + * + */ +public class ProxyRequestObserverHolder { + private static final Logger logger = LogService.getLogger(); + + @Immutable + private static final ProxyRequestObserver NO_OBSERVER = + new ProxyRequestObserverAdapter(); + /** + * The current observer which will be notified of all query events. + */ + @MakeNotStatic + private static final AtomicReference _instance = + new AtomicReference<>(NO_OBSERVER); + + /** + * Set the given observer to be notified of proxy requests. Returns the current observer. + * + * @param observer the observer to be set + * + * @return the current observer instance + */ + public static ProxyRequestObserver setInstance( + @NotNull ProxyRequestObserver observer) { + logger.info("Setting ProxyRequestObserver with: {}", observer); + return _instance.getAndSet(observer); + } + + /** + * Return the current observer instance + * + * @return the current observer instance + */ + public static ProxyRequestObserver getInstance() { + return _instance.get(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.java b/geode-core/src/main/java/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.java new file mode 100644 index 000000000000..bb88056daae3 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ThreadLimitingProxyRequestObserver + implements ProxyRequestObserver { + private final int maxThreadsToDestination; + + public ThreadLimitingProxyRequestObserver(int maxThreadsToDestination) { + this.maxThreadsToDestination = maxThreadsToDestination; + } + + private static final Logger logger = LogService.getLogger(); + private final Map threadsToDestination = + new ConcurrentHashMap(); + + @Override + public void beforeSendRequest(Set members) { + if (Thread.currentThread().getName().startsWith("ServerConnection on") + || Thread.currentThread().getName().startsWith("Function Execution Processor")) { + for (InternalDistributedMember member : members) { + if (threadsToDestination.getOrDefault(member, 0) >= maxThreadsToDestination) { + CacheFactory.getAnyInstance().getCacheServers().get(0).incRejectedProxyRequests(); + logger.info("toberal Max number of threads reached for " + member, new Exception("kk")); + throw new IllegalStateException("Max number of threads reached"); + } + } + for (InternalDistributedMember member : members) { + threadsToDestination.merge(member, 1, Integer::sum); + } + } + } + + @Override + public void afterReceiveResponse(Set members) { + for (InternalDistributedMember member : members) { + if (Thread.currentThread().getName().startsWith("ServerConnection on") + || Thread.currentThread().getName().startsWith("Function Execution Processor")) { + threadsToDestination.merge(member, -1, Integer::sum); + } + } + } + + @Override + public String toString() { + return this.getClass().getName() + ", maxThreadsToDestination: " + maxThreadsToDestination; + } +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java b/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java index 945e018ded7b..cb8b0e0d5802 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java +++ b/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java @@ -519,4 +519,9 @@ public interface CacheServer { */ Set getInterestRegistrationListeners(); + default void incRejectedProxyRequests() {} + + default long getRejectedProxyRequests() { + return 0; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java index a921f543d706..e88f9a937302 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -135,6 +136,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution private final ClientHealthMonitorProvider clientHealthMonitorProvider; private final Function cacheServerAdvisorProvider; + private final AtomicLong rejectedProxyRequests = new AtomicLong(); public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE = Boolean.getBoolean( GeodeGlossary.GEMFIRE_PREFIX + "cache-server.enable-notify-by-subscription-false"); @@ -839,4 +841,14 @@ public CacheClientNotifierProvider getCacheClientNotifierProvider() { public ClientHealthMonitorProvider getClientHealthMonitorProvider() { return clientHealthMonitorProvider; } + + @Override + public long getRejectedProxyRequests() { + return rejectedProxyRequests.get(); + } + + @Override + public void incRejectedProxyRequests() { + rejectedProxyRequests.incrementAndGet(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index e14a159fc26c..fdce3137b3e5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -83,6 +83,8 @@ import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.PartitionedRegionDistributionException; import org.apache.geode.cache.PartitionedRegionStorageException; +import org.apache.geode.cache.ProxyRequestObserver; +import org.apache.geode.cache.ProxyRequestObserverHolder; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; @@ -2979,201 +2981,215 @@ private boolean putInBucket(final InternalDistributedMember targetNode, final In logger.debug("putInBucket: {} ({}) to {} to bucketId={} retry={} ms", event.getKey(), event.getKey().hashCode(), targetNode, bucketStringForLogs(bucketId), retryTimeout); } - // retry the put remotely until it finds the right node managing the bucket - RetryTimeKeeper retryTime = null; - boolean result = false; - InternalDistributedMember currentTarget = targetNode; - long timeOut = 0; - int count = 0; - for (;;) { - switch (count) { - case 0: - // Note we don't check for DM cancellation in common case. - // First time. Assume success, keep going. - break; - case 1: - cache.getCancelCriterion().checkCancelInProgress(null); - // Second time (first failure). Calculate timeout and keep going. - timeOut = System.currentTimeMillis() + retryTimeout; - break; - default: - cache.getCancelCriterion().checkCancelInProgress(null); - // test for timeout - long timeLeft = timeOut - System.currentTimeMillis(); - if (timeLeft < 0) { - PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); - // NOTREACHED + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); + Set bucketOwners = null; + try { + if (observer != null) { + bucketOwners = getRegionAdvisor().getBucketOwners(bucketId); + observer.beforeSendRequest(bucketOwners); + } + + // retry the put remotely until it finds the right node managing the bucket + + RetryTimeKeeper retryTime = null; + boolean result = false; + InternalDistributedMember currentTarget = targetNode; + long timeOut = 0; + int count = 0; + for (;;) { + switch (count) { + case 0: + // Note we don't check for DM cancellation in common case. + // First time. Assume success, keep going. + break; + case 1: + cache.getCancelCriterion().checkCancelInProgress(null); + // Second time (first failure). Calculate timeout and keep going. + timeOut = System.currentTimeMillis() + retryTimeout; + break; + default: + cache.getCancelCriterion().checkCancelInProgress(null); + // test for timeout + long timeLeft = timeOut - System.currentTimeMillis(); + if (timeLeft < 0) { + PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); + // NOTREACHED + } + + // Didn't time out. Sleep a bit and then continue + boolean interrupted = Thread.interrupted(); + try { + Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); + } catch (InterruptedException ignore) { + interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + break; + } // switch + count++; + + if (currentTarget == null) { // pick target + checkReadiness(); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(retryTimeout); } + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); - // Didn't time out. Sleep a bit and then continue - boolean interrupted = Thread.interrupted(); - try { - Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); - } catch (InterruptedException ignore) { - interrupted = true; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + checkShutdown(); + continue; + } // pick target + + try { + final boolean isLocal = (localMaxMemory > 0) && currentTarget.equals(getMyId()); + if (logger.isDebugEnabled()) { + logger.debug("putInBucket: currentTarget = {}; ifNew = {}; ifOld = {}; isLocal = {}", + currentTarget, ifNew, ifOld, isLocal); + } + checkIfAboveThreshold(event); + if (isLocal) { + event.setInvokePRCallbacks(true); + long start = prStats.startPutLocal(); + try { + final BucketRegion br = + dataStore.getInitializedBucketForId(event.getKey(), bucketId); + // Local updates should insert a serialized (aka CacheDeserializable) object + // given that most manipulation of values is remote (requiring serialization to send). + // But... function execution always implies local manipulation of + // values so keeping locally updated values in Object form should be more efficient. + if (!FunctionExecutionPooledExecutor.isFunctionExecutionThread()) { + // TODO: this condition may not help since BucketRegion.virtualPut calls + // forceSerialized + br.forceSerialized(event); + } + if (ifNew) { + result = dataStore.createLocally(br, event, ifNew, ifOld, requireOldValue, + lastModified); + } else { + result = dataStore.putLocally(br, event, ifNew, ifOld, expectedOldValue, + requireOldValue, lastModified); + } + } finally { + prStats.endPutLocal(start); + } + } // local + else { // remote + // no need to perform early serialization (and create an un-necessary byte array) + // sending the message performs that work. + long start = prStats.startPutRemote(); + try { + if (ifNew) { + result = createRemotely(currentTarget, bucketId, event, requireOldValue); + } else { + result = putRemotely(currentTarget, event, ifNew, ifOld, expectedOldValue, + requireOldValue); + if (!requireOldValue) { + // make sure old value is set to NOT_AVAILABLE token + event.oldValueNotAvailable(); + } + } + } finally { + prStats.endPutRemote(start); } + } // remote + + if (!result && !ifOld && !ifNew) { + Assert.assertTrue(!isLocal); + ForceReattemptException fre = new ForceReattemptException( + "false result when !ifNew and !ifOld is unacceptable - retrying"); + fre.setHash(event.getKey().hashCode()); + throw fre; } - break; - } // switch - count++; - if (currentTarget == null) { // pick target - checkReadiness(); - if (retryTime == null) { - retryTime = new RetryTimeKeeper(retryTimeout); + return result; + } catch (ConcurrentCacheModificationException e) { + if (logger.isDebugEnabled()) { + logger.debug("putInBucket: caught concurrent cache modification exception", e); + } + event.isConcurrencyConflict(true); + + if (logger.isTraceEnabled()) { + logger.trace( + "ConcurrentCacheModificationException received for putInBucket for bucketId: {}{}{} for event: {} No reattampt is done, returning from here", + getPRId(), BUCKET_ID_SEPARATOR, bucketId, event); + } + return result; + } catch (ForceReattemptException prce) { + prce.checkKey(event.getKey()); + if (logger.isDebugEnabled()) { + logger.debug( + "putInBucket: Got ForceReattemptException for {} on VM {} for node {}{}{} for bucket = {}", + this, getMyId(), currentTarget, getPRId(), BUCKET_ID_SEPARATOR, bucketId, prce); + logger.debug("putInBucket: count={}", count); + } + checkReadiness(); + InternalDistributedMember lastTarget = currentTarget; + if (retryTime == null) { + retryTime = new RetryTimeKeeper(retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); + if (lastTarget.equals(currentTarget)) { + if (retryTime.overMaximum()) { + PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); + // NOTREACHED + } + retryTime.waitToRetryNode(); + } + event.setPossibleDuplicate(true); + } catch (PrimaryBucketException notPrimary) { + if (logger.isDebugEnabled()) { + logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), + currentTarget); + } + getRegionAdvisor().notPrimary(bucketId, currentTarget); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); // It's possible this is a GemFire thread e.g. ServerConnection // which got to this point because of a distributed system shutdown or - // region closure which uses interrupt to break any sleep() or wait() calls - // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + // region closure which uses interrupt to break any sleep() or wait() + // calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw + // exception checkShutdown(); - continue; - } // pick target - try { - final boolean isLocal = (localMaxMemory > 0) && currentTarget.equals(getMyId()); - if (logger.isDebugEnabled()) { - logger.debug("putInBucket: currentTarget = {}; ifNew = {}; ifOld = {}; isLocal = {}", - currentTarget, ifNew, ifOld, isLocal); - } - checkIfAboveThreshold(event); - if (isLocal) { - event.setInvokePRCallbacks(true); - long start = prStats.startPutLocal(); - try { - final BucketRegion br = - dataStore.getInitializedBucketForId(event.getKey(), bucketId); - // Local updates should insert a serialized (aka CacheDeserializable) object - // given that most manipulation of values is remote (requiring serialization to send). - // But... function execution always implies local manipulation of - // values so keeping locally updated values in Object form should be more efficient. - if (!FunctionExecutionPooledExecutor.isFunctionExecutionThread()) { - // TODO: this condition may not help since BucketRegion.virtualPut calls - // forceSerialized - br.forceSerialized(event); - } - if (ifNew) { - result = dataStore.createLocally(br, event, ifNew, ifOld, requireOldValue, - lastModified); - } else { - result = dataStore.putLocally(br, event, ifNew, ifOld, expectedOldValue, - requireOldValue, lastModified); - } - } finally { - prStats.endPutLocal(start); - } - } // local - else { // remote - // no need to perform early serialization (and create an un-necessary byte array) - // sending the message performs that work. - long start = prStats.startPutRemote(); - try { - if (ifNew) { - result = createRemotely(currentTarget, bucketId, event, requireOldValue); - } else { - result = putRemotely(currentTarget, event, ifNew, ifOld, expectedOldValue, - requireOldValue); - if (!requireOldValue) { - // make sure old value is set to NOT_AVAILABLE token - event.oldValueNotAvailable(); - } - } - } finally { - prStats.endPutRemote(start); + // If we get here, the attempt failed... + if (count == 1) { + if (ifNew) { + prStats.incCreateOpsRetried(); + } else { + prStats.incPutOpsRetried(); } - } // remote - - if (!result && !ifOld && !ifNew) { - Assert.assertTrue(!isLocal); - ForceReattemptException fre = new ForceReattemptException( - "false result when !ifNew and !ifOld is unacceptable - retrying"); - fre.setHash(event.getKey().hashCode()); - throw fre; } - - return result; - } catch (ConcurrentCacheModificationException e) { - if (logger.isDebugEnabled()) { - logger.debug("putInBucket: caught concurrent cache modification exception", e); + if (event.getOperation().isCreate()) { + prStats.incCreateRetries(); + } else { + prStats.incPutRetries(); } - event.isConcurrencyConflict(true); - if (logger.isTraceEnabled()) { - logger.trace( - "ConcurrentCacheModificationException received for putInBucket for bucketId: {}{}{} for event: {} No reattampt is done, returning from here", - getPRId(), BUCKET_ID_SEPARATOR, bucketId, event); - } - return result; - } catch (ForceReattemptException prce) { - prce.checkKey(event.getKey()); if (logger.isDebugEnabled()) { logger.debug( - "putInBucket: Got ForceReattemptException for {} on VM {} for node {}{}{} for bucket = {}", - this, getMyId(), currentTarget, getPRId(), BUCKET_ID_SEPARATOR, bucketId, prce); - logger.debug("putInBucket: count={}", count); - } - checkReadiness(); - InternalDistributedMember lastTarget = currentTarget; - if (retryTime == null) { - retryTime = new RetryTimeKeeper(retryTimeout); - } - currentTarget = getNodeForBucketWrite(bucketId, retryTime); - if (lastTarget.equals(currentTarget)) { - if (retryTime.overMaximum()) { - PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); - // NOTREACHED - } - retryTime.waitToRetryNode(); - } - event.setPossibleDuplicate(true); - } catch (PrimaryBucketException notPrimary) { - if (logger.isDebugEnabled()) { - logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), + "putInBucket for bucketId = {} failed (attempt # {} ({} ms left), retrying with node {}", + bucketStringForLogs(bucketId), count, (timeOut - System.currentTimeMillis()), currentTarget); } - getRegionAdvisor().notPrimary(bucketId, currentTarget); - if (retryTime == null) { - retryTime = new RetryTimeKeeper(retryTimeout); - } - currentTarget = getNodeForBucketWrite(bucketId, retryTime); - } - - // It's possible this is a GemFire thread e.g. ServerConnection - // which got to this point because of a distributed system shutdown or - // region closure which uses interrupt to break any sleep() or wait() - // calls - // e.g. waitForPrimary or waitForBucketRecovery in which case throw - // exception - checkShutdown(); - - // If we get here, the attempt failed... - if (count == 1) { - if (ifNew) { - prStats.incCreateOpsRetried(); - } else { - prStats.incPutOpsRetried(); - } - } - if (event.getOperation().isCreate()) { - prStats.incCreateRetries(); - } else { - prStats.incPutRetries(); - } + } // for - if (logger.isDebugEnabled()) { - logger.debug( - "putInBucket for bucketId = {} failed (attempt # {} ({} ms left), retrying with node {}", - bucketStringForLogs(bucketId), count, (timeOut - System.currentTimeMillis()), - currentTarget); + } finally { + if (observer != null) { + observer.afterReceiveResponse(bucketOwners); } - } // for - + } // NOTREACHED } @@ -4173,8 +4189,19 @@ private Object getFromBucket(final InternalDistributedMember targetNode, int buc } } - obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, - clientEvent, returnTombstones); + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); + try { + if (observer != null) { + observer.beforeSendRequest(Collections.singleton(retryNode)); + } + obj = + getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, + clientEvent, returnTombstones); + } finally { + if (observer != null) { + observer.afterReceiveResponse(Collections.singleton(retryNode)); + } + } // TODO: there should be better way than this one String name = Thread.currentThread().getName(); @@ -4886,23 +4913,35 @@ public Object getRemotely(InternalDistributedMember targetNode, int bucketId, fi private ResultCollector executeFunctionOnRemoteNode(InternalDistributedMember targetNode, final Function function, final Object object, final Set routingKeys, ResultCollector rc, int[] bucketArray, ServerToClientFunctionResultSender sender, AbstractExecution execution) { - PartitionedRegionFunctionResultSender resultSender = - new PartitionedRegionFunctionResultSender(null, this, 0, rc, sender, false, true, - execution.isForwardExceptions(), function, bucketArray); - PartitionedRegionFunctionResultWaiter resultReceiver = - new PartitionedRegionFunctionResultWaiter(getSystem(), getPRId(), rc, function, - resultSender); + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); + if (observer != null) { + observer.beforeSendRequest(Collections.singleton(targetNode)); + } - FunctionRemoteContext context = new FunctionRemoteContext(function, object, routingKeys, - bucketArray, execution.isReExecute(), execution.isFnSerializationReqd(), getPrincipal()); + try { + PartitionedRegionFunctionResultSender resultSender = + new PartitionedRegionFunctionResultSender(null, this, 0, rc, sender, false, true, + execution.isForwardExceptions(), function, bucketArray); - HashMap recipMap = - new HashMap<>(); + PartitionedRegionFunctionResultWaiter resultReceiver = + new PartitionedRegionFunctionResultWaiter(getSystem(), getPRId(), rc, function, + resultSender); - recipMap.put(targetNode, context); + FunctionRemoteContext context = new FunctionRemoteContext(function, object, routingKeys, + bucketArray, execution.isReExecute(), execution.isFnSerializationReqd(), getPrincipal()); - return resultReceiver.getPartitionedDataFrom(recipMap, this, execution); + HashMap recipMap = + new HashMap<>(); + + recipMap.put(targetNode, context); + + return resultReceiver.getPartitionedDataFrom(recipMap, this, execution); + } finally { + if (observer != null) { + observer.afterReceiveResponse(Collections.singleton(targetNode)); + } + } } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index 7bfb4f17f199..dc091ab63e34 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -2952,6 +2952,7 @@ public void executeOnDataStore(final Set localKeys, final Function function, fin FunctionStats stats = FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()); long start = stats.startFunctionExecution(function.hasResult()); try { + if (logger.isDebugEnabled()) { logger.debug("Executing Function:{} on Remote Node with context:{}", function.getId(), prContext); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index 2d1b1a864623..e7f091cb3696 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -327,6 +327,7 @@ void send(TXLockId lockId) { } return; } + Assert.assertTrue(txState != null, "Send must have transaction state."); this.lockId = (TXLockIdImpl) lockId; updateLockMembers(); @@ -335,18 +336,7 @@ void send(TXLockId lockId) { IdentityHashMap> distMap = new IdentityHashMap<>(); HashSet ackReceivers = null; - for (final Map.Entry entry : msgMap.entrySet()) { - final RegionCommitList rcl = entry.getValue(); - if (rcl.getNeedsAck()) { - if (ackReceivers == null) { - ackReceivers = new HashSet<>(); - } - ackReceivers.add(entry.getKey()); - } - final Set receivers = - distMap.computeIfAbsent(rcl, k -> new HashSet<>()); - receivers.add(entry.getKey()); - } + ackReceivers = getAckReceivers(distMap, ackReceivers); CommitReplyProcessor processor = null; { @@ -422,6 +412,41 @@ void send(TXLockId lockId) { } } + Set getTxCommitRecipients() { + IdentityHashMap> distMap = + new IdentityHashMap<>(); + + getAckReceivers(distMap, null); + Set recipients = new HashSet<>(); + for (final Map.Entry> me : distMap + .entrySet()) { + recipients.addAll(me.getValue()); + } + recipients.retainAll(dm.getDistributionManagerIds()); + return recipients; + } + + private HashSet getAckReceivers( + IdentityHashMap> distMap, + HashSet ackReceivers) { + if (msgMap == null) { + return null; + } + for (final Map.Entry entry : msgMap.entrySet()) { + final RegionCommitList rcl = entry.getValue(); + if (rcl.getNeedsAck()) { + if (ackReceivers == null) { + ackReceivers = new HashSet<>(); + } + ackReceivers.add(entry.getKey()); + } + final Set receivers = + distMap.computeIfAbsent(rcl, k -> new HashSet<>()); + receivers.add(entry.getKey()); + } + return ackReceivers; + } + @Override public boolean containsRegionContentChange() { return true; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 58828b4b6550..4e79c72073f0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -42,6 +42,8 @@ import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.FailedSynchronizationException; import org.apache.geode.cache.Operation; +import org.apache.geode.cache.ProxyRequestObserver; +import org.apache.geode.cache.ProxyRequestObserverHolder; import org.apache.geode.cache.Region; import org.apache.geode.cache.Region.Entry; import org.apache.geode.cache.RegionDestroyedException; @@ -512,7 +514,20 @@ public void commit() throws CommitConflictException { lockTXRegions(regions); + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); + msg = buildMessage(); + Set txCommitRecipients = msg.getTxCommitRecipients(); try { + if (observer != null) { + try { + observer.beforeSendRequest(txCommitRecipients); + } catch (Exception e) { + SystemFailure.checkFailure(); + logger.error("toberal throwing exception {} before committing for tx: {}", e, this); + throw e; + } + } + // apply changes to the cache applyChanges(entries); // For internal testing @@ -523,7 +538,6 @@ public void commit() throws CommitConflictException { attachFilterProfileInformation(entries); // build and send the message - msg = buildMessage(); commitMessage = msg; if (internalBeforeSend != null) { internalBeforeSend.run(); @@ -542,6 +556,9 @@ public void commit() throws CommitConflictException { */ commitMessage = buildCompleteMessage(); } finally { + if (observer != null) { + observer.afterReceiveResponse(txCommitRecipients); + } unlockTXRegions(regions); } } finally { diff --git a/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java b/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java index d3b567650090..7191a6c416ca 100644 --- a/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java @@ -498,4 +498,5 @@ public interface CacheServerMXBean { */ ClientQueueDetail showClientQueueDetails(String clientId) throws Exception; + long getRejectedProxyRequests(); } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java index 281114ddb708..a5aaac808088 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java @@ -740,6 +740,9 @@ public ClientQueueDetail getClientQueueDetail(String clientId) throws Exception return null; } + public long getRejectedProxyRequests() { + return cacheServer.getRejectedProxyRequests(); + } private static String getClientIdFromCacheClientProxy(CacheClientProxy p) { if (p == null) { diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java index e372ecfe2bc7..1abc6c73fd2b 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java @@ -311,4 +311,9 @@ public ClientQueueDetail[] showClientQueueDetails() throws Exception { public ClientQueueDetail showClientQueueDetails(String clientId) throws Exception { return bridge.getClientQueueDetail(clientId); } + + @Override + public long getRejectedProxyRequests() { + return bridge.getRejectedProxyRequests(); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java index 8ea970d3ed8a..186b29a1e3e6 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -110,9 +110,9 @@ public void attachFilterProfileAfterApplyingChanges() { txState.commit(); InOrder inOrder = inOrder(txState, txCommitMessage); + inOrder.verify(txState).buildMessage(); inOrder.verify(txState).applyChanges(any()); inOrder.verify(txState).attachFilterProfileInformation(any()); - inOrder.verify(txState).buildMessage(); inOrder.verify(txCommitMessage).send(any()); inOrder.verify(txState).firePendingCallbacks(); } diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/SetThreadLimitingProxyRequestCommand.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/SetThreadLimitingProxyRequestCommand.java new file mode 100644 index 000000000000..a1900efa4fc6 --- /dev/null +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/SetThreadLimitingProxyRequestCommand.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.management.internal.cli.commands; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.logging.log4j.Logger; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; + +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.management.cli.ConverterHint; +import org.apache.geode.management.cli.GfshCommand; +import org.apache.geode.management.internal.cli.functions.SetThreadLimitingProxyRequestFunction; +import org.apache.geode.management.internal.cli.result.model.ResultModel; +import org.apache.geode.management.internal.cli.result.model.TabularResultModel; +import org.apache.geode.management.internal.i18n.CliStrings; +import org.apache.geode.management.internal.security.ResourceOperation; +import org.apache.geode.security.ResourcePermission; + +public class SetThreadLimitingProxyRequestCommand extends GfshCommand { + + private static final Logger logger = LogService.getLogger(); + + @CliAvailabilityIndicator({"set-thread-limit-proxy"}) + public boolean commandAvailable() { + return isOnlineCommandAvailable(); + } + + @CliCommand(value = "set-thread-limit-proxy", + help = "Set a maximum number of threads to be used to proxy operations to another server") + @ResourceOperation(resource = ResourcePermission.Resource.CLUSTER, + operation = ResourcePermission.Operation.MANAGE) + + public ResultModel setThreadLimitingProxyRequest( + @CliOption(key = "max-threads-to-destination", + optionContext = ConverterHint.LOG_LEVEL, mandatory = true, unspecifiedDefaultValue = "0", + help = "The maximum number of threads to a specific destination") int maxThreads) { + + + Set dsMembers = getAllNormalMembers(); + + SetThreadLimitingProxyRequestFunction logFunction = new SetThreadLimitingProxyRequestFunction(); + FunctionService.registerFunction(logFunction); + Object[] functionArgs = new Object[1]; + functionArgs[0] = maxThreads; + + ResultModel result = new ResultModel(); + + @SuppressWarnings("unchecked") + Execution execution = FunctionService.onMembers(dsMembers).setArguments(functionArgs); + if (execution == null) { + return ResultModel.createError(CliStrings.CHANGE_LOGLEVEL__MSG__CANNOT_EXECUTE); + } + List resultList = + (List) executeFunction(logFunction, functionArgs, dsMembers).getResult(); + + TabularResultModel tableInfo = result.addTable("result"); + tableInfo.setColumnHeader("Member", + "thread limited"); + for (Object object : resultList) { + try { + if (object instanceof Throwable) { + logger.warn("Exception in set thread limit for proxy requests " + + ((Throwable) object).getMessage(), + ((Throwable) object)); + continue; + } + + if (object != null) { + @SuppressWarnings("unchecked") + Map resultMap = (Map) object; + Map.Entry entry = resultMap.entrySet().iterator().next(); + + if (entry.getValue().contains("Exception")) { + tableInfo.addRow(entry.getKey(), "false"); + } else { + tableInfo.addRow(entry.getKey(), "true"); + } + + } + } catch (Exception ex) { + logger.warn("command exception " + ex); + } + } + + logger.info("command result=" + result); + return result; + + } +} diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction.java new file mode 100644 index 000000000000..1428e18d6c1b --- /dev/null +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal.cli.functions; + + +import java.util.HashMap; +import java.util.Map; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.cache.ProxyRequestObserver; +import org.apache.geode.cache.ProxyRequestObserverHolder; +import org.apache.geode.cache.ThreadLimitingProxyRequestObserver; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.execute.InternalFunction; +import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.logging.internal.log4j.api.LogService; + +/** + * Class for change log level function + * + * @since 8.0 + */ +public class SetThreadLimitingProxyRequestFunction implements InternalFunction { + private static final Logger logger = LogService.getLogger(); + private static final long serialVersionUID = 1L; + + private static final String ID = + "org.apache.geode.management.internal.cli.functions.SetThreadLimitingProxyRequestFunction"; + + @Override + public String getId() { + return ID; + } + + @Override + public void execute(FunctionContext context) { + InternalCache cache = (InternalCache) context.getCache(); + Map result = new HashMap<>(); + try { + Object[] args = context.getArguments(); + Integer maxThreads = (Integer) args[0]; + + ProxyRequestObserver observer = null; + + if (maxThreads > 0) { + observer = new ThreadLimitingProxyRequestObserver(maxThreads); + } + + ProxyRequestObserverHolder.setInstance(observer); + + result.put(cache.getDistributedSystem().getDistributedMember().getId(), + "Set thread liminting proxy request to: " + maxThreads); + context.getResultSender().lastResult(result); + } catch (Exception ex) { + logger.info(LogMarker.CONFIG_MARKER, "GFSH exception {}", ex.getMessage(), + ex); + result.put(cache.getDistributedSystem().getDistributedMember().getId(), + "Exception " + ex.getMessage()); + context.getResultSender().lastResult(result); + } + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean optimizeForWrite() { + return false; + } + + @Override + public boolean isHA() { + return false; + } +} diff --git a/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker b/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker index 00ae9b2e0341..0fe8f2ab187d 100644 --- a/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker +++ b/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker @@ -86,6 +86,7 @@ org.apache.geode.management.internal.cli.commands.RestoreRedundancyCommand org.apache.geode.management.internal.cli.commands.ResumeAsyncEventQueueDispatcherCommand org.apache.geode.management.internal.cli.commands.ResumeGatewaySenderCommand org.apache.geode.management.internal.cli.commands.RevokeMissingDiskStoreCommand +org.apache.geode.management.internal.cli.commands.SetThreadLimitingProxyRequestCommand org.apache.geode.management.internal.cli.commands.SetVariableCommand org.apache.geode.management.internal.cli.commands.ShCommand org.apache.geode.management.internal.cli.commands.ShowDeadlockCommand diff --git a/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt b/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt index e9a15586e71a..0c39fc335899 100644 --- a/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt +++ b/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt @@ -94,6 +94,7 @@ org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$EvictionAt org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$ExpirationAttrs,true,1474255033398008063,action:org/apache/geode/cache/ExpirationAction,time:java/lang/Integer org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$PartitionArgs,true,5907052187323280919,partitionResolver:java/lang/String,prColocatedWith:java/lang/String,prLocalMaxMemory:java/lang/Integer,prRecoveryDelay:java/lang/Long,prRedundantCopies:java/lang/Integer,prStartupRecoveryDelay:java/lang/Long,prTotalMaxMemory:java/lang/Long,prTotalNumBuckets:java/lang/Integer org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction,false +org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction,true,1 org/apache/geode/management/internal/cli/functions/ShowMissingDiskStoresFunction,false org/apache/geode/management/internal/cli/functions/ShutDownFunction,true,1 org/apache/geode/management/internal/cli/functions/SizeExportLogsFunction,true,1