Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-104: Add new consumer type: TableView #12356

Closed
merlimat opened this issue Oct 13, 2021 · 3 comments
Closed

PIP-104: Add new consumer type: TableView #12356

merlimat opened this issue Oct 13, 2021 · 3 comments
Labels
Milestone

Comments

@merlimat
Copy link
Contributor

Motivation

In many use cases, applications are using Pulsar consumers or readers to fetch
all the updates from a topic and construct a map with the latest value of each
key for the messages that were received. This is very common when constructing
a local cache of the data.

We want to offer support for this access pattern directly in the Pulsar client
API, as a way to encapsulate the complexities of setting this up.

Goal

Provide a view of the topic data in the form of a read-only map that is
constantly updated with the latest version of each key.

Additionally, let the application specify a listener so that it can perform
a scan of the map and then receive notifications when new messages are
received and applied.

API Changes

This proposal will only add new API on the client side.

A new type of consumer will be added, the TableView.

Example:

TableView<Integer> tableView = pulsarClient.newTableView(Schema.INT32)
    .topic(topic)
    .create();

tableView.get("my-key"); // --> 5
tableView.get("my-other-key"); // --> 7

When a TableView instance is created, it will be guaranteed to already have
the latest value for each key, for the current time.

API additions

interface PulsarClient {
    // ....
    <T> TableViewBuilder<T> newTableView(Schema<T> schema);
}

interface TableViewBuilder<T> {
    TableViewBuilder<T> loadConf(Map<String, Object> config);
    TableView<T> create() throws PulsarClientException;
    CompletableFuture<TableView<T>> createAsync();
    TableViewBuilder<T> topic(String topic);
    TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
}

interface TableView<T> extends Closeable {

    // Similar methods as java.util.Map
    int size();
    boolean isEmpty();
    boolean containsKey(String key);
    T get(String key);
    Set<Map.Entry<String, T>> entrySet();
    Set<String> keySet();
    Collection<T> values();
    void forEach(BiConsumer<String, T> action);

    /**
     * Performs the given action for each entry in this map until all entries
     * have been processed or the action throws an exception.
     *
     * When all the entries have been processed, the action will be invoked
     * for every new update that is received from the topic.
     *
     * @param action The action to be performed for each entry
     */
    void forEachAndListen(BiConsumer<String, T> action);

    /**
     * Close the table view and releases resources allocated.
     *
     * @return a future that can used to track when the table view has been closed
     */
    CompletableFuture<Void> closeAsync();
}

Implementation

The TableView will be implemented using multiple Reader instances, one
per each partition and will always specify to read starting from the compacted
view of the topic.

The creation time of a table view can be controlled by configuring the
topic compaction policies for the given topic or namespace. More frequent
compaction can lead to very short startup times, as in less data will be
replayed to reconstruct the TableView of the topic.

@merlimat merlimat added this to the 2.10.0 milestone Oct 13, 2021
@merlimat merlimat changed the title PIP-102: Add new consumer type: TableView PIP-104: Add new consumer type: TableView Oct 13, 2021
@aahmed-se
Copy link
Contributor

Will be persist snapshots of tables also ?

@merlimat
Copy link
Contributor Author

Will be persist snapshots of tables also ?

The 1st implementation will be in memory only in the client side. The snapshots can be thought of as the compacted view that are kept in BookKeeper.

Later on, we can add more client options, including the possibility to have the possibility to store the state on disk and to internally replay from a specific message id.

@hezhangjian
Copy link
Member

It will be useful when content can store in memory. expecting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants