Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-17267: Extensions for broker interceptor #4783

Open
2 tasks done
sijie opened this issue Aug 25, 2022 · 0 comments
Open
2 tasks done

ISSUE-17267: Extensions for broker interceptor #4783

sijie opened this issue Aug 25, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Aug 25, 2022

Original Issue: apache#17267


Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently, we have a reconciliation system that compares the message(or entry) accounts at the minutes level to indicate whether all the data has been consumed by the consumers. We need to trace the message before persistent to Bookie(including the timestamp and msgSize) and also the event of the producer or consumer closed.

A good way to achieve this is using the BrokerInterceptor to interceptor the message at certain points. We want to expand some interfaces for BrokerInterceptor like what apache#12858 has done.

Solution

1. interceptor message before persistent to bookie

/**
     * Intercept after a message before persistent to bookie.
     *
     * @param headersAndPayload entry's header and payload
     * @param publishContext Publish Context
     */
    default void beforeMessagePersistent(Producer producer,
                                         ByteBuf headersAndPayload,
                                         Topic.PublishContext publishContext) {

    }

2. Add interfaces for producer or consumer closed:

/**
     * Called by the broker when a producer is closed.
     *
     * @param cnx      client Connection
     * @param producer Consumer object
     * @param metadata A map of metadata
     */
    default void producerClosed(ServerCnx cnx,
                                Producer producer,
                                Map<String, String> metadata) {
    }

 /**
     *  Called by the broker when a consumer is closed.
     *
     * @param cnx client Connection
     * @param consumer Consumer object
     * @param metadata A map of metadata
     */
    default void consumerClosed(ServerCnx cnx,
                                Consumer consumer,
                                Map<String, String> metadata) {
    }

3. expand the beforeSendMessage to support consumer

/**
     * Intercept messages before sending them to the consumers.
     *
     * @param subscription pulsar subscription
     * @param entry entry
     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
     * @param msgMetadata message metadata. The message metadata will be recycled after this call.
     * @param consumer consumer. Consumer which entry are sent to.
     */
    default void beforeSendMessage(Subscription subscription,
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata,
                                   Consumer consumer) {
    }

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant