diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
index 439a9f3d78..0564e58734 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -120,9 +120,10 @@ public synchronized int size() {
*/
public synchronized List fetchRange(int start, int end, boolean removed) {
- if (start < 0 || end > this.size() || start > end) {
+ if (start < 0 || start > end) {
throw new IllegalArgumentException("Invalid range");
}
+ end = Math.min(end, this.size());
Iterator iterator = this.iterator();
List items = new ArrayList<>(end - start);
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 8a14756372..9b6038bdea 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -20,10 +20,10 @@
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
-import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler;
-import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler;
-import org.apache.eventmesh.connector.http.sink.handle.RetryHttpSinkHandler;
-import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.impl.CommonHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.impl.HttpSinkHandlerRetryWrapper;
+import org.apache.eventmesh.connector.http.sink.handler.impl.WebhookHttpSinkHandler;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
@@ -86,7 +86,7 @@ private void doInit() {
this.sinkHandler = nonRetryHandler;
} else if (maxRetries > 0) {
// Wrap the sink handler with a retry handler
- this.sinkHandler = new RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler);
+ this.sinkHandler = new HttpSinkHandlerRetryWrapper(this.httpSinkConfig.connectorConfig, nonRetryHandler);
} else {
throw new IllegalArgumentException("Max retries must be greater than or equal to 0.");
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
index 0bceac7d47..08c3a323e7 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
@@ -24,8 +24,8 @@ public class HttpRetryConfig {
// maximum number of retries, default 2, minimum 0
private int maxRetries = 2;
- // retry interval, default 2000ms
- private int interval = 2000;
+ // retry interval, default 1000ms
+ private int interval = 1000;
// Default value is false, indicating that only requests with network-level errors will be retried.
// If set to true, all failed requests will be retried, including network-level errors and non-2xx responses.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
index a258c6ab53..95b40afe9e 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
@@ -20,31 +20,60 @@
import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.Builder;
-import lombok.Data;
+import lombok.Getter;
/**
* a special ConnectRecord for HttpSinkConnector
*/
-@Data
+@Getter
@Builder
-public class HttpConnectRecord {
+public class HttpConnectRecord implements Serializable {
- private String type;
+ private static final long serialVersionUID = 5271462532332251473L;
+
+ /**
+ * The unique identifier for the HttpConnectRecord
+ */
+ private final String httpRecordId = UUID.randomUUID().toString();
- private String time;
+ /**
+ * The time when the HttpConnectRecord was created
+ */
+ private LocalDateTime createTime;
- private String uuid;
+ /**
+ * The type of the HttpConnectRecord
+ */
+ private String type;
+ /**
+ * The event id of the HttpConnectRecord
+ */
private String eventId;
+ /**
+ * The ConnectRecord to be sent
+ */
private ConnectRecord data;
+ @Override
+ public String toString() {
+ return "HttpConnectRecord{"
+ + "createTime=" + createTime
+ + ", httpRecordId='" + httpRecordId
+ + ", type='" + type
+ + ", eventId='" + eventId
+ + ", data=" + data
+ + '}';
+ }
+
/**
* Convert ConnectRecord to HttpConnectRecord
*
@@ -62,11 +91,8 @@ public static HttpConnectRecord convertConnectRecord(ConnectRecord record, Strin
}
return HttpConnectRecord.builder()
.type(type)
- .time(LocalDateTime.now().toString())
- .uuid(UUID.randomUUID().toString())
.eventId(type + "-" + offset)
.data(record)
.build();
}
-
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
index 848012f152..41a5087870 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.connector.http.sink.data;
+import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Builder;
@@ -27,7 +28,10 @@
*/
@Data
@Builder
-public class HttpExportMetadata {
+public class HttpExportMetadata implements Serializable {
+
+ private static final long serialVersionUID = 1121010466793041920L;
+
private String url;
private int code;
@@ -36,7 +40,9 @@ public class HttpExportMetadata {
private LocalDateTime receivedTime;
- private String uuid;
+ private String httpRecordId;
+
+ private String recordId;
private String retriedBy;
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java
index b6382aee7a..c6bdb02884 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.connector.http.sink.data;
+import java.io.Serializable;
+
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -25,7 +27,9 @@
*/
@Data
@AllArgsConstructor
-public class HttpExportRecord {
+public class HttpExportRecord implements Serializable {
+
+ private static final long serialVersionUID = 6010283911452947157L;
private HttpExportMetadata metadata;
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java
index 5c44eb3b7f..81e582c33a 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.connector.http.sink.data;
+import java.io.Serializable;
import java.util.List;
import lombok.AllArgsConstructor;
@@ -27,7 +28,9 @@
*/
@Data
@AllArgsConstructor
-public class HttpExportRecordPage {
+public class HttpExportRecordPage implements Serializable {
+
+ private static final long serialVersionUID = 1143791658357035990L;
private int pageNum;
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
new file mode 100644
index 0000000000..4b229f9839
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.data;
+
+import lombok.Data;
+
+/**
+ * Single HTTP retry event
+ */
+@Data
+public class HttpRetryEvent {
+
+ public static final String PREFIX = "http-retry-event-";
+
+ private String parentId;
+
+ private int maxRetries;
+
+ private int currentRetries;
+
+ private Throwable lastException;
+
+ /**
+ * Increase the current retries by 1
+ */
+ public void increaseCurrentRetries() {
+ this.currentRetries++;
+ }
+
+ /**
+ * Check if the current retries is greater than or equal to the max retries
+ * @return true if the current retries is greater than or equal to the max retries
+ */
+ public boolean isMaxRetriesReached() {
+ return this.currentRetries >= this.maxRetries;
+ }
+
+ /**
+ * Get the limited exception message with the default limit of 256
+ * @return the limited exception message
+ */
+ public String getLimitedExceptionMessage() {
+ return getLimitedExceptionMessage(256);
+ }
+
+ /**
+ * Get the limited exception message with the specified limit
+ * @param maxLimit the maximum limit of the exception message
+ * @return the limited exception message
+ */
+ public String getLimitedExceptionMessage(int maxLimit) {
+ if (lastException == null) {
+ return "";
+ }
+ String message = lastException.getMessage();
+ if (message == null) {
+ return "";
+ }
+ if (message.length() > maxLimit) {
+ return message.substring(0, maxLimit);
+ }
+ return message;
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
new file mode 100644
index 0000000000..67ab943818
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.data;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Multi HTTP request context
+ */
+public class MultiHttpRequestContext {
+
+ public static final String NAME = "multi-http-request-context";
+
+ /**
+ * The remaining requests to be processed.
+ */
+ private final AtomicInteger remainingRequests;
+
+ /**
+ * The last failed event.
+ * If there are no retries or retries are not enabled, it will be null.
+ * If retries occur but still fail, it will be logged, and only the last one will be retained.
+ */
+ private HttpRetryEvent lastFailedEvent;
+
+ public MultiHttpRequestContext(int remainingEvents) {
+ this.remainingRequests = new AtomicInteger(remainingEvents);
+ }
+
+ /**
+ * Decrement the remaining requests by 1.
+ */
+ public void decrementRemainingRequests() {
+ remainingRequests.decrementAndGet();
+ }
+
+ public int getRemainingRequests() {
+ return remainingRequests.get();
+ }
+
+ public HttpRetryEvent getLastFailedEvent() {
+ return lastFailedEvent;
+ }
+
+ public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
+ this.lastFailedEvent = lastFailedEvent;
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
deleted file mode 100644
index bc2a536107..0000000000
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.http.sink.handle;
-
-import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig;
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
-import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
-import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
-import org.apache.eventmesh.connector.http.util.HttpUtils;
-import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-
-import java.net.ConnectException;
-import java.net.URI;
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import io.vertx.core.Future;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.HttpResponse;
-
-import lombok.extern.slf4j.Slf4j;
-
-import dev.failsafe.Failsafe;
-import dev.failsafe.RetryPolicy;
-import dev.failsafe.RetryPolicyBuilder;
-import dev.failsafe.event.ExecutionEvent;
-
-
-@Slf4j
-public class RetryHttpSinkHandler implements HttpSinkHandler {
-
- private final SinkConnectorConfig connectorConfig;
-
- // Retry policy builder
- private RetryPolicyBuilder> retryPolicyBuilder;
-
- private final List urls;
-
- private final HttpSinkHandler sinkHandler;
-
-
- public RetryHttpSinkHandler(SinkConnectorConfig connectorConfig, HttpSinkHandler sinkHandler) {
- this.connectorConfig = connectorConfig;
- this.sinkHandler = sinkHandler;
-
- // Initialize retry
- initRetry();
-
- // Initialize URLs
- String[] urlStrings = connectorConfig.getUrls();
- this.urls = Arrays.stream(urlStrings)
- .map(URI::create)
- .collect(Collectors.toList());
- }
-
- private void initRetry() {
- HttpRetryConfig httpRetryConfig = this.connectorConfig.getRetryConfig();
-
- this.retryPolicyBuilder = RetryPolicy.>builder()
- .handleIf(e -> e instanceof ConnectException)
- .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
- .withMaxRetries(httpRetryConfig.getMaxRetries())
- .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()));
- }
-
-
- /**
- * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig.
- */
- @Override
- public void start() {
- sinkHandler.start();
- }
-
-
- /**
- * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
- *
- * @param record the ConnectRecord to process
- */
- @Override
- public void handle(ConnectRecord record) {
- for (URI url : this.urls) {
- // convert ConnectRecord to HttpConnectRecord
- String type = String.format("%s.%s.%s",
- this.connectorConfig.getConnectorName(), url.getScheme(),
- this.connectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
- HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
- // handle the HttpConnectRecord
- deliver(url, httpConnectRecord);
- }
- }
-
-
- /**
- * Processes HttpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the
- * HttpConnectRecord
- *
- * @param url URI to which the HttpConnectRecord should be sent
- * @param httpConnectRecord HttpConnectRecord to process
- * @return processing chain
- */
- @Override
- public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
- // Only webhook mode needs to use the UUID to identify the request
- String id = httpConnectRecord.getUuid();
-
- // Build the retry policy
- RetryPolicy> retryPolicy = retryPolicyBuilder
- .onSuccess(event -> {
- if (connectorConfig.getWebhookConfig().isActivate()) {
- // convert the result to an HttpExportRecord
- HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
- // add the data to the queue
- ((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
- }
- })
- .onRetry(event -> {
- if (log.isDebugEnabled()) {
- log.warn("Retrying the request to {} for the {} time. HttpConnectRecord= {}", url, event.getAttemptCount(), httpConnectRecord);
- } else {
- log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
- }
- if (connectorConfig.getWebhookConfig().isActivate()) {
- HttpExportRecord exportRecord =
- covertToExportRecord(httpConnectRecord, event, event.getLastResult(), event.getLastException(), url, id);
- ((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
- }
- // update the HttpConnectRecord
- httpConnectRecord.setTime(LocalDateTime.now().toString());
- httpConnectRecord.setUuid(UUID.randomUUID().toString());
- })
- .onFailure(event -> {
- if (log.isDebugEnabled()) {
- log.error("Failed to send the request to {} after {} attempts. HttpConnectRecord= {}", url, event.getAttemptCount(),
- httpConnectRecord, event.getException());
- } else {
- log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
- }
- if (connectorConfig.getWebhookConfig().isActivate()) {
- HttpExportRecord exportRecord = covertToExportRecord(httpConnectRecord, event, event.getResult(), event.getException(), url, id);
- ((WebhookHttpSinkHandler) sinkHandler).getReceivedDataQueue().offer(exportRecord);
- }
- }).build();
-
- // Handle the HttpConnectRecord with retry
- Failsafe.with(retryPolicy)
- .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord).toCompletionStage());
-
- return null;
- }
-
- /**
- * Converts the ExecutionCompletedEvent to an HttpExportRecord.
- *
- * @param httpConnectRecord HttpConnectRecord
- * @param event ExecutionEvent
- * @param response the response of the request, may be null
- * @param e the exception thrown during the request, may be null
- * @param url the URL the request was sent to
- * @param id UUID
- * @return the converted HttpExportRecord
- */
- private HttpExportRecord covertToExportRecord(HttpConnectRecord httpConnectRecord, ExecutionEvent event, HttpResponse response,
- Throwable e, URI url, String id) {
-
- HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder()
- .url(url.toString())
- .code(response != null ? response.statusCode() : -1)
- .message(response != null ? response.statusMessage() : e.getMessage())
- .receivedTime(LocalDateTime.now())
- .uuid(httpConnectRecord.getUuid())
- .retriedBy(event.getAttemptCount() > 1 ? id : null)
- .retryNum(event.getAttemptCount() - 1).build();
-
- return new HttpExportRecord(httpExportMetadata, response == null ? null : response.bodyAsString());
- }
-
- /**
- * Cleans up and releases resources used by the HTTP/HTTPS handler.
- */
- @Override
- public void stop() {
- sinkHandler.stop();
- }
-}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
new file mode 100644
index 0000000000..36d01115bb
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handler;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * AbstractHttpSinkHandler is an abstract class that provides a base implementation for HttpSinkHandler.
+ */
+public abstract class AbstractHttpSinkHandler implements HttpSinkHandler {
+
+ private final SinkConnectorConfig sinkConnectorConfig;
+
+ private final List urls;
+
+ protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+ this.sinkConnectorConfig = sinkConnectorConfig;
+ // Initialize URLs
+ String[] urlStrings = sinkConnectorConfig.getUrls();
+ this.urls = Arrays.stream(urlStrings)
+ .map(URI::create)
+ .collect(Collectors.toList());
+ }
+
+ public SinkConnectorConfig getSinkConnectorConfig() {
+ return sinkConnectorConfig;
+ }
+
+ public List getUrls() {
+ return urls;
+ }
+
+ /**
+ * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
+ *
+ * @param record the ConnectRecord to process
+ */
+ @Override
+ public void handle(ConnectRecord record) {
+ // build attributes
+ Map attributes = new ConcurrentHashMap<>();
+ attributes.put(MultiHttpRequestContext.NAME, new MultiHttpRequestContext(urls.size()));
+
+ // send the record to all URLs
+ for (URI url : urls) {
+ // convert ConnectRecord to HttpConnectRecord
+ String type = String.format("%s.%s.%s",
+ this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
+ this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
+ HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
+
+ // add retry event to attributes
+ HttpRetryEvent retryEvent = new HttpRetryEvent();
+ retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries());
+ attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId(), retryEvent);
+
+ // deliver the record
+ deliver(url, httpConnectRecord, attributes);
+ }
+ }
+
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
similarity index 83%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
index 09fd66a762..1731809ab9 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.net.URI;
+import java.util.Map;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
@@ -32,14 +33,14 @@
*
* Any class that needs to process ConnectRecords via HTTP or HTTPS should implement this interface.
* Implementing classes must provide implementations for the {@link #start()}, {@link #handle(ConnectRecord)},
- * {@link #deliver(URI, HttpConnectRecord)}, and {@link #stop()} methods.
+ * {@link #deliver(URI, HttpConnectRecord, Map)}, and {@link #stop()} methods.
*
* Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently.
* The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a
- * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord)} method processes HttpConnectRecord on specified URL
- * while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord, Map)} method processes HttpConnectRecord on specified
+ * URL while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
*
- * It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord)} method
+ *
It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord, Map)} method
* to prevent message loss or processing interruptions.
*/
public interface HttpSinkHandler {
@@ -62,9 +63,10 @@ public interface HttpSinkHandler {
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
* @return processing chain
*/
- Future> deliver(URI url, HttpConnectRecord httpConnectRecord);
+ Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes);
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
similarity index 57%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index 4bc365a139..0907847455 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler.impl;
import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
+import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.core.Future;
@@ -60,22 +60,13 @@
*/
@Slf4j
@Getter
-public class CommonHttpSinkHandler implements HttpSinkHandler {
-
- private final SinkConnectorConfig connectorConfig;
-
- private final List urls;
+public class CommonHttpSinkHandler extends AbstractHttpSinkHandler {
private WebClient webClient;
public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
- this.connectorConfig = sinkConnectorConfig;
- // Initialize URLs
- String[] urlStrings = sinkConnectorConfig.getUrls();
- this.urls = Arrays.stream(urlStrings)
- .map(URI::create)
- .collect(Collectors.toList());
+ super(sinkConnectorConfig);
}
/**
@@ -91,41 +82,57 @@ public void start() {
* Initializes the WebClient with the provided configuration options.
*/
private void doInitWebClient() {
+ SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig();
final Vertx vertx = Vertx.vertx();
WebClientOptions options = new WebClientOptions()
- .setKeepAlive(this.connectorConfig.isKeepAlive())
- .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000)
- .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+ .setKeepAlive(sinkConnectorConfig.isKeepAlive())
+ .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 1000)
+ .setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
- .setConnectTimeout(this.connectorConfig.getConnectionTimeout())
- .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize());
+ .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
+ .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
this.webClient = WebClient.create(vertx, options);
}
/**
- * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
+ * URL using the WebClient.
*
- * @param record the ConnectRecord to process
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
+ * @return processing chain
*/
@Override
- public void handle(ConnectRecord record) {
- for (URI url : this.urls) {
- // convert ConnectRecord to HttpConnectRecord
- String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common");
- HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
- // get timestamp and offset
- Long timestamp = httpConnectRecord.getData().getTimestamp();
- Map offset = null;
- try {
- // May throw NullPointerException.
- offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
- } catch (NullPointerException e) {
- // ignore null pointer exception
- }
- final Map finalOffset = offset;
- Future> responseFuture = deliver(url, httpConnectRecord);
- responseFuture.onSuccess(res -> {
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) {
+ // create headers
+ MultiMap headers = HttpHeaders.headers()
+ .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
+ .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
+
+ // get timestamp and offset
+ Long timestamp = httpConnectRecord.getData().getTimestamp();
+ Map offset = null;
+ try {
+ // May throw NullPointerException.
+ offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+ } catch (NullPointerException e) {
+ // ignore null pointer exception
+ }
+ final Map finalOffset = offset;
+
+ // send the request
+ return this.webClient.post(url.getPath())
+ .host(url.getHost())
+ .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
+ .putHeaders(headers)
+ .ssl(Objects.equals(url.getScheme(), "https"))
+ .sendJson(httpConnectRecord)
+ .onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
+
+ Exception e = null;
+
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
@@ -135,7 +142,6 @@ public void handle(ConnectRecord record) {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
- record.getCallback().onSuccess(convertToSendResult(record));
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
@@ -144,14 +150,96 @@ public void handle(ConnectRecord record) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
- record.getCallback()
- .onException(buildSendExceptionContext(record, new RuntimeException("HTTP response code: " + res.statusCode())));
+
+ e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode());
}
+
+ // try callback
+ tryCallback(httpConnectRecord, e, attributes);
}).onFailure(err -> {
log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err);
- record.getCallback().onException(buildSendExceptionContext(record, err));
+
+ // try callback
+ tryCallback(httpConnectRecord, err, attributes);
});
+ }
+
+ /**
+ * Tries to call the callback based on the result of the request.
+ *
+ * @param httpConnectRecord the HttpConnectRecord to use
+ * @param e the exception thrown during the request, may be null
+ * @param attributes additional attributes to be used in processing
+ */
+ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map attributes) {
+ // get the retry event
+ HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e);
+
+ // get the multi http request context
+ MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
+
+ if (multiHttpRequestContext.getRemainingRequests() == 0) {
+ // do callback
+ ConnectRecord record = httpConnectRecord.getData();
+ if (record.getCallback() == null) {
+ if (log.isDebugEnabled()) {
+ log.warn("ConnectRecord callback is null. Ignoring callback. {}", record);
+ } else {
+ log.warn("ConnectRecord callback is null. Ignoring callback.");
+ }
+ return;
+ }
+
+ HttpRetryEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
+ if (lastFailedEvent == null) {
+ // success
+ record.getCallback().onSuccess(convertToSendResult(record));
+ } else {
+ // failure
+ record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
+ }
+ }
+ }
+
+ /**
+ * Gets and updates the retry event based on the provided attributes and HttpConnectRecord.
+ *
+ * @param attributes the attributes to use
+ * @param httpConnectRecord the HttpConnectRecord to use
+ * @param e the exception thrown during the request, may be null
+ * @return the updated retry event
+ */
+ private HttpRetryEvent getAndUpdateRetryEvent(Map attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
+ // get the retry event
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ // update the retry event
+ retryEvent.setLastException(e);
+ return retryEvent;
+ }
+
+
+ /**
+ * Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord.
+ *
+ * @param attributes the attributes to use
+ * @param retryEvent the retry event to use
+ * @return the updated multi http request context
+ */
+ private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map attributes, HttpRetryEvent retryEvent) {
+ // get the multi http request context
+ MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);
+
+ if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) {
+ // decrement the counter
+ multiHttpRequestContext.decrementRemainingRequests();
+
+ // try set failed event
+ if (retryEvent.getLastException() != null) {
+ multiHttpRequestContext.setLastFailedEvent(retryEvent);
+ }
}
+
+ return multiHttpRequestContext;
}
private SendResult convertToSendResult(ConnectRecord record) {
@@ -174,30 +262,6 @@ private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Thr
}
- /**
- * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
- * URL using the WebClient.
- *
- * @param url URI to which the HttpConnectRecord should be sent
- * @param httpConnectRecord HttpConnectRecord to process
- * @return processing chain
- */
- @Override
- public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
- // create headers
- MultiMap headers = HttpHeaders.headers()
- .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
- .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
- // send the request
- return this.webClient.post(url.getPath())
- .host(url.getHost())
- .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
- .putHeaders(headers)
- .ssl(Objects.equals(url.getScheme(), "https"))
- .sendJson(httpConnectRecord);
- }
-
-
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
*/
@@ -209,6 +273,4 @@ public void stop() {
log.warn("WebClient is null, ignore.");
}
}
-
-
}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
new file mode 100644
index 0000000000..268d0a0d6d
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handler.impl;
+
+import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+
+
+/**
+ * HttpSinkHandlerRetryWrapper is a wrapper class for the HttpSinkHandler that provides retry functionality for failed HTTP requests.
+ */
+@Slf4j
+public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler {
+
+ private final HttpRetryConfig httpRetryConfig;
+
+ private final HttpSinkHandler sinkHandler;
+
+ public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) {
+ super(sinkConnectorConfig);
+ this.sinkHandler = sinkHandler;
+ this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+ }
+
+ /**
+ * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ */
+ @Override
+ public void start() {
+ sinkHandler.start();
+ }
+
+
+ /**
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the
+ * HttpConnectRecord
+ *
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to pass to the processing chain
+ * @return processing chain
+ */
+ @Override
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) {
+
+ // Build the retry policy
+ RetryPolicy> retryPolicy = RetryPolicy.>builder()
+ .handleIf(e -> e instanceof ConnectException)
+ .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
+ .withMaxRetries(httpRetryConfig.getMaxRetries())
+ .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
+ .onRetry(event -> {
+ if (log.isDebugEnabled()) {
+ log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord);
+ } else {
+ log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
+ }
+ // update the retry event
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ retryEvent.increaseCurrentRetries();
+ })
+ .onFailure(event -> {
+ if (log.isDebugEnabled()) {
+ log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(),
+ httpConnectRecord, event.getException());
+ } else {
+ log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
+ }
+ }).build();
+
+ // Handle the ConnectRecord with retry policy
+ Failsafe.with(retryPolicy)
+ .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes).toCompletionStage());
+
+ return null;
+ }
+
+
+ /**
+ * Cleans up and releases resources used by the HTTP/HTTPS handler.
+ */
+ @Override
+ public void stop() {
+ sinkHandler.stop();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
similarity index 82%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
index 4e64126a9d..ff8f69d45a 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler.impl;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
@@ -25,13 +25,14 @@
import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
-import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.commons.lang3.StringUtils;
import java.net.URI;
import java.time.LocalDateTime;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -61,8 +62,6 @@
@Slf4j
public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
- private final SinkConnectorConfig sinkConnectorConfig;
-
// the configuration for webhook
private final HttpWebhookConfig webhookConfig;
@@ -86,7 +85,7 @@ public boolean isExportDestroyed() {
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
- this.sinkConnectorConfig = sinkConnectorConfig;
+
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
int maxQueueSize = this.webhookConfig.getMaxStorageSize();
this.receivedDataQueue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
@@ -94,9 +93,6 @@ public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
doInitExportServer();
}
- public SynchronizedCircularFifoQueue getReceivedDataQueue() {
- return receivedDataQueue;
- }
/**
* Initialize the server for exporting the received data
@@ -202,22 +198,6 @@ public void start() {
});
}
- /**
- * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
- *
- * @param record the ConnectRecord to process
- */
- @Override
- public void handle(ConnectRecord record) {
- for (URI url : super.getUrls()) {
- // convert ConnectRecord to HttpConnectRecord
- String type = String.format("%s.%s.%s", this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
- HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
- // handle the HttpConnectRecord
- deliver(url, httpConnectRecord);
- }
- }
-
/**
* Processes HttpConnectRecord on specified URL while returning its own processing logic This method sends the HttpConnectRecord to the specified
@@ -225,30 +205,27 @@ public void handle(ConnectRecord record) {
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
* @return processing chain
*/
@Override
- public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) {
// send the request
- Future> responseFuture = super.deliver(url, httpConnectRecord);
+ Future> responseFuture = super.deliver(url, httpConnectRecord, attributes);
// store the received data
return responseFuture.onComplete(arr -> {
- // If open retry, return directly and handled by RetryHttpSinkHandler
- if (sinkConnectorConfig.getRetryConfig().getMaxRetries() > 0) {
- return;
+ // get tryEvent from attributes
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+
+ HttpResponse response = null;
+ if (arr.succeeded()) {
+ response = arr.result();
+ } else {
+ retryEvent.setLastException(arr.cause());
}
- // create ExportMetadataBuilder
- HttpResponse response = arr.succeeded() ? arr.result() : null;
-
- HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder()
- .url(url.toString())
- .code(response != null ? response.statusCode() : -1)
- .message(response != null ? response.statusMessage() : arr.cause().getMessage())
- .receivedTime(LocalDateTime.now())
- .retriedBy(null)
- .uuid(httpConnectRecord.getUuid())
- .retryNum(0)
- .build();
+
+ // create ExportMetadata
+ HttpExportMetadata httpExportMetadata = buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent);
// create ExportRecord
HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null);
@@ -257,6 +234,38 @@ public Future> deliver(URI url, HttpConnectRecord httpConne
});
}
+ /**
+ * Builds the HttpExportMetadata object based on the response, HttpConnectRecord, and HttpRetryEvent.
+ *
+ * @param url the URI to which the HttpConnectRecord was sent
+ * @param response the response received from the URI
+ * @param httpConnectRecord the HttpConnectRecord that was sent
+ * @param retryEvent the SingleHttpRetryEvent that was used for retries
+ * @return the HttpExportMetadata object
+ */
+ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse response, HttpConnectRecord httpConnectRecord,
+ HttpRetryEvent retryEvent) {
+
+ String msg = null;
+ // order of precedence: lastException > response > null
+ if (retryEvent.getLastException() != null) {
+ msg = retryEvent.getLimitedExceptionMessage();
+ retryEvent.setLastException(null);
+ } else if (response != null) {
+ msg = response.statusMessage();
+ }
+
+ return HttpExportMetadata.builder()
+ .url(url.toString())
+ .code(response != null ? response.statusCode() : -1)
+ .message(msg)
+ .receivedTime(LocalDateTime.now())
+ .httpRecordId(httpConnectRecord.getHttpRecordId())
+ .recordId(httpConnectRecord.getData().getRecordId())
+ .retryNum(retryEvent.getCurrentRetries())
+ .build();
+ }
+
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
index 3e724627c0..7ddba511c4 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
@@ -86,7 +86,7 @@ void before() throws Exception {
JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString());
return HttpResponse.response()
.withContentType(MediaType.APPLICATION_JSON)
- .withStatusCode(200)
+ .withStatusCode(HttpStatus.SC_OK)
.withBody(new JSONObject()
.fluentPut("code", 0)
.fluentPut("message", "success")