-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
expect regex extracted tokens in database bloom filters #103
Changes from 3 commits
b9d2731
465cf83
7d4559d
945e839
2a8d97c
b3ac348
0d72406
a682789
b354614
5f907c8
0bf7410
31e0bf9
f370356
eda9878
aefb66a
73efeb9
441e40b
4844ebd
145fcb6
40c2165
6513861
adc7faf
2c01634
34efb98
7412ec2
7dea6f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
/* | ||
* Teragrep Archive Datasource (pth_06) | ||
* Copyright (C) 2021-2024 Suomen Kanuuna Oy | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
* | ||
* | ||
* Additional permission under GNU Affero General Public License version 3 | ||
* section 7 | ||
* | ||
* If you modify this Program, or any covered work, by linking or combining it | ||
* with other code, such other code is not for that reason alone subject to any | ||
* of the requirements of the GNU Affero GPL version 3 as long as this Program | ||
* is the same Program as licensed from Suomen Kanuuna Oy without any additional | ||
* modifications. | ||
* | ||
* Supplemented terms under GNU Affero General Public License version 3 | ||
* section 7 | ||
* | ||
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified | ||
* versions must be marked as "Modified version of" The Program. | ||
* | ||
* Names of the licensors and authors may not be used for publicity purposes. | ||
* | ||
* No rights are granted for use of trade names, trademarks, or service marks | ||
* which are in The Program if any. | ||
* | ||
* Licensee must indemnify licensors and authors for any liability that these | ||
* contractual assumptions impose on licensors and authors. | ||
* | ||
* To the extent this program is licensed as part of the Commercial versions of | ||
* Teragrep, the applicable Commercial License may apply to this file if you as | ||
* a licensee so wish it. | ||
*/ | ||
package com.teragrep.pth_06.planner.bloomfilter; | ||
|
||
import org.apache.spark.util.sketch.BloomFilter; | ||
import org.jooq.Record; | ||
import org.jooq.Table; | ||
import org.jooq.impl.DSL; | ||
import org.jooq.types.ULong; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
|
||
import static com.teragrep.pth_06.jooq.generated.bloomdb.Bloomdb.BLOOMDB; | ||
|
||
/** | ||
* Extracts filter type from record, creates a bloom filter and returns the filters byte array | ||
*/ | ||
public final class BloomFilterFromRecord { | ||
|
||
private final Logger LOGGER = LoggerFactory.getLogger(BloomFilterFromRecord.class); | ||
private final Long expected; | ||
private final Double fpp; | ||
private final String pattern; | ||
private final String searchTerm; | ||
|
||
private BloomFilter create() { | ||
if (expected == null || fpp == null) { | ||
LOGGER | ||
.error( | ||
"Null field while creating bloom filter expected <{}>, fpp <{}>, pattern <{}>, search term <{}>", | ||
expected, fpp, pattern, searchTerm | ||
); | ||
throw new RuntimeException("Object field was null"); | ||
} | ||
final BloomFilter filter = BloomFilter.create(expected, fpp); | ||
// if no pattern use to tokenized value (currently BLOOMDB.FILTERTYPE.PATTERN is NOT NULL) | ||
if (pattern == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. object is configurable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactored object to be not configurable |
||
LOGGER.info("Table pattern was null using tokenizer to generate tokens"); | ||
new TokenizedValue(searchTerm).stringTokens().forEach(filter::put); | ||
} | ||
else { // get tokens using regex | ||
final Set<String> tokens = new RegexExtractedValue(searchTerm, pattern).tokens(); | ||
LOGGER.info("Insert pattern <{}> tokens to temp table filter <{}>", pattern, tokens); | ||
if (tokens.isEmpty()) { | ||
throw new IllegalStateException( | ||
"Trying to insert empty filter, pattern match joined table should always have tokens" | ||
); | ||
} | ||
tokens.forEach(filter::put); | ||
} | ||
return filter; | ||
} | ||
|
||
public BloomFilterFromRecord(Record record, Table<?> table, String searchTerm) { | ||
this( | ||
record.getValue(DSL.field(DSL.name(table.getName(), "expectedElements"), ULong.class)).longValue(), | ||
record.getValue(DSL.field(DSL.name(table.getName(), "targetFpp"), Double.class)), | ||
record.getValue(BLOOMDB.FILTERTYPE.PATTERN, String.class), | ||
searchTerm | ||
); | ||
} | ||
|
||
public BloomFilterFromRecord(Long expected, Double fpp, String pattern, String searchTerm) { | ||
this.expected = expected; | ||
this.fpp = fpp; | ||
this.pattern = pattern; | ||
this.searchTerm = searchTerm; | ||
} | ||
|
||
public byte[] bytes() { | ||
final BloomFilter filter = create(); | ||
final ByteArrayOutputStream filterBAOS = new ByteArrayOutputStream(); | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not try-with-resources? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored to use try-with-resources |
||
filter.writeTo(filterBAOS); | ||
filterBAOS.close(); | ||
} | ||
catch (IOException e) { | ||
throw new UncheckedIOException(new IOException("Error writing filter bytes: " + e.getMessage())); | ||
} | ||
return filterBAOS.toByteArray(); | ||
} | ||
|
||
@Override | ||
public boolean equals(final Object object) { | ||
if (this == object) | ||
return true; | ||
if (object == null || getClass() != object.getClass()) | ||
return false; | ||
final BloomFilterFromRecord cast = (BloomFilterFromRecord) object; | ||
return expected.equals(cast.expected) && fpp.equals(cast.fpp) && Objects.equals(pattern, cast.pattern) | ||
&& searchTerm.equals(cast.searchTerm); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(expected, fpp, pattern, searchTerm); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,126 +43,70 @@ | |
* Teragrep, the applicable Commercial License may apply to this file if you as | ||
* a licensee so wish it. | ||
*/ | ||
package com.teragrep.pth_06.planner; | ||
package com.teragrep.pth_06.planner.bloomfilter; | ||
|
||
import com.teragrep.blf_01.Token; | ||
import org.apache.spark.util.sketch.BloomFilter; | ||
import org.jooq.*; | ||
import org.jooq.DSLContext; | ||
import org.jooq.Field; | ||
import org.jooq.Record; | ||
import org.jooq.Table; | ||
import org.jooq.impl.DSL; | ||
import org.jooq.types.ULong; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.regex.Pattern; | ||
import java.util.Objects; | ||
import java.util.function.Consumer; | ||
|
||
import static com.teragrep.pth_06.jooq.generated.bloomdb.Bloomdb.BLOOMDB; | ||
import static org.jooq.impl.SQLDataType.BIGINTUNSIGNED; | ||
|
||
/** | ||
* Filter types of a table that can be inserted into the tables category table | ||
*/ | ||
public final class TableFilters { | ||
public final class FilterFromRecordToCategoryTableConsumer implements Consumer<Record> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use object way instead of functional way for producing an iterator meaning for loop instead of a consumer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored to use for loop |
||
|
||
private final DSLContext ctx; | ||
private final Table<?> table; | ||
private final long bloomTermId; | ||
private final TokenizedValue value; | ||
private final TableRecords recordsInMetadata; | ||
|
||
public TableFilters(DSLContext ctx, Table<?> table, long bloomTermId, String input) { | ||
this( | ||
ctx, | ||
table, | ||
bloomTermId, | ||
new TokenizedValue(input), | ||
new TableFilterTypesFromMetadata(ctx, table, bloomTermId) | ||
); | ||
} | ||
private final String searchTerm; | ||
|
||
public TableFilters( | ||
public FilterFromRecordToCategoryTableConsumer( | ||
DSLContext ctx, | ||
Table<?> table, | ||
long bloomTermId, | ||
TokenizedValue value, | ||
TableFilterTypesFromMetadata recordsInMetadata | ||
String searchTerm | ||
) { | ||
this.ctx = ctx; | ||
this.table = table; | ||
this.bloomTermId = bloomTermId; | ||
this.value = value; | ||
this.recordsInMetadata = recordsInMetadata; | ||
this.searchTerm = searchTerm; | ||
} | ||
|
||
/** | ||
* Extracts filter type from record, creates a bloom filter and returns the filters byte array | ||
* | ||
* @param record record with filter info | ||
* @return byte[] of the created filter | ||
*/ | ||
private byte[] filterBytesFromRecord(final Record record) { | ||
final ULong expected = record.getValue(DSL.field(DSL.name(table.getName(), "expectedElements"), ULong.class)); | ||
final Double fpp = record.getValue(DSL.field(DSL.name(table.getName(), "targetFpp"), Double.class)); | ||
final String pattern = record.getValue(BLOOMDB.FILTERTYPE.PATTERN, String.class); | ||
final BloomFilter filter = BloomFilter.create(expected.longValue(), fpp); | ||
final Pattern compiled = Pattern.compile(pattern); | ||
boolean isEmpty = true; | ||
for (final Token token : value.tokens()) { | ||
final String tokenString = token.toString(); | ||
if (compiled.matcher(tokenString).matches()) { | ||
isEmpty = false; | ||
filter.put(tokenString); | ||
} | ||
} | ||
if (isEmpty) { | ||
throw new IllegalStateException("Trying to insert empty filter"); | ||
} | ||
final ByteArrayOutputStream filterBAOS = new ByteArrayOutputStream(); | ||
try { | ||
filter.writeTo(filterBAOS); | ||
filterBAOS.close(); | ||
} | ||
catch (IOException e) { | ||
throw new UncheckedIOException(new IOException("Error writing filter bytes: " + e.getMessage())); | ||
} | ||
return filterBAOS.toByteArray(); | ||
} | ||
|
||
private void insertFilterRecordToCategoryTable(final Record record) { | ||
@Override | ||
public void accept(final Record record) { | ||
final Table<Record> categoryTable = DSL.table(DSL.name(("term_" + bloomTermId + "_" + this.table.getName()))); | ||
final Field<?>[] insertFields = { | ||
DSL.field("term_id", BIGINTUNSIGNED.nullable(false)), | ||
DSL.field("type_id", BIGINTUNSIGNED.nullable(false)), | ||
DSL.field(DSL.name(categoryTable.getName(), "filter"), byte[].class) | ||
}; | ||
final BloomFilterFromRecord filterFromRecord = new BloomFilterFromRecord(record, table, searchTerm); | ||
final Field<?>[] valueFields = { | ||
DSL.val(bloomTermId, ULong.class), | ||
DSL.val(record.getValue(BLOOMDB.FILTERTYPE.ID), ULong.class), | ||
DSL.val(filterBytesFromRecord(record), byte[].class) | ||
DSL.val(filterFromRecord.bytes(), byte[].class) | ||
}; | ||
ctx.insertInto(categoryTable).columns(insertFields).values(valueFields).execute(); | ||
} | ||
|
||
public void insertFiltersIntoCategoryTable() { | ||
recordsInMetadata.toResult().forEach(this::insertFilterRecordToCategoryTable); | ||
} | ||
|
||
/** | ||
* Expects DSLContext values to be the same instance | ||
* | ||
* @param object object compared | ||
* @returs true if object is equal | ||
*/ | ||
@Override | ||
public boolean equals(final Object object) { | ||
if (this == object) | ||
return true; | ||
if (object == null) | ||
return false; | ||
if (object.getClass() != this.getClass()) | ||
if (object == null || this.getClass() != object.getClass()) | ||
return false; | ||
final TableFilters cast = (TableFilters) object; | ||
return this.ctx == cast.ctx && this.value.equals(cast.value) && this.table.equals(cast.table) | ||
&& this.bloomTermId == cast.bloomTermId; | ||
final FilterFromRecordToCategoryTableConsumer cast = (FilterFromRecordToCategoryTableConsumer) object; | ||
return bloomTermId == cast.bloomTermId && ctx == cast.ctx && table.equals(cast.table) | ||
&& searchTerm.equals(cast.searchTerm); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(ctx, table, bloomTermId, searchTerm); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this exception message could be a bit clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified the exception messages, added tests for excetpions, removed use of
.longValue()
method in constructor which would lead to NPE.