Skip to content

Commit

Permalink
[proxima-beam-core] fix merging of reading of same families
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Oct 17, 2023
1 parent e10f1d5 commit 95c94c0
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ private static class BatchUpdatesDescriptor implements PCollectionDescriptor {
long startStamp;
long endStamp;
boolean asStream;
List<AttributeDescriptor<?>> attrList;

PCollection<StreamElement> createBatchUpdates(List<AttributeDescriptor<?>> attrList) {
PCollection<StreamElement> createBatchUpdates() {
return asStream
? dataAccessor.createStreamFromUpdates(pipeline, attrList, startStamp, endStamp, -1)
: dataAccessor.createBatch(pipeline, attrList, startStamp, endStamp);
Expand All @@ -107,8 +108,9 @@ private static class BatchSnapshotDescriptor implements PCollectionDescriptor {
DataAccessor dataAccessor;
long fromStamp;
long untilStamp;
List<AttributeDescriptor<?>> attrList;

PCollection<StreamElement> createBatchUpdates(List<AttributeDescriptor<?>> attrList) {
PCollection<StreamElement> createBatchUpdates() {
return dataAccessor.createBatch(pipeline, attrList, fromStamp, untilStamp);
}
}
Expand Down Expand Up @@ -285,13 +287,7 @@ public final PCollection<StreamElement> getBatchUpdates(

Preconditions.checkArgument(
attrs.length > 0, "Cannot create PCollection from empty attribute list");
List<AttributeDescriptor<?>> attrClosure =
findSuitableFamilies(af -> af.getAccess().canReadBatchUpdates(), attrs)
.filter(p -> p.getSecond().isPresent())
.map(p -> p.getSecond().get())
.flatMap(d -> d.getAttributes().stream())
.distinct()
.collect(Collectors.toList());
List<AttributeDescriptor<?>> attrClosure = createAttributeClosure(attrs);
Preconditions.checkArgument(
!attrClosure.isEmpty(),
"Cannot find suitable family for attributes %s",
Expand All @@ -303,8 +299,9 @@ public final PCollection<StreamElement> getBatchUpdates(
.map(
da -> {
BatchUpdatesDescriptor desc =
new BatchUpdatesDescriptor(pipeline, da, startStamp, endStamp, asStream);
return getOrCreatePCollection(desc, true, d -> d.createBatchUpdates(attrClosure));
new BatchUpdatesDescriptor(
pipeline, da, startStamp, endStamp, asStream, attrClosure);
return getOrCreatePCollection(desc, true, BatchUpdatesDescriptor::createBatchUpdates);
})
.reduce(
PCollectionList.<StreamElement>empty(pipeline),
Expand Down Expand Up @@ -348,8 +345,6 @@ public final PCollection<StreamElement> getBatchSnapshot(
public final PCollection<StreamElement> getBatchSnapshot(
Pipeline pipeline, long fromStamp, long untilStamp, AttributeDescriptor<?>... attrs) {

List<AttributeDescriptor<?>> attrList = Arrays.stream(attrs).collect(Collectors.toList());

List<Pair<AttributeDescriptor<?>, Optional<AttributeFamilyDescriptor>>> resolvedAttrs;
resolvedAttrs =
findSuitableFamilies(af -> af.getAccess().canReadBatchSnapshot(), attrs)
Expand All @@ -358,6 +353,12 @@ public final PCollection<StreamElement> getBatchSnapshot(
boolean unresolved = resolvedAttrs.stream().anyMatch(p -> p.getSecond().isEmpty());

if (!unresolved) {
List<AttributeDescriptor<?>> attrList =
resolvedAttrs.stream()
.flatMap(p -> p.getSecond().stream())
.flatMap(af -> af.getAttributes().stream())
.distinct()
.collect(Collectors.toList());
return resolvedAttrs.stream()
// take all attributes from the same family
// it will be filtered away then, this is needed to enable fusion of multiple reads from
Expand All @@ -373,8 +374,9 @@ public final PCollection<StreamElement> getBatchSnapshot(
.map(
da -> {
BatchSnapshotDescriptor desc =
new BatchSnapshotDescriptor(pipeline, da, fromStamp, untilStamp);
return getOrCreatePCollection(desc, true, d -> d.createBatchUpdates(attrList));
new BatchSnapshotDescriptor(pipeline, da, fromStamp, untilStamp, attrList);
return getOrCreatePCollection(
desc, true, BatchSnapshotDescriptor::createBatchUpdates);
})
.reduce(
PCollectionList.<StreamElement>empty(pipeline),
Expand All @@ -394,6 +396,15 @@ public final PCollection<StreamElement> getBatchSnapshot(
getBatchUpdates(pipeline, fromStamp, untilStamp, attrs));
}

private List<AttributeDescriptor<?>> createAttributeClosure(AttributeDescriptor<?>[] attrs) {
return findSuitableFamilies(af -> af.getAccess().canReadBatchUpdates(), attrs)
.filter(p -> p.getSecond().isPresent())
.map(p -> p.getSecond().get())
.flatMap(d -> d.getAttributes().stream())
.distinct()
.collect(Collectors.toList());
}

/**
* Get {@link DataAccessor} for given {@link AttributeFamilyDescriptor}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.util.Optionals;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -55,10 +56,7 @@ public void tearDown() {
public void testReadingFromProxy() {
EntityDescriptor proxied = repo.getEntity("proxied");
AttributeDescriptor<byte[]> event = proxied.getAttribute("event.*");
direct
.getWriter(event)
.orElseThrow(() -> new IllegalArgumentException("Missing writer for " + event))
.write(newEvent(proxied, event), (succ, exc) -> {});
Optionals.get(direct.getWriter(event)).write(newEvent(proxied, event), (succ, exc) -> {});
Pipeline p = Pipeline.create();
PCollection<StreamElement> input = beam.getBatchSnapshot(p, event);
PCollection<Long> result = input.apply(Count.globally());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.core.util.Optionals;
import cz.o2.proxima.core.util.ReplicationRunner;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -68,6 +70,7 @@ public class BeamDataOperatorTest {
.withFallback(ConfigFactory.load("test-reference.conf")));
final EntityDescriptor gateway = repo.getEntity("gateway");
final AttributeDescriptor<?> armed = gateway.getAttribute("armed");
final AttributeDescriptor<?> status = gateway.getAttribute("status");
final EntityDescriptor proxied = repo.getEntity("proxied");
final AttributeDescriptor<?> event = proxied.getAttribute("event.*");

Expand Down Expand Up @@ -360,6 +363,64 @@ public void testMultipleConsumptionsFromSingleAttribute() {
checkHasSingleInput(p);
}

@Test
public void testDifferentAttributesFromSameFamily() {
Pipeline p = Pipeline.create();
PCollection<StreamElement> first = beam.getStream(p, Position.OLDEST, true, true, armed);
PCollection<StreamElement> second = beam.getStream(p, Position.OLDEST, true, true, status);
// validate that we have used cached PCollection
terminatePipeline(first, second);
checkHasSingleInput(p);

p = Pipeline.create();
first = beam.getBatchUpdates(p, armed);
second = beam.getBatchUpdates(p, status);
terminatePipeline(first, second);
checkHasSingleInput(p);

p = Pipeline.create();
first = beam.getBatchSnapshot(p, armed);
second = beam.getBatchSnapshot(p, status);
terminatePipeline(first, second);
checkHasSingleInput(p);
}

@Test
public void testConsumptionFromDifferentAttributesFromSameFamily() {
OnlineAttributeWriter writer = Optionals.get(direct.getWriter(armed));
long now = System.currentTimeMillis();
writer.write(
StreamElement.upsert(
gateway,
armed,
UUID.randomUUID().toString(),
"key",
armed.getName(),
now,
new byte[] {}),
(succ, exc) -> {});
writer.write(
StreamElement.upsert(
gateway,
status,
UUID.randomUUID().toString(),
"key",
status.getName(),
now,
new byte[] {}),
(succ, exc) -> {});
Pipeline p = Pipeline.create();
PCollection<StreamElement> first = beam.getBatchSnapshot(p, armed);
PCollection<StreamElement> second = beam.getBatchSnapshot(p, status);
PCollectionList<StreamElement> input = PCollectionList.of(first).and(second);
PCollection<KV<String, Long>> counted =
input
.apply(Flatten.pCollections())
.apply(WithKeys.of(StreamElement::getKey).withKeyType(TypeDescriptors.strings()))
.apply(Count.perKey());
PAssert.that(counted).containsInAnyOrder(KV.of("key", 2L));
}

@Test
public void testReadFromProxy() {
EntityDescriptor entity = repo.getEntity("proxied");
Expand Down

0 comments on commit 95c94c0

Please sign in to comment.