Skip to content

Commit

Permalink
GH-8609: Fix TcpConnectorInterceptor chain propagation
Browse files Browse the repository at this point in the history
Fixes #8609

* Changed to passed the self instance to `addNewConnection()` instead of argument's connection
in a `TcpConnectionInterceptorSupport` to compose a chain of interceptors from top to down.
This way the target `Sender` get the last interceptor in a chain as its connection.

**Cherry-pick to `6.0.x` & `5.5.x`**
  • Loading branch information
kazuki43zoo authored and artembilan committed May 5, 2023
1 parent 313c38c commit d38370e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@
* to the underlying {@link TcpConnection}.
*
* @author Gary Russell
* @author Kazuki Shimizu
*
* @since 2.0
*/
Expand Down Expand Up @@ -232,7 +233,7 @@ public TcpListener getListener() {
@Override
public void addNewConnection(TcpConnection connection) {
if (this.interceptedSenders != null) {
this.interceptedSenders.forEach(sender -> sender.addNewConnection(connection));
this.interceptedSenders.forEach(sender -> sender.addNewConnection(this));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,9 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -30,6 +32,7 @@

/**
* @author Gary Russell
* @author Kazuki Shimizu
* @since 5.3.10
*
*/
Expand Down Expand Up @@ -81,11 +84,14 @@ private void senderCalledForDeadConnectionClient(AbstractClientConnectionFactory
List<Integer> addOrder = Collections.synchronizedList(new ArrayList<>());
List<Integer> remOrder = Collections.synchronizedList(new ArrayList<>());
AtomicReference<Thread> thread = new AtomicReference<>();
Map<Integer, TcpConnection> interceptorsPerInstance = new HashMap<>();
List<TcpConnection> passedConnectionsToSenderViaAddNewConnection = new ArrayList<>();
class InterceptorFactory extends HelloWorldInterceptorFactory {

@Override
public TcpConnectionInterceptorSupport getInterceptor() {
return new TcpConnectionInterceptorSupport() {

TcpConnectionInterceptorSupport interceptor = new TcpConnectionInterceptorSupport() {

private final int instance = instances.incrementAndGet();

Expand All @@ -107,6 +113,8 @@ public synchronized void removeDeadConnection(TcpConnection connection) {
}

};
interceptorsPerInstance.put(instances.get(), interceptor);
return interceptor;
}

}
Expand All @@ -118,6 +126,7 @@ public synchronized void removeDeadConnection(TcpConnection connection) {
@Override
public void addNewConnection(TcpConnection connection) {
addOrder.add(99);
passedConnectionsToSenderViaAddNewConnection.add(connection);
adds.countDown();
}

Expand Down Expand Up @@ -146,6 +155,9 @@ public synchronized void removeDeadConnection(TcpConnection connection) {
assertThat(remOrder).containsExactly(1, 2, 99, 3, 4, 5, 99, 6);
assertThat(interceptorAddCalled.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(interceptorRemCalled.await(10, TimeUnit.SECONDS)).isTrue();
// should be passed the last interceptor to the real sender via addNewConnection method
assertThat(passedConnectionsToSenderViaAddNewConnection.get(0)).isSameAs(interceptorsPerInstance.get(3));
assertThat(passedConnectionsToSenderViaAddNewConnection.get(1)).isSameAs(interceptorsPerInstance.get(6));
}

}

0 comments on commit d38370e

Please sign in to comment.