Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/260 #272

Merged
merged 5 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public class CamelAws2s3SinkConnector extends CamelSinkConnector {
public ConfigDef config() {
return CamelAws2s3SinkConnectorConfig.conf();
}

@Override
public Class<? extends Task> taskClass() {
return CamelAws2s3SinkTask.class;
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
public class CamelAws2s3SinkTask extends CamelSinkTask {

@Override
protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(Map<String, String> props) {
protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(
Map<String, String> props) {
return new CamelAws2s3SinkConnectorConfig(props);
}

@Override
protected Map<String, String> getDefaultConfig() {
return new HashMap<String, String>() {
{
put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-s3");
}
};
return new HashMap<String, String>() {{
put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-s3");
}};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public class CamelAws2s3SourceConnector extends CamelSourceConnector {
public ConfigDef config() {
return CamelAws2s3SourceConnectorConfig.conf();
}

@Override
public Class<? extends Task> taskClass() {
return CamelAws2s3SourceTask.class;
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
public class CamelAws2s3SourceTask extends CamelSourceTask {

@Override
protected CamelSourceConnectorConfig getCamelSourceConnectorConfig(Map<String, String> props) {
protected CamelSourceConnectorConfig getCamelSourceConnectorConfig(
Map<String, String> props) {
return new CamelAws2s3SourceConnectorConfig(props);
}

@Override
protected Map<String, String> getDefaultConfig() {
return new HashMap<String, String>() {
{
put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "aws2-s3");
}
};
return new HashMap<String, String>() {{
put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "aws2-s3");
}};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

package org.apache.camel.kafkaconnector.aws.common;

import java.util.Iterator;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;

public final class AWSCommon {
/**
* The default SQS queue name used during the tests
Expand Down Expand Up @@ -46,4 +55,54 @@ public final class AWSCommon {
private AWSCommon() {

}

/**
* Delete an S3 bucket using the provided client. Coming from AWS documentation:
* https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
* @param s3Client the AmazonS3 client instance used to delete the bucket
* @param bucketName a String containing the bucket name
*/
public static void deleteBucket(AmazonS3 s3Client, String bucketName) {
// Delete all objects from the bucket. This is sufficient
// for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
// delete markers for all objects, but doesn't delete the object versions.
// To delete objects from versioned buckets, delete all of the object versions before deleting
// the bucket (see below for an example).
ObjectListing objectListing = s3Client.listObjects(bucketName);
while (true) {
Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator();
while (objIter.hasNext()) {
s3Client.deleteObject(bucketName, objIter.next().getKey());
}

// If the bucket contains many objects, the listObjects() call
// might not return all of the objects in the first listing. Check to
// see whether the listing was truncated. If so, retrieve the next page of objects
// and delete them.
if (objectListing.isTruncated()) {
objectListing = s3Client.listNextBatchOfObjects(objectListing);
} else {
break;
}
}

// Delete all object versions (required for versioned buckets).
VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName));
while (true) {
Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator();
while (versionIter.hasNext()) {
S3VersionSummary vs = versionIter.next();
s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId());
}

if (versionList.isTruncated()) {
versionList = s3Client.listNextBatchOfVersions(versionList);
} else {
break;
}
}

// After all objects and object versions are deleted, delete the bucket.
s3Client.deleteBucket(bucketName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public void tearDown() {
}

awsKinesisClient.shutdown();

deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private boolean checkRecord(ConsumerRecord<String, String> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -47,9 +46,9 @@

@Testcontainers
public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {

@RegisterExtension
public static AWSService<AmazonS3> service = AWSServiceFactory.createS3Service();

private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);

private AmazonS3 awsS3Client;
Expand All @@ -75,14 +74,15 @@ public void setUp() {
}
}


@AfterEach
public void tearDown() {
try {
awsS3Client.deleteBucket(AWSCommon.DEFAULT_S3_BUCKET);
AWSCommon.deleteBucket(awsS3Client, AWSCommon.DEFAULT_S3_BUCKET);
} catch (Exception e) {
LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
}

deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private boolean checkRecord(ConsumerRecord<String, String> record) {
Expand Down Expand Up @@ -158,8 +158,6 @@ public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, Inte
runTest(connectorPropertyFactory);
}


@Disabled("Disabled due to issue #260")
@Test
@Timeout(180)
public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
Expand All @@ -168,13 +166,15 @@ public void testBasicSendReceiveUsingUrl() throws ExecutionException, Interrupte
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withConfiguration(TestS3Configuration.class.getName())
.withUrl(AWSCommon.DEFAULT_S3_BUCKET)
.append("configuration", CamelAWSS3PropertyFactory.classRef(TestS3Configuration.class.getName()))
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();
.buildUrl();

runTest(connectorPropertyFactory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -70,6 +71,11 @@ public void setUp() {
received = 0;
}

@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private boolean checkMessages(List<Message> messages) {
for (Message message : messages) {
LOG.info("Received: {}", message.getBody());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void setUp() {

@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
fail("Failed to delete queue");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public void setUp() {

@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));

if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
fail("Failed to delete queue");
}
Expand Down Expand Up @@ -142,12 +144,12 @@ public void testBasicSendReceiveUsingUrl() throws ExecutionException, Interrupte
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withUrl(AWSCommon.DEFAULT_SQS_QUEUE)
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.appendIfAvailable("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.appendIfAvailable("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();

runTest(connectorPropertyFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public void tearDown() {
if (testDataDao != null) {
testDataDao.dropTable();
}

deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}

private void putRecords(CountDownLatch latch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

package org.apache.camel.kafkaconnector.common;

import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public abstract class AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaTest.class);

@RegisterExtension
public final KafkaService kafkaService;
Expand Down Expand Up @@ -54,8 +58,16 @@ public KafkaService getKafkaService() {
return kafkaService;
}


public KafkaConnectService getKafkaConnectService() {
return kafkaConnectService;
}

protected void deleteKafkaTopic(String topic) {
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.deleteTopic(topic);
} catch (Throwable t) {
LOG.warn("Topic not deleted (probably the Kafka test cluster was already shutting down?).", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Predicate;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -37,6 +40,8 @@
* @param <V> Value type
*/
public class KafkaClient<K, V> {
private final ConsumerPropertyFactory consumerPropertyFactory;
private final ProducerPropertyFactory producerPropertyFactory;
private KafkaProducer<K, V> producer;
private KafkaConsumer<K, V> consumer;

Expand All @@ -48,8 +53,8 @@ public class KafkaClient<K, V> {
* PLAINTEXT://${address}:${port}
*/
public KafkaClient(String bootstrapServer) {
ConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
ProducerPropertyFactory producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer);
consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer);

producer = new KafkaProducer<>(producerPropertyFactory.getProperties());
consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties());
Expand Down Expand Up @@ -93,5 +98,14 @@ public void produce(String topic, V message) throws ExecutionException, Interrup
future.get();
}


/**
* Delete a topic
*
* @param topic the topic to be deleted
*/
public void deleteTopic(String topic) {
Properties props = producerPropertyFactory.getProperties();
AdminClient admClient = AdminClient.create(props);
admClient.deleteTopics(Collections.singleton(topic));
}
}
Loading