Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-16569: PIP-186: Introduce two phase deletion protocol based on system topic #4539

Open
sijie opened this issue Jul 13, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Jul 13, 2022

Original Issue: apache#16569


Motivation

Original issue: apache#13238
In current ledger deletion, we divided it into two separate steps. It happens in ManagedLedger and ManagedCursor.
Remove all the waiting to delete ledgers from the ledger list and update the newest ledger list into a meta store.
In the meta store update callback operation, delete the waiting to delete ledgers from storage systems, such as BookKeeper or Tiered storage.

Due to the separate step, we can’t ensure the ledger deletion transaction. If the first step succeeds and the second step fails, it will lead to ledgers that can't be deleted from the storage system forever. The second step may fail by broker restart or storage system deletion failed.

In our customer’s environment, we have found many orphan ledgers cause by the above reason.

Design

Based on the above, we Introduce LedgerDeletionService to support two phase deletion. We hope it provides a general solution for two phase deletion. It will cover the problem we already found in managed-ledger, managed-cursor and schema-storage.

In this design, we use the system topic to help us to store the pending delete ledger.

  • pulsar/system/persistent/__ledger_deletion : store the pending delete ledger
  • pulsar/system/persistent/__ledger_deletion-RETRY : store the retry pending delete ledger
  • pulsar/system/persistent/__ledger_deletion-DLQ : as the DLQ for above

How to create system topic

In LedgerDeletionService start, we will use pulsarAdmin to create partitioned system topic.

  • pulsar/system/persistent/__ledger_deletion-partition
  • pulsar/system/persistent/__ledger_deletion-RETRY
  • pulsar/system/persistent/__ledger_deletion-DLQ

We should ensure the system topic is partitioned, or if the user config allowAutoTopicCreationType=non-partitioned, then change it to partitioned, the old system topic pulsar/system/persistent/__ledger_deletion will be lost.

Why we define system topic but not per-tenant topic.

If we bind the topic with per-tenant, when the tenant is deleted or the tenant is not being loaded anymore, the data in the tenant system topic can't be consumed before the tenant's next load.

The first phase:

client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
 .topic("pulsar/system/persistent/__ledger_deletion")
 .enableBatching(false)
 .createAsync();

In the LedgerDeletionService  start, it will create  a producer to send pending delete ledger.
When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the delay is for consumer side, if send it immediately, maybe the metadata din't change when consumer receive it). After the send operation succeeds,  then to operate metadata. If send msg failed, we think this deletion operation failed, and didn't operate metadata.

We need to operate metadata at the first phase, or we delete successfully, but the data still can be used.

Process flow

图片1

PendingDeleteLedgerInfo

public class PendingDeleteLedgerInfo {
    /**
     * Partitioned topic name without domain. Likes public/default/test-topic-partition-1 or
     * public/default/test-topic
     */
    private String topicName;

    /**
     * The ledger component . managed-ledger, managed-cursor and schema-storage.
     */
    private LedgerComponent ledgerComponent;

    /**
     * The ledger type. ledger or offload-ledger.
     */
    private LedgerType ledgerType;

    /**
     * LedgerId.
     */
    private Long ledgerId;

    /**
     * Context, holds offload info. If bk ledger, the context is null.
     */
    private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;

    /**
     * Extent properties.
     */
    private Map<String, String> properties = new HashMap<>();
}

The second phase

client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))
.topic("pulsar/system/persistent/__ledger_deletion")
.subscriptionName("ledger-deletion-worker")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
                    .retryLetterTopic("pulsar/system/persistent/__ledger_deletion-RETRY")
                    .deadLetterTopic("pulsar/system/persistent/__ledger_deletion-DLQ")
                    .maxRedeliverCount(10).build())
 .subscribeAsync()

In the LedgerDeletionService start, it will start a consumer to consume pending delete ledger.

Send delete command to the broker.

When receiving a pending delete ledger msg, we will check if the topic still exists. If the topic exists, send a delete command (PendingDelteLedger) to the broker which owns the topic. In the broker, it will check if the ledger is still in the metadata store, if the ledger in the metadata store means the ledger is still in use, give up to delete this ledger, return the delete failed response. If the ledger is not in the metadata store, delete the data from the storage system. If the topic does not exist, meaning the topic is already deleted, delete data from the storage system at the consumer side.

If we delete successfully, the consumer will ack this msg. If delete fails, reconsume this msg 10 min later.

When a PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ pulsar/system/persistent/__ledger_deletion-DLQ
Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If the storage system shutdown, the pending delete ledger will try to delete 10 times in 100 min. So we don't worry if the storage system shutdown, the ledger can't be delete.

Process flow

pp6uTFP_QXefuJBxN3_tcRsj

Security

We introduce system topic pulsar/system/persistent/__ledger_deletion to help us to delete ledger, it may introduce security problems, the hacker may send delete ledger msg to this system topic to delete user data. or the user sends a delete command to the broker with a mistake param. Both may delete the data we don’t want to see.
So when we delete the data from system storage like bookkeeper, we need to check if the topic of the ledger matches the param name, we record the name(topic-name, schemaId, compacted-name, cursor-name) and component(managed-ledger, schema or compacted-topic) in the ledger metadata.
We will fetch the ledger metadata and check if the param matches metadata, if matches, delete ledger. If it does not match, we ignore this deletion and ack this msg to avoid comparing again.

Optimization of security check

Before deleting the data, we need to check the ledger metadata matches the param, which means we need to fetch ledger metadata in every deletion, so we introduce delete cache in broker to reduce the fetch operation.
At the first phase, after sending the delete ledger successfully, the managed-ledger, managed-cursor or schema-storage will record the ledger id in the cache, which means it will delete in the future. We can trust the deletion which hit the cache.
When the broker receives a delete command, we will check if the ledger has hit the cache. If hit, delete data from storage system, after delete succeed, remove the ledger id from the cache. If it misses the cache, we fetch the ledger metadata and check if it matches or not.
If the broker down after storing the cache ledger id, the topic loads at another broker, the cache is empty, it needs to fetch metadata before delete.

How to delete normal topic

If we want to delete a topic, we should send the delete ledger msg to system topic and remove ledger id from metadata one by one, after all the ledger has been deleted, then delete the topic. If any ledger operate failed, we think this delete topic operation failed and return the left ledger id in the response.

Here maybe some middle state

Case1: Send pending delete ledger succeed, before operate metadata, the broker down.

The consumer will receive the pending delete ledger msg, and send the delete command to the broker. In the broker, it found the ledgerId still in metadata, so it won’t delete, reconsume this msg 10 min later.

Case2: Send pending delete ledger succeed, before operate metadata, the broker down. After awhile, send this pending delete ledger again, and operate metadata succeed.

The consumer will receive two pending delete ledger msg with the same ledgerId, It will send two delete commands to the broker. The first deletion succeeded and removed the cache ledger id. And the second deletion fetch the metadata will fail, cause the ledger metadata has already been deleted, we think the second deletion succeeded.

Case3: Consumer received pending delete ledger msg, and delete ledger succeed, the broker shutdown before ack the msg.

The pending delete ledger msg will redelivery, it will send the delete command to the broker again. Like case2, it will delete successfully.

Case4: Delete topic, then create the same name topic.

When deleting a topic, we will send every ledger in the topic to the system topic, when the consumer receives the msg, it finds the topic exists again. It sends the delete command to the broker. In the broker, the same name topic is recreated, it doesn't contain the old ledger id, so it will fetch the ledger metadata and delete it.

API Changes

Introduce LedgerDeletionService interface

public interface LedgerDeletionService {

    /**
     * Start.
     */
    void start() throws PulsarClientException, PulsarAdminException;

    /**
     * @param topicName topicName
     * @param ledgerId  ledgerId
     * @param context   ledgerInfo
     * @param component managed_ledger, managed_cursor, schema_storage
     * @param type      ledger, offload_ledger
     * @param properties properties
     * @return
     */
    CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
                                                   LedgerComponent component, LedgerType type, Map<String, String> properties);

    /**
     *
     * @param topicName topicName
     * @param ledgerId  ledgerId
     * @param component managed_ledger, managed_cursor, schema_storage
     * @param isBelievedDelete isBelievedDelete, if false, we should check the param is match the ledger metadata.
     * @return
     */
    CompletableFuture<?> asyncDeleteLedger(String topicName, long ledgerId, LedgerComponent component,
                                           boolean isBelievedDelete);

    /**
     *
     * @param topicName topicName
     * @param ledgerId ledgerId
     * @param offloadContext offloadContext
     * @return
     */
    CompletableFuture<?> asyncDeleteOffloadedLedger(String topicName, long ledgerId,
                                                    MLDataFormats.OffloadContext offloadContext);

    /**
     * Close.
     */
    void close() throws Exception;

    /**
     * Async close.
     */
    CompletableFuture<?> asyncClose();
}

Introduce delete ledger api in ManagedLedger

public interface ManagedLedger {

    CompletableFuture<?> asyncDeleteLedger(String topicName, long ledgerId, LedgerType ledgerType,
                                              MLDataFormats.OffloadContext offloadContext);

    void deleteLedger(String topicName, long ledgerId, LedgerType ledgerType,
                      MLDataFormats.OffloadContext offloadContext) throws InterruptedException, ManagedLedgerException;
}

Introduce delete ledger api in SchemaStorage

public interface SchemaStorage {

    CompletableFuture<Void> asyncDeleteLedger(String topicName, long ledgerId);

    void deleteLedger(String topicName, long ledgerId) throws InterruptedException, ManagedLedgerException;
}

Introduce delete ledger api in SchemaStorage

public interface ManagedCursor {

    CompletableFuture<Void> asyncDeleteLedger(String topicName, long ledgerId);
   
    void deleteLedger(String topicName, long ledgerId) throws InterruptedException, ManagedLedgerException;
}

Introduce admin api in Topics

public interface Topics {

    CompletableFuture<Void> deleteLedgerAsync(DeleteLedgerPayload deleteLedgerPayload);

    void deleteLedger(DeleteLedgerPayload deleteLedgerPayload) throws PulsarAdminException;

    CompletableFuture<Void> deleteCursorLedgerAsync(DeleteLedgerPayload deleteLedgerPayload);

    void deleteCursorLedger(DeleteLedgerPayload deleteLedgerPayload) throws PulsarAdminException;
}

Introduce admin api in Schemas

public interface Schemas {

    CompletableFuture<Void> deleteLedgerAsync(DeleteLedgerPayload deleteLedgerPayload);

    void deleteLedger(DeleteLedgerPayload deleteLedgerPayload) throws PulsarAdminException;
}

Configuration Changes

Add property in ServiceConfiguration

public class ServiceConfiguration {
    @FieldContext(
            dynamic = true,
            category = CATEGORY_SERVER,
            doc = "Using two phase deletion when delete ledger. if true, "
                    + "LedgerDeletionService will take over ledger deletion. (Default false)"
    )
    private boolean topicTwoPhaseDeletionEnabled;

    @FieldContext(
            category = CATEGORY_SERVER,
            doc = "Ledger deletion parallelism. Create partitioned system topic with this value when "
                    + "twoPhaseDeletionEnabled is true. (Default 4)"
    )
    private int ledgerDeletionParallelismOfTopicTwoPhaseDeletion = 4;

    @FieldContext(
            category = CATEGORY_SERVER,
            doc = "When delete ledger of two phase deletion, it will send PendingDeleteLedgerInfo to system topic,"
                    + " send it delay according this value. (Default 60s)"
    )
    private int sendDelayOfTopicTwoPhaseDeletionInSeconds = 60;

    @FieldContext(
            category = CATEGORY_SERVER,
            doc = "When delete ledger of two phase deletion, it will start consumer to subscribe system topic,"
                    + " when consume PendingDeleteLedgerInfo failed, will reconsume later according this value."
                    + " (Default 600s)"
    )
    private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600;
}

Documentation Changes

We should add some documents for this new feature.

Compatibility

If user upgrade and enable two phase deletion, the ledger deletion msg will store in system topic. If the user rolls back to the old version and the system topic msg hasn't consumed all, some ledger may not delete.

@sijie sijie added the PIP label Jul 13, 2022
@sijie sijie added the Stale label Sep 16, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant