Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharing a pool of gRPC threads/connections with N topics for Pub/Sub? #13707

Open
anthonyalayo opened this issue Mar 1, 2024 · 10 comments
Open
Labels
type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@anthonyalayo
Copy link

What component of google-cloud-cpp is this feature request for?
Pub/Sub

Is your feature request related to a problem? Please describe.
Thanks for a great product. I've been looking at your samples for using the Pub/Sub publisher class. There is one example in particular where it shows you how to pass in your own thread pool:
https://cloud.google.com/cpp/docs/reference/pubsub/latest/classgoogle_1_1cloud_1_1pubsub_1_1Publisher#thread-safety

I had some suspicions about gRPC connections and the like, so I dug into the source code.

Each call of MakePublisherConnection is still tied to a topic. It is unclear if can use the same thread pool for multiple calls to MakePublisherConnection or not. Inside MakePublisherConnection I see gRPC channels being connected per thread, so I'm assuming that it's not expected to use the same thread pool across multiple topics?

That leaves me to my point of confusion. What is the expected usage for a scenario that I would assume is common:

  1. You have multiple topics you publish to
  2. You want a pool of gRPC connections that can be shared across all topics

Describe the solution you'd like
I would like a sample to document that, or even have this issue be the documentation.

Describe alternatives you've considered
I am currently iterating on ways I can play with the API to do what I expect, but it isn't ideal.

@anthonyalayo anthonyalayo added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Mar 1, 2024
@dbolduc
Copy link
Member

dbolduc commented Mar 1, 2024

While not the Pub/Sub expert, let me jump in and confirm a few of your suspicions.

Each call of MakePublisherConnection is still tied to a topic.

Yes.

It is unclear if can use the same thread pool for multiple calls to MakePublisherConnection or not. Inside MakePublisherConnection I see gRPC channels being connected per thread, so I'm assuming that it's not expected to use the same thread pool across multiple topics?

So there are two things here:

  1. The thread pool servicing the CompletionQueue
  2. The pool of gRPC channels

For 1, the thread pool servicing the CompletionQueue can be shared across connections, by using the GrpcCompletionQueueOption, as you found in the documentation.

For 2, the pool of gRPC channels cannot be reused by Publishers that are tied to different topics, with the current API. Although this is a totally reasonable use case.


For the sake of completeness, I should point out that the BlockingPublisher lets you reuse the gRPC channel pool for different topics. You could make one BlockingPublisherConnection and share it across multiple clients, providing a different Topic in each call to Publish(...).

https://cloud.google.com/cpp/docs/reference/pubsub/latest/classgoogle_1_1cloud_1_1pubsub_1_1BlockingPublisher#classgoogle_1_1cloud_1_1pubsub_1_1BlockingPublisher_1a37e3aae927a3fb1fa2e83f1663d621a1

Of course, you would be using a blocking API. Maybe that is not good enough for your use case.

@anthonyalayo
Copy link
Author

Solid reply, thanks as usual @dbolduc. I think an easy workaround for now would be

  1. Use the BlockingPublisher, setting up one BlockingPublisherConnection
  2. Have it called on a pool of background threads that I manage

I'll go with that for now, but what do you think about this?

For 2, the pool of gRPC channels cannot be reused by Publishers that are tied to different topics, with the current API. Although this is a totally reasonable use case.

Perhaps that could be discussed among maintainers for a future release?

@dbolduc
Copy link
Member

dbolduc commented Mar 1, 2024

Perhaps that could be discussed among maintainers for a future release?

Definitely. I will start the brainstorming of ways for the fancy Publisher to support sharing gRPC channel pools...

Option A : Override the topic via a call option

At some point we decided not to add per-call options for the Publisher. #7689 (comment)

We could add the equivalent of pubsub::SubscriptionOption, except it would be pubsub::TopicOption. What I don't know is if an option to override the topic makes any sense in the case of something like a message ordering publisher. I would have to defer to the pubsub experts.

Note that Options are not the only way to do this. We could also add the equivalent of a bigtable::Table::WithNewTarget(...) to change the resource. Plumbing it through the many layers of pubsub might be tricky though.

Option B: offer a way to provide a channel pool to MakePublisherConnection

If changing the topic on the fly is something we want to disallow, we would need a way to inject an existing channel pool into MakePublisherConnection.

e.g. something like:

class GrpcChannelPool {
 private:
  // Only accessed through internal library functions
  std::vector<std::shared_ptr<grpc::Channel>> channels_;
  // I think we need to hold this too.
  std::shared_ptr<GrpcAuthenticationStrategy> auth_;
};

// Supply your own gRPC channel pool for the connection to use.
struct PubSubChannelPoolOption {
  using Type = GrpcChannelPool;
};

// Returns an object that holds a channel pool, to be given to a `PublisherConnection`.
GrpcChannelPool MakeChannelPool(Options options);

MakeChannelPool() would do something along the lines of this CreateGrpcChannel():

auto channel = CreateGrpcChannel(*auth, options, id);

The stub factory would look for the presence of the GrpcChannelPoolOption and use the channels therein if the option is present instead of creating new channels.

@anthonyalayo
Copy link
Author

Both sound reasonable in my opinion. When I first started fiddling with the API, I expected something like Option B (since the nuances around Option A are unclear to me too).

@coryan
Copy link
Contributor

coryan commented Mar 1, 2024

This document covers some of the questions here: https://cloud.google.com/cpp/docs/background-threads (if nothing else it may help the search engines find it in the future).

On the channels: keep in mind that gRPC shares sockets under the hood, that is, different channels use the same socket if (1) the use the same endpoint, (2) they use the same authentication / credentials, (3) the have the same channel attributes, and (4) they are configured to use the global pool of sockets (I may have forgotten some other condition, but you get the idea). There are some tradeoffs. The last condition requires some synchronization overhead.

@anthonyalayo
Copy link
Author

This document covers some of the questions here: https://cloud.google.com/cpp/docs/background-threads (if nothing else it may help the search engines find it in the future).

I was looking for something like this and couldn't find it. Appreciate this!

@anthonyalayo
Copy link
Author

In case it helps, I ended up going with something like this in the interim:

auto PubSubPublisher::makePublisherConnection( const std::string & topic ) -> std::shared_ptr<g::pubsub::PublisherConnection>
{
    // use our own completion queue / thread pool for all pub/sub connections
    // in the future, we may be able to specify a gRPC connection pool as well
    // https://github.com/googleapis/google-cloud-cpp/issues/13707
    auto publisherConnection = g::pubsub::MakePublisherConnection(
        g::pubsub::Topic( kProjectId, topic ),
        g::Options{}
            .set<g::GrpcCompletionQueueOption>( mCq )
            .set<g::pubsub::MaxHoldTimeOption>( kMaxHoldTime )
            .set<g::pubsub::MaxBatchBytesOption>( kMaxBatchBytes )
            .set<g::pubsub::MaxBatchMessagesOption>( kMaxBatchMessages ) );

    return publisherConnection;
}

void PubSubPublisher::publish( const std::string & topic, const google::protobuf::Message & message )
{
    // store a map of topic connections
    // in the future, we may be able to use a single publisher connection for multiple topics
    // https://github.com/googleapis/google-cloud-cpp/issues/13707
    if ( !mTopicConnections.contains( topic ) )
    {
        mTopicConnections[topic] = makePublisherConnection( topic );
    }

    auto publisher = g::pubsub::Publisher( mTopicConnections[topic] );

    g::pubsub::MessageBuilder messageBuilder{};
    messageBuilder.SetData( message.SerializeAsString() );

    publisher
        .Publish( std::move( messageBuilder ).Build() )
        .then( [topic]( g::future<g::StatusOr<std::string>> future )
               { handlePublishResponse( std::move( future ), topic ); } );
}

Where I'm maintaining a map of publisher connections per topic in order to keep using the async API.

@anthonyalayo
Copy link
Author

@dbolduc @coryan how does it look for adding this to the road map?

@coryan
Copy link
Contributor

coryan commented May 20, 2024

@scotthart as I no longer define the roadmap on this project.

@scotthart
Copy link
Member

We'll take this feature request into consideration in our upcoming planning, but I cannot commit to any time frame at present. If/when we decide to work on this feature, we'll update this issue accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

4 participants