Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[proxima-direct-core] #318 filter target attributes in test replication controller #849

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
package cz.o2.proxima.core.util;

import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.functional.TriFunction;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.storage.StorageType;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.BulkAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver.OnNextContext;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.ObserveHandle;
import java.net.URI;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -68,43 +72,71 @@ public static void runAttributeReplicas(
.flatMap(DirectAttributeFamilyDescriptor::getCommitLogReader));
final ObserveHandle handle;
if (writer instanceof OnlineAttributeWriter) {
final OnlineAttributeWriter onlineWriter = writer.online();
handle =
primaryCommitLogReader.observe(
af.getDesc().getName(),
(CommitLogObserver)
(ingest, context) -> {
log.debug("Replicating input {} to {}", ingest, writer);
onlineWriter.write(
ingest,
(succ, exc) -> {
context.commit(succ, exc);
onReplicated.accept(ingest);
});
return true;
});
newReplicationCommitLogObserver(attributes, writer.online(), onReplicated));
} else {
final BulkAttributeWriter bulkWriter = writer.bulk();
handle =
primaryCommitLogReader.observe(
af.getDesc().getName(),
(CommitLogObserver)
(ingest, context) -> {
log.debug("Replicating input {} to {}", ingest, writer);
bulkWriter.write(
ingest,
context.getWatermark(),
(succ, exc) -> {
context.commit(succ, exc);
onReplicated.accept(ingest);
});
return true;
});
newReplicationCommitLogObserver(attributes, writer.bulk(), onReplicated));
}
ExceptionUtils.unchecked(handle::waitUntilReady);
log.info("Started attribute replica {}", af.getDesc().getName());
});
}

static CommitLogObserver newReplicationCommitLogObserver(
List<AttributeDescriptor<?>> attributes,
OnlineAttributeWriter writer,
Consumer<StreamElement> onReplicated) {

return newReplicationObserver(
(ingest, context, commit) -> {
writer.write(ingest, commit);
return null;
},
writer.getUri(),
attributes,
onReplicated);
}

static CommitLogObserver newReplicationCommitLogObserver(
List<AttributeDescriptor<?>> attributes,
BulkAttributeWriter writer,
Consumer<StreamElement> onReplicated) {

return newReplicationObserver(
(ingest, context, commit) -> {
writer.write(ingest, context.getWatermark(), commit);
return null;
},
writer.getUri(),
attributes,
onReplicated);
}

private static CommitLogObserver newReplicationObserver(
TriFunction<StreamElement, OnNextContext, CommitCallback, Void> write,
URI uri,
List<AttributeDescriptor<?>> attributes,
Consumer<StreamElement> onReplicated) {

return (ingest, context) -> {
if (attributes.contains(ingest.getAttributeDescriptor())) {
write.apply(
ingest,
context,
(succ, exc) -> {
log.debug("Replicated input {} to {}", ingest, uri);
context.commit(succ, exc);
onReplicated.accept(ingest);
});
}
return true;
};
}

private ReplicationRunner() {}
}
Loading