diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java index 2ae6fe2ad36..7508136bdb6 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/EsTypeMappingSeaTunnelType.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException; import java.util.HashMap; import java.util.Map; @@ -32,6 +34,7 @@ public class EsTypeMappingSeaTunnelType { put("string", BasicType.STRING_TYPE); put("keyword", BasicType.STRING_TYPE); put("text", BasicType.STRING_TYPE); + put("binary", BasicType.STRING_TYPE); put("boolean", BasicType.BOOLEAN_TYPE); put("byte", BasicType.BYTE_TYPE); put("short", BasicType.SHORT_TYPE); @@ -44,7 +47,19 @@ public class EsTypeMappingSeaTunnelType { } }; + /** + * if not find the mapping SeaTunnelDataType will throw runtime exception + * + * @param esType + * @return + */ public static SeaTunnelDataType getSeaTunnelDataType(String esType) { - return MAPPING.get(esType); + SeaTunnelDataType seaTunnelDataType = MAPPING.get(esType); + if (seaTunnelDataType == null) { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.ES_FIELD_TYPE_NOT_SUPPORT, + String.format("elasticsearch type is %s", esType)); + } + return seaTunnelDataType; } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java index a08c6a3ffc1..67f01201dd6 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java @@ -28,6 +28,7 @@ public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode { LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"), DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"), CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index failed"), + ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"); ; private final String code;