Skip to content

Commit

Permalink
Fix query11: Replace Count.perKey by Count.perElement
Browse files Browse the repository at this point in the history
issue apache#32
  • Loading branch information
echauchot committed May 9, 2017
1 parent 1a8f83c commit 3f6d62b
Showing 1 changed file with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,30 @@ public Query11(NexmarkConfiguration configuration) {
}

private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
return events.apply(JUST_BIDS)
.apply(name + ".Rekey",
// TODO etienne: why not avoid this ParDo and do a Cont.perElement?
ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Bid bid = c.element();
c.output(KV.of(bid.bidder, (Void) null));
}
}))
.apply(Window.<KV<Long, Void>>into(
Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)))
.apply(Count.<Long, Void>perKey())
.apply(name + ".ToResult",
ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
}
}));
PCollection<Long> bidders = events.apply(JUST_BIDS).apply(name + ".Rekey",
ParDo.of(new DoFn<Bid, Long>() {

@ProcessElement public void processElement(ProcessContext c) {
Bid bid = c.element();
c.output(bid.bidder);
}
}));

PCollection<Long> biddersWindowed = bidders.apply(
Window.<Long>into(
Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
.triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)));
PCollection<BidsPerSession> bidsPerSession = biddersWindowed.apply(Count.<Long>perElement())
.apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {

@ProcessElement public void processElement(ProcessContext c) {
c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
}
}));
return bidsPerSession;
}

@Override
Expand Down

0 comments on commit 3f6d62b

Please sign in to comment.