Skip to content

Commit

Permalink
[Dataflow Streaming] Enabled Heartbeat by Default (#31689)
Browse files Browse the repository at this point in the history
  • Loading branch information
TongruiLi committed Jul 1, 2024
1 parent 5a09095 commit ef50604
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactory
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
.setSendKeyedGetDataRequests(
!options.isEnableStreamingEngine()
|| !DataflowRunner.hasExperiment(
options, "streaming_engine_send_new_heartbeat_requests"));
|| DataflowRunner.hasExperiment(
options, "streaming_engine_disable_new_heartbeat_requests"));
}

private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ static GrpcWindmillServer newTestInstance(
testOptions(/* enableStreamingEngine= */ true, experiments);
boolean sendKeyedGetDataRequests =
!testOptions.isEnableStreamingEngine()
|| !DataflowRunner.hasExperiment(
testOptions, "streaming_engine_send_new_heartbeat_requests");
|| DataflowRunner.hasExperiment(
testOptions, "streaming_engine_disable_new_heartbeat_requests");
GrpcWindmillStreamFactory windmillStreamFactory =
GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId))
.setSendKeyedGetDataRequests(sendKeyedGetDataRequests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,16 @@ public class GrpcWindmillServerTest {

@Before
public void setUp() throws Exception {
String name = "Fake server for " + getClass();
startServerAndClient(new ArrayList<>());
}

@After
public void tearDown() throws Exception {
server.shutdownNow();
}

private void startServerAndClient(List<String> experiments) throws Exception {
String name = "Fake server for " + getClass();
this.server =
InProcessServerBuilder.forName(name)
.fallbackHandlerRegistry(serviceRegistry)
Expand All @@ -136,17 +144,12 @@ public void setUp() throws Exception {
this.client =
GrpcWindmillServer.newTestInstance(
name,
new ArrayList<>(),
experiments,
clientId,
new FakeWindmillStubFactory(
() -> grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name))));
}

@After
public void tearDown() throws Exception {
server.shutdownNow();
}

private <Stream extends StreamObserver> void maybeInjectError(Stream stream) {
if (remainingErrors > 0 && ThreadLocalRandom.current().nextInt(20) == 0) {
try {
Expand Down Expand Up @@ -880,6 +883,11 @@ private List<HeartbeatRequest> makeHeartbeatRequest(List<String> keys) {
public void testStreamingGetDataHeartbeatsAsKeyedGetDataRequests() throws Exception {
// This server records the heartbeats observed but doesn't respond.
final Map<String, List<KeyedGetDataRequest>> getDataHeartbeats = new HashMap<>();
// Create a client and server different from the one in SetUp so we can add an experiment to the
// options passed in. This requires teardown and re-constructing the client and server
tearDown();
startServerAndClient(
Collections.singletonList("streaming_engine_disable_new_heartbeat_requests"));

serviceRegistry.addService(
new CloudWindmillServiceV1Alpha1ImplBase() {
Expand Down Expand Up @@ -973,21 +981,6 @@ public void onCompleted() {

@Test
public void testStreamingGetDataHeartbeatsAsHeartbeatRequests() throws Exception {
// Create a client and server different from the one in SetUp so we can add an experiment to the
// options passed in.
this.server =
InProcessServerBuilder.forName("TestServer")
.fallbackHandlerRegistry(serviceRegistry)
.executor(Executors.newFixedThreadPool(1))
.build()
.start();
this.client =
GrpcWindmillServer.newTestInstance(
"TestServer",
Collections.singletonList("streaming_engine_send_new_heartbeat_requests"),
clientId,
new FakeWindmillStubFactory(
() -> WindmillChannelFactory.inProcessChannel("TestServer")));
// This server records the heartbeats observed but doesn't respond.
final List<ComputationHeartbeatRequest> receivedHeartbeats = new ArrayList<>();

Expand Down

0 comments on commit ef50604

Please sign in to comment.