Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewindable event handler separation #440

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/main/java/com/lmax/disruptor/BaseEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lmax.disruptor;

interface BaseEventHandler<T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gentle preference for EventHandlerBase over BaseEventHandler, which sounds like an EventHandler that someone has done something unpleasant to.

{
/**
* Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
* read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be
* processed without having to wait for any new event to arrive. This can be useful for event handlers that need
* to do slower operations like I/O as they can group together the data from multiple events into a single
* operation. Implementations should ensure that the operation is always performed when endOfBatch is true as
* the time between that message and the next one is indeterminate.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Throwable if the EventHandler would like the exception handled further up the chain or possible rewind
* the batch if a {@link RewindableException} is thrown.
*/
void onEvent(T event, long sequence, boolean endOfBatch) throws Throwable;

/**
* Invoked by {@link BatchEventProcessor} prior to processing a batch of events
*
* @param batchSize the size of the batch that is starting
*/
default void onBatchStart(long batchSize)
{
}

/**
* Called once on thread start before first event is available.
*/
default void onStart()
{
}

/**
* Called once just before the event processing thread is shutdown.
*
* <p>Sequence event processing will already have stopped before this method is called. No events will
* be processed after this message.
*/
default void onShutdown()
{
}

/**
* Invoked when a {@link BatchEventProcessor}'s {@link WaitStrategy} throws a {@link TimeoutException}.
*
* @param sequence - the last processed sequence.
* @throws Exception if the implementation is unable to handle this timeout.
*/
default void onTimeout(long sequence) throws Exception
{
}
}
101 changes: 82 additions & 19 deletions src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011 LMAX Ltd.
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,7 @@
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
*/
public final class BatchEventProcessor<T>
implements EventProcessor
implements EventProcessor
{
private static final int IDLE = 0;
private static final int HALTED = IDLE + 1;
Expand All @@ -38,40 +38,78 @@ public final class BatchEventProcessor<T>
private ExceptionHandler<? super T> exceptionHandler;
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final BaseEventHandler<? super T> eventHandler;
private final int batchLimitOffset;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private BatchRewindStrategy batchRewindStrategy = new SimpleBatchRewindStrategy();
private int retriesAttempted = 0;
private final boolean rewindable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a boolean, could we have behaviour?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking something like adding an private inner interface:

private final interface RewindBehaviour
{
      long onRewindableException(RewindableException e);
}

and then there could be two private methods:

private long beKindRewind(final RewindableException e)
{
    if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND)
    {
        return startOfBatchSequence;
    }
    else
    {
        retriesAttempted = 0;
        throw e;
    }
}

private long rewindNotAllowed(final RewindableException e)
{
    throw new RuntimeException("Rewindable Exception thrown from a non-rewindable event handler", e);
}

Constructors would then take this::beKindRewind in place of true and this::rewindNotAllowed in place of false? (one catch here is that they might not be referencable in this() calls, so eh?).

This would save continuously checking an if that will never change; on the other hand it does possibly make navigation slightly harder?

Also, this really only works because retriesAttempted appears to be instance scoped? I'm assuming this is so we persist failures across calls until we get a success, but it's not entirely clear to me?


private BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final BaseEventHandler<? super T> eventHandler,
final int maxBatchSize,
final boolean rewindable
)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;

if (maxBatchSize < 1)
{
throw new IllegalArgumentException("maxBatchSize must be greater than 0");
}
this.batchLimitOffset = maxBatchSize - 1;

this.rewindable = rewindable;
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* <p>This constructor will not support rewinding batches.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about 'The created BEP will not support batch rewind". The constructor is just a constructor

*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param eventHandler is the delegate to which events are dispatched.
* @param maxBatchSize limits number of events processed in a batch before updating the sequence.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler,
final int maxBatchSize
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandler<? super T> eventHandler,
final int maxBatchSize
)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (maxBatchSize < 1)
{
throw new IllegalArgumentException("maxBatchSize must be greater than 0");
}
this.batchLimitOffset = maxBatchSize - 1;
this(dataProvider, sequenceBarrier, eventHandler, maxBatchSize, false);

eventHandler.setSequenceCallback(sequence);
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* <p>This constructor will support rewinding batches.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same same - the constructor isn't supporting anything!

*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param rewindableEventHandler is the delegate to which events are dispatched.
* @param maxBatchSize limits number of events processed in a batch before updating the sequence.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final RewindableEventHandler<? super T> rewindableEventHandler,
final int maxBatchSize
)
{
this(dataProvider, sequenceBarrier, rewindableEventHandler, maxBatchSize, true);
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
Expand All @@ -89,6 +127,23 @@ public BatchEventProcessor(
this(dataProvider, sequenceBarrier, eventHandler, Integer.MAX_VALUE);
}

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
* the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
*
* @param dataProvider to which events are published.
* @param sequenceBarrier on which it is waiting.
* @param rewindableEventHandler is the delegate to which events are dispatched.
*/
public BatchEventProcessor(
final DataProvider<T> dataProvider,
final SequenceBarrier sequenceBarrier,
final RewindableEventHandler<? super T> rewindableEventHandler
)
{
this(dataProvider, sequenceBarrier, rewindableEventHandler, Integer.MAX_VALUE);
}

@Override
public Sequence getSequence()
{
Expand Down Expand Up @@ -128,6 +183,7 @@ public void setExceptionHandler(final ExceptionHandler<? super T> exceptionHandl
* Which can include whether the batch should be rewound and reattempted,
* or simply thrown and move on to the next sequence
* the default is a {@link SimpleBatchRewindStrategy} which always rewinds
*
* @param batchRewindStrategy to replace the existing rewindStrategy.
*/
public void setRewindStrategy(final BatchRewindStrategy batchRewindStrategy)
Expand Down Expand Up @@ -213,14 +269,21 @@ private void processEvents()
}
catch (final RewindableException e)
{
if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND)
if (this.rewindable)
{
nextSequence = startOfBatchSequence;
if (this.batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND)
{
nextSequence = startOfBatchSequence;
}
else
{
retriesAttempted = 0;
throw e;
}
}
else
{
retriesAttempted = 0;
throw e;
throw new RuntimeException("Rewindable Exception thrown from a non-rewindable event handler", e);
}
}
}
Expand Down
41 changes: 3 additions & 38 deletions src/main/java/com/lmax/disruptor/EventHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011 LMAX Ltd.
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
*/
public interface EventHandler<T>
public interface EventHandler<T> extends BaseEventHandler<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
Expand All @@ -36,34 +36,9 @@ public interface EventHandler<T>
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
@Override
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;

/**
* Invoked by {@link BatchEventProcessor} prior to processing a batch of events
*
* @param batchSize the size of the batch that is starting
*/
default void onBatchStart(long batchSize)
{
}

/**
* Called once on thread start before first event is available.
*/
default void onStart()
{
}

/**
* Called once just before the event processing thread is shutdown.
*
* <p>Sequence event processing will already have stopped before this method is called. No events will
* be processed after this message.
*/
default void onShutdown()
{
}

/**
* Used by the {@link BatchEventProcessor} to set a callback allowing the {@link EventHandler} to notify
* when it has finished consuming an event if this happens after the {@link EventHandler#onEvent(Object, long, boolean)} call.
Expand All @@ -77,14 +52,4 @@ default void onShutdown()
default void setSequenceCallback(Sequence sequenceCallback)
{
}

/**
* Invoked when a {@link BatchEventProcessor}'s {@link WaitStrategy} throws a {@link TimeoutException}.
*
* @param sequence - the last processed sequence.
* @throws Exception if the implementation is unable to handle this timeout.
*/
default void onTimeout(long sequence) throws Exception
{
}
}
43 changes: 43 additions & 0 deletions src/main/java/com/lmax/disruptor/RewindableEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2022 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lmax.disruptor;

/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
* with support for throwing a {@link RewindableException} when an even cannot be processed currently but may succeed on retry.
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
*/
public interface RewindableEventHandler<T> extends BaseEventHandler<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
* read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be
* processed without having to wait for any new event to arrive. This can be useful for event handlers that need
* to do slower operations like I/O as they can group together the data from multiple events into a single
* operation. Implementations should ensure that the operation is always performed when endOfBatch is true as
* the time between that message and the next one is indeterminate.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws RewindableException if the EventHandler would like the batch event processor to process the entire batch again.
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
@Override
void onEvent(T event, long sequence, boolean endOfBatch) throws RewindableException, Exception;
}
2 changes: 1 addition & 1 deletion src/main/java/com/lmax/disruptor/RewindableException.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* On throwing this exception the {@link BatchEventProcessor} can choose to rewind and replay the batch or throw
* depending on the {@link BatchRewindStrategy}
*/
public class RewindableException extends RuntimeException
public class RewindableException extends Throwable
{
/**
* @param cause The underlying cause of the exception.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/lmax/disruptor/dsl/Disruptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public ExceptionHandlerSetting<T> handleExceptionsFor(final EventHandler<T> even
*
* <pre><code>dw.after(A).handleEventsWith(B);</code></pre>
*
* @param handlers the event handlers, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventHandler[])},
* @param handlers the event handlers, previously set up with {@link #handleEventsWith(EventHandler[])},
* that will form the barrier for subsequent handlers or processors.
* @return an {@link EventHandlerGroup} that can be used to setup a dependency barrier over the specified event handlers.
*/
Expand All @@ -253,7 +253,7 @@ public final EventHandlerGroup<T> after(final EventHandler<T>... handlers)
* @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)},
* that will form the barrier for subsequent handlers or processors.
* @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors.
* @see #after(com.lmax.disruptor.EventHandler[])
* @see #after(EventHandler[])
*/
public EventHandlerGroup<T> after(final EventProcessor... processors)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class EventProcessorInfo<T> implements ConsumerInfo
private boolean endOfChain = true;

EventProcessorInfo(
final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
{
this.eventprocessor = eventprocessor;
this.handler = handler;
Expand Down
Loading