diff --git a/aerospike-crud-store/pom.xml b/aerospike-crud-store/pom.xml index 1088b7b..c6d423d 100644 --- a/aerospike-crud-store/pom.xml +++ b/aerospike-crud-store/pom.xml @@ -5,7 +5,7 @@ crud-store com.livetheoogway.crudstore - 1.2.5 + 1.2.6 4.0.0 diff --git a/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/AerospikeStore.java b/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/AerospikeStore.java index e1c43b1..5a3ade7 100644 --- a/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/AerospikeStore.java +++ b/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/AerospikeStore.java @@ -140,12 +140,12 @@ protected int expiration(T item) { */ @Override public void create(final T item) { - write(item, null, createPolicy); + write(item.id(), () -> recordDetails(item, null), createPolicy); } @Override public void create(final T item, List refIds) { - write(item, refIds, createPolicy); + write(item.id(), () -> recordDetails(item, refIds), createPolicy); } /** @@ -155,7 +155,7 @@ public void create(final T item, List refIds) { */ @Override public void update(final T item) { - write(item, null, updateOnly); + write(item.id(), () -> recordDetails(item, null), updateOnly); } @@ -178,15 +178,11 @@ public void delete(final String id) { */ @Override public Optional get(final String id) { - final T data = exec("get", id, () -> { + return exec("get", id, () -> { final var requestIdKey = new Key(namespaceSet.namespace(), namespaceSet.set(), id); final var asRecord = client.get(client.getReadPolicyDefault(), requestIdKey); - return extractItem(id, asRecord); + return extractItemFromRecord(id, asRecord); }, errorHandler); - if (isValidDataItem(data)) { - return Optional.ofNullable(data); - } - return Optional.empty(); } /** @@ -205,7 +201,7 @@ public Map get(final List ids) { final Record[] records = client.get(client.getBatchPolicyDefault(), batchReads); return IntStream .range(0, ids.size()) - .mapToObj(index -> extractItemForBulkOperations(batchReads[index].userKey.toString(), records[index])) + .mapToObj(index -> extractItemFromRecord(batchReads[index].userKey.toString(), records[index])) .filter(Optional::isPresent) .map(Optional::get) .filter(this::isValidDataItem) @@ -226,7 +222,7 @@ public List list() { client.getScanPolicyDefault(), namespaceSet.namespace(), namespaceSet.set(), (key, asRecord) -> - extractItemForBulkOperations(key.userKey.toString(), asRecord) + extractItemFromRecord(key.userKey.toString(), asRecord) .ifPresent(items::add)); return items; } @@ -237,16 +233,13 @@ public List getByRefId(final String refId) { statement.setNamespace(namespaceSet.namespace()); statement.setSetName(namespaceSet.set()); statement.setIndexName(storeSetting.refIdIndex()); + statement.setBinNames(storeSetting.dataBin()); statement.setFilter(Filter.contains(storeSetting.refIdBin(), IndexCollectionType.LIST, refId)); final List results = new ArrayList<>(); try (final RecordSet recordSet = client.query(client.getQueryPolicyDefault(), statement)) { while (recordSet.next()) { - final T t = extractItem(recordSet.getKey().userKey.toString(), recordSet.getRecord()); - if (isValidDataItem(t)) { - results.add(t); - } else { - log.warn("Invalid item found for id:{}", t.id()); - } + extractItemFromRecord(recordSet.getKey().userKey.toString(), recordSet.getRecord()) + .ifPresent(results::add); } } catch (AerospikeException e) { log.error("[{}] Aerospike Error {} item for id:{}", typeReference.getType().getTypeName(), "getByRefId", refId, e); @@ -255,37 +248,33 @@ public List getByRefId(final String refId) { return results; } - public T extractItem(String id, Record asRecord) { + /** + * given an Aerospike Record, extract the data out of it + * @param id key + * @param asRecord how the record is extracted + * @return optionally return data if available + */ + protected Optional extractItemFromRecord(final String id, final Record asRecord) { if (asRecord == null) { return errorHandler.onNoRecordFound(id); } final var data = asRecord.getString(storeSetting.dataBin()); try { - return mapper.readValue(data, typeReference); + final T value = mapper.readValue(data, typeReference); + if (isValidDataItem(value)) { + return Optional.of(value); + } + log.warn("Invalid item found for id:{}", id); + return Optional.empty(); } catch (JsonProcessingException e) { - log.error("[{}] Deserialization error id:{} record:{}", typeReference.getType().getTypeName(), id, asRecord, - e); + log.error("[{}] Deserialization error id:{} record:{}", typeReference.getType().getTypeName(), id, asRecord, e); return errorHandler.onDeSerializationError(id, e); } } - public Optional extractItemForBulkOperations(String id, Record asRecord) { - if (asRecord == null) { - return Optional.ofNullable(errorHandler.onNoRecordFound(id)); - } - final var data = asRecord.getString(storeSetting.dataBin()); - try { - return Optional.of(mapper.readValue(data, typeReference)); - } catch (JsonProcessingException e) { - log.error("[{}] Deserialization error id:{} record:{}", typeReference.getType().getTypeName(), id, asRecord, - e); - return Optional.ofNullable(errorHandler.onDeSerializationError(id, e)); - } - } - - protected RecordDetails recordDetails(T item, List refIds) throws JsonProcessingException { + protected RecordDetails recordDetails(final T item, final List refIds) throws JsonProcessingException { final var dataBin = new Bin(storeSetting.dataBin(), mapper.writeValueAsString(item)); - if (refIds!=null && !refIds.isEmpty()) { + if (refIds != null && !refIds.isEmpty()) { final var refIdBin = new Bin(storeSetting.refIdBin(), refIds); return new RecordDetails(expiration(item), dataBin, refIdBin); } @@ -301,7 +290,10 @@ protected RecordDetails recordDetails(T item, List refIds) throws JsonPr * @param result type * @return result */ - protected R exec(final String operation, final String id, ESupplier response, ErrorHandler errorHandler) { + protected Optional exec(final String operation, + final String id, + final ESupplier> response, + final ErrorHandler errorHandler) { try { log.info("[{}] {} item for id:{}", typeReference.getType().getTypeName(), operation, id); return response.get(); @@ -317,10 +309,11 @@ protected R exec(final String operation, final String id, ESupplier respo } } - private void write(final T item, List refIds, WritePolicy defaultWritePolicy) { - final var id = item.id(); + protected void write(final String id, + final ESupplier recordDetailsSupplier, + final WritePolicy defaultWritePolicy) { exec("operation:" + defaultWritePolicy.recordExistsAction, id, () -> { - final var recordDetails = recordDetails(item, refIds); + final var recordDetails = recordDetailsSupplier.get(); var writePolicy = defaultWritePolicy; if (recordDetails.expiration() > 0) { writePolicy = new WritePolicy(writePolicy); diff --git a/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/DefaultErrorHandler.java b/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/DefaultErrorHandler.java index 8648c4d..43135d4 100644 --- a/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/DefaultErrorHandler.java +++ b/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/DefaultErrorHandler.java @@ -26,27 +26,27 @@ public void onDeleteUnsuccessful() { } @Override - public T onNoRecordFound(final String id) { - return null; + public Optional onNoRecordFound(final String id) { + return Optional.empty(); } @Override - public T onDeSerializationError(final String id, final JsonProcessingException e) { + public Optional onDeSerializationError(final String id, final JsonProcessingException e) { throw new RuntimeException(id + " not deserializable", e); } @Override - public T onAerospikeError(final String id, final AerospikeException e) { + public Optional onAerospikeError(final String id, final AerospikeException e) { throw new RuntimeException(e); } @Override - public T onSerializationError(final String id, final JsonProcessingException e) { + public Optional onSerializationError(final String id, final JsonProcessingException e) { throw new RuntimeException(id + " not serializable", e); } @Override - public T onExecutionError(final String id, final Exception e) { + public Optional onExecutionError(final String id, final Exception e) { throw new RuntimeException(e); } } diff --git a/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/ErrorHandler.java b/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/ErrorHandler.java index 3a83d90..e03d9b4 100644 --- a/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/ErrorHandler.java +++ b/aerospike-crud-store/src/main/java/com/livetheoogway/crudstore/aerospike/ErrorHandler.java @@ -18,22 +18,23 @@ import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; +import java.util.Optional; @SuppressWarnings("java:S112") public interface ErrorHandler { void onDeleteUnsuccessful(); - T onNoRecordFound(String id); + Optional onNoRecordFound(String id); - T onDeSerializationError(String id, final JsonProcessingException e); + Optional onDeSerializationError(String id, final JsonProcessingException e); - T onAerospikeError(String id, AerospikeException e); + Optional onAerospikeError(String id, AerospikeException e); default List onAerospikeErrorForRefId(String id, AerospikeException e) { return List.of(); } - T onSerializationError(final String id, JsonProcessingException e); + Optional onSerializationError(final String id, JsonProcessingException e); - T onExecutionError(final String id, Exception e); + Optional onExecutionError(final String id, Exception e); } diff --git a/crud-store-core/pom.xml b/crud-store-core/pom.xml index 648aaaa..ad35c22 100644 --- a/crud-store-core/pom.xml +++ b/crud-store-core/pom.xml @@ -5,7 +5,7 @@ crud-store com.livetheoogway.crudstore - 1.2.5 + 1.2.6 4.0.0 diff --git a/pom.xml b/pom.xml index a5f9647..14000a7 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.livetheoogway.crudstore crud-store pom - 1.2.5 + 1.2.6 crud-store