diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..ba6a3d629 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,268 @@ +root = true + +[*] +charset = utf-8 +end_of_line = crlf +indent_size = 2 +indent_style = space +insert_final_newline = false +max_line_length = 120 +tab_width = 4 +ij_continuation_indent_size = 4 +ij_formatter_off_tag = @formatter:off +ij_formatter_on_tag = @formatter:on +ij_formatter_tags_enabled = false +ij_smart_tabs = false +ij_visual_guides = none +ij_wrap_on_typing = false + +[*.java] +ij_java_align_consecutive_assignments = false +ij_java_align_consecutive_variable_declarations = false +ij_java_align_group_field_declarations = false +ij_java_align_multiline_annotation_parameters = false +ij_java_align_multiline_array_initializer_expression = false +ij_java_align_multiline_assignment = false +ij_java_align_multiline_binary_operation = false +ij_java_align_multiline_chained_methods = false +ij_java_align_multiline_extends_list = false +ij_java_align_multiline_for = true +ij_java_align_multiline_method_parentheses = false +ij_java_align_multiline_parameters = true +ij_java_align_multiline_parameters_in_calls = false +ij_java_align_multiline_parenthesized_expression = false +ij_java_align_multiline_records = true +ij_java_align_multiline_resources = true +ij_java_align_multiline_ternary_operation = false +ij_java_align_multiline_text_blocks = false +ij_java_align_multiline_throws_list = false +ij_java_align_subsequent_simple_methods = false +ij_java_align_throws_keyword = false +ij_java_annotation_parameter_wrap = off +ij_java_array_initializer_new_line_after_left_brace = false +ij_java_array_initializer_right_brace_on_new_line = false +ij_java_array_initializer_wrap = off +ij_java_assert_statement_colon_on_next_line = false +ij_java_assert_statement_wrap = off +ij_java_assignment_wrap = off +ij_java_binary_operation_sign_on_next_line = false +ij_java_binary_operation_wrap = off +ij_java_blank_lines_after_anonymous_class_header = 0 +ij_java_blank_lines_after_class_header = 0 +ij_java_blank_lines_after_imports = 1 +ij_java_blank_lines_after_package = 1 +ij_java_blank_lines_around_class = 1 +ij_java_blank_lines_around_field = 0 +ij_java_blank_lines_around_field_in_interface = 0 +ij_java_blank_lines_around_initializer = 1 +ij_java_blank_lines_around_method = 1 +ij_java_blank_lines_around_method_in_interface = 1 +ij_java_blank_lines_before_class_end = 0 +ij_java_blank_lines_before_imports = 1 +ij_java_blank_lines_before_method_body = 0 +ij_java_blank_lines_before_package = 0 +ij_java_block_brace_style = end_of_line +ij_java_block_comment_at_first_column = true +ij_java_builder_methods = none +ij_java_call_parameters_new_line_after_left_paren = false +ij_java_call_parameters_right_paren_on_new_line = false +ij_java_call_parameters_wrap = off +ij_java_case_statement_on_separate_line = true +ij_java_catch_on_new_line = false +ij_java_class_annotation_wrap = split_into_lines +ij_java_class_brace_style = end_of_line +ij_java_class_count_to_use_import_on_demand = 5 +ij_java_class_names_in_javadoc = 1 +ij_java_do_not_indent_top_level_class_members = false +ij_java_do_not_wrap_after_single_annotation = false +ij_java_do_while_brace_force = never +ij_java_doc_add_blank_line_after_description = true +ij_java_doc_add_blank_line_after_param_comments = false +ij_java_doc_add_blank_line_after_return = false +ij_java_doc_add_p_tag_on_empty_lines = true +ij_java_doc_align_exception_comments = true +ij_java_doc_align_param_comments = true +ij_java_doc_do_not_wrap_if_one_line = false +ij_java_doc_enable_formatting = true +ij_java_doc_enable_leading_asterisks = true +ij_java_doc_indent_on_continuation = false +ij_java_doc_keep_empty_lines = true +ij_java_doc_keep_empty_parameter_tag = true +ij_java_doc_keep_empty_return_tag = true +ij_java_doc_keep_empty_throws_tag = true +ij_java_doc_keep_invalid_tags = true +ij_java_doc_param_description_on_new_line = false +ij_java_doc_preserve_line_breaks = false +ij_java_doc_use_throws_not_exception_tag = true +ij_java_else_on_new_line = false +ij_java_entity_dd_suffix = EJB +ij_java_entity_eb_suffix = Bean +ij_java_entity_hi_suffix = Home +ij_java_entity_lhi_prefix = Local +ij_java_entity_lhi_suffix = Home +ij_java_entity_li_prefix = Local +ij_java_entity_pk_class = java.lang.String +ij_java_entity_vo_suffix = VO +ij_java_enum_constants_wrap = off +ij_java_extends_keyword_wrap = off +ij_java_extends_list_wrap = off +ij_java_field_annotation_wrap = split_into_lines +ij_java_finally_on_new_line = false +ij_java_for_brace_force = never +ij_java_for_statement_new_line_after_left_paren = false +ij_java_for_statement_right_paren_on_new_line = false +ij_java_for_statement_wrap = off +ij_java_generate_final_locals = false +ij_java_generate_final_parameters = false +ij_java_if_brace_force = never +ij_java_imports_layout = *,|,javax.**,java.**,|,$* +ij_java_indent_case_from_switch = true +ij_java_insert_inner_class_imports = false +ij_java_insert_override_annotation = true +ij_java_keep_blank_lines_before_right_brace = 2 +ij_java_keep_blank_lines_between_package_declaration_and_header = 2 +ij_java_keep_blank_lines_in_code = 2 +ij_java_keep_blank_lines_in_declarations = 2 +ij_java_keep_builder_methods_indents = false +ij_java_keep_control_statement_in_one_line = true +ij_java_keep_first_column_comment = true +ij_java_keep_indents_on_empty_lines = false +ij_java_keep_line_breaks = true +ij_java_keep_multiple_expressions_in_one_line = false +ij_java_keep_simple_blocks_in_one_line = false +ij_java_keep_simple_classes_in_one_line = false +ij_java_keep_simple_lambdas_in_one_line = false +ij_java_keep_simple_methods_in_one_line = false +ij_java_label_indent_absolute = false +ij_java_label_indent_size = 0 +ij_java_lambda_brace_style = end_of_line +ij_java_layout_static_imports_separately = true +ij_java_line_comment_add_space = false +ij_java_line_comment_at_first_column = true +ij_java_message_dd_suffix = EJB +ij_java_message_eb_suffix = Bean +ij_java_method_annotation_wrap = split_into_lines +ij_java_method_brace_style = end_of_line +ij_java_method_call_chain_wrap = off +ij_java_method_parameters_new_line_after_left_paren = false +ij_java_method_parameters_right_paren_on_new_line = false +ij_java_method_parameters_wrap = off +ij_java_modifier_list_wrap = false +ij_java_names_count_to_use_import_on_demand = 3 +ij_java_new_line_after_lparen_in_record_header = false +ij_java_packages_to_use_import_on_demand = java.awt.*,javax.swing.* +ij_java_parameter_annotation_wrap = off +ij_java_parentheses_expression_new_line_after_left_paren = false +ij_java_parentheses_expression_right_paren_on_new_line = false +ij_java_place_assignment_sign_on_next_line = false +ij_java_prefer_longer_names = true +ij_java_prefer_parameters_wrap = false +ij_java_record_components_wrap = normal +ij_java_repeat_synchronized = true +ij_java_replace_instanceof_and_cast = false +ij_java_replace_null_check = true +ij_java_replace_sum_lambda_with_method_ref = true +ij_java_resource_list_new_line_after_left_paren = false +ij_java_resource_list_right_paren_on_new_line = false +ij_java_resource_list_wrap = off +ij_java_rparen_on_new_line_in_record_header = false +ij_java_session_dd_suffix = EJB +ij_java_session_eb_suffix = Bean +ij_java_session_hi_suffix = Home +ij_java_session_lhi_prefix = Local +ij_java_session_lhi_suffix = Home +ij_java_session_li_prefix = Local +ij_java_session_si_suffix = Service +ij_java_space_after_closing_angle_bracket_in_type_argument = false +ij_java_space_after_colon = true +ij_java_space_after_comma = true +ij_java_space_after_comma_in_type_arguments = true +ij_java_space_after_for_semicolon = true +ij_java_space_after_quest = true +ij_java_space_after_type_cast = true +ij_java_space_before_annotation_array_initializer_left_brace = false +ij_java_space_before_annotation_parameter_list = false +ij_java_space_before_array_initializer_left_brace = false +ij_java_space_before_catch_keyword = true +ij_java_space_before_catch_left_brace = true +ij_java_space_before_catch_parentheses = true +ij_java_space_before_class_left_brace = true +ij_java_space_before_colon = true +ij_java_space_before_colon_in_foreach = true +ij_java_space_before_comma = false +ij_java_space_before_do_left_brace = true +ij_java_space_before_else_keyword = true +ij_java_space_before_else_left_brace = true +ij_java_space_before_finally_keyword = true +ij_java_space_before_finally_left_brace = true +ij_java_space_before_for_left_brace = true +ij_java_space_before_for_parentheses = true +ij_java_space_before_for_semicolon = false +ij_java_space_before_if_left_brace = true +ij_java_space_before_if_parentheses = true +ij_java_space_before_method_call_parentheses = false +ij_java_space_before_method_left_brace = true +ij_java_space_before_method_parentheses = false +ij_java_space_before_opening_angle_bracket_in_type_parameter = false +ij_java_space_before_quest = true +ij_java_space_before_switch_left_brace = true +ij_java_space_before_switch_parentheses = true +ij_java_space_before_synchronized_left_brace = true +ij_java_space_before_synchronized_parentheses = true +ij_java_space_before_try_left_brace = true +ij_java_space_before_try_parentheses = true +ij_java_space_before_type_parameter_list = false +ij_java_space_before_while_keyword = true +ij_java_space_before_while_left_brace = true +ij_java_space_before_while_parentheses = true +ij_java_space_inside_one_line_enum_braces = false +ij_java_space_within_empty_array_initializer_braces = false +ij_java_space_within_empty_method_call_parentheses = false +ij_java_space_within_empty_method_parentheses = false +ij_java_spaces_around_additive_operators = true +ij_java_spaces_around_assignment_operators = true +ij_java_spaces_around_bitwise_operators = true +ij_java_spaces_around_equality_operators = true +ij_java_spaces_around_lambda_arrow = true +ij_java_spaces_around_logical_operators = true +ij_java_spaces_around_method_ref_dbl_colon = false +ij_java_spaces_around_multiplicative_operators = true +ij_java_spaces_around_relational_operators = true +ij_java_spaces_around_shift_operators = true +ij_java_spaces_around_type_bounds_in_type_parameters = true +ij_java_spaces_around_unary_operator = false +ij_java_spaces_within_angle_brackets = false +ij_java_spaces_within_annotation_parentheses = false +ij_java_spaces_within_array_initializer_braces = false +ij_java_spaces_within_braces = false +ij_java_spaces_within_brackets = false +ij_java_spaces_within_cast_parentheses = false +ij_java_spaces_within_catch_parentheses = false +ij_java_spaces_within_for_parentheses = false +ij_java_spaces_within_if_parentheses = false +ij_java_spaces_within_method_call_parentheses = false +ij_java_spaces_within_method_parentheses = false +ij_java_spaces_within_parentheses = false +ij_java_spaces_within_record_header = false +ij_java_spaces_within_switch_parentheses = false +ij_java_spaces_within_synchronized_parentheses = false +ij_java_spaces_within_try_parentheses = false +ij_java_spaces_within_while_parentheses = false +ij_java_special_else_if_treatment = true +ij_java_subclass_name_suffix = Impl +ij_java_ternary_operation_signs_on_next_line = false +ij_java_ternary_operation_wrap = off +ij_java_test_name_suffix = Test +ij_java_throws_keyword_wrap = off +ij_java_throws_list_wrap = off +ij_java_use_external_annotations = false +ij_java_use_fq_class_names = false +ij_java_use_relative_indents = false +ij_java_use_single_class_imports = true +ij_java_variable_annotation_wrap = off +ij_java_visibility = public +ij_java_while_brace_force = never +ij_java_while_on_new_line = false +ij_java_wrap_comments = false +ij_java_wrap_first_method_in_call_chain = false +ij_java_wrap_long_lines = false diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java index 2398c721f..1bf137773 100644 --- a/src/main/java/com/purbon/kafka/topology/Configuration.java +++ b/src/main/java/com/purbon/kafka/topology/Configuration.java @@ -11,11 +11,15 @@ import com.purbon.kafka.topology.utils.Pair; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; + import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.kafka.clients.admin.AdminClientConfig; public class Configuration { @@ -374,11 +378,37 @@ public Pair apply(String[] strings) { .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } - public String getKSQLServer() { - return config.getString(PLATFORM_SERVER_KSQL); + public URL getKSQLServer() { + try { + return new URL(config.getString(PLATFORM_SERVER_KSQL_URL)); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(config.getString(PLATFORM_SERVER_KSQL_URL) + " is not a valid KSQL URL"); + } + } + + public BasicAuth getKSQLBasicAuth() { + return new BasicAuth(config.getString(PLATFORM_SERVER_KSQL_BASIC_AUTH_USER), config.getString(PLATFORM_SERVER_KSQL_BASIC_AUTH_PASSWORD)); } public boolean hasKSQLServer() { - return config.hasPath(PLATFORM_SERVER_KSQL); + return config.hasPath(PLATFORM_SERVER_KSQL_URL); + } + + public static class BasicAuth { + private final String user; + private final String password; + + public BasicAuth(String user, String password) { + this.user = user; + this.password = password; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } } } diff --git a/src/main/java/com/purbon/kafka/topology/Constants.java b/src/main/java/com/purbon/kafka/topology/Constants.java index b5811949e..e303ff142 100644 --- a/src/main/java/com/purbon/kafka/topology/Constants.java +++ b/src/main/java/com/purbon/kafka/topology/Constants.java @@ -82,7 +82,9 @@ public class Constants { public static final String GROUP_MANAGED_PREFIXES = "topology.group.managed.prefixes"; public static final String PLATFORM_SERVERS_CONNECT = "platform.servers.connect"; - public static final String PLATFORM_SERVER_KSQL = "platform.server.ksql"; + public static final String PLATFORM_SERVER_KSQL_URL = "platform.server.ksql.url"; + public static final String PLATFORM_SERVER_KSQL_BASIC_AUTH_USER = "platform.server.ksql.user"; + public static final String PLATFORM_SERVER_KSQL_BASIC_AUTH_PASSWORD = "platform.server.ksql.password"; public static final String TOPOLOGY_BUILDER_INTERNAL_PRINCIPAL = "topology.builder.internal.principal"; diff --git a/src/main/java/com/purbon/kafka/topology/JulieOps.java b/src/main/java/com/purbon/kafka/topology/JulieOps.java index a57c02a6a..378a9a775 100644 --- a/src/main/java/com/purbon/kafka/topology/JulieOps.java +++ b/src/main/java/com/purbon/kafka/topology/JulieOps.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.util.*; @@ -192,10 +193,8 @@ private static KSqlArtefactManager configureKSqlArtefactManager( Map clients = new HashMap<>(); if (config.hasKSQLServer()) { - String ksqlAddress = config.getKSQLServer(); - String server = ksqlAddress.substring(0, ksqlAddress.lastIndexOf(":")); - Integer port = Integer.parseInt(ksqlAddress.substring(ksqlAddress.lastIndexOf(":") + 1)); - KsqlApiClient client = new KsqlApiClient(server, port); + URL ksqlAddress = config.getKSQLServer(); + KsqlApiClient client = new KsqlApiClient(ksqlAddress, config.getKSQLBasicAuth()); clients.put("default", client); } diff --git a/src/main/java/com/purbon/kafka/topology/KSqlArtefactManager.java b/src/main/java/com/purbon/kafka/topology/KSqlArtefactManager.java index 991d74ef2..b14aa60dc 100644 --- a/src/main/java/com/purbon/kafka/topology/KSqlArtefactManager.java +++ b/src/main/java/com/purbon/kafka/topology/KSqlArtefactManager.java @@ -84,15 +84,9 @@ private Collection getClustersState() throws IOException { .map( artefact -> { if (artefact instanceof KsqlStreamArtefact) { - return new KsqlStreamArtefact( - artefact.getPath(), - null, - artefact.getName()); + return new KsqlStreamArtefact(artefact.getPath(), null, artefact.getName()); } else if (artefact instanceof KsqlTableArtefact) { - return new KsqlTableArtefact( - artefact.getPath(), - null, - artefact.getName()); + return new KsqlTableArtefact(artefact.getPath(), null, artefact.getName()); } else { LOGGER.error("KSQL Artefact of wrong type " + artefact.getClass()); return null; diff --git a/src/main/java/com/purbon/kafka/topology/actions/CreateArtefactAction.java b/src/main/java/com/purbon/kafka/topology/actions/CreateArtefactAction.java index 76c07984e..288fa7c75 100644 --- a/src/main/java/com/purbon/kafka/topology/actions/CreateArtefactAction.java +++ b/src/main/java/com/purbon/kafka/topology/actions/CreateArtefactAction.java @@ -5,10 +5,12 @@ import com.purbon.kafka.topology.clients.ArtefactClient; import com.purbon.kafka.topology.model.Artefact; import com.purbon.kafka.topology.utils.Utils; + import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Map; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -16,10 +18,10 @@ public class CreateArtefactAction extends BaseAction { private static final Logger LOGGER = LogManager.getLogger(CreateArtefactAction.class); - private ArtefactClient client; - private Artefact artefact; - private String rootPath; - private Collection artefacts; + private final ArtefactClient client; + private final Artefact artefact; + private final String rootPath; + private final Collection artefacts; public CreateArtefactAction( ArtefactClient client, @@ -36,8 +38,7 @@ public CreateArtefactAction( public void run() throws IOException { if (!artefacts.contains(artefact)) { LOGGER.info( - String.format( - "Creating artefact %s for client %s", artefact.getName(), client.getClass())); + String.format("Creating artefact %s for client %s", artefact.getName(), client.getClass())); client.add(content()); } } @@ -47,9 +48,7 @@ public Artefact getArtefact() { } private String content() throws IOException { - LOGGER.debug( - String.format( - "Reading artefact content from %s with rootPath %s", artefact.getPath(), rootPath)); + LOGGER.debug("Reading artefact content from " + artefact.getPath() + " with rootPath " + rootPath); return Utils.readFullFile(filePath(artefact.getPath(), rootPath)); } diff --git a/src/main/java/com/purbon/kafka/topology/api/ksql/KsqlApiClient.java b/src/main/java/com/purbon/kafka/topology/api/ksql/KsqlApiClient.java index 5bc9d6e7d..85c792d85 100644 --- a/src/main/java/com/purbon/kafka/topology/api/ksql/KsqlApiClient.java +++ b/src/main/java/com/purbon/kafka/topology/api/ksql/KsqlApiClient.java @@ -1,6 +1,7 @@ package com.purbon.kafka.topology.api.ksql; import com.fasterxml.jackson.core.JsonProcessingException; +import com.purbon.kafka.topology.Configuration; import com.purbon.kafka.topology.clients.ArtefactClient; import com.purbon.kafka.topology.model.Artefact; import com.purbon.kafka.topology.model.artefact.KsqlArtefact; @@ -11,12 +12,11 @@ import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.StreamInfo; import io.confluent.ksql.api.client.TableInfo; + +import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.net.URL; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -24,25 +24,35 @@ public class KsqlApiClient implements ArtefactClient { - private String server; - private Integer port; + private final URL server; private Client client; public static String QUERY_TYPE = "query"; public static String STREAM_TYPE = "stream"; public static String TABLE_TYPE = "table"; - public KsqlApiClient(String server, Integer port) { + public KsqlApiClient(URL server, @Nullable Configuration.BasicAuth basicAuth) { this.server = server; - this.port = port; - ClientOptions options = - ClientOptions.create().setHost(server.split(":")[0].strip()).setPort(port); + ClientOptions options = ClientOptions.create() + .setHost(server.getHost()) + .setPort(server.getPort()); + if (server.getProtocol() != null && server.getProtocol().equals("https")) { + options.setUseTls(true); + options.setUseAlpn(true); + } + if (basicAuth != null) { + options.setBasicAuthCredentials(basicAuth.getUser(), basicAuth.getPassword()); + } client = Client.create(options); } + public KsqlApiClient(URL server) { + this(server, null); + } + @Override public String getServer() { - return server + ":" + port; + return server.toString(); } @Override @@ -93,7 +103,7 @@ public List listTables() throws IOException { } return infos.stream() - .map(tableInfo -> new KsqlTableArtefact("", server, tableInfo.getName())) + .map(tableInfo -> new KsqlTableArtefact("", server.getHost(), tableInfo.getName())) .map(artefactToString()) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); @@ -121,7 +131,7 @@ public List listStreams() throws IOException { return infos.stream() .filter(e -> !"KSQL_PROCESSING_LOG".equalsIgnoreCase(e.getName())) - .map(queryInfo -> new KsqlStreamArtefact("", server, queryInfo.getName())) + .map(queryInfo -> new KsqlStreamArtefact("", server.getHost(), queryInfo.getName())) .map(artefactToString()) .filter(s -> !s.isEmpty()) .collect(Collectors.toList()); diff --git a/src/main/java/com/purbon/kafka/topology/model/Artefact.java b/src/main/java/com/purbon/kafka/topology/model/Artefact.java index 2c97c1a13..ac38433cb 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Artefact.java +++ b/src/main/java/com/purbon/kafka/topology/model/Artefact.java @@ -1,8 +1,6 @@ package com.purbon.kafka.topology.model; import com.fasterxml.jackson.annotation.JsonInclude; - -import java.util.Locale; import java.util.Objects; @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/src/test/java/com/purbon/kafka/topology/integration/KsqlClientIT.java b/src/test/java/com/purbon/kafka/topology/integration/KsqlClientIT.java index 88f157b3c..bb7a2f977 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/KsqlClientIT.java +++ b/src/test/java/com/purbon/kafka/topology/integration/KsqlClientIT.java @@ -9,6 +9,7 @@ import com.purbon.kafka.topology.integration.containerutils.KsqlContainer; import com.purbon.kafka.topology.integration.containerutils.SaslPlaintextKafkaContainer; import java.io.IOException; +import java.net.URL; import java.util.List; import org.junit.After; import org.junit.Before; @@ -37,7 +38,7 @@ public void configure() throws InterruptedException { @Test public void testStreamTableCreateAndDelete() throws IOException { - KsqlApiClient client = new KsqlApiClient(ksqlContainer.getHost(), ksqlContainer.getPort()); + KsqlApiClient client = new KsqlApiClient(new URL("http", ksqlContainer.getHost(), ksqlContainer.getPort(), "")); String streamName = "riderLocations"; diff --git a/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java b/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java index 94114818d..74b8f6d7e 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java +++ b/src/test/java/com/purbon/kafka/topology/integration/KsqlManagerIT.java @@ -17,6 +17,7 @@ import com.purbon.kafka.topology.utils.TestUtils; import java.io.File; import java.io.IOException; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; @@ -50,7 +51,8 @@ public void configure() throws InterruptedException, IOException { Files.deleteIfExists(Paths.get(".cluster-state")); - client = new KsqlApiClient(ksqlContainer.getHost(), ksqlContainer.getPort()); + URL containerUrl = new URL("http", ksqlContainer.getHost(), ksqlContainer.getPort(), ""); + client = new KsqlApiClient(containerUrl); parser = new TopologySerdes(); this.plan = ExecutionPlan.init(new BackendController(), System.out); @@ -73,7 +75,7 @@ public void testCreateAndUpdatePathWithRemoveClusterState() throws IOException { props.put(TOPOLOGY_STATE_FROM_CLUSTER, "true"); props.put(TOPOLOGY_TOPIC_STATE_FROM_CLUSTER, "false"); props.put(ALLOW_DELETE_KSQL_ARTEFACTS, "true"); - props.put(PLATFORM_SERVER_KSQL, "http://"+client.getServer()); + props.put(PLATFORM_SERVER_KSQL_URL, "http://" + client.getServer()); File file = TestUtils.getResourceFile("/descriptor-ksql.yaml");