Chalk是一个Java开发的XMPP客户端通讯库,可以用于开发Java桌面和Android的XMPP客户端。Chalk基于插件体系设计,这使得它易于使用及易于扩展。
Chalk是一个纯粹的Java库,在使用它时,我们需要将它配置为工程的依赖库。以Maven工程为例,我们需要做以下的配置。
在pom.xml中添加FirstLineCode的Maven仓库。
<repository>
<id>com.thefirstlineofcode.release</id>
<name>TheFirstLineOfCode Repository - Releases</name>
<url>http://repo.thefirstlineofcode.com/content/repositories/releases</url>
</repository>
Chalk库被设计成插件架构,以保证良好的扩展性,不同的功能被封装在不同的插件依赖库里,开发者可以根据自己需要,选择要使用的依赖。
我们最少需要配置以下的基础依赖。
<dependency>
<groupId>com.thefirstlineofcode.chalk</groupId>
<artifactId>com.thefirstlineofcode.chalk</artifactId>
<version>0.2.0-RELEASE</version>
</dependency>
可以根据需要添加可选的插件依赖,例如添加在线注册(In Band Registration)插件依赖。
<dependency>
<groupId>com.thefirstlineofcode.chalk.xeps</groupId>
<artifactId>com.thefirstlineofcode.chalk.xeps.ibr</artifactId>
<version>0.2.0-RELEASE</version>
</dependency>
下表列出了Chalk提供了的常用插件。
插件名 | groupId | artifactId | 插件描述 | 备注 |
---|---|---|---|---|
IM | com.thefirstlineofcode.chalk | com.thefirstlineofcode.chalk | 实现RFC3921(Instant Messaging and Presence) | IM插件内置在基础依赖包中 |
Ping | com.thefirstlineofcode.chalk.xeps | com.thefirstlineofcode.chalk.xeps.ping | 实现XEP-0199(XMPP Ping) | |
IBR | com.thefirstlineofcode.chalk.xeps | com.thefirstlineofcode.chalk.xeps.ibr | 实现XEP-0077(In-Band Registration) | |
MUC | com.thefirstlineofcode.chalk.xeps | com.thefirstlineofcode.chalk.xeps.muc | 实现XEP-0045(Multi-User Chat) | |
LEP IM | com.thefirstlineofcode.chalk.leps | com.thefirstlineofcode.chalk.leps.im | 实现LEP-0011(Traceable Message) | 非标准协议,解决XMPP IM的一些缺陷,如:双向订阅、可信赖消息;消息状态跟踪等 |
使用Chalk库最重要的接口就是IChatClient,IChatClient提供了两个核心的功能:
- 建立客户端到服务器端的连接(Stream)。
- 创建插件的API,应用开发者通过插件API来使用库提供的各种功能。
根据XMPP协议,在客户端-服务器之间,我们需要:(1)先建立一个信息通道(Stream) 。(2)在建立好的Stream上,交换信息(Stanza)。
使用以下的代码,可以建立Stream。
StreamConfig config = new StreamConfig("im.thefirstlineofcode.com", 5222);
config.setResource("my_android_mobile");
IChatClient chatClient = new StandardChatClient(config);
try {
chatClient.connect("my_user_name", "my_password");
} catch (ConnectionException e) {
throw new RuntimeException("can't connect to host", e);
} catch (AuthFailureException e) {
throw new RuntimeException("auth failed", e);
}
XMPP规范定义了Stream建立的几个协商阶段:
- Initial Stream
- TLS
- SASL
- Resource Binding
- Session Establishment
如果需要监控Stream建立的细节,可以使用INegotiationListener。
chatClient.addNegotiationListener(new INegotiationListsener() {
public void before(IStreamNegotiant source) {
if (source instanceof TlsNegotiation) {
System.out.println("Ready to negotiate TLS");
}
}
public void after(IStreamNegotiant source) {
if (source instanceof TlsNegotiation) {
System.out.println("TLS negotiation has done");
}
}
public void occurred(NegotiationException exception) {
IStreamNegotiant source = exception.getSource();
if (source instnaceof SaslNegotiant) {
SaslError saslError = (SaslError)exception.getAdditionalErrorInfo();
System.out.println("Error occured in SASL negotiation. Additional error info: " + saslError);
}
}
public void done(IStream stream) {
System.out.println("Stream has created");
}
});
chatClient.connect("my_user_name", "my_password");
在TLS协商过程中,如果客户端需要检查服务器证书有效性,可以使用IPeerCertificateTruster。
((StandardChatClient)chatClient).setPeerCertificateTruster(new IPeerCertificateTruster() {
public boolean accept(X509Certificate[] certificates) {
// check certificates
}
}
);
XMPP是一组协议族,包括RFC协议(例如:RFC3920、RFC3921)和XEPs(例如:XEP-0045 Multi-User Chat、XEP-0077 In-Band Registration)协议。在很多情况下,用户甚至可能会基于XMPP协议标准,定义自己的私有协议。
Chalk基于插件架构设计,以对应XMPP的扩展性和灵活性。除了少数基础功能(例如:建立Stream)之外,Chalk的所有功能都由插件来提供。
Chalk基于一个简单、灵活的插件框架,使用插件功能,一般使用插件需要以下的步骤:
- 注册插件
- 获取插件API
- 使用插件功能
我们用一个简单的插件PingPlugin(实现XEP-0199 XMPP Ping)来演示插件的使用方法。
使用插件前,需要先注册插件,IChatClient提供了注册插件的接口。
注册PingPlugin的代码如下。
chatClient.register(PingPlugin.class);
注册插件后,就可以获取插件的API。每个插件根据自己协议的细节,设计并提供API,应用开发者需要通过对应的文档了解相关细节。
Ping插件提供了一个名为IPing的接口,定义如下。
public interface IPing {
public enum Result {
PONG,
SERVICE_UNAVAILABLE,
TIME_OUT
}
Result ping();
void setTimeout(int timeout);
int getTimeout();
}
获取IPing接口的代码如下。
IPing ping = chatClient.createApi(IPing.class);
ping.setTimeout(4 * 1000); // Set timeout to the ping operation. Default is 2 * 1000ms.
Result result = ping.ping(); // Ping the server and waiting for result.
if (result == Ping.Result.PONG) {
System.out.println("Pong.");
} else if (result == SERVICE_UNAVAILABLE) {
System.out.println("Server doesn't support the protocol.");
} else {
System.out.println("Ping timed out.");
}
Chalk的一个重要设计目的,是为了充分体现XMPP协议的灵活性及扩展性,以便于我们可以根据自己需要,灵活的定义和开发IM通讯协议。
XMPP协议为一组松散的协议族,除了RFC3920 XMPP Core,RFC3921 XMPP IM之外,其它的协议(主要是XEPs)被视为可选的协议。不同的IM产品会选择实现其中一些协议, Chalk也实现了部分XMPP标准协议,并提供一些改善标准XMPP的非标准协议扩展。
以下为Chalk已经实现的插件(协议)。
IbrPlugin实现了XEP-0077(In-Band Registration),为客户端提供在线注册功能。
chaClient.register(IbrPlugin.class);
try {
IRegistration registration = chatClient.createApi(IRegistration.class);
registration.register(new IRegistrationCallback() {
public Object fillOut(IqRegister iqRegister) {
if (iqRegister.getRegister() instanceof RegistrationForm) {
RegistrationForm form = new RegistrationForm();
form.getFields().add(new RegistrationField("username", "my_user_name"));
form.getFields().add(new RegistrationField("password", "my_password"));
return form;
} else {
throw new RuntimeException("Can't get registration form");
}
}
});
} catch(RegistrationException e) {
IbrError error = e.getError();
if (error instanceof IbrError.Conflict) {
System.out.println("User has existed. Please change your name.");
} else if (error instanceof IbrError.NOT_ACCEPTABLE) {
System.out.println("Illegal user name. Please change your name.");
} else {
System.out.println("Registration failed.");
}
}
InstantingMessengerPlugin实现了RFC3921(Instant Messaging and Presence),提供Roster管理、Subscription管理,及发送接收Presence和Message的功能。
为简化文档,我们在后续说明中,可能会使用IM插件作为InstantingMessengerPlugin的同义词,当提及IM插件时,意味着是InstantingMessengerPlugin。
chaClient.register(InstantingMessengerPlugin.class);
在成功建立Stream后,根据RFC3921要求,客户端应该要立即发送一个Initial Presence,服务器端在收到Initial Presence后,才会将客户端的状态置为Available。
IInstantingMessenger im = chatClient.createApi(IInstantingMessenger.class);
im.send(new Presence()); // Send initial presence
在此后任何时刻,都可以通过发送Presence,更改自己的当前状态。
Presence presence = new Presence(Show.DND);
presence.getStatuses().add(new LangText("I'm being in a meeting."));
im.send(presence);
可以通过IPresenceListener监听联系人的Presence变化。
im.addPresenceListener(new IPresenceListener() {
public void received(Presence presence) {
System.out.println("Contact " + presence.getFrom() + " changed it's presence to " + presence.toString());
}
});
XMPP IM协议中,使用Roster来管理联系人列表。IM插件提供IRosterService和IRosterListener来管理Roster。
使用IRosterService的retrieve()方法来从服务器端获取Roster列表。
IRosterService rosterService = im.getRosterService();
rosterService.addRosterListener(new IRosterListener() {
void retrieved(Roster roster) {
// Process the roster that is retrieved from server.
}
void occurred(RosterError error) {
// An error occurred
}
...
});
rosterService.retrieve();
注意:retrieve()是一个异步方法,并不会直接返回结果,所以我们需要注册一个IRosterListener来监听获取的结果。
IRosterListener还提供了updated()和deleted()方法,可以用于监听Roster的变更。
rosterService.addRosterListener(new IRosterListener() {
...
public void updated(Roster roster) {
// Process the updated roster
}
public void deleted(Roster roster) {
// process the deleted roster
}
...
});
大部分时候,我们并不需要直接变更Roster,Roster管理往往和Subscription管理相关,当Subscription状态变更时,会自动导致Roster变更。
在某些情况下,我们需要直接变更Roster,例如修改用户的分组,IRosterService提供了以下的方法。
public interface IRosterService {
...
void add(Roster roster);
void update(Roster roster);
void delete(Roster roster);
...
}
XMPP IM协议使用Subscription来管理联系人之间的关联关系。IM插件提供了ISubscriptionService和ISubscriptionListener来管理Subscription。
JabberId contact = JabberId.parse("smartsheep@im.thefirstlineofcode.com");
ISubscriptionService subscriptionService = im.getSubscriptionService();
subscriptionService.subscribe(contact);
注册ISubscriptionListener,可以接收订阅消息。
subscriptionService.addSubscriptionListener(new ISubscriptionListener() {
...
public void asked(JabberId user) {
System.out.println("User " + user + " wants to subscribe you.");
}
...
});
如果用户决定通过对方的订阅,可以使用以下代码。
subscriptionService.approve(contact);
如果拒绝对方订阅,可以使用以下代码。
subscriptionService.refuse(contact);
使用ISubscriptionListener监听订阅反馈信息。
subscriptionService.addSubscriptionListener(new ISubscriptionListener() {
...
public void approved(JabberId contact) {
System.out.println("User " + contact + " approved your subscription.");
}
public void refused(JabberId contact) {
System.out.println("User " + user + " refused your subscription.");
}
...
});
JabberId contact = JabberId.parse("agilest@im.thefirstlineofcode.com");
Message message = new Message("Hello, Agilest!");
message.setTo(contact);
im.send(message);
或者,采用更简洁的方式。
JabberId contact = JabberId.parse("agilest@im.thefirstlineofcode.com");
im.send(contact, new Message("Hello, Agilest!"))
im.addMessageListener(new IMessageListener() {
public void received(Message message) {
System.out.println("Received a message from user " + message.getFrom());
}
});
InstantingMessengerPlugin2实现了LEP-0011(Traceable Message)。主要是提供可靠消息服务,以及可以跟踪消息状态。
LEP-0011是非标准协议,需要支持LEP协议的服务器配合,例如:Granite XMPP Server。
chaClient.register(InstantingMessengerPlugin2.class);
InstantingMessengerPlugin2插件,使用IMessageListener2的接口来替代标准的IMessageListener。IMessageListeners接口提供了一个traced方法来跟踪消息状态。
IInstantingMessenger2 im2 = chatClient.createApi(IInstantingMessenger2.class);
im2.addMessageListener(new IMessageListener2() {
public void received(Message message) {
System.out.println("Received a message from user " + message.getFrom());
}
public void traced(Trace trace) {
for (MsgStatus status : trace.getMsgStatuses()) {
if (status.getStatus() == MsgStatus.Status.SERVER_REACHED) {
System.out.println("Message which id is " + status.getId() + " has reached server at time " + status.getStamp());
} else if (status.getStatus() == MsgStatus.Status.PEER_REACHED) {
System.out.println("Message which id is " + status.getId() + " has reached peer at time " + status.getStamp());
} else { // status.getStatus() == MsgStatus.Status.MESSAGE_READ
System.out.println("Message which id is " + status.getId() + " has read by contact at time " + status.getStamp());
}
}
}
});
MucPlugin实现了XEP-0045(Multi-User Chat),提供聊天室多人聊天功能。
chaClient.register(MucPlugin.class);
IMucService muc = chatClient.createApi(IMucService.class);
JabberId[] hosts = muc.getMucHosts();
JabberId[] rooms = muc.getMucRooms();
IRoom room = muc.getRoom(roomJid);
RoomInfo roomInfo = room.getRoomInfo();
room.enter("my_nick_name");
room.exit();
Occupant[] occupants = room.getOccupants();
可以给聊天室发送消息,聊天室里的所有用户都能收到该消息。
room.send(new Message("Hello, everyone!"));
可以发送私聊消息给聊天室中某个用户。
room.send("user_nick_name", new Message("Hello, everyone!"));
JabberId roomJid = JabberId.parse("my_chat_room_name@im.thefirstlineofcode.com");
muc.createInstantRoom(roomJid, "my_nick_name");
JabberId roomJid = JabberId.parse("my_chat_room_name@im.thefirstlineofcode.com");
muc.createReservedRoom(roomJid, "my_nick_name", new StandardRoomConfigurator() {
protected RoomConfig configure(RoomConfig roomConfig) {
roomConfig.setRoomName("my first room");
roomConfig.setRoomDesc("Hope you have happy hours here!");
roomConfig.setMembersOnly(true);
roomConfig.setAllowInvites(true);
roomConfig.setPasswordProtectedRoom(true);
roomConfig.setRoomSecret("simple_password");
roomConfig.getGetMemberList().setParticipant(false);
roomConfig.getGetMemberList().setVisitor(false);
roomConfig.setWhoIs(WhoIs.MODERATORS);
roomConfig.setModeratedRoom(true);
return roomConfig;
}
}
);
JabberId myColleague = JabberId.parse("my_colleague_name@im.thefirstlineofcode.com");
room.invite(myColleague, "Let's discuss our plan")
通过IRoomListener可以监听多种Room相关的事件。当Room产生事件时,IRoomListener接收到RoomEvent类型的对象。
RoomEvent对象有两个关键属性,roomJid和eventObject,roomJid表示Event来自哪个Rooom,而eventObject则根据RoomEvent类型的不同,可以是不同类型的对象,表示Event的具体细节。
处理RoomEvent的代码大概如下。
muc.addRoomListener(new IRoomListener() {
public void received(RoomEvent<?> event) {
if (event instanceof InvitationEvent) {
InvitationEvent invitationEvent = (InvitationEvent)event;
JabberId roomJid = invitationEvent.getRoomJid();
Invitation invitation = invitationEvent.getEventObject();
System.out.println(String.format("'%s' invites you to join room '%s'", invitation.getInvitor(), roomJid));
} else if (event instanceof EnterEvent) {
Enter enter = ((EnterEvent)event).getEventObject();
Occupant occupant = muc.getRoom(event.getRoomJid()).getOccupant(enter.getNick());
int sessions = occupant == null ? 0 : occupant.getSessions();
System.out.println(String.format("'%s'[sessions:%d] has joined room '%s'", enter.getNick(), sessions, event.getRoomJid()));
} else if (event instanceof ExitEvent) {
Exit exit = ((ExitEvent)event).getEventObject();
Occupant occupant = muc.getRoom(event.getRoomJid()).getOccupant(exit.getNick());
int sessions = occupant == null ? 0 : occupant.getSessions();
System.out.println(String.format("'%s'[sessions:%d] has exited room '%s'", exit.getNick(), sessions, event.getRoomJid()));
} else if (event instanceof ChangeAvailabilityStatusEvent) {
ChangeAvailabilityStatus changeAvailabilityStatus = ((ChangeAvailabilityStatusEvent)event).getEventObject();
System.out.println(String.format("'%s' has changed it's availability status to: %s", changeAvailabilityStatus.getNick(),
getAvailabilityStatus(changeAvailabilityStatus)));
} else if (event instanceof RoomMessageEvent) {
RoomMessageEvent messageEvent = (RoomMessageEvent)event;
System.out.println(String.format("groupchat message received[from '%s' at room '%s']: %s", messageEvent.getEventObject().getNick(),
messageEvent.getRoomJid(), messageEvent.getEventObject().getMessage()));
} else if (event instanceof PrivateMessageEvent) {
PrivateMessageEvent privateMessageEvent = (PrivateMessageEvent)event;
System.out.println(String.format("groupchat private message received[from '%s' at room '%s']: %s", privateMessageEvent.getEventObject().getNick(),
privateMessageEvent.getRoomJid(), privateMessageEvent.getEventObject().getMessage()));
} else if (event instanceof DiscussionHistoryEvent) {
DiscussionHistoryEvent discussionHistoryEvent = (DiscussionHistoryEvent)event;
System.out.println(String.format("groupchat discussion history message received[from '%s' at room '%s']: %s", discussionHistoryEvent.getEventObject().getNick(),
discussionHistoryEvent.getRoomJid(), discussionHistoryEvent.getEventObject().getMessage()));
} else if (event instanceof ChangeNickEvent) {
ChangeNickEvent changeNickEvent = (ChangeNickEvent)event;
System.out.println(String.format("user '%s'[sessions: %d] changed his nick[at room '%s']: %s", changeNickEvent.getEventObject().getOldNick(),
changeNickEvent.getEventObject().getOldNickSessions(), changeNickEvent.getRoomJid(),
changeNickEvent.getEventObject().getNewNick()));
} else if (event instanceof RoomSubjectEvent) {
RoomSubjectEvent roomSubjectEvent = (RoomSubjectEvent)event;
if ("".equals(roomSubjectEvent.getEventObject().getSubject())) {
System.out.println(String.format("there are no room subject in room '%s'", roomSubjectEvent.getRoomJid()));
} else {
System.out.println(String.format("room subject received[from '%s' in room '%s']: %s", roomSubjectEvent.getEventObject().getNick(),
roomSubjectEvent.getRoomJid(), roomSubjectEvent.getEventObject().getSubject()));
}
} else if (event instanceof KickedEvent) {
KickedEvent kickedEvent = (KickedEvent)event;
System.out.println(String.format("you are kicked by '%s' from room '%s'. reason is '%s'", kickedEvent.getEventObject().getNick(),
kickedEvent.getEventObject().getActor().getNick(), kickedEvent.getRoomJid(),
kickedEvent.getEventObject().getReason()));
} else if (event instanceof KickEvent) {
KickEvent kickEvent = (KickEvent)event;
System.out.println(String.format("'%s' is kicked from room '%s'", kickEvent.getEventObject().getNick(), kickEvent.getRoomJid()));
}
}
});
以下是RoomEvent对象列表。
对象类型 | eventObject类型 | 描述 |
---|---|---|
ChangeAvailabilityStatusEvent | ChangeAvailabilityStatus | 有聊天室用户修改了他的Presence状态 |
ChangeNickEvent | ChangeNick | 有聊天室用户修改了他的昵称 |
DiscussionHistoryEvent | RoomMessage | 进入聊天室时,会收到聊天室最近的聊天历史消息 |
EnterEvent | Enter | 有新用户进入聊天室 |
ExitEvent | Exit | 有用户退出了聊天室 |
InvitationEvent | Invitation | 加入聊天室的邀请 |
KickedEvent | Kicked | 用户自己被踢出了聊天室 |
KickEvent | Kick | 有用户被踢出了聊天室 |
PrivateMessageEvent | RoomMessage | 私聊消息 |
RoomMessageEvent | RoomMessage | 有用户在聊天室发了消息 |
RoomSubjectEvent | RoomSubject | 聊天室主题变更 |
Chalk的架构设计将系统分为两部分。
- 主程序框架
- 插件
在主程序框架中,灰色的框是系统的骨架,是系统中比较稳定的部分,虚线框的部分是系统中灵活及充满变化的地方。
这种灵活性和变化从何而来?因为XMPP被设计成一个内核稳定,但高度可扩展的协议,通过XML的namespace语义,我们可以在stanza(iq, message, presence)中添加任意的新协议元素,从而扩展XMPP协议。通过这样的方式,XMPP被扩展成一个庞大的协议族,仅公开的XEPs(XMPP Extension Protocols)就有近200个。
对应XMPP协议的设计原则,Chalk也将系统设计成稳定的框架+可扩展的插件子系统。框架部分封装了通讯细节和XMPP基础概念,插件子系统允许通过插件任意扩展系统的能力。
值得注意的是,系统的扩展和变化主要出现这几个地方。
- 协议定义
- 协议-协议对象的转化
- 协议逻辑处理
大部分情况下,我们希望插件和协议有清楚的映射关系。这意味着,我们希望尽量能够在一个插件中,封装一个或一组相关的XMPP协议。应该尽量避免将一个独立协议的逻辑,拆分在多个插件中。
为简化Plugin的开发,Chalk提供了IChatSystem和IChatServices,希望能够把Plugin和系统之间的联系,简化限制在这两个接口内。
IChatSystem允许Plugin将特定的扩展注册到系统当中,主要包括:
- 协议对象(Protocol Object)
- 协议-协议对象转换器(Parser & Translator)
- 插件的Api(Api)
- 插件Api的实现(Api Impl)
IChatServices封装了下层通讯细节及XMPP基础概念,插件的Api实现可以调用IChatServices提供的服务,处理协议的细节逻辑。
我们通过一个简单的案例,说明如何开发一个Chalk插件。
我们选择实现一个简单的协议XEP-0199(XMPP Ping)。
XMPP Ping是一个基于iq的协议,简单来说,我们需要处理以下的逻辑。
客户端向服务器端发送一个ping请求。
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='c2s1' type='get'>
<ping xmlns='urn:xmpp:ping'/>
</iq>
如果服务器支持XMPP Ping协议,服务器返回pong响应。
<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='c2s1' type='result'/>
如果服务器不支持XMPP Ping协议,则返回错误。
<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='c2s1' type='error'>
<ping xmlns='urn:xmpp:ping'/>
<error type='cancel'>
<service-unavailable xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
</error>
</iq>
为了便于逻辑处理,我们一般会对应网络上传输协议XML文档,设计一个Java业务对象,这样便于用业务类来进行处理。这个Java业务对象,我们称之为协议对象(Protocol Object)。
在本案例中,协议对象结构非常简单,类定义如下。
public class Ping {
public static final Protocol PROTOCOL = new Protocol("urn:xmpp:ping", "ping");
}
Ping类不包含任何信息,只是用类型来表示ping协议。
有了Protocol Object,引发了另外的问题,就是如何处理XMPP协议文档-Protocol Object之间的转换。
当我们收到网络上的XML协议文档,需要将其内容转换成一个Protocol Object实例。对应的,当我们发送一个Protocol Object时,需要将Protocol Object包含的协议信息,转换成对应的XMPP协议文档,再发送到网络上去。
Chalk用IParser和ITranslator来处理Protocol Object-XMPP协议文档之间的转换。IParser负责将一个XMPP协议文档转换成Protocol Object实例。ITranslator负责将一个Protocol Object实例翻译成对应的XMPP协议文档。
为了简化这些转换逻辑,Chalk使用Basalt项目提供的一个OXM(Protocol Object-XMPP Document Mapping)框架。在大部分情况下,我们并不需要为对象和XMPP协议文档之间的转换,编写逻辑代码。我们只需要选择系统内置的IParser和ITranslator实现。
因为Ping对象结构非常简单,我们可以选择使用SimpleObjectParser和SimpleObjectTranslator来处理对象和XML文档之间的转换。
Basalt是一个XMPP协议库,定义了基础的XMPP协议对象,并提供一个简单易用的OXM(Protocol Object-XMPP Document Mapping)框架。
最常用的IParser和ITranslator,是NamingConventionParser和NamingConventionTranslator,它们采用命名约定的方法,将Protocol Object的Field和XMPP协议文档中的Element做对应的拷贝。
关于Basalt OXM(Protocol Object-XMPP Document Mapping)框架的更多信息,请参考Basalt项目相关文档。
在理想的情况下,我们将Chalk的开发者分为两类:
- 插件开发者
- 应用开发者
插件开发者理解XMPP协议细节,以及Chalk的插件架构体系。他们将XEPs或者自定义的非标准协议,开发成对应的Plugin。
应用开发者不需要理解XMPP协议的细节,他们只是使用Chalk基础框架和选用插件,在此之上开发具体XMPP应用,例如IM,物联网应用等。
在现实中,开发者可能需要兼插件开发者和应用开发者。即使是这样的情况,在设计插件时,也应该遵循一些原则:
- 提供易于使用的Api,使得应用开发更简单、直观。
- 屏蔽XMPP协议的底层细节。
- 避免绑定具体的应用逻辑,提升插件的复用性。
插件开发者一个重要的任务,是设计良好的Api,给应用开发者调用。
在本案例中,PingPlugin提供以下的Api给应用开发者使用。
public interface IPing {
public enum Result {
PONG, // Server returned a pong.
SERVICE_UNAVAILABLE, // Server doesn't support the protocol.
TIME_OUT // Ping timed out.
}
/**
* Send a ping request to server and waiting for result.
*/
Result ping();
/**
* Set timeout for ping operation.
*/
void setTimeout(int timeout);
/**
* Get the ping timeout.
*/
int getTimeout();
}
基于IChatServices接口,有多种办法可以实现协议逻辑。
只要正确的注册Api和Api Impl,框架会在在Api Impl里自动注入IChatServices对象,所以我们总是可以获取到IChatServices来使用。
最常规的想法,我们向服务器端发送一个带id的ping消息,然后监控所有收到的信息,如果有一条相同id的消息返回,我们检查是pong还是server-unavailable,如果超过一定的时间还没收到响应消息,我们就返回超时。
如果是这样的思路,我们可以:
- 使用IIqService发送一个ping消息到服务器。
IIqService iqService = chatServices.getIqService();
Iq ping = new Iq(Iq.Type.GET);
ping.setObject(new Ping());
String pingId = ping.getId();
iqService.send(ping);
注意,我们需要记录下发送的iq的id,便于后面找到服务器端对应的响应消息。
- 我们当然需要增加一个IIqListener,监听从服务器端来的消息,检查是否pong的响应。
iqService.addListener(new IIqListener() {
public void received(Iq iq) {
if (pingId.equals(iq.getId()) {
// Received response from server
}
}
});
- 我们还需要监听错误消息,检查服务器可能返回service-unavailable错误。
IErrorService errorService = chatServices.getErrorService();
errorService.addListener(new IErrorListener() {
public void occurred(IError error) {
if (pingId.equals(error.getId()) {
if (error instanceof ServerUnavailable) {
// Received server-unavailable error from server
}
}
}
});
- 我们当然还需要一个Timer定时器,来处理超时的情况。
Timer timer = new Timer();
timer.schedule(pingTimeoutTask, timeout);
这些处理看上去非常繁琐,Chalk提供了稍微简便一些的方法,我们可以使用SyncOperationTemplate类来简化一些代码逻辑。
以下是使用SyncOperationTemplate大概的代码逻辑。
public class PingImpl implements IPing {
private IChatServices chatServices;
private String id;
private int timeout;
public Result ping() {
SyncOperationTemplate<Iq, IPing.Result> template = new SyncOperationTemplate<Iq, IPing.Result>(chatServices);
try {
return template.execute(new ISyncIqOperation<IPing.Result>() {
public void trigger(IUnidirectionalStream<Iq> stream) {
Iq iq = new Iq(Iq.Type.SET);
iq.setObject(new Ping());
id = iq.getId();
stream.send(iq, timeout);
}
public boolean isErrorOccurred(StanzaError error) {
if (id.equals(error.getId()))
return true;
return false;
}
public boolean isResultReceived(Iq iq) {
if (id.equals(iq.getId()))
return true;
return false;
}
public Result processResult(Iq iq) {
return IPing.Result.PONG;
}
});
} catch (ErrorException e) {
if (e.getError().getDefinedCondition().equals(RemoteServerTimeout.DEFINED_CONDITION)) {
return IPing.Result.TIME_OUT;
} else {
return IPing.Result.SERVICE_UNAVAILABLE;
}
}
}
}
...
}
这里简化之处在于,我们可以在一个ISyncIqOperation内部类中处理所有逻辑,而不需要去访问IIqService,IIqListener,IErrorService,IErrorListener及Timer等诸多细节。
Legacy模式还是比较复杂,特别我们会注意到一个问题,我们总是需要在代码中跟踪相同id的消息,这似乎意味着跟踪id的处理,应该移交给框架去进行处理。
Chalk提供了Task模式的处理框架,可以避免我们琐碎的去跟踪相同id的消息,更加简化协议逻辑的处理。
注意,我们在IPing接口里,采用同步阻塞等待结果的方法,调用ping()方法后,程序会阻塞直到获得pong响应,或者接收到server-unavailable错误,或者等待超时返回。
在同步的情况下,最方便是使用ITaskService和ISyncTask接口。
public class PingImpl implements IPing {
private IChatServices chatServices;
private int timeout;
...
public Result ping() {
ITaskService taskService = chatServices.getTaskService();
try {
return taskService.execute(new ISyncTask<Iq, IPing.Result>() {
public void trigger(IUnidirectionalStream<Iq> stream) {
Iq iq = new Iq(Iq.Type.SET);
iq.setObject(new Ping());
stream.send(iq, timeout);
}
public Result processResult(Iq iq) {
return IPing.Result.PONG;
}
});
} catch (ErrorException e) {
if (e.getError().getDefinedCondition().equals(RemoteServerTimeout.DEFINED_CONDITION)) {
return IPing.Result.TIME_OUT;
} else {
return IPing.Result.SERVICE_UNAVAILABLE;
}
}
}
...
}
可以看到,在Task模式下,框架默认监控相同id的消息,如果是接收到相同id的iq result,则回调processResult()方法进行处理。
如果服务器端返回相同id的错误,以及超时错误,都会统一封装成ErrorException,可以catch例外根据具体情况进行处理。
在实时消息系统中,Sync的场景会比较少,大多数场景下,等待来自服务器或联系人的消息,但是消息何时会来到,我们并不能预期。
在大多数情况下,我们应该使用Async Task而不是Sync Task,等消息到达的时候,触发回调方法。
ITaskService提供了执行Async Task的方法:
public interface ITaskService {
...
void execute(ITask<?> task);
...
}
关于Sync Task和Async Task的区别,还有一个值得注意的地方。所有的Sync Task的回调处理,都是在主消息接收线程里执行的,这意味着,如果有一个回调方法执行时,占用太多时间,会导致其它的Task被阻塞,有可能导致的一个结果是,应用程序一些业务被阻塞变慢。
当然我们可以在一些耗时的Sync Task回调方法里,启动新的线程,避免阻塞主消息接收线程。这是一个解决办法,但是我们有时候可能会容易忘记需要启动新线程。
Async Task采用了不同的处理方法,框架提供了一个线程池,每当接收到一个需要处理的Async Task回调时,系统会从线程池中启动一个线程,将回调逻辑放在新线程中去处理。这样,Async Task可以更好的避免系统阻塞变慢问题。
如果可能,应该尽可能的使用Async Task模式来处理协议的逻辑。
现在,我们已经处理好了所有的协议细节和业务逻辑,需要将所有的代码和逻辑,通过插件注册到系统中去。我们在上面已经提到过了,我们使用IChatSystem来帮助完成这项工作。
我们需要编写一个Plugin类,在本案例中,这个类是PingPlugin。
在本案例中PingPlugin中,我们需要:
- 注册协议的协议对象Ping,以及对应的Parser和Translator。
- 注册提供的Api接口IPing,以及IPing的具体实现PingImpl。
我们在Plugin类的init()方法里,注册插件给系统提供的扩展。在destroy()方法里,我们移除这些扩展。
PingPlugin的代码如下。
public class PingPlugin implements IPlugin {
public void init(IChatSystem chatSystem, Properties properties) {
chatSystem.registerParser(
ProtocolChain.first(Iq.PROTOCOL).next(Ping.PROTOCOL),
new SimpleObjectParserFactory<Ping>(Ping.PROTOCOL, Ping.class));
chatSystem.registerTranslator(
Ping.class,
new SimpleObjectTranslatorFactory<Ping>(Ping.class, Ping.PROTOCOL));
chatSystem.registerApi(IPing.class, PingImpl.class, properties);
}
public void destroy(IChatSystem chatSystem) {
chatSystem.unregisterApi(IPing.class);
chatSystem.unregisterTranslator(Ping.class);
chatSystem.unregisterParser(ProtocolChain.first(Iq.PROTOCOL).next(Ping.PROTOCOL));
}
}
插件已经开发完成,现在我们可以注册插件:
chatClient.register(PingPlugin.class);
创建Api:
IPing ping = chatClient.createApi(IPing.class);
并执行协议逻辑:
IPing.Result result = ping.ping();
PingPlugin是一个非常简单的插件,虽然它很简单,但是开发这样一个插件,依然需要完成一个完整的插件开发的过程。
这样一个简单而又完整的插件案例,是我们开发更复杂协议的起点。
如果要开发更复杂的协议,最好的办法就是阅读XMPP文档,以及阅读Chalk的代码。如果你对XMPP和开源充满热情,那现在就开始吧。
TBD
TBD