Skip to content

Commit

Permalink
INT-4388: Add UdpServerListeningEvent
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4388

Publish an event when the UDP server socket is established.

* Polishing - remove unreachable assertions
  • Loading branch information
garyrussell authored and artembilan committed Jan 30, 2018
1 parent 23687e4 commit 30fac62
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
Expand All @@ -33,27 +35,30 @@
* @since 2.0
*/
public abstract class AbstractInternetProtocolReceivingChannelAdapter
extends MessageProducerSupport implements SchedulingAwareRunnable, CommonSocketOptions {
extends MessageProducerSupport
implements ApplicationEventPublisherAware, SchedulingAwareRunnable, CommonSocketOptions {

private final int port;

private volatile int soTimeout = 0;
private ApplicationEventPublisher applicationEventPublisher;

private volatile int soReceiveBufferSize = -1;
private int soTimeout = 0;

private volatile int receiveBufferSize = 2048;
private int soReceiveBufferSize = -1;

private volatile boolean active;
private int receiveBufferSize = 2048;

private volatile boolean listening;
private String localAddress;

private volatile String localAddress;
private Executor taskExecutor;

private volatile Executor taskExecutor;
private boolean taskExecutorSet;

private volatile boolean taskExecutorSet;
private int poolSize = 5;

private volatile int poolSize = 5;
private volatile boolean active;

private volatile boolean listening;


public AbstractInternetProtocolReceivingChannelAdapter(int port) {
Expand Down Expand Up @@ -182,6 +187,15 @@ public Executor getTaskExecutor() {
return this.taskExecutor;
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

/**
* @return the active
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,6 @@ public class TcpConnectionServerExceptionEvent extends IpIntegrationEvent {
public TcpConnectionServerExceptionEvent(Object connectionFactory, Throwable cause) {
super(connectionFactory, cause);
Assert.notNull(cause, "'cause' cannot be null");
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package org.springframework.integration.ip.tcp.connection;

import org.springframework.integration.ip.event.IpIntegrationEvent;
import org.springframework.util.Assert;

/**
* {@link IpIntegrationEvent} emitted when a server begins listening. Useful
Expand All @@ -35,7 +34,6 @@ public class TcpConnectionServerListeningEvent extends IpIntegrationEvent {

public TcpConnectionServerListeningEvent(TcpServerConnectionFactory connectionFactory, int port) {
super(connectionFactory);
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
this.port = port;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.ip.udp;

import org.springframework.integration.ip.event.IpIntegrationEvent;

/**
* {@link IpIntegrationEvent} emitted when a server begins listening. Useful
* when the configured port is zero and the operating system chooses the port.
* Also useful to avoid polling the {@code isListening()} if you need to wait
* before starting some other process to connect to the socket.
*
* @author Gary Russell
* @since 5.0.2
*/
@SuppressWarnings("serial")
public class UdpServerListeningEvent extends IpIntegrationEvent {

private final int port;

public UdpServerListeningEvent(UnicastReceivingChannelAdapter adapter, int port) {
super(adapter);
this.port = port;
}

public int getPort() {
return this.port;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.ip.AbstractInternetProtocolReceivingChannelAdapter;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -108,6 +109,11 @@ protected void onInit() {
public void run() {
getSocket();

ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new UdpServerListeningEvent(this, getPort()));
}

if (logger.isDebugEnabled()) {
logger.debug("UDP Receiver running on port:" + this.getPort());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,13 +21,18 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
Expand All @@ -43,6 +48,7 @@
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;
import org.springframework.integration.ip.udp.MulticastSendingMessageHandler;
import org.springframework.integration.ip.udp.UdpServerListeningEvent;
import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter;
import org.springframework.integration.ip.util.TestingUtilities;
import org.springframework.integration.support.MessageBuilder;
Expand Down Expand Up @@ -77,6 +83,9 @@ public class IpIntegrationTests {
@Autowired
private QueueChannel udpIn;

@Autowired
private Config config;

@Test
public void testTcpAdapters() throws Exception {
ApplicationEventPublisher publisher = e -> { };
Expand Down Expand Up @@ -119,8 +128,9 @@ public void testTcpGateways() {
}

@Test
public void testUdp() {
TestingUtilities.waitListening(this.udpInbound, null);
public void testUdp() throws Exception {
assertTrue(this.config.listeningLatch.await(10, TimeUnit.SECONDS));
assertEquals(this.udpInbound.getPort(), this.config.serverPort);
Message<String> outMessage = MessageBuilder.withPayload("foo")
.setHeader("udp_dest", "udp://localhost:" + this.udpInbound.getPort())
.build();
Expand Down Expand Up @@ -148,6 +158,10 @@ public void testUdpInheritance() {
@EnableIntegration
public static class Config {

private final CountDownLatch listeningLatch = new CountDownLatch(1);

private volatile int serverPort;

@Bean
public AbstractServerConnectionFactory server1() {
return Tcp.netServer(0)
Expand Down Expand Up @@ -181,6 +195,14 @@ public IntegrationFlow outUdpAdapter() {
return f -> f.handle(Udp.outboundAdapter(m -> m.getHeaders().get("udp_dest")));
}

@Bean
public ApplicationListener<UdpServerListeningEvent> events() {
return (ApplicationListener<UdpServerListeningEvent>) event -> {
this.serverPort = event.getPort();
this.listeningLatch.countDown();
};
}

}

}
7 changes: 7 additions & 0 deletions src/reference/asciidoc/ip.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ public IntegrationFlow udpIn() {
}
----

==== Server Listening Events

Starting with _version 5.0.2_, a `UdpServerListeningEvent` is emitted when an inbound adapter is started and has begun listening.
This is useful when the adapter is configured to listen on port 0, meaning that the operating system chooses the port.
It can also be used instead of polling `isListening()`, if you need to wait before starting some other process that will
connect to the socket.

==== Advanced Outbound Configuration

The `destination-expression` and `socket-expression` options are available
Expand Down

0 comments on commit 30fac62

Please sign in to comment.