Skip to content

Commit

Permalink
SMT InsertBack: Max retries added.
Browse files Browse the repository at this point in the history
Signed-off-by: Lalith Kota <kotalalith@gmail.com>
  • Loading branch information
lalithkota committed Oct 24, 2024
1 parent db4fcc4 commit d49c62e
Showing 1 changed file with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ESQueryConfig extends Config{
String esUsername;
String esPassword;
boolean esUpsert;
int esMaxRetries;
int esRetryOnConflict;

ESQueryConfig(
Expand All @@ -82,6 +83,7 @@ public class ESQueryConfig extends Config{
String esUsername,
String esPassword,
boolean esUpsert,
int esMaxRetries,
int esRetryOnConflict
) {
super(type,idExpr,conditionExpr,valueExpr);
Expand All @@ -92,6 +94,7 @@ public class ESQueryConfig extends Config{
this.esUsername = esUsername;
this.esPassword = esPassword;
this.esUpsert = esUpsert;
this.esMaxRetries = esMaxRetries;
this.esRetryOnConflict = esRetryOnConflict;

HttpClientBuilder hClientBuilder = HttpClients.custom();
Expand Down Expand Up @@ -153,13 +156,24 @@ void insertBack(Object input){
requestJson.set("doc_as_upsert", BooleanNode.TRUE);
}
hPost.setEntity(new StringEntity(requestJson.toString()));
try(CloseableHttpResponse hResponse = hClient.execute(hPost)){
HttpEntity hResponseEntity = hResponse.getEntity();
if(hResponse.getCode() < 200 || hResponse.getCode() >= 400){
logger.error("Error occured while ES query. " + EntityUtils.toString(hResponseEntity));
boolean isSuccess = false;
for(int i=1; i <= esMaxRetries; i++){
try(CloseableHttpResponse hResponse = hClient.execute(hPost)){
HttpEntity hResponseEntity = hResponse.getEntity();
if(hResponse.getCode() < 200 || hResponse.getCode() >= 400){
logger.error("Error occured while ES query. " + EntityUtils.toString(hResponseEntity) + ". Retrying...");
} else {
isSuccess = true;
}
} catch(Exception e) {
logger.error("ES connection issues. Retrying...", e);
}
} catch(Exception e) {
logger.error("ES connection issues.", e);
if(isSuccess){
break;
}
}
if (!isSuccess){
logger.error("Max retries exceeded.");
}
}
}
Expand All @@ -185,8 +199,9 @@ void close(){
public static final String ES_SECURITY_ENABLED_CONFIG = "es.security.enabled";
public static final String ES_USERNAME_CONFIG = "es.username";
public static final String ES_PASSWORD_CONFIG = "es.password";
public static final String ES_RETRY_ON_CONFLICT_CONFIG = "es.retry.on.conflict";
public static final String ES_UPSERT = "es.upsert";
public static final String ES_UPSERT_CONFIG = "es.upsert";
public static final String ES_RETRY_ON_CONFLICT_CONFIG = "es.max.retries";
public static final String ES_MAX_RETRIES_CONFIG = "es.retry.on.conflict";

private Config config;
private boolean stopRecordAfterInsert;
Expand All @@ -203,8 +218,9 @@ void close(){
.define(ES_SECURITY_ENABLED_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Is Elasticsearch security enabled?")
.define(ES_USERNAME_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Username")
.define(ES_PASSWORD_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Elasticsearch Password")
.define(ES_UPSERT, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, "Elasticsearch Update API doc_as_upsert parameter.")
.define(ES_RETRY_ON_CONFLICT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API retry_on_conflict parameter.");
.define(ES_UPSERT_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.HIGH, "Elasticsearch Update API doc_as_upsert param.")
.define(ES_MAX_RETRIES_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API no of retries before fail.")
.define(ES_RETRY_ON_CONFLICT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.HIGH, "Elasticsearch Update API retry_on_conflict param.");


@Override
Expand Down Expand Up @@ -241,8 +257,9 @@ public void configure(Map<String, ?> configs) {
String esSecurity = absconf.getString(ES_SECURITY_ENABLED_CONFIG);
String esUsername = absconf.getString(ES_USERNAME_CONFIG);
String esPassword = absconf.getString(ES_PASSWORD_CONFIG);
boolean esUpsert = absconf.getBoolean(ES_UPSERT_CONFIG);
int esMaxRetries = absconf.getInt(ES_MAX_RETRIES_CONFIG);
int esRetryOnConflict = absconf.getInt(ES_RETRY_ON_CONFLICT_CONFIG);
boolean esUpsert = absconf.getBoolean(ES_UPSERT);

if(esUrl.isEmpty() || esIndex.isEmpty()){
throw new ConfigException("One of required transform Elasticsearch config fields not set. Required Elasticsearch fields in tranform: " + ES_URL_CONFIG + " ," + ES_INDEX_CONFIG);
Expand All @@ -260,6 +277,7 @@ public void configure(Map<String, ?> configs) {
esUsername,
esPassword,
esUpsert,
esMaxRetries,
esRetryOnConflict
);
}
Expand Down

0 comments on commit d49c62e

Please sign in to comment.