Skip to content

Commit

Permalink
Polydata index storage update (#3)
Browse files Browse the repository at this point in the history
* Improved storing of indexes

* add id filtering

* Improved fetching of redis indexes
  • Loading branch information
denis256 authored Apr 29, 2023
1 parent 6b10c8c commit ef8c8c7
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ void metadataOperations() {
@Test
void queryRandom() {
String poly = createPoly();
for (int i = 0; i < 105; i++) {
for (int i = 0; i < 1000; i++) {
polydata.insert(poly, Collections.singletonList(
InsertRequest.builder()
.data(BasicPoly.newPoly("test_" + i).with("app", i + "").with("field", i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public Optional<Polydata> create(BasicPoly config) {
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
poolConfig.setTestOnCreate(true);
poolConfig.setMinEvictableIdleTime(Duration.ofSeconds(config.fetch("min-evictable-idle", 60)));
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(config.fetch("time-between-eviction-runs", 30)));
poolConfig.setNumTestsPerEvictionRun(config.fetch("num-tests-per-eviction", 10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public PolydataMongodb(String mongoUri) {

public void prepareStorage(String poly) {
collection(poly).createIndex(Indexes.ascending(INDEXES));
indexCollection(poly).createIndex(Indexes.descending(COUNT));
}

@Override
Expand Down Expand Up @@ -110,14 +109,28 @@ public Optional<BasicPoly> index(String poly) {
if (cachedResult != null) {
return Optional.of(cachedResult);
}
BasicPoly index = new BasicPoly();
MongoCollection<Document> collection = indexCollection(poly);
for (Document document : collection.find()) {
BasicPoly value = toPoly(document);
index.put(value._id(), value);
BasicPoly index = null;
BasicPoly rawIndex = null;
Bson query = Filters.eq(_ID, poly);
try (MongoCursor<Document> cursor = indexCollection(poly).find(query).iterator()) {
if (cursor.hasNext()) {
Document document = cursor.next();
rawIndex = toPoly(document);
}
}
putIfCache(poly + "-index", index);
return Optional.of(index);
if (rawIndex != null) {
// transform index to poly
index = BasicPoly.newPoly(poly);
for(String key : rawIndex.data().keySet()) {
if (StringUtils.equals(key, _ID)) {
continue;
}
index.put(key, BasicPoly.newPoly(key).with("count", Long.parseLong(rawIndex.data().get(key) + "")));
}
putIfCache(poly + "-index", index);
}

return Optional.ofNullable(index);
}

@Override
Expand Down Expand Up @@ -439,7 +452,7 @@ private void persistPolyToCollection(String poly, String collection, BasicPoly d
}

private MongoCollection<Document> indexCollection(String poly) {
return collection(INDEX_COLLECTION + "_" + poly);
return collection(INDEX_COLLECTION);
}

private void recalculateIndex(String poly) {
Expand All @@ -452,23 +465,23 @@ private void recalculateIndex(String poly) {
)
);


try (MongoCursor<Document> iterator = documents.iterator()) {
List<BasicPoly> indexes = new ArrayList<>();
BasicPoly indexes = BasicPoly.newPoly(poly);
UpdateOptions opt = new UpdateOptions().upsert(true);
while (iterator.hasNext()) {
Document next = iterator.next();
String index = next.getString(_ID);
Long count = Long.parseLong(next.get("count") + "");
BasicPoly indexData = BasicPoly.newPoly(index);
indexData.put("count", count);
indexData.put("index", index);
indexData.put("poly", poly);
indexes.add(indexData);
}
if (!indexes.isEmpty()) {
indexCollection(poly).drop();
indexCollection(poly).insertMany(indexes.stream().map(this::toDocument).collect(Collectors.toList()));
indexes.put(index, count);
}
Document indexDocument = toDocument(indexes);
Bson update = new Document("$set", indexDocument);
Bson filter = Filters.eq(_ID, poly);
BulkWriteResult bulkWriteResult = indexCollection(poly).bulkWrite(List.of(new UpdateOneModel<>(filter, update, opt)));

log.debug("Index write result getInsertedCount {} getModifiedCount {} getMatchedCount {}",
bulkWriteResult.getInsertedCount(), bulkWriteResult.getModifiedCount(),
bulkWriteResult.getMatchedCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,8 @@ public Optional<BasicPoly> indexData(String poly, String indexId) {
public BasicPolyList insert(String poly, Collection<InsertRequest> insertRequests) {
final BasicPolyList basicPolyList = new BasicPolyList();
redis(jedis -> {

Set<String> polyIds = new HashSet<>();
// build index data
for (InsertRequest insertRequest : insertRequests) {
BasicPoly data = insertRequest.getData();
polyIds.add(data._id());

Set<String> indexToPersist = insertRequest.getIndexToPersist();
if (CollectionUtils.isEmpty(indexToPersist)) {
indexToPersist = new HashSet<>();
Expand All @@ -129,36 +125,45 @@ public BasicPolyList insert(String poly, Collection<InsertRequest> insertRequest
}
indexToPersist.add(DATE_INDEX);
insertRequest.setIndexToPersist(indexToPersist);
insertRequest.getData().put(INDEXES, indexToPersist);
}

for (InsertRequest insertRequest : insertRequests) {

BasicPoly polyToPersist = insertRequest.getData();
writePoly(jedis, poly, polyToPersist);
Map<String, BasicPoly> indexData = insertRequest.getIndexData();

removePolyFromIndex(jedis, poly, polyToPersist);
for (String indexName : insertRequest.getIndexToPersist()) {
// add poly it to list of polys
byte[] indexId = fetchId(poly, indexName);
byte[] indexId = fetchIndexId(poly, indexName);
jedis.lpush(indexId, insertRequest.getData()._id().getBytes());

BasicPoly tagIndex = index(poly).orElseGet(() -> BasicPoly.newPoly(TAG_INDEX_KEY));

// update tags list
BasicPoly data = new BasicPoly();
if (indexData != null) {
data.data().putAll(indexData.getOrDefault(indexName, BasicPoly.newPoly(indexName)).data());
}
data.put("count", jedis.llen(indexId));
tagIndex.put(indexName, data);
writePoly(jedis, poly, tagIndex);
}
}
rebuildIndex(jedis, poly);
});

return basicPolyList;
}

private void rebuildIndex(Jedis jedis, String poly) {
byte[] pattern = fetchIndexId(poly, "*");
Set<byte[]> keys = jedis.keys(pattern);
BasicPoly tagIndex = BasicPoly.newPoly(TAG_INDEX_KEY);
for (byte[] key : keys) {
String stringKey = StringUtils.replace(new String(key), new String(fetchIndexId(poly, "")), "");
long length = jedis.llen(key);
tagIndex.put(stringKey, BasicPoly.newPoly().with("count", length));
}
writePoly(jedis, poly, tagIndex);
}

private void removePolyFromIndex(Jedis jedis, String poly, BasicPoly p) {
byte[] pattern = fetchIndexId(poly, "*");
Set<byte[]> keys = jedis.keys(pattern);
for (byte[] index : keys) {
jedis.lrem(index, 0, p._id().getBytes());
}
}

@Override
public BasicPolyList update(String poly, Collection<InsertRequest> insertRequests) {
return insert(poly, insertRequests);
Expand Down Expand Up @@ -190,13 +195,23 @@ public BasicPolyList remove(String poly, Set<String> ids) {
return redis(jedis -> {
byte[][] redisIds = ids.stream().map(id -> fetchId(poly, id)).toArray(byte[][]::new);
BasicPolyList basicPolyList = read(poly, ids);
// remove ids
for (byte[] id : redisIds) {
try {
jedis.del(id);
} catch (Exception e) {
log.error("Failed to delete poly", e);
}
}
// remove poly from indexes
for(BasicPoly p: basicPolyList.list()) {
Collection<String> indexes = p.fetch(INDEXES);
for (String index : indexes) {
byte[] indexId = fetchIndexId(poly, index);
jedis.lrem(indexId, 0, p._id().getBytes());
}
}
rebuildIndex(jedis, poly);
return basicPolyList;
});
}
Expand All @@ -223,11 +238,11 @@ public BasicPolyList query(String poly, PolyQuery polyQuery) {
Integer defaultItemPerPage = config.fetch(ITEM_PER_PAGE, DEFAULT_ITEM_PER_PAGE);
Integer itemPerPage = query.getOptions().fetch(ITEM_PER_PAGE, defaultItemPerPage);

long count = jedis.llen(fetchId(poly, index));
int randomCount = query.option(RANDOM_COUNT, itemPerPage);

List<Integer> ids = new ArrayList<>();
if (query.queryType() == BasicPolyQuery.QueryFunction.RANDOM) {
long count = jedis.llen(fetchIndexId(poly, index));
int randomCount = query.option(RANDOM_COUNT, itemPerPage);

for (int i = 0; i < randomCount; i++) {
ids.add(randoms.getRandom().nextInt((int) count));
}
Expand All @@ -239,7 +254,7 @@ public BasicPolyList query(String poly, PolyQuery polyQuery) {
}

Set<String> indexIds = ids.stream()
.map(id -> jedis.lindex(fetchId(poly, index), id))
.map(id -> jedis.lindex(fetchIndexId(poly, index), id))
.filter(Objects::nonNull)
.map(id -> new String(id))
.collect(Collectors.toSet());
Expand All @@ -263,7 +278,7 @@ public Long count(String poly, PolyQuery polyQuery) {
} else {
index = DATE_INDEX;
}
byte[] indexId = fetchId(poly, index);
byte[] indexId = fetchIndexId(poly, index);
return jedis.llen(indexId);
});
}
Expand Down Expand Up @@ -297,6 +312,14 @@ private byte[] fetchId(String poly, String id) {
return DigestUtils.sha256Hex(value).toLowerCase().getBytes();
}

private byte[] fetchIndexId(String poly, String id) {
String value = polyConfig.prefix + poly + "-index-" + id;
if (!polyConfig.hashIds) {
return value.getBytes();
}
return DigestUtils.sha256Hex(value).toLowerCase().getBytes();
}

/**
* Execute logic on redis connection.
*/
Expand Down

0 comments on commit ef8c8c7

Please sign in to comment.