-
Notifications
You must be signed in to change notification settings - Fork 22
Get started
使用SDK进行消息生产消费集中场景下的基本用法,包括
基本生产消费的场景下,消费者订阅到指定topic的channel进项消费,生产者想指定topic发送消息。
SDK通过Producer.publish()接口向NSQ同步发送消息。使用生产者发送消息分为以下步骤:
- 初始化
SDK统一使用NSQConfig初始化生产者和消费者,在NSQConfig中,通过指定nsq集群的lookupd http address,告知生产者目标集群的地址。
NSQConfig config = new NSQConfig(); config.setLookupAddresses("127.0.0.1:4161");对于生产者,以能够正常发送消息为目标所需要指定的配置只有lookupd http address. 通过NSQConfig初始化Producer,并启动
Producer producer = new ProducerImplV2(config); producer.start();
至此,消息生产者已经就绪。接下来我们组装并尝试发送消息。
- 组装消息
通过构造Message对象我们可以得到一条发往nsq的消息。
Message.create(Topic, byte[])
构造方法提醒我们,在发消息前,需要保证topic已经在nsq集群上创建。我们在nsq上创建一个名为“JavaTesting-Producer-Base”的topic,Message的构造方法还有一个字节数组,接收我们要发的消息的内容
Topic aTopic = new Topic("JavaTesting-Producer-Base"); String msgStr = "The quick brown fox jumps over the lazy dog, 那只迅捷的灰狐狸跳过了那条懒狗"; Message aMsg = Message.create(aTopic, msgStr.getBytes(Charset.defaultCharset()));
- 发送
producer.publish(aMsg);
- 关闭
Producer是closable的,通过close()方法销毁当前producer。已经销毁的producer不可复用.
nsq消息处理使用异步处理的方式。消费者订阅目标topic,和nsq建立连接后,nsq根据rdy值大小向消费者投递消息,单个nsq分区同一时间内投递中(未被消费者ack)的消息数量不超过rdy值。SDK consumer通过预设的消费回调函数对消息进行处理,并根据配置或否ack。
目前nsq支持的消息投递模式为服务端投递并管理消费位点(服务端“推”的模式)
- 初始化
consumer采取异步消费的方式,在配置阶段指定的参数相对producer较多。consumer的初始化同样通过NSQConfig。
NSQConfig config = new NSQConfig("BaseConsumer"); config.setLookupAddresses("127.0.0.1:4161");
和初始化producer的NSQConfig对象有所不同,初始化consumer的config的构造函数包含了channel名称。nsq的消息发送与组播模式类似,同一个topic下,消息在不同channel的之间为广播,在channel内的不同consumer之间随机投递。consumer在和nsq建立连接时,会根据channel名称进行选择,如果是一个不存在的channel,服务端会新建一个。
- 订阅和消费逻辑 首先,我们初始化一个Consumer对象
Consumer consumer = new ConsumerImplV2(config); consumer.setAutoFinish(Boolean.TRUE);
对于消息消费后的ack,SDK默认使用自动ack的配置。当用户的消费逻辑正常退出后,SDK会自动ack当前消息。SDK捕获到任何消费逻辑抛出的异常的情况下,当前消息被SDK requeue。需要自己管理消息ack的场景下,通过Consumer.setAutoFinish(false)
关闭自动ack。
有了consumer,我们需要告知它:
- 需要订阅什么topic;
- 收到的消息要如何处理;
我们还是订阅“JavaTesting-Producer-Base”这个topic,至于做些什么,就把消息打出来好了……
MessageHandler handler = new MessageHandler() { @Override public void process(NSQMessage message) { System.out.println(message.getReadableContent()); } }); //订阅 consumer.subscribe("JavaTesting-Producer-Base"); //消费逻辑 consumer.setMessageHandler(handler); //启动 consumer.start();
由于NSQ消费逻辑是异步的,如果在一个测试用例里面执行的话,你会发现程序直接就退出了。为了观察到消费逻辑工作,我们使用CountDownLatch对消费逻辑和主进程进行同步。最后发消费逻辑放在testcase里面的代码看起来是这样的:
public void test() throws Exception { final CountDownLatch latch = new CountDownLatch(10); Consumer consumer = new ConsumerImplV2(config, new MessageHandler() { @Override public void process(NSQMessage message) { System.out.println(message.getReadableContent()); latch.countDown(); } }); consumer.setAutoFinish(true); consumer.subscribe("JavaTesting-Producer-Base"); consumer.start(); try { Assert.assertTrue(latch.await(1, TimeUnit.MINUTES)); } finally { consumer.close(); } }
代码使用了Consumer的另外一个构造函数,通过第二个参数传递消费逻辑。主线程在消费逻辑收到10条消息(成功),或者超过1分钟(失败)后将退出。
SDK通过捕获消费逻辑抛出的异常来对当前消费的消息进行重试。对于任意的Exception,SDK都会对当前消息进行requeue。SDK中规定了以下两类特别的Exception,在捕获到时会有特殊处理:
- ExplicitRequeueException 消费逻辑抛出ExplicitRequeueException的情况下,SDK在requeue该条消息前,不会打印错误日志;
- RetryBusinessException 消费逻辑抛出该异常后,sdk会尝试重新调用消费逻辑对当前消息进行消费,重新消费如果抛出任何异常,SDK将requeue该条消息。
- 对于其他的Exception类型,SDK在打印错误日志后requeue消息。
普通模式下的消息在消费的时候,NSQ不保证消息到达consumer的顺序性。需要顺序消费的topic通过配置进行顺序消费和生产。NSQ的顺序消费基于分区有序,同一个分区内的消息有序投递。
SDK进行顺序消费前,请确保目标topic是支持顺序消费的。
由于顺序消费基于分区有序,生产者在发送消息时,遵循“需要顺序的消息发到相同分区”的规则。SDK提供shardingID接口,具有相同shardingID的消息被发送到同一个分区节点。保证了相同shardingID的消息在同一分区队列中有序。
Producer通过Message.setTopicShardingIDXXX()指定消息的shardingID,目前的实现有
Message.setTopicShardingIDObject() Message.setTopicShardingIDLong() Message.setTopicShardingIDString()
SDK会根据传入对象的hashcode()选择分区发送消息。相比于正常的消息发送,顺序发送消息场景下就多了这一步。
Topic aTopic = new Topic("JavaTesting-Producer-Base"); String msgStr = "The quick brown fox jumps over the lazy dog, 那只迅捷的灰狐狸跳过了那条懒狗"; Message aMsg = Message.create(aTopic, msgStr.getBytes(Charset.defaultCharset())); aMsg.setTopicShardingIDLong(123456789l);
相比较于普通消费,顺序消费场景下的消费者仅需要调用NSQConfig.setOrdered(true)
后,使用NSQConfig初始化consumer即可。消息的顺序投递由NSQ服务端保证。
批量消息发送通过nsq支持的mpub命令在一次传输中发送多条消息
Producer.publishMulti(List<byte[]> messages, Topic topic); Producer.publishMulti(List<byte[]> messages, Topic topic, int batchSize); Producer.publishMulti(List<byte[]> messages, Topic topic, int batchSize, ExecutorService executorService);
带有batchSize参数的方法允许消息列表被分批发送,批次之间的消息不保证先后顺序。方法返回发送失败的批次中的消息。
producer = new ProducerImplV2(config); producer.start(); List msgs = new ArrayList<>(); for(int cnt = 0; cnt < 2345; cnt++) { msgs.add(msgBytes); } List failedMsgs = producer.publishMulti(msgs, topic, 100);
拓展消息使用了youzan nsq中新定义的消息格式,支持在消息中附加json k/v。SDK为生产方和消费方提供了设置/解析拓展内容的接口。
该功能需要topic需要拓展消息格式。对不支持拓展消息格式的topic发送拓展消息会收到E_EXT_NOT_SUPPORT的报错。
通过Message.setJsonHeaderExt()传入一个json格式的map作为拓展消息,并发送。
Map properties = new HashMap<>(); properties.put("key" ,"javaSDK"); msg.setJsonHeaderExt(properties); producer.publish(msg);
nsq 的消息大小不应超过服务端设置的上限,拓展消息的大小也包含在内。
Consumer的消费逻辑中,Object NSQMessage.getExtByName(String key), Map<String, Object> NSQMessage.getJsonExtHeader()
接口用于获取消息中单个值和全部的json拓展k/v。