Skip to content

Commit

Permalink
add basic support for TLS and fix handling ksql urls
Browse files Browse the repository at this point in the history
  • Loading branch information
Rocco Schulz committed Jun 1, 2021
1 parent 8b0f6d5 commit 16d0881
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 44 deletions.
268 changes: 268 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
root = true

[*]
charset = utf-8
end_of_line = crlf
indent_size = 2
indent_style = space
insert_final_newline = false
max_line_length = 120
tab_width = 4
ij_continuation_indent_size = 4
ij_formatter_off_tag = @formatter:off
ij_formatter_on_tag = @formatter:on
ij_formatter_tags_enabled = false
ij_smart_tabs = false
ij_visual_guides = none
ij_wrap_on_typing = false

[*.java]
ij_java_align_consecutive_assignments = false
ij_java_align_consecutive_variable_declarations = false
ij_java_align_group_field_declarations = false
ij_java_align_multiline_annotation_parameters = false
ij_java_align_multiline_array_initializer_expression = false
ij_java_align_multiline_assignment = false
ij_java_align_multiline_binary_operation = false
ij_java_align_multiline_chained_methods = false
ij_java_align_multiline_extends_list = false
ij_java_align_multiline_for = true
ij_java_align_multiline_method_parentheses = false
ij_java_align_multiline_parameters = true
ij_java_align_multiline_parameters_in_calls = false
ij_java_align_multiline_parenthesized_expression = false
ij_java_align_multiline_records = true
ij_java_align_multiline_resources = true
ij_java_align_multiline_ternary_operation = false
ij_java_align_multiline_text_blocks = false
ij_java_align_multiline_throws_list = false
ij_java_align_subsequent_simple_methods = false
ij_java_align_throws_keyword = false
ij_java_annotation_parameter_wrap = off
ij_java_array_initializer_new_line_after_left_brace = false
ij_java_array_initializer_right_brace_on_new_line = false
ij_java_array_initializer_wrap = off
ij_java_assert_statement_colon_on_next_line = false
ij_java_assert_statement_wrap = off
ij_java_assignment_wrap = off
ij_java_binary_operation_sign_on_next_line = false
ij_java_binary_operation_wrap = off
ij_java_blank_lines_after_anonymous_class_header = 0
ij_java_blank_lines_after_class_header = 0
ij_java_blank_lines_after_imports = 1
ij_java_blank_lines_after_package = 1
ij_java_blank_lines_around_class = 1
ij_java_blank_lines_around_field = 0
ij_java_blank_lines_around_field_in_interface = 0
ij_java_blank_lines_around_initializer = 1
ij_java_blank_lines_around_method = 1
ij_java_blank_lines_around_method_in_interface = 1
ij_java_blank_lines_before_class_end = 0
ij_java_blank_lines_before_imports = 1
ij_java_blank_lines_before_method_body = 0
ij_java_blank_lines_before_package = 0
ij_java_block_brace_style = end_of_line
ij_java_block_comment_at_first_column = true
ij_java_builder_methods = none
ij_java_call_parameters_new_line_after_left_paren = false
ij_java_call_parameters_right_paren_on_new_line = false
ij_java_call_parameters_wrap = off
ij_java_case_statement_on_separate_line = true
ij_java_catch_on_new_line = false
ij_java_class_annotation_wrap = split_into_lines
ij_java_class_brace_style = end_of_line
ij_java_class_count_to_use_import_on_demand = 5
ij_java_class_names_in_javadoc = 1
ij_java_do_not_indent_top_level_class_members = false
ij_java_do_not_wrap_after_single_annotation = false
ij_java_do_while_brace_force = never
ij_java_doc_add_blank_line_after_description = true
ij_java_doc_add_blank_line_after_param_comments = false
ij_java_doc_add_blank_line_after_return = false
ij_java_doc_add_p_tag_on_empty_lines = true
ij_java_doc_align_exception_comments = true
ij_java_doc_align_param_comments = true
ij_java_doc_do_not_wrap_if_one_line = false
ij_java_doc_enable_formatting = true
ij_java_doc_enable_leading_asterisks = true
ij_java_doc_indent_on_continuation = false
ij_java_doc_keep_empty_lines = true
ij_java_doc_keep_empty_parameter_tag = true
ij_java_doc_keep_empty_return_tag = true
ij_java_doc_keep_empty_throws_tag = true
ij_java_doc_keep_invalid_tags = true
ij_java_doc_param_description_on_new_line = false
ij_java_doc_preserve_line_breaks = false
ij_java_doc_use_throws_not_exception_tag = true
ij_java_else_on_new_line = false
ij_java_entity_dd_suffix = EJB
ij_java_entity_eb_suffix = Bean
ij_java_entity_hi_suffix = Home
ij_java_entity_lhi_prefix = Local
ij_java_entity_lhi_suffix = Home
ij_java_entity_li_prefix = Local
ij_java_entity_pk_class = java.lang.String
ij_java_entity_vo_suffix = VO
ij_java_enum_constants_wrap = off
ij_java_extends_keyword_wrap = off
ij_java_extends_list_wrap = off
ij_java_field_annotation_wrap = split_into_lines
ij_java_finally_on_new_line = false
ij_java_for_brace_force = never
ij_java_for_statement_new_line_after_left_paren = false
ij_java_for_statement_right_paren_on_new_line = false
ij_java_for_statement_wrap = off
ij_java_generate_final_locals = false
ij_java_generate_final_parameters = false
ij_java_if_brace_force = never
ij_java_imports_layout = *,|,javax.**,java.**,|,$*
ij_java_indent_case_from_switch = true
ij_java_insert_inner_class_imports = false
ij_java_insert_override_annotation = true
ij_java_keep_blank_lines_before_right_brace = 2
ij_java_keep_blank_lines_between_package_declaration_and_header = 2
ij_java_keep_blank_lines_in_code = 2
ij_java_keep_blank_lines_in_declarations = 2
ij_java_keep_builder_methods_indents = false
ij_java_keep_control_statement_in_one_line = true
ij_java_keep_first_column_comment = true
ij_java_keep_indents_on_empty_lines = false
ij_java_keep_line_breaks = true
ij_java_keep_multiple_expressions_in_one_line = false
ij_java_keep_simple_blocks_in_one_line = false
ij_java_keep_simple_classes_in_one_line = false
ij_java_keep_simple_lambdas_in_one_line = false
ij_java_keep_simple_methods_in_one_line = false
ij_java_label_indent_absolute = false
ij_java_label_indent_size = 0
ij_java_lambda_brace_style = end_of_line
ij_java_layout_static_imports_separately = true
ij_java_line_comment_add_space = false
ij_java_line_comment_at_first_column = true
ij_java_message_dd_suffix = EJB
ij_java_message_eb_suffix = Bean
ij_java_method_annotation_wrap = split_into_lines
ij_java_method_brace_style = end_of_line
ij_java_method_call_chain_wrap = off
ij_java_method_parameters_new_line_after_left_paren = false
ij_java_method_parameters_right_paren_on_new_line = false
ij_java_method_parameters_wrap = off
ij_java_modifier_list_wrap = false
ij_java_names_count_to_use_import_on_demand = 3
ij_java_new_line_after_lparen_in_record_header = false
ij_java_packages_to_use_import_on_demand = java.awt.*,javax.swing.*
ij_java_parameter_annotation_wrap = off
ij_java_parentheses_expression_new_line_after_left_paren = false
ij_java_parentheses_expression_right_paren_on_new_line = false
ij_java_place_assignment_sign_on_next_line = false
ij_java_prefer_longer_names = true
ij_java_prefer_parameters_wrap = false
ij_java_record_components_wrap = normal
ij_java_repeat_synchronized = true
ij_java_replace_instanceof_and_cast = false
ij_java_replace_null_check = true
ij_java_replace_sum_lambda_with_method_ref = true
ij_java_resource_list_new_line_after_left_paren = false
ij_java_resource_list_right_paren_on_new_line = false
ij_java_resource_list_wrap = off
ij_java_rparen_on_new_line_in_record_header = false
ij_java_session_dd_suffix = EJB
ij_java_session_eb_suffix = Bean
ij_java_session_hi_suffix = Home
ij_java_session_lhi_prefix = Local
ij_java_session_lhi_suffix = Home
ij_java_session_li_prefix = Local
ij_java_session_si_suffix = Service
ij_java_space_after_closing_angle_bracket_in_type_argument = false
ij_java_space_after_colon = true
ij_java_space_after_comma = true
ij_java_space_after_comma_in_type_arguments = true
ij_java_space_after_for_semicolon = true
ij_java_space_after_quest = true
ij_java_space_after_type_cast = true
ij_java_space_before_annotation_array_initializer_left_brace = false
ij_java_space_before_annotation_parameter_list = false
ij_java_space_before_array_initializer_left_brace = false
ij_java_space_before_catch_keyword = true
ij_java_space_before_catch_left_brace = true
ij_java_space_before_catch_parentheses = true
ij_java_space_before_class_left_brace = true
ij_java_space_before_colon = true
ij_java_space_before_colon_in_foreach = true
ij_java_space_before_comma = false
ij_java_space_before_do_left_brace = true
ij_java_space_before_else_keyword = true
ij_java_space_before_else_left_brace = true
ij_java_space_before_finally_keyword = true
ij_java_space_before_finally_left_brace = true
ij_java_space_before_for_left_brace = true
ij_java_space_before_for_parentheses = true
ij_java_space_before_for_semicolon = false
ij_java_space_before_if_left_brace = true
ij_java_space_before_if_parentheses = true
ij_java_space_before_method_call_parentheses = false
ij_java_space_before_method_left_brace = true
ij_java_space_before_method_parentheses = false
ij_java_space_before_opening_angle_bracket_in_type_parameter = false
ij_java_space_before_quest = true
ij_java_space_before_switch_left_brace = true
ij_java_space_before_switch_parentheses = true
ij_java_space_before_synchronized_left_brace = true
ij_java_space_before_synchronized_parentheses = true
ij_java_space_before_try_left_brace = true
ij_java_space_before_try_parentheses = true
ij_java_space_before_type_parameter_list = false
ij_java_space_before_while_keyword = true
ij_java_space_before_while_left_brace = true
ij_java_space_before_while_parentheses = true
ij_java_space_inside_one_line_enum_braces = false
ij_java_space_within_empty_array_initializer_braces = false
ij_java_space_within_empty_method_call_parentheses = false
ij_java_space_within_empty_method_parentheses = false
ij_java_spaces_around_additive_operators = true
ij_java_spaces_around_assignment_operators = true
ij_java_spaces_around_bitwise_operators = true
ij_java_spaces_around_equality_operators = true
ij_java_spaces_around_lambda_arrow = true
ij_java_spaces_around_logical_operators = true
ij_java_spaces_around_method_ref_dbl_colon = false
ij_java_spaces_around_multiplicative_operators = true
ij_java_spaces_around_relational_operators = true
ij_java_spaces_around_shift_operators = true
ij_java_spaces_around_type_bounds_in_type_parameters = true
ij_java_spaces_around_unary_operator = false
ij_java_spaces_within_angle_brackets = false
ij_java_spaces_within_annotation_parentheses = false
ij_java_spaces_within_array_initializer_braces = false
ij_java_spaces_within_braces = false
ij_java_spaces_within_brackets = false
ij_java_spaces_within_cast_parentheses = false
ij_java_spaces_within_catch_parentheses = false
ij_java_spaces_within_for_parentheses = false
ij_java_spaces_within_if_parentheses = false
ij_java_spaces_within_method_call_parentheses = false
ij_java_spaces_within_method_parentheses = false
ij_java_spaces_within_parentheses = false
ij_java_spaces_within_record_header = false
ij_java_spaces_within_switch_parentheses = false
ij_java_spaces_within_synchronized_parentheses = false
ij_java_spaces_within_try_parentheses = false
ij_java_spaces_within_while_parentheses = false
ij_java_special_else_if_treatment = true
ij_java_subclass_name_suffix = Impl
ij_java_ternary_operation_signs_on_next_line = false
ij_java_ternary_operation_wrap = off
ij_java_test_name_suffix = Test
ij_java_throws_keyword_wrap = off
ij_java_throws_list_wrap = off
ij_java_use_external_annotations = false
ij_java_use_fq_class_names = false
ij_java_use_relative_indents = false
ij_java_use_single_class_imports = true
ij_java_variable_annotation_wrap = off
ij_java_visibility = public
ij_java_while_brace_force = never
ij_java_while_on_new_line = false
ij_java_wrap_comments = false
ij_java_wrap_first_method_in_call_chain = false
ij_java_wrap_long_lines = false
36 changes: 33 additions & 3 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
import com.purbon.kafka.topology.utils.Pair;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.clients.admin.AdminClientConfig;

public class Configuration {
Expand Down Expand Up @@ -374,11 +378,37 @@ public Pair<String, String> apply(String[] strings) {
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

public String getKSQLServer() {
return config.getString(PLATFORM_SERVER_KSQL);
public URL getKSQLServer() {
try {
return new URL(config.getString(PLATFORM_SERVER_KSQL_URL));
} catch (MalformedURLException e) {
throw new IllegalArgumentException(config.getString(PLATFORM_SERVER_KSQL_URL) + " is not a valid KSQL URL");
}
}

public BasicAuth getKSQLBasicAuth() {
return new BasicAuth(config.getString(PLATFORM_SERVER_KSQL_BASIC_AUTH_USER), config.getString(PLATFORM_SERVER_KSQL_BASIC_AUTH_PASSWORD));
}

public boolean hasKSQLServer() {
return config.hasPath(PLATFORM_SERVER_KSQL);
return config.hasPath(PLATFORM_SERVER_KSQL_URL);
}

public static class BasicAuth {
private final String user;
private final String password;

public BasicAuth(String user, String password) {
this.user = user;
this.password = password;
}

public String getUser() {
return user;
}

public String getPassword() {
return password;
}
}
}
4 changes: 3 additions & 1 deletion src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public class Constants {
public static final String GROUP_MANAGED_PREFIXES = "topology.group.managed.prefixes";

public static final String PLATFORM_SERVERS_CONNECT = "platform.servers.connect";
public static final String PLATFORM_SERVER_KSQL = "platform.server.ksql";
public static final String PLATFORM_SERVER_KSQL_URL = "platform.server.ksql.url";
public static final String PLATFORM_SERVER_KSQL_BASIC_AUTH_USER = "platform.server.ksql.user";
public static final String PLATFORM_SERVER_KSQL_BASIC_AUTH_PASSWORD = "platform.server.ksql.password";

public static final String TOPOLOGY_BUILDER_INTERNAL_PRINCIPAL =
"topology.builder.internal.principal";
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
Expand Down Expand Up @@ -192,10 +193,8 @@ private static KSqlArtefactManager configureKSqlArtefactManager(

Map<String, KsqlApiClient> clients = new HashMap<>();
if (config.hasKSQLServer()) {
String ksqlAddress = config.getKSQLServer();
String server = ksqlAddress.substring(0, ksqlAddress.lastIndexOf(":"));
Integer port = Integer.parseInt(ksqlAddress.substring(ksqlAddress.lastIndexOf(":") + 1));
KsqlApiClient client = new KsqlApiClient(server, port);
URL ksqlAddress = config.getKSQLServer();
KsqlApiClient client = new KsqlApiClient(ksqlAddress, config.getKSQLBasicAuth());
clients.put("default", client);
}

Expand Down
10 changes: 2 additions & 8 deletions src/main/java/com/purbon/kafka/topology/KSqlArtefactManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,9 @@ private Collection<? extends Artefact> getClustersState() throws IOException {
.map(
artefact -> {
if (artefact instanceof KsqlStreamArtefact) {
return new KsqlStreamArtefact(
artefact.getPath(),
null,
artefact.getName());
return new KsqlStreamArtefact(artefact.getPath(), null, artefact.getName());
} else if (artefact instanceof KsqlTableArtefact) {
return new KsqlTableArtefact(
artefact.getPath(),
null,
artefact.getName());
return new KsqlTableArtefact(artefact.getPath(), null, artefact.getName());
} else {
LOGGER.error("KSQL Artefact of wrong type " + artefact.getClass());
return null;
Expand Down
Loading

0 comments on commit 16d0881

Please sign in to comment.