Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up of record extraction and creation Error handler is more comp… #22

Merged
merged 1 commit into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aerospike-crud-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>crud-store</artifactId>
<groupId>com.livetheoogway.crudstore</groupId>
<version>1.2.5</version>
<version>1.2.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ protected int expiration(T item) {
*/
@Override
public void create(final T item) {
write(item, null, createPolicy);
write(item.id(), () -> recordDetails(item, null), createPolicy);
}

@Override
public void create(final T item, List<String> refIds) {
write(item, refIds, createPolicy);
write(item.id(), () -> recordDetails(item, refIds), createPolicy);
}

/**
Expand All @@ -155,7 +155,7 @@ public void create(final T item, List<String> refIds) {
*/
@Override
public void update(final T item) {
write(item, null, updateOnly);
write(item.id(), () -> recordDetails(item, null), updateOnly);
}


Expand All @@ -178,15 +178,11 @@ public void delete(final String id) {
*/
@Override
public Optional<T> get(final String id) {
final T data = exec("get", id, () -> {
return exec("get", id, () -> {
final var requestIdKey = new Key(namespaceSet.namespace(), namespaceSet.set(), id);
final var asRecord = client.get(client.getReadPolicyDefault(), requestIdKey);
return extractItem(id, asRecord);
return extractItemFromRecord(id, asRecord);
}, errorHandler);
if (isValidDataItem(data)) {
return Optional.ofNullable(data);
}
return Optional.empty();
}

/**
Expand All @@ -205,7 +201,7 @@ public Map<String, T> get(final List<String> ids) {
final Record[] records = client.get(client.getBatchPolicyDefault(), batchReads);
return IntStream
.range(0, ids.size())
.mapToObj(index -> extractItemForBulkOperations(batchReads[index].userKey.toString(), records[index]))
.mapToObj(index -> extractItemFromRecord(batchReads[index].userKey.toString(), records[index]))
.filter(Optional::isPresent)
.map(Optional::get)
.filter(this::isValidDataItem)
Expand All @@ -226,7 +222,7 @@ public List<T> list() {
client.getScanPolicyDefault(),
namespaceSet.namespace(), namespaceSet.set(),
(key, asRecord) ->
extractItemForBulkOperations(key.userKey.toString(), asRecord)
extractItemFromRecord(key.userKey.toString(), asRecord)
.ifPresent(items::add));
return items;
}
Expand All @@ -237,16 +233,13 @@ public List<T> getByRefId(final String refId) {
statement.setNamespace(namespaceSet.namespace());
statement.setSetName(namespaceSet.set());
statement.setIndexName(storeSetting.refIdIndex());
statement.setBinNames(storeSetting.dataBin());
statement.setFilter(Filter.contains(storeSetting.refIdBin(), IndexCollectionType.LIST, refId));
final List<T> results = new ArrayList<>();
try (final RecordSet recordSet = client.query(client.getQueryPolicyDefault(), statement)) {
while (recordSet.next()) {
final T t = extractItem(recordSet.getKey().userKey.toString(), recordSet.getRecord());
if (isValidDataItem(t)) {
results.add(t);
} else {
log.warn("Invalid item found for id:{}", t.id());
}
extractItemFromRecord(recordSet.getKey().userKey.toString(), recordSet.getRecord())
.ifPresent(results::add);
}
} catch (AerospikeException e) {
log.error("[{}] Aerospike Error {} item for id:{}", typeReference.getType().getTypeName(), "getByRefId", refId, e);
Expand All @@ -255,37 +248,33 @@ public List<T> getByRefId(final String refId) {
return results;
}

public T extractItem(String id, Record asRecord) {
/**
* given an Aerospike Record, extract the data out of it
* @param id key
* @param asRecord how the record is extracted
* @return optionally return data if available
*/
protected Optional<T> extractItemFromRecord(final String id, final Record asRecord) {
if (asRecord == null) {
return errorHandler.onNoRecordFound(id);
}
final var data = asRecord.getString(storeSetting.dataBin());
try {
return mapper.readValue(data, typeReference);
final T value = mapper.readValue(data, typeReference);
if (isValidDataItem(value)) {
return Optional.of(value);
}
log.warn("Invalid item found for id:{}", id);
return Optional.empty();
} catch (JsonProcessingException e) {
log.error("[{}] Deserialization error id:{} record:{}", typeReference.getType().getTypeName(), id, asRecord,
e);
log.error("[{}] Deserialization error id:{} record:{}", typeReference.getType().getTypeName(), id, asRecord, e);
return errorHandler.onDeSerializationError(id, e);
}
}

public Optional<T> extractItemForBulkOperations(String id, Record asRecord) {
if (asRecord == null) {
return Optional.ofNullable(errorHandler.onNoRecordFound(id));
}
final var data = asRecord.getString(storeSetting.dataBin());
try {
return Optional.of(mapper.readValue(data, typeReference));
} catch (JsonProcessingException e) {
log.error("[{}] Deserialization error id:{} record:{}", typeReference.getType().getTypeName(), id, asRecord,
e);
return Optional.ofNullable(errorHandler.onDeSerializationError(id, e));
}
}

protected RecordDetails recordDetails(T item, List<String> refIds) throws JsonProcessingException {
protected RecordDetails recordDetails(final T item, final List<String> refIds) throws JsonProcessingException {
final var dataBin = new Bin(storeSetting.dataBin(), mapper.writeValueAsString(item));
if (refIds!=null && !refIds.isEmpty()) {
if (refIds != null && !refIds.isEmpty()) {
final var refIdBin = new Bin(storeSetting.refIdBin(), refIds);
return new RecordDetails(expiration(item), dataBin, refIdBin);
}
Expand All @@ -301,7 +290,10 @@ protected RecordDetails recordDetails(T item, List<String> refIds) throws JsonPr
* @param <R> result type
* @return result
*/
protected <R> R exec(final String operation, final String id, ESupplier<R> response, ErrorHandler<R> errorHandler) {
protected <R> Optional<R> exec(final String operation,
final String id,
final ESupplier<Optional<R>> response,
final ErrorHandler<R> errorHandler) {
try {
log.info("[{}] {} item for id:{}", typeReference.getType().getTypeName(), operation, id);
return response.get();
Expand All @@ -317,10 +309,11 @@ protected <R> R exec(final String operation, final String id, ESupplier<R> respo
}
}

private void write(final T item, List<String> refIds, WritePolicy defaultWritePolicy) {
final var id = item.id();
protected void write(final String id,
final ESupplier<RecordDetails> recordDetailsSupplier,
final WritePolicy defaultWritePolicy) {
exec("operation:" + defaultWritePolicy.recordExistsAction, id, () -> {
final var recordDetails = recordDetails(item, refIds);
final var recordDetails = recordDetailsSupplier.get();
var writePolicy = defaultWritePolicy;
if (recordDetails.expiration() > 0) {
writePolicy = new WritePolicy(writePolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@ public void onDeleteUnsuccessful() {
}

@Override
public T onNoRecordFound(final String id) {
return null;
public Optional<T> onNoRecordFound(final String id) {
return Optional.empty();
}

@Override
public T onDeSerializationError(final String id, final JsonProcessingException e) {
public Optional<T> onDeSerializationError(final String id, final JsonProcessingException e) {
throw new RuntimeException(id + " not deserializable", e);
}

@Override
public T onAerospikeError(final String id, final AerospikeException e) {
public Optional<T> onAerospikeError(final String id, final AerospikeException e) {
throw new RuntimeException(e);
}

@Override
public T onSerializationError(final String id, final JsonProcessingException e) {
public Optional<T> onSerializationError(final String id, final JsonProcessingException e) {
throw new RuntimeException(id + " not serializable", e);
}

@Override
public T onExecutionError(final String id, final Exception e) {
public Optional<T> onExecutionError(final String id, final Exception e) {
throw new RuntimeException(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@
import com.fasterxml.jackson.core.JsonProcessingException;

import java.util.List;
import java.util.Optional;

@SuppressWarnings("java:S112")
public interface ErrorHandler<T> {
void onDeleteUnsuccessful();

T onNoRecordFound(String id);
Optional<T> onNoRecordFound(String id);

T onDeSerializationError(String id, final JsonProcessingException e);
Optional<T> onDeSerializationError(String id, final JsonProcessingException e);

T onAerospikeError(String id, AerospikeException e);
Optional<T> onAerospikeError(String id, AerospikeException e);

default List<T> onAerospikeErrorForRefId(String id, AerospikeException e) {
return List.of();
}

T onSerializationError(final String id, JsonProcessingException e);
Optional<T> onSerializationError(final String id, JsonProcessingException e);

T onExecutionError(final String id, Exception e);
Optional<T> onExecutionError(final String id, Exception e);
}
2 changes: 1 addition & 1 deletion crud-store-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>crud-store</artifactId>
<groupId>com.livetheoogway.crudstore</groupId>
<version>1.2.5</version>
<version>1.2.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.livetheoogway.crudstore</groupId>
<artifactId>crud-store</artifactId>
<packaging>pom</packaging>
<version>1.2.5</version>
<version>1.2.6</version>

<name>crud-store</name>
<description>
Expand Down
Loading