Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WriteBatcher hangs in awaitCompletion() after forest failover #744

Closed
srinathgit opened this issue May 17, 2017 · 2 comments
Closed

WriteBatcher hangs in awaitCompletion() after forest failover #744

srinathgit opened this issue May 17, 2017 · 2 comments

Comments

@srinathgit
Copy link
Contributor

srinathgit commented May 17, 2017

The following test is run using gradle on a 3 node cluster and tests forest fail over scenario when one node is shutdown during the run and the forest fails over to another node . All the documents are getting written fine. After all documents are written, WriteBatcher hangs in awaitCompletion(). The thread dump is attached.

@Test
	public void testStopOneNode() throws Exception{
		try{
			final String query1 = "fn:count(fn:doc())";
			
			final AtomicInteger successCount = new AtomicInteger(0);
			
			final AtomicBoolean failState = new AtomicBoolean(false);
			final AtomicInteger failCount = new AtomicInteger(0);
							
			WriteBatcher ihb2 =  dmManager.newWriteBatcher();
			ihb2.withBatchSize(10);
			ihb2.withThreadCount(10);
			
			WriteFailureListener[] failureListeners = ihb2.getBatchFailureListeners();
			List<WriteFailureListener> failure = Arrays.asList(failureListeners);
			
			failure = new ArrayList<WriteFailureListener>(failure);	
			failure.add( new HostAvailabilityListener(dmManager)
						.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
						.withMinHosts(2));
					 
			ihb2.setBatchFailureListeners(failure.toArray(new WriteFailureListener[failure.size()]));
			ihb2.onBatchSuccess(
				   batch -> {

						successCount.addAndGet(batch.getItems().length);
						System.out.println("Success Host: "+ batch.getClient().getHost());
						System.out.println("Success batch number: "+ batch.getJobBatchNumber());
						System.out.println("Success Job writes so far: "+ batch.getJobWritesSoFar());
					  }
					)
					.onBatchFailure(
					  (batch, throwable) -> {
						  System.out.println("Failed batch number: "+ batch.getJobBatchNumber());
									 
						  throwable.printStackTrace();
						  failState.set(true);
						  failCount.addAndGet(batch.getItems().length);
					  });
			
			
			writeTicket = dmManager.startJob(ihb2);    
			boolean isRunning = true;
			for (int j =0 ;j < 20000; j++){
				String uri ="/local/ABC-"+ j;
				ihb2.add(uri, stringHandle);
				if (dmManager.getJobReport(writeTicket).getSuccessEventsCount() > 2000 && isRunning){
					isRunning = false;
                                        serverStartStop(hostNames[hostNames.length -1], "stop");
					Thread.currentThread().sleep(40000L);
					
				}
				
			}
		
			
			ihb2.flushAndWait();
		   
			
			System.out.println("Fail : "+failCount.intValue());
			System.out.println("Success : "+successCount.intValue());
			System.out.println("Count : "+ dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
	  
			Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==20000);
			
		}
		catch(Exception e){
			e.printStackTrace();
		}
		
	}
	

Thread:

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000007213cbd78> (a java.util.concurrent.FutureTask)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl$CompletableThreadPoolExecutor.awaitCompletion(WriteBatcherImpl.java:1185)
        - locked <0x00000007213cbd78> (a java.util.concurrent.FutureTask)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.awaitCompletion(WriteBatcherImpl.java:654)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.awaitCompletion(WriteBatcherImpl.java:660)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:547)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAndWait(WriteBatcherImpl.java:518)
        at com.marklogic.client.datamovement.functionaltests.WBFailoverTest.testStopOneNode(WBFailoverTest.java:234)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
        at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
        at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
        at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
        at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
        at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
        at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


@srinathgit srinathgit added this to the java-client-api-4.0.2 milestone May 17, 2017
@srinathgit srinathgit changed the title WriteBatcher hangs in awaitCompletion() WriteBatcher hangs in awaitCompletion() after forest failover May 18, 2017
@srinathgit
Copy link
Contributor Author

This issue still occurs. Thread dump and client-log are attached

Test:

@Test
	public void testStopOneNode() throws Exception{
		try{
			final String query1 = "fn:count(fn:doc())";
			
			final AtomicInteger successCount = new AtomicInteger(0);
			
			final AtomicBoolean failState = new AtomicBoolean(false);
			final AtomicInteger failCount = new AtomicInteger(0);
							
			WriteBatcher ihb2 =  dmManager.newWriteBatcher();
			ihb2.withBatchSize(10);
			ihb2.withThreadCount(10);
			
			WriteFailureListener[] failureListeners = ihb2.getBatchFailureListeners();
			List<WriteFailureListener> failure = Arrays.asList(failureListeners);
			
			failure = new ArrayList<WriteFailureListener>(failure);	

			for (Iterator<WriteFailureListener> iter = failure.listIterator(); iter.hasNext(); ) {
				WriteFailureListener objList = iter.next();
			    if (objList.toString().contains("com.marklogic.client.datamovement.HostAvailabilityListener")) {
			        iter.remove();
			    }
			}
			failure.add( new HostAvailabilityListener(dmManager)
						.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
						.withMinHosts(2));
			for(WriteFailureListener w:failure){
				System.out.println(w.toString());
			}
						 
			ihb2.setBatchFailureListeners(failure.toArray(new WriteFailureListener[failure.size()]));
			ihb2.onBatchSuccess(
				   batch -> {
	
						successCount.addAndGet(batch.getItems().length);
						System.out.println("Success Host: "+ batch.getClient().getHost());
						System.out.println("Success batch number: "+ batch.getJobBatchNumber());
						System.out.println("Success Job writes so far: "+ batch.getJobWritesSoFar());
					  }
					)
					.onBatchFailure(
					  (batch, throwable) -> {
						  System.out.println("Failed batch number: "+ batch.getJobBatchNumber());
									 
						  throwable.printStackTrace();
						  failState.set(true);
						  failCount.addAndGet(batch.getItems().length);
					  });
			
			
			writeTicket = dmManager.startJob(ihb2);    
			AtomicBoolean isRunning = new AtomicBoolean(true);
			for (int j =0 ;j < 20000; j++){
				String uri ="/local/ABC-"+ j;
				ihb2.add(uri, stringHandle);
				if (dmManager.getJobReport(writeTicket).getSuccessEventsCount() > 2000 && isRunning.get()){
					isRunning.set(false);
					serverStartStop(hostNames[hostNames.length -1], "stop");
					Thread.currentThread().sleep(40000L);
					
				}
				
			}
		
			
			ihb2.flushAndWait();
		   
			
			System.out.println("Fail : "+failCount.intValue());
			System.out.println("Success : "+successCount.intValue());
			System.out.println("Count : "+ dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
	  
			Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==20000);
			
		}
		catch(Exception e){
			e.printStackTrace();
		}
		
	}

vivekmuniyandi added a commit that referenced this issue Aug 24, 2017
@vivekmuniyandi vivekmuniyandi added test and removed new labels Aug 24, 2017
@srinathgit
Copy link
Contributor Author

This issue has been resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants