Skip to content
doraalin edited this page Jan 14, 2021 · 9 revisions

Overview

总结NSQ Java SDK过程中遇到的问题,介绍SDK使用的"也许是"最佳实践。

首先请确保初始化使用了正确的lookup-http-address,并且topic以及相应的channel已经存在,运维过程中的大多数生产/消费问题来自于错误的lookup- http-address或者topic/channel未创建。

Producer

Producer 并发

Producer在往nsqd发送消息是,基本流程为,

  1. 根据lookup结果获取nsqd地址;
  2. 根据nsqd地址,获取或创建nsqd连接;
  3. 发送消息;

Producer保证线程安全,生产方应用可以维护一个Producer对象。由于Producer维护一个拥有最大上限的连接池,单个nsqd分区对应的连接数默认值为30 ,请评估并确保调用的并发量小于Producer的分区最大连接数。通过NSQConfig.setConnectionPoolSize(int)调整。当线程获取连接不到连接时, 默认配置会等待一段时间,直至获得连接或超时。当错误日志中出现pool exhausted报错的时候,请调整最大上线连接池。

Producer的消息发送是同步发送,publish正常退出后表示发送成功。

message trace vs extension message

消息追踪和拓展消息不能同时发送,Producer如果同时调用了Message.traced(),并且设置了desiredTag和jsonExtensionHeader的情况下,消息 追踪请求将被忽略。

消息发送重试

非顺序消息的发送场景下,当向某个选中的nsqd节点发送消息失败时,根据NSQConfig.setPublishRetry(int)的重试次数,sdk将尝试使用其他nsqd节点 进行重试,全部重试失败后,Producer将向上抛出异常NSQPubException

Consumer

tuning consume performance

Consumer的消费基于异步,消息被提交给Consumer内部的线程池,由线程池调用MessageHandler的消息处理逻辑。在初始化消费者时,请结合应用的消费能力 确定以下配置

  1. Rdy

    通过NSQConfig.setRdy(int)改变rdy,默认的rdy值为(consumer pool size)/(topic partition 总数)。rdy值保证SDK正在处理来自一个分区 的消息数不超过rdy。默认的rdy值根据consumer pool size动态调整。

  2. 消费线程数大小

    消费线程的值可以和rdy值相对应。举例,rdy为3的情况下,topic为2分区,消费2个topic。此时的consumer pool size可设置为3(rdy#)* 2(partition#)* 2(topic#) = 12。

  3. 消费能力

    消费速度较慢的情况下,较大的rdy值可能会造成下发的消息在SDK端超时。结合消费速度,设置消费线程数以及rdy。

消费处理

nsq在消息投递策略上保证"at least once",消息队列中的消息至少下发一次,由nsqd服务端保证。SDK在调用消费逻辑时,抛出任何异常时,SDK会 requeue该消息,等待nsqd下次重新下发消息。requeue的timeout由NSQConfig.setNextConsumingInSeconds(int)指定,如果该值为0(默认值为60),requeue的 消息在服务端不会等待而直接下发。同时SDK提供了基于消息级别的requeue timeout, 通过NSQMessage.setNextConsumingInSeconds(int) 对于消息消费需要等待下游同步的场景,可以:

  1. 设置NextConsumingInSeconds,设置消息重投的希望值
  2. 抛出ExplicitRequeueException, 触发sdk requeue后,等待重新消费。

next consuming timeout为期望值,业务方不应依赖该数值进行精准延时消费。详情请参考http://nsq.io/clients/tcp_protocol_spec.html#req

实际生产中,由于消息格式或者其他问题,导致消息始终无法被消费逻辑正常消费。此时不加任何处理,会导致消息一直在队列中被重复下发。消费逻辑应该 结合消息的attempt,适时finish消息。attempt通过NSQMessage.getReadableAttempts()查看。消费逻辑中加入以下逻辑,当attempt超过预设的 阈值时,消费逻辑告警或者打印日志内容,之后消费逻辑退出(SDK自动finish)或者finish该条消息(手动管理finish)。如下流程图所示

message consumption process

顺序消费

NSQ新集群中,消息的顺序生产/消费基于topic partition。Producer(生产者)通过指定shardingID,向目标partition发送消息;Consumer(消费者 )通过指定partitionID,从指定partition接收消息。Consumer进行顺序消费时时,Rdy相当于将被NSQ服务端指定为1,在当前消息Finish之前不会推送 下一条。NSQ服务器端topic进行强制消费配置,当日志出现

E_SUB_ORDER_IS_MUST

错误消息时,说明该topic必须进行顺序消费。

和顺序生产的情况类似,当topic的某个partition不可用时,SDK将无法从NSQ查询到相应的partition信息,此时尝试Pub/Sub至该分区的操作将返回 NSQPartitionNotAvailableException。Producer在捕获该异常后将尝试重试,Consumer将会打印错误日志和堆栈。

Debug

消费逻辑在debug过程中,可以通过NSQMessage.toString()打印消息的internalID, attempt, 以及topic等必要信息。对于消息消费延时敏感的应用 消费逻辑中建议打印收到消息的时间点和消息时间戳的时间间隔,超出指定值时可以报警。同时建议打印消息消费的花费时间,由于nsq值保证consumer在消费完下 发消息前(最大等于rdy)不会再推送新的消息,对于消费速度较慢的应用,由于新的消息迟迟无法下发,会导致消息消费延时。