diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 0c723c8f06..b83cb881f9 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -180,7 +180,9 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon dlq: dlq, log: log.WithField("topic", options.topic), } - pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription) + pc.log = pc.log.WithField("name", pc.name). + WithField("subscription", options.subscription). + WithField("consumerID", pc.consumerID) pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay) err := pc.grabConn() diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 703c7739c0..4be9ba2928 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -636,7 +636,7 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) if consumer, ok := c.consumerHandler(consumerID); ok { consumer.ConnectionClosed() - delete(c.listeners, consumerID) + c.DeleteConsumeHandler(consumerID) } else { c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer") } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3832d695fd..18f22f67e3 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -150,7 +150,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions return nil, err } - p.log = p.log.WithField("producer_name", p.producerName) + p.log = p.log.WithField("producer_name", p.producerName). + WithField("producerID", p.producerID) p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") atomic.StoreInt32(&p.state, producerReady)