-
Notifications
You must be signed in to change notification settings - Fork 3.6k
PIP 64: Introduce REST endpoints for producing, consuming and reading messages
- Status: Proposal
- Author: Sijie Guo
- Pull Request:
- Mailing List discussion:
- Release:
Currently, Pulsar provides a REST endpoint for managing resources in a Pulsar cluster. Additionally it also provides methods to query the state for those resources. But it lacks the ability to produce, consume and read messages through the web service. It is a bit inconvenient for applications that wants to use HTTP to interact with Pulsar. This proposal proposes to introduce REST endpoints for producing, consuming and reading messages via the web service.
Produce messages to a topic, optionally specifying keys or partitions for the messages. If no partition is provided, one will be chosen based on the hash of the key. If no key is provided, the partition will be chosen for each message in a round-robin fashion. Schemas may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema version returned with the first response.
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to produce the messages to
- schema_type (string): The schema type
- schema_version (int): Version returned by a previous request using the same schema. This version corresponds to the version of the schema in schema registry
- key_schema (object): key schema information (optional)
- value_schema (object): value schema information (optional)
- messages: A list of messages to produce to the topic.
- messages[i].key (object): The message key, formatted according to the schema, or null to omit a key (optional)
- messages[i].value (object): The message value, formatted according to the schema
- messages[i].partition (int): The partition to publish the message to (optional)
- messages[i].properties (map[string, string]): The properties of the message (optional)
- messages[i].eventTime (long): The event time of the message (optional)
- messages[i].sequenceId (long): The sequence id of the message (optional)
- messages[i].replicationClusters (array[string]): The list of clusters to replicate (optional)
- messages[i].disableReplication (bool): The flag to disable replication (optional)
- messages[i].deliverAt (long): Deliver the message only at or after the specified absolute timestamp (optional)
- messages[i].deliverAfterMs (long): Deliver the message only after the specified relative delay in milliseconds (optional)
-
schema_version (int): The schema version used to produce messages, or null if the schema type is
bytes
ornone
.
- messageIds (object): List of message ids the messages were published to.
- messageIds[i].partition (int): Partition the message was published to, or null if publishing the message failed
- messageIds[i].messageId (string): The base64 encoded message id, or null if publishing the message failed
-
messageIds[i].error_code (long): An error code classifying the reason this operation failed, or null if it succeeded.
- 1 - Non-retriable Pulsar exception
- 2 - Retriable. Pulsar exception; the message might be sent successfully if retried
- messageIds[i].error (string): An error message describing why the operation failed, or null if it succeeded
- 404 Not Found
- Error code 40401 - Topic not found
- 422 Unprocessable Entity
- Error code 42201: Request requires key/value schema but does not include the
key_schema
orvalue_schema
fields - Error code 42202: Request requires a non-kv schema but does not include the
value_schema
field - Error code 42203: Request includes invalid key schema
- Error code 42204: Request includes invalid value schema
- Error code 42205: Request includes invalid messages
- Error code 42201: Request requires key/value schema but does not include the
POST /topics/(string: tenant_name)/(string: namespace_name)/(string: topic_name)/partitions/(int: partition_id)
Produce messages to one partition of a topic. Schemas may be provided as the full schema encoded as a string, or, after the initial request may be provided as the schema version returned with the first response.
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to produce the messages to
- partition_id (int): Partition to produce the messages to
- schema_type (string): The schema type
- schema_version (int): Version returned by a previous request using the same schema. This version corresponds to the version of the schema in schema registry
- key_schema (string): Full schema encoded as a string (e.g. JSON serialized schema info)
- value_schema (string): Full schema encoded as a string (e.g. JSON serialized schema info)
- messages: A list of messages to produce to the topic.
- messages[i].key (object): The message key, formatted according to the schema, or null to omit a key (optional)
- messages[i].value (object): The message value, formatted according to the schema
- messages[i].partition (int): The partition to publish the message to (optional)
- messages[i].properties (map[string, string]): The properties of the message (optional)
- messages[i].eventTime (long): The event time of the message (optional)
- messages[i].sequenceId (long): The sequence id of the message (optional)
- messages[i].replicationClusters (array[string]): The list of clusters to replicate (optional)
- messages[i].disableReplication (bool): The flag to disable replication (optional)
- messages[i].deliverAt (long): Deliver the message only at or after the specified absolute timestamp (optional)
- messages[i].deliverAfterMs (long): Deliver the message only after the specified relative delay in milliseconds (optional)
-
schema_version (int): The schema version used to produce messages, or null if the schema type is
bytes
ornone
.
- messageIds (object): List of message ids the messages were published to.
- messageIds[i].partition (int): Partition the message was published to, or null if publishing the message failed
- messageIds[i].messageId (string): The base64 encoded message id, or null if publishing the message failed
-
messageIds[i].error_code (long): An error code classifying the reason this operation failed, or null if it succeeded.
- 1 - Non-retriable Pulsar exception
- 2 - Retriable. Pulsar exception; the message might be sent successfully if retried
- messageIds[i].error (string): An error message describing why the operation failed, or null if it succeeded
- 404 Not Found
- Error code 40401 - Topic not found
- 422 Unprocessable Entity
- Error code 42201: Request requires key/value schema but does not include the
key_schema
orvalue_schema
fields - Error code 42202: Request requires a non-kv schema but does not include the
value_schema
field - Error code 42203: Request includes invalid key schema
- Error code 42204: Request includes invalid value schema
- Error code 42205: Request includes invalid messages
- Error code 42201: Request requires key/value schema but does not include the
GET /topics/(string: tenant_name)/(string: namespace_name)/(string: topic_name)/partitions/(int: partition_id)/messages
Fetch messages from one partition of a topic. The format of the embedded data returned by this request is determined by the schema of the topic.
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to read the messages from
- partition_id (int): Partition to read the messages from
-
timeout: Maximum amount of milliseconds the reader will spend fetching messages. Other parameters controlling actual time spent fetching messages.
max_messages
andmax_bytes
. - max_messages: The maximum number of messages should be included in the response. This provides approximate control over the size of responses and the amount of memory required to store the decoded response.
- max_bytes: The maximum number of bytes of unencoded keys and values that should be included in the response. The provides approximate control over the size of responses and the amount of memory required to store the decoded response.
- include_schema: Flag to control whether to include schema information or not.
- schemas (object): List of schemas of the returned messages.
- schemas[i].type (string): The type of the schema
- schemas[i].version (int): The version of the schema
- schemas[i].data (string): The base64 encoded schema data
- schemas[I].properties (map[string, string]): The properties of the schema
- messages (object): List of messages returned
- messages[i].messageId (string): The base64 encoded message id
- messages[i].key (object): The message key, formatted according to the schema, or null to omit a key (optional)
- messages[i].value (object): The message value, formatted according to the schema
- messages[i].partition (int): The partition to publish the message to (optional)
- messages[i].properties (map[string, string]): The properties of the message (optional)
- messages[i].eventTime (long): The event time of the message (optional)
- messages[i].sequenceId (long): The sequence id of the message (optional)
- messages[i].replicationClusters (array[string]): The list of clusters to replicate (optional)
- 404 Not Found
- Error code 40401 - Topic not found
The consumers resource provides access to the current state of subscription, allows you to create a consumer in a subscription and consume messages from topics and partitions.
Because consumers are stateful, any consumer created with the REST API are tied to a specific broker. A full URL is provided when the instance is created and it should be used to construct any subsequent requests. Failing to use the returned URL for future consumer requests will result in 404 errors because the consumer will not be found.
POST /topics/(string: tenant_name)/(string: namespace_name)/(string: topic_name)/subscription/(string: subscription)
Create a new consumer instance in the subscription.
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to read the messages from
- subscription (string): Name of the subscription
- schema_type (string): The schema type
- schema_version (int): Version returned by a previous creation request using the same schema. This version corresponds to the version of the schema in schema registry
- key_schema (object): key schema information (optional)
- value_schema (object): value schema information (optional)
- name (string) : Name of the consumer (optional)
- id (string): ID of the consumer. Id returned by a previous creation request. (optional)
- id (string) - Unique ID for the consumer instance in the subscription
- base_uri (string) - Base URI used to construct URIs for subsequent requests against this consumer instance.
- 409 Conflict
- Error code - Consumer with the specified name already exists
GET /topics/(string: tenant_name)/(string: namespace_name)/(string: topic_name)/subscription/(string: subscription)/consumer/(string: consumer_id)/messages
Fetch messages from a given consumer. The format of the embedded data returned by this request is determined by the schema of the topic.
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to read the messages from
- subscription (string): Name of the subscription
- consumer_id (string): ID of the consumer returned from the creation request
-
timeout: Maximum amount of milliseconds the reader will spend fetching messages. Other parameters controlling actual time spent fetching messages.
max_messages
andmax_bytes
. - max_messages: The maximum number of messages should be included in the response. This provides approximate control over the size of responses and the amount of memory required to store the decoded response.
- max_bytes: The maximum number of bytes of unencoded keys and values that should be included in the response. The provides approximate control over the size of responses and the amount of memory required to store the decoded response.
- messages (object): List of messages returned
- messages[i].messageId (string): The base64 encoded message id
- messages[i].key (object): The message key, formatted according to the schema, or null to omit a key (optional)
- messages[i].value (object): The message value, formatted according to the schema
- messages[i].partition (int): The partition to publish the message to (optional)
- messages[i].properties (map[string, string]): The properties of the message (optional)
- messages[i].eventTime (long): The event time of the message (optional)
- messages[i].sequenceId (long): The sequence id of the message (optional)
- messages[i].replicationClusters (array[string]): The list of clusters to replicate (optional)
- 404 Not Found
- Error code 40401 - Topic not found
DELETE /topics/(string: tenant_name)/(string: namespace_name)/(string: topic_name)/subscription/(string: subscription)/consumers/(string: consumer_id)
Remove the consumer from the subscription.
Note that this request must be made to the specific broker holding the consumer instance.
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to read the messages from
- subscription (string): Name of the subscription
- consumer_id (string): The id of the consumer
- 404 Conflict
- Error code 40403 - Consumer instance not found
GET /topics/(string: tenant_name)/(string: namespace_name)/(string: topic_name)/subscription/(string: subscription)/cursor
Get the current cursor for a given subscription.
Note: this request must be made to the specific broker that owns the topic
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to read the messages from
- subscription (string): Name of the subscription
- cursor (object): The cursor object
POST /topics/(string: tenant_name)/(string: namespace_name)/(string: topic_name)/subscription/(string: subscription)/cursor
Update the current cursor for a given subscription.
Note: this request must be made to the specific broker that owns the topic
- tenant_name (string): The tenant name
- namespace_name (string): The namespace name
- topic_name (string): Name of the topic to read the messages from
- subscription (string): Name of the subscription
- type (string): acknowledgement type
- messageIds (object): List of message ids to acknowledge.
- messageIds[i] (string): The base64 encoded message id, or null if publishing the message failed
This PIP is introducing new features. It doesn’t change or remove any existing endpoints. So there is nothing to deprecate or migrate.
- Unit tests
- Integration tests
N/A