Skip to content

Commit

Permalink
Log throughput of file upload and download on elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustovic committed May 2, 2019
1 parent 36d020d commit a6e0c5a
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 36 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The project **Data Exchange Client** allows you to connect to SFTP or FTP servers and configure upload and download pullers.

Current stable version: 1.0.1
Current stable version: 1.2.1

## How to use

Expand Down Expand Up @@ -192,6 +192,19 @@ app:
pattern: pictures-%s.tarz
```
#### log throughput:
Push throughput speeds for every file to ElasticSearch. Internally we are using RestHighLevelClient so it needs to be configured via Spring Boot.
```yaml
spring:
elasticsearch:
rest:
uris: localhost # Needed for RestHighLevelClient

app:
es:
index_pattern: "'data-exchange-client-' + T(java.time.LocalDate).now().format(T(java.time.format.DateTimeFormatter).ofPattern('YYYY-MM'))"
```
### Notes while doing upload / download
Expand Down
29 changes: 16 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.dataexchange</groupId>
<artifactId>data-exchange-client</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<packaging>jar</packaging>

<name>data-exchange-client</name>
Expand All @@ -14,21 +14,17 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<java.version>1.8</java.version>

<!-- Normally using >${spring.version} should work, but spring boot is a bit ahead of integration so we need to manually
specify lower version of spring
-->
<spring.sftp.version>5.1.1.RELEASE</spring.sftp.version>
<spring.ftp.version>5.1.1.RELEASE</spring.ftp.version>

<commons-lang3.version>3.8.1</commons-lang3.version>
<spring-integration-aws.version>2.0.1.RELEASE</spring-integration-aws.version>
<spring-integration-zip.version>1.0.2.RELEASE</spring-integration-zip.version>

<commons-lang3.version>3.8.1</commons-lang3.version>
<com.amazonaws.sdk>1.11.336</com.amazonaws.sdk>

<!-- Test -->
Expand All @@ -53,13 +49,11 @@
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>${spring.sftp.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>${spring.ftp.version}</version>
</dependency>

<dependency>
Expand All @@ -70,7 +64,16 @@
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zip</artifactId>
<version>1.0.2.RELEASE</version>
<version>${spring-integration-zip.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -133,5 +136,5 @@
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import com.dataexchange.client.config.model.MainConfiguration;
import com.dataexchange.client.config.model.SftpPollerConfiguration;
import com.dataexchange.client.domain.ConnectionMonitor;
import com.dataexchange.client.infrastructure.integration.file.LoggingSessionFactory;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPClientConfig;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.stereotype.Component;
Expand All @@ -35,6 +39,10 @@ public class DynamicConfigurationCreator {
private SftpFlow sftpFlow;
@Autowired
private FtpFlow ftpFlow;
@Autowired(required = false)
private RestHighLevelClient restHighLevelClient;
@Value("${app.es.index_pattern:#{null}}")
private String indexPattern;

@PostConstruct
public void setup() {
Expand Down Expand Up @@ -101,7 +109,8 @@ private CachingSessionFactory sftpSessionFactory(SftpPollerConfiguration sftpPol
sftpSessionFactory.setTimeout(30_000);
sftpSessionFactory.setAllowUnknownKeys(true);

return new CachingSessionFactory(sftpSessionFactory, 5);
return createSessionFactory(sftpSessionFactory, 5, sftpPollerConfiguration.getUsername(),
sftpPollerConfiguration.getHost());
}

private CachingSessionFactory ftpSessionFactory(FtpPollerConfiguration ftpPollerConfiguration) {
Expand All @@ -121,7 +130,17 @@ private CachingSessionFactory ftpSessionFactory(FtpPollerConfiguration ftpPoller
ftpSessionFactory.setConfig(ftpClientConfig);
}

return new CachingSessionFactory(ftpSessionFactory, 5);
return createSessionFactory(ftpSessionFactory, 5, ftpPollerConfiguration.getUsername(),
ftpPollerConfiguration.getHost());
}

private CachingSessionFactory createSessionFactory(SessionFactory sessionFactory, int sessionCacheSize,
String connectionUsername, String connectionHost) {
if (indexPattern == null) {
return new CachingSessionFactory(sessionFactory, sessionCacheSize);
} else {
return new LoggingSessionFactory(restHighLevelClient, indexPattern, sessionFactory, sessionCacheSize,
connectionUsername, connectionHost);
}
}
}
13 changes: 9 additions & 4 deletions src/main/java/com/dataexchange/client/config/flows/FtpFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public void downloadSetup(CachingSessionFactory ftpSessionFactory, DownloadPolle
.maxMessagesPerPoll(100)
.errorHandler(e -> connectionMonitorHelper.handleConnectionError(name, e))
.advice(encrichLogsWithConnectionInfo(username, config.getOutputFolder()),
clearLogContext(), connectionMonitorHelper.connectionSuccessAdvice(name))
clearLogContext(),
connectionMonitorHelper.connectionSuccessAdvice(name)
)
))
.<File, Boolean>route(f -> hasSemaphoreSemantics(f, config), semaphoreRouterAndOutboundAdapter(config));

Expand All @@ -62,9 +64,12 @@ public void uploadSetup(CachingSessionFactory ftpSessionFactory, UploadPollerCon
conf -> conf.poller(Pollers.fixedRate(10000).maxMessagesPerPoll(100)))
.enrichHeaders(h -> h.header("destination_folder", config.getProcessedFolder()))
.handle(ftpOutboundAdapter(ftpSessionFactory, config),
c -> c.advice(encrichLogsWithConnectionInfo(username,
config.getRemoteOutputFolder()), enrichLogsContextWithFileInfo(), clearLogContext(),
RetryAdvice.retry(), moveFileAdvice, connectionMonitorHelper.connectionSuccessAdvice(name),
c -> c.advice(encrichLogsWithConnectionInfo(username, config.getRemoteOutputFolder()),
enrichLogsContextWithFileInfo(),
clearLogContext(),
RetryAdvice.retry(),
moveFileAdvice,
connectionMonitorHelper.connectionSuccessAdvice(name),
connectionMonitorHelper.connectionErrorAdvice(name)
)
).get();
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/com/dataexchange/client/config/flows/SftpFlow.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.dataexchange.client.config.flows;


import com.dataexchange.client.config.model.DownloadPollerConfiguration;
import com.dataexchange.client.config.model.UploadPollerConfiguration;
import com.dataexchange.client.domain.util.ConnectionMonitorHelper;
Expand Down Expand Up @@ -51,7 +50,10 @@ public void downloadSetup(CachingSessionFactory sftpSessionFactory, DownloadPoll
.maxMessagesPerPoll(100)
.errorHandler(e -> connectionMonitorHelper.handleConnectionError(name, e))
.advice(encrichLogsWithConnectionInfo(username, config.getOutputFolder()),
clearLogContext(), connectionMonitorHelper.connectionSuccessAdvice(name))))
clearLogContext(),
connectionMonitorHelper.connectionSuccessAdvice(name)
)
))
.<File, Boolean>route(f -> hasSemaphoreSemantics(f, config), semaphoreRouterAndOutboundAdapter(config));

String beanName = "sftpDownloadFlow-" + config.getName();
Expand All @@ -63,10 +65,14 @@ public void uploadSetup(CachingSessionFactory sftpSessionFactory, UploadPollerCo
.from(FileAdapterHelper.fileMessageSource(config.getInputFolder(), config.getRegexFilter()), secondsPoller(10, 100))
.enrichHeaders(h -> h.header("destination_folder", config.getProcessedFolder()))
.handle(sftpOutboundAdapter(sftpSessionFactory, config),
conf -> conf.advice(encrichLogsWithConnectionInfo(username,
config.getRemoteOutputFolder()), enrichLogsContextWithFileInfo(),
clearLogContext(), RetryAdvice.retry(), moveFileAdvice,
connectionMonitorHelper.connectionSuccessAdvice(name), connectionMonitorHelper.connectionErrorAdvice(name)
conf -> conf.advice(
encrichLogsWithConnectionInfo(username, config.getRemoteOutputFolder()),
enrichLogsContextWithFileInfo(),
clearLogContext(),
RetryAdvice.retry(),
moveFileAdvice,
connectionMonitorHelper.connectionSuccessAdvice(name),
connectionMonitorHelper.connectionErrorAdvice(name)
)
).get();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.dataexchange.client.domain.model;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -12,8 +9,6 @@

public class ConnectionStatus {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionStatus.class);

enum ConnectionAliveStatus {
UNKNOWN, UP, DOWN
}
Expand All @@ -37,9 +32,7 @@ public void up() {

public void down(String message) {
if (status != DOWN) {
LOGGER.error(message);
downSince = LocalDateTime.now();

}
status = DOWN;
lastCheck = LocalDateTime.now();
Expand Down
18 changes: 16 additions & 2 deletions src/main/java/com/dataexchange/client/domain/util/LogHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,31 @@
import org.slf4j.MDC;
import org.springframework.aop.AfterReturningAdvice;
import org.springframework.aop.MethodBeforeAdvice;
import org.springframework.aop.ThrowsAdvice;
import org.springframework.integration.handler.advice.AbstractHandleMessageAdvice;
import org.springframework.messaging.Message;
import org.springframework.util.StringUtils;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.LinkedHashSet;

public final class LogHelper {

public static AfterReturningAdvice clearLogContext() {
return (returnValue, method, args, target) -> MDC.clear();
public static class ClearLogContextAdvice implements AfterReturningAdvice, ThrowsAdvice {

@Override
public void afterReturning(Object o, Method method, Object[] objects, Object o1) {
MDC.clear();
}

public void afterThrowing(Exception ex) {
MDC.clear();
}
}

public static ClearLogContextAdvice clearLogContext() {
return new ClearLogContextAdvice();
}

public static MethodBeforeAdvice encrichLogsWithConnectionInfo(String username, String folder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.dataexchange.client.domain.model.exception.ConnectionAlreadyRegistered;
import com.dataexchange.client.domain.model.exception.NoConnection;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Collections;
Expand All @@ -14,6 +16,8 @@
@Service
public class ConnectionMonitorImpl implements ConnectionMonitor {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionMonitorImpl.class);

private final Map<String, ConnectionStatus> connections;

public ConnectionMonitorImpl() {
Expand All @@ -37,6 +41,7 @@ public void up(String connectionName) {
public void down(String connectionName, Throwable e) {
String message = "Connection down for " + connectionName + ". Cause: " + ExceptionUtils.getRootCauseMessage(e);
findConnectionStatus(connectionName).down(message);
LOGGER.error("Connection down", e);
}

@Override
Expand Down
Loading

0 comments on commit a6e0c5a

Please sign in to comment.