Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve data search #552

Merged
merged 2 commits into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
367 changes: 232 additions & 135 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions client/src/containers/Topic/Topic/TopicData/styles.scss
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,18 @@ code{
.schema-value {
margin-left: 0 !important;
}
}

.search-input-fields {
display: flex;
justify-content: space-between;
width: 100%;

label {
min-width: 100px;
}

input {
max-width: 150px;
}
}
7 changes: 5 additions & 2 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ export const uriTopicData = (
return uri;
};

export const uriTopicDataSearch = (clusterId, topicId, search, filters) => {
let uri = `${apiUrl}/${clusterId}/topic/${topicId}/data/search/${search}`;
export const uriTopicDataSearch = (clusterId, topicId, filters, offsets) => {
let uri = `${apiUrl}/${clusterId}/topic/${topicId}/data/search`;
if(filters) {
uri = uri + `?${filters}`
}
if(offsets) {
uri = uri + `&after=${offsets}`
}
return uri;
};

Expand Down
6 changes: 5 additions & 1 deletion client/src/utils/functions.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export async function getClusterUIOptions(clusterId) {
}
}

export const capitalizeTxt = (text) => {
return text.charAt(0).toUpperCase() + text.slice(1);
}


export default { getSelectedTab, getClusterUIOptions };
export default { getSelectedTab, getClusterUIOptions, capitalizeTxt };

46 changes: 38 additions & 8 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,23 @@ public ResultNextList<Record> data(
Optional<Integer> partition,
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> search
Optional<String> searchByKey,
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Optional<String> searchByHeaderValue
) throws ExecutionException, InterruptedException {
Topic topic = this.topicRepository.findByName(cluster, topicName);
RecordRepository.Options options = dataSearchOptions(cluster, topicName, after, partition, sort, timestamp, search);
RecordRepository.Options options =
dataSearchOptions(cluster,
topicName,
after,
partition,
sort,
timestamp,
searchByKey,
searchByValue,
searchByHeaderKey,
searchByHeaderValue);
URIBuilder uri = URIBuilder.fromURI(request.getUri());
List<Record> data = this.recordRepository.consume(cluster, options);

Expand Down Expand Up @@ -279,7 +292,7 @@ public HttpResponse<?> delete(String cluster, String topicName) throws Execution

@Secured(Role.ROLE_TOPIC_DATA_READ)
@ExecuteOn(TaskExecutors.IO)
@Get(value = "api/{cluster}/topic/{topicName}/data/search/{search}", produces = MediaType.TEXT_EVENT_STREAM)
@Get(value = "api/{cluster}/topic/{topicName}/data/search", produces = MediaType.TEXT_EVENT_STREAM)
@Operation(tags = {"topic data"}, summary = "Search for data for a topic")
public Publisher<Event<SearchRecord>> sse(
String cluster,
Expand All @@ -288,7 +301,10 @@ public Publisher<Event<SearchRecord>> sse(
Optional<Integer> partition,
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> search
Optional<String> searchByKey,
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Optional<String> searchByHeaderValue
) {
RecordRepository.Options options = dataSearchOptions(
cluster,
Expand All @@ -297,7 +313,10 @@ public Publisher<Event<SearchRecord>> sse(
partition,
sort,
timestamp,
search
searchByKey,
searchByValue,
searchByHeaderKey,
searchByHeaderValue
);

return recordRepository
Expand Down Expand Up @@ -338,6 +357,9 @@ public ResultNextList<Record> record(
Optional.of(partition),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

Expand Down Expand Up @@ -401,6 +423,9 @@ public RecordRepository.CopyResult copy(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

Expand All @@ -421,7 +446,10 @@ private RecordRepository.Options dataSearchOptions(
Optional<Integer> partition,
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> search
Optional<String> searchByKey,
Optional<String> searchByValue,
Optional<String> searchByHeaderKey,
Optional<String> searchByHeaderValue
) {
RecordRepository.Options options = new RecordRepository.Options(environment, cluster, topicName);

Expand All @@ -430,8 +458,10 @@ private RecordRepository.Options dataSearchOptions(
sort.ifPresent(options::setSort);
timestamp.map(r -> Instant.parse(r).toEpochMilli()).ifPresent(options::setTimestamp);
after.ifPresent(options::setAfter);
search.ifPresent(options::setSearch);

searchByKey.ifPresent(options::setSearchByKey);
searchByValue.ifPresent(options::setSearchByValue);
searchByHeaderKey.ifPresent(options::setSearchByHeaderKey);
searchByHeaderValue.ifPresent(options::setSearchByHeaderValue);
return options;
}

Expand Down
168 changes: 155 additions & 13 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -626,28 +626,83 @@ public Flowable<Event<SearchEvent>> search(String clusterId, Options options) {
});
}

private boolean searchFilter(BaseOptions options, Record record) {
if (options.getSearch() == null) {
return true;
}
private static boolean searchFilter(BaseOptions options, Record record) {

if (record.getKey() != null && containsAll(options.getSearch(), record.getKey())) {
return true;
if (options.getSearch() != null) {
if(!search(options.getSearch(), Arrays.asList(record.getKey(), record.getValue()))) return false;
} else {
if (options.getSearchByKey() != null) {
if (!search(options.getSearchByKey(), Collections.singletonList(record.getKey()))) return false;
}

if (options.getSearchByValue() != null) {
if (!search(options.getSearchByValue(), Collections.singletonList(record.getValue()))) return false;
}

if (options.getSearchByHeaderKey() != null) {
if (!search(options.getSearchByHeaderKey(), record.getHeaders().keySet())) return false;
}

if (options.getSearchByHeaderValue() != null) {
if (!search(options.getSearchByHeaderValue(), record.getHeaders().values())) return false;
}
}
return true;
}

return record.getValue() != null && containsAll(options.getSearch(), record.getValue());
private static boolean search(Search searchFilter, Collection<String> stringsToSearch) {
switch (searchFilter.searchMatchType) {
case EQUALS:
return equalsAll(searchFilter.getText(), stringsToSearch);
case NOT_CONTAINS:
return notContainsAll(searchFilter.getText(), stringsToSearch);
default:
return containsAll(searchFilter.getText(), stringsToSearch);
}
}

private static boolean containsAll(String search, String in) {
private static boolean containsAll(String search, Collection<String> in) {
String[] split = search.toLowerCase().split("\\s");
in = in.toLowerCase();
for (String s : in) {
if(s != null) {
s = s.toLowerCase();
for (String k : split) {
if (s.contains(k)) {
return true;
}
}
}
}
return false;
}

for (String k : split) {
if (!in.contains(k)) {
return false;
private static boolean equalsAll(String search, Collection<String> in) {
String[] split = search.toLowerCase().split("\\s");
for (String s : in) {
if(s != null) {
s = s.toLowerCase();
for (String k : split) {
if (s.equals(k)) {
return true;
}
}
}
}
return false;
}

private static boolean notContainsAll(String search, Collection<String> in) {
String[] split = search.toLowerCase().split("\\s");
for (String s : in) {
if(s != null) {
s = s.toLowerCase();
for (String k : split) {
if (s.contains(k)) {
return false;
}
}
}
}
return true;
}

Expand Down Expand Up @@ -875,13 +930,99 @@ public static class TailEvent {
private final Map<Map<String, Integer>, Long> offsets = new HashMap<>();
}

@ToString
@EqualsAndHashCode
@Getter
public static class Search {

public enum SearchMatchType {
EQUALS("E"),
CONTAINS("C"),
NOT_CONTAINS("N");

private final String code;

SearchMatchType(String code) {
this.code = code;
}

public static SearchMatchType valueOfCode(String code) {
for (SearchMatchType e : values()) {
if (e.code.equals(code)) {
return e;
}
}
return null;
}
}

protected String text;
protected SearchMatchType searchMatchType;

public Search(String text) {
this.setText(text);
this.searchMatchType = SearchMatchType.CONTAINS;
}

public Search(String text, String searchMatchType) {
this.setText(text);
this.setSearchMatchType(searchMatchType);
}

public void setText(String text) {
this.text = text;
}

public void setSearchMatchType(String type) {
this.searchMatchType = SearchMatchType.valueOfCode(type);
}
}

@ToString
@EqualsAndHashCode
@Getter
@Setter
abstract public static class BaseOptions {

protected String clusterId;
protected String search;
protected Search search;
protected Search searchByKey;
protected Search searchByValue;
protected Search searchByHeaderKey;
protected Search searchByHeaderValue;

public BaseOptions() {
}

public void setSearchByKey(String searchByKey) {
this.searchByKey = this.buildSearch(searchByKey);
}

public void setSearchByValue(String searchByValue) {
this.searchByValue = this.buildSearch(searchByValue);
}

public void setSearchByHeaderKey(String searchByHeaderKey) {
this.searchByHeaderKey = this.buildSearch(searchByHeaderKey);
}

public void setSearchByHeaderValue(String searchByHeaderValue) {
this.searchByHeaderValue = this.buildSearch(searchByHeaderValue);
}

public void setSearch(String search) {
this.search = new Search(search);
}

private Search buildSearch(String searchByKey) {
int sepPos = searchByKey.lastIndexOf('_');
if(sepPos > 0) {
return new Search(searchByKey.substring(0, sepPos), searchByKey.substring(sepPos + 1));
} else {
return new Search(searchByKey);
}
}

}

@ToString
Expand Down Expand Up @@ -985,6 +1126,7 @@ public static class TailOptions extends BaseOptions {
private List<String> topics;
protected List<String> after;


public TailOptions(String clusterId, List<String> topics) {
this.clusterId = clusterId;
this.topics = topics;
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/akhq/controllers/SseControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void searchApi() {
RxSseClient sseClient = embeddedServer.getApplicationContext().createBean(RxSseClient.class, embeddedServer.getURL());

List<Record> results = sseClient
.eventStream(BASE_URL + "/" + KafkaTestCluster.TOPIC_HUGE + "/data/search/key_100", TopicController.SearchRecord.class)
.eventStream(BASE_URL + "/" + KafkaTestCluster.TOPIC_HUGE + "/data/search?searchByKey=key_100_C", TopicController.SearchRecord.class)
.toList()
.blockingGet()
.stream()
Expand Down
Loading