OpenFire源码学习之十三:消息处理

消息处理流程总揽(该图来源于互联网,图片很大,不过类容还是挺清楚的。不方便查看,需要下载查看):

OpenFire源码学习之十三:消息处理

更为直观的流程描述:

OpenFire源码学习之十三:消息处理

在线chat

Test1---->test2

<message  
          to="test2@8ntmorv1ep4wgcy/Smack" 
          from="test1@8ntmorv1ep4wgcy/Spark 2.6.3" type="chat">
  <body>hello</body>
  <thread>CLhNEh</thread>
  <x xmlns="jabber:x:event">
    <offline/>
    <composing/>
  </x>
</message>

Of系统发送消息会调用两次过滤器

InterceptorManager.getInstance().invokeInterceptors(packet, this, false, false);
                deliver(packet);
                // Invoke the interceptors after we have sent the packet
                InterceptorManager.getInstance().invokeInterceptors(packet, this, false, true);

离线chat 会 调用数据存储方法,不会经过过滤器

消息路由

RoutingTable(路由表),用于存储客户端会话路线,即将离任的服务器会话和组件。当一个用户与服务器进行身份验证的客户端会话将被添加到路由表。每当客户端会话可用或不可用的路由表将被更新过。RoutingTable是维护服务器范围的知识,它包括任何节点(也是集群的主机)的路由。

路由只关心节点地址,目的地是数据包处理程序。通常情况下是指以下三个类型:

l  Session 一个本地或远程会话属于服务器的域。远程会话可能在集群服务器。

l  Chatbot 聊天机器人,将有各种各样的Chatbot数据包路由到它

l  Transport 其他服务器域,它可能是驻留在同一服务器JVM

几乎在所有情况下,调用者都不必关心如何处理一个给定的节点。只需要简单地获得数据包处理程序和交付包节点,把细节的处理程序就可以了。在OpenFire中的路由使用字符串的预处理匹配规则来赋予的XMPP规范。一个特定名称的路由可以由通配符或者其他的资源来匹配,也可标示为null。例如:一个路由到任何地址(server.com)应该设置名称为空,主机“server.com”和资源都为null,一个路由到最好的资源,为(user@server.com)应该表明路由无效资源组件的XMPPAddress。在会话管理中,添加的路由最好用user@server.com和user@server.com/resource routes(路由资源)。

 在容纳广播这一规则里,你还可以查询匹配个别的比如孩子节点。路由表包含很多的节点树。节点树是安排在以下等级秩序:

forest  

在路由表中的所有节点。一些XMPP与主机的地址,名字,和资源设置为null会匹配所有节点  存储在路由表。使用的时候应该注意,路由表可能包含成千上万条目和迭代器产品。

domain root

这些节点树的根都是服务器的一个域。XMPP地址包含只是一个主机条目,空的名称和资源领域将匹配域根,所有孩子节点将包含两根条目(如果有的话)和所有条目相同的主机名。

user branches

根的直接子元素是用户分支。一个XMPP地址仅包一个曾和名称条目,空在资源领域将匹配一个特定用户的分支。所有孩子节点将同时包含用户部门和所有条目相同的主机和名字,而可以其他忽视资源。这是给大多数的用户进行广播。注意,如果用户分支位于外国服务器,唯一的途径将服务器对服务的传输。

resource leaves

每个用户分支可以有零个或多个资源叶子。部分匹配在XMPP地址和值在主机、名称和资源领域将相当于精确匹配调用因为只有一个路线可以被注册为一个特定的。如果你想搜索的资源叶路线,以及有效的用户部门对该节点如果没有叶的存在,可以看这个getBestRoute()方法的描述

任何组件或模块都会触发路由路由表的立即更新。

用户登陆

OF于spark的登陆对话

第一次对话

客户端发送请求消息:

<stream:stream to="192.169.1.104" xmlns="jabber:client" xmlns:stream="http://etherx.jabber.org/streams" version="1.0">

服务端处理:

1.通过ConnectionHandle的messageReceived方法进入process处理消息方法,然后调用create_

  Session方法。该方法使用XPP抓住开放流标签和创建一个活跃的会话对象,这里会话创建取决于发送

  者的名称空间,而且这个会话将获得打开流标签、检查错误、创建一个会话、返回一个错误或者杀死这 个

  个连接。在这个方法中,首先会检查这个stream的“to“属性是否会匹配服务器的的子域名,如果这个

  属性中to的值是个未知,或者连接关闭的话,则返回false进行下一步判断如果检查正确则拼装一个会

  话对象的xml文件调用deliverRawText。当返回了false就会进行下一个的判定,在这样的判定中,

  先用发送这个空间命名,判断客户端与服务端连接的安全型对话,服务端也会回复一串字符流。在登陆

  会话中,因为stream的开始标签START_TAG不存在,则直接进入了下个方法。

2.通过NIOConnection中的方法deliverRawText()方法给客户端会话。它会给客户端返回两条信息:

  1):<?xml version='1.0'encoding='UTF-8'?>

<stream:streamxmlns:stream="http://etherx.jabber.org/streams" xmlns="jabber:client"

 from="192.168.2.104"

        version="1.0">

  2):<stream:features>

<starttlsxmlns="urn:ietf:params:xml:ns:xmpp-tls"></starttls>

<mechanismsxmlns="urn:ietf:params:xml:ns:xmpp-sasl">

<mechanism>DIGEST-MD5</mechanism>

<mechanism>JIVE-SHAREDSECRET</mechanism>

<mechanism>PLAIN</mechanism>

<mechanism>ANONYMOUS</mechanism>

<mechanism>CRAM-MD5</mechanism>

               </mechanisms>

<compression xmlns="http://jabber.org/features/compress">

<method>zlib</method>

</compression>

<auth xmlns="http://jabber.org/features/iq-auth"/>

<registerxmlns="http://jabber.org/features/iq-register"/>

    </stream:features>

上面的第一条为服务端需要发送的地址id,第二条应该告诉客户端具体信息了。其中的<mechanisms>节点就是要告诉客户端openfire支持SASL验证方式了。

第二次对话:

客户端发送请求消息:

<starttlsxmlns="urn:ietf:params:xml:ns:xmpp-tls"/>

服务处理:在StanzaHandler中的process方法处理中,会鉴定发送者消息tag。而spark发的消息则是<starttls>,服务器的处理则如下代码片段:

 String tag = doc.getName();
        if ("starttls".equals(tag)) {
            // Negotiate TLS
            if (negotiateTLS()) {
                startedTLS = true;
            }
            else {
                connection.close();
                session = null;
            }
        }

根据客户端tag内容可判断,它需要使用TLS安全协议。TLS协议是......

那么服务端也会通过startTLS()方法 进行客户端认证,认证的类型包含以下:

l  Disabled:客户机无具体身份验证

l  Wanted:客户端尝试认证。如果客户端没有提供身份验证的基本信息,TLS协议将停止与它的连接。这种选项用于s2s模式。

l  Needed:客户需要认证。如果客户选择不提供身份验证信息本身,LS谈判将继续进行。此选项用于s2s模式

这里默认的选择Connection.ClientAuth.disabled;服务端的回复如下:

<proceedxmlns="urn:ietf:params:xml:ns:xmpp-tls"/>

 

第三次对话:

客户端请求消息:再次握手

<stream:stream to="192.168.2.104" xmlns="jabber:client" xmlns:stream="http://etherx.jabber.org/streams" version="1.0">

服务端处理:

OF会调用processPresence()处理收到的存在数据包。处理数据包的时候of会将数据消息交给PacketRouterImpl的route()方法处理。在这个方法里边针对登陆的消息会经过下面的几个步骤:

1.route方法会先调用拦截器(InterceptorManager.getInstance().invokeInterceptors)里

  面调用全局拦截器,对这个包进行检查如果这个集合对象为空,创建迭代

  器操作

for (PacketInterceptor interceptor : globalInterceptors) {
   try {
         interceptor.interceptPacket(packet, session, read, processed);}
 ......
      }
}

2.设置消息发送的地址等信息

packet.setTo(session.getAddress());
     packet.setFrom((JID)null);
packet.setError(PacketError.Condition.not_authorized)
session.process(packet);

3.Process()处理消息包

4.Deliver()分发消息

5.incrementServerPacketCount()增加一个从服务器到客户端发送数据包数量。

第四次对话:

客户端选择DIGEST-MD5方式验证

<authmechanism="DIGEST-MD5"xmlns="urn:ietf:params:xml:ns:xmpp-sasl"></auth>

服务器处理:

   Of在process方法中进行SASL验证.验证数据包处理handle()体可能会发送一个初始身份验证请求或响应挑战由服务器。返回值指示是否认证已经完成,不论是成功与否或如果实体将发送一个响应的挑战。代码清单:

......
else if ("auth".equals(tag)) {
            // User is trying to authenticate using SASL 
            startedSASL = true;
            // Process authentication stanza 验证处理
            saslStatus = SASLAuthentication.handle(session, doc);
        }

然后通过sendChallenge()方法生成加密参数给客户端认证:

<challenge xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
cmVhbG09IjE5Mi4xNjguMi4xMDQiLG5vbmNlPSJiYWV1ZDd3UHZHSHFLd1Y0d3hDaFFVN2NnbzhvSUN	hMUdhbVNxSUo4Iixxb3A9ImF1dGgiLGNoYXJzZXQ9dXRmLTgsYWxnb3JpdGhtPW1kNS1zZXNz
</challenge>

第五次对话:

客户端请求:

<iq >

<queryxmlns="jabber:iq:auth">

<username>600646</username>

</query>

</iq>

服务处理iq信息返回:

<challengexmlns="urn:ietf:params:xml:ns:xmpp-sasl">

cmVhbG09IjE5Mi4xNjguMi4xMDQiLG5vbmNlPSJJK21mRVpKbnk5SHZzcmNsRmZIL2dTMG5EYzF3Q2Nk         alBNOFR1N3JiIixxb3A9ImF1dGgiLGNoYXJzZXQ9dXRmLTgsYWxnb3JpdGhtPW1kNS1zZXNz

</challenge>

 

第六次对话:

客户端请求:

<responsexmlns="urn:ietf:params:xml:ns:xmpp-sasl">

Y2hhcnNldD11dGYtOCx1c2VybmFtZT0iNjAwNjQ2IixyZWFsbT0iMTkyLjE2OC4yLjEwNCIsbm9uY2U9Im01         MGR3cFR0L3JQVlZCT2o4RVlIWkpTS0wvWUxMei92N0orSUV5RksiLG5jPTAwMDAwMDAxLGNub25jZT0         iN0lVK2xXTFhOc2Z5eTFYTnBIU2Jwdlc2V3p5aU14Zi9JcWJpUml0OSIsZGlnZXN0LXVyaT0ieG1wcC8xOT         IuMTY4LjIuMTA0IixtYXhidWY9NjU1MzYscmVzcG9uc2U9NWNkOTBjM2NjNWNlNTMxOGVkMjY1Mjk5Y         WMxOGQ1NDYscW9wPWF1dGgsYXV0aHppZD0iNjAwNjQ2Ig==

</response>

服务端处理:此时of根据客户端返回的<response>信息来查找数据库查询请求登陆者密码:

JDBCAuthProvider:

private String getPasswordValue(String username) throws UserNotFoundException {
.......
        if (username.contains("@")) {
            // Check that the specified domain matches the server's domain
            int index = username.indexOf("@");
            String domain = username.substring(index + 1);
            if (domain.equals(XMPPServer.getInstance().getServerInfo().getXMPPDomain())) {
                username = username.substring(0, index);
            } else {
                // Unknown domain.
                throw new UserNotFoundException();
            }
        }
        try {
            con = getConnection();
            pstmt = con.prepareStatement(passwordSQL);
            pstmt.setString(1, username);

            rs = pstmt.executeQuery();

            // If the query had no results, the username and password
            // did not match a user record. Therefore, throw an exception.
            if (!rs.next()) {
                throw new UserNotFoundException();
            }
            password = rs.getString(1);
        }
       ......
}

得到密码校验后服务端给客户端反馈的信息如下

<successxmlns="urn:ietf:params:xml:ns:xmpp-sasl">

cnNwYXV0aD1mYTljNjA1YWUyYjJiMzQ1NjE3NzQ4NTIyODgxMDdhOA==

</success>

此时c2s的登陆方式就结束了。

单点消息

流程图:

OpenFire源码学习之十三:消息处理

流程处理:

发送者A:

<message >
  <body>aaa</body>
  <thread>3GU94l</thread>
  <x xmlns="jabber:x:event">
    <offline/>
    <composing/>
  </x>
</message>

服务器:

ConnectionHandler
messageReceived()->handler.process()
StanzaHandler
        process()->processMessage(packet);
StanzaHandler
        Route()
PacketRouterImpl
 messageRouter.route(packet)
MessageRouter
Route()
RoutingTableImpl
        getClientRoute()//检查客户端(发送者)是否在线
MessageRouter
        Route()->invokeInterceptors()-routingTable.routePacket()
RoutingTableImpl
<span style="white-space:pre">	</span>routePacket()->routeToBareJID()->getHighestPrioritySessions()
<span style="white-space:pre">	</span>//判断客户端(接受者)地址是否在线和优先级别
<span style="white-space:pre">	</span>False:
<span style="white-space:pre">	</span>MessageRouter
<span style="white-space:pre">		</span>routingFailed
<span style="white-space:pre">	</span>OfflineMessageStrategy
<span style="white-space:pre">		</span>storeOffline()//存储离线消息
<span style="white-space:pre">		</span>PrivacyListManager
<span style="white-space:pre">			</span>getDefaultPrivacyList()//个人默认隐私列表
<span style="white-space:pre">		</span>PrivacyListProvider
<span style="white-space:pre">			</span>loadDefaultPrivacyList()//从数据库查询用户默认的隐私列表
<span style="white-space:pre">				</span>If not null listsCache.put(cacheKey, list);
<span style="white-space:pre">				</span>Else
<span style="white-space:pre">				</span>OfflineMessageStrategy 
<span style="white-space:pre">					</span>store()
<span style="white-space:pre">	</span>OfflineMessageStore 
<span style="white-space:pre">	</span>addMessage()//保存消息
拼写xml:
<message ><offline/><composing/></x></message>
INSERT INTO ofOffline (username, messageID, creationDate, messageSize, stanza) VALUES (?, ?, ?, ?, ?)
True:
<span style="white-space:pre">	</span>LocalSession
<span style="white-space:pre">		</span>Process()-invokeInterceptors()->deliver()
LocalClientSession
        Deliver()
NIOConnection
<span style="white-space:pre">	</span>Deliver->ioSession.write(buffer);

离线消息表

离线消息存储表(ofOffline

1

USERNAME

用户名

varchar2

32

 

NOT NULL

 

 

2

MESSAGEID

消息ID

number

 

 

NOT NULL

 

 

3

CREATIONDATE

消息存储日期

char

15

 

 

NOT NULL

 

 

4

MESSAGESIZE

消息的字节大小

number

 

 

 

NOT NULL

 

 

5

STANZA

消息体

long

 

 

 

NOT NULL

 

 

广播

系统广播交互页面:user-message.jsp

OpenFire源码学习之十三:消息处理

1.通过消息管理器SessionManager.sendServerMessage()

2.在sendServerMessage方法中调用broadcast(packet)

   Broadcast方法:将消息广播到所有的连接回话

   在broadcast方法中会调用broadcastPacket()方法

3.在broadcastPacket方法中:

 public void broadcastPacket(Message packet, boolean onlyLocal) {
        // 从路由表中遍历所有在线信息
        for(ClientSession session : localRoutingTable.getClientRoutes()) {
           / /开始处理广播信息
            session.process(packet);
        }
.......
}

4.通过LocalSession的process方法处理广播信息。LocalSession是个本地回话类,它代表c2s、

 s2s两种不同的回话连接,它的回话属于一对多。

public void process(Packet packet) {
        // 检查请求的包可以被处理
        if (canProcess(packet)) {
            // 执行的实际处理数据包。这通常意味着发送数据包到实体
            try {
                // 在发送数据包之前调用拦截器
                InterceptorManager.getInstance().invokeInterceptors(packet, this, false, false);
                 // 发送数据包
                deliver(packet);
                InterceptorManager.getInstance().invokeInterceptors(packet, this, false, true);
            }
            catch (PacketRejectedException e) {
                // An interceptor rejected the packet so do nothing
            }
            catch (Exception e) {
                Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
            }
        }
    }

Openfire发送数据包用:NIOConnection中的deliver方法实现了Connection接口的方法。这个方法本质是调用套接字发送(packet.getWriteBuffer())。

房间消息

房间消息格式

<message >
  <body>aaaaaaaaaaaaaa</body>
  <x xmlns="jabber:x:event">
    <offline/>
    <delivered/>
    <displayed/>
    <composing/>
  </x>
</message>

服务器处理消息:

1.将发送到此服务器托管的组件

RoutingTableImpl.routeToComponent()。

该方法中处理消息的方法交由本地房间(LocalMUCUser)处理。该类代表聊天服务的交

互的用户,用户可以加入聊天服务器托管的serveral的房间。这意味着openfire将不

得不为每个加入房间的用户和MUCRoles生成一个实例。房间成员都依附JVM上。当

房间内成员托管另一个集群节点,那么一个RemoteMUCRole实例将会被代替。

Of在处理房间消息的时候会通过一系列的过滤条件判断。比如消息类型,消息资源等。

这里不多描述。距离看下房间消息发送——sendPublicMessage()。

public void sendPublicMessage(Message message, MUCRole senderRole) throws ForbiddenException {
        // Check that if the room is moderated then the sender of the message has to have voice
        if (isModerated() && senderRole.getRole().compareTo(MUCRole.Role.participant) > 0) {
            throw new ForbiddenException();
        }
        // Send the message to all occupants
        message.setFrom(senderRole.getRoleAddress());
        send(message);
        // Fire event that message was receibed by the room
        MUCEventDispatcher.messageReceived(getRole().getRoleAddress(), senderRole.getUserAddress(),
                senderRole.getNickname(), message);
    }

该方法在类LocalMUCRoom类中。

在send(message)方法中调用了方法broadcast播放。

 private void broadcast(Message message) {
        // Broadcast message to occupants hosted by other cluster nodes
        BroadcastMessageRequest request = new BroadcastMessageRequest(this, message, occupants.size());
        CacheFactory.doClusterTask(request);

        // Broadcast message to occupants connected to this JVM
        request = new BroadcastMessageRequest(this, message, occupants.size());
        request.setOriginator(true);
        request.run();
    }

该方法将广播消息到其他集群节点的驾乘托管。

BroadcastMessageRequest该类将广播一条消息到本地房间成员的任务。当房间成员将

消息发送到房间的每个集群节点将执行这条任务。

doClusterTask(request)。该方法将其他集群成员一异步方式调用任务。在本地的集群

成员不被会执行。


用户登陆房间

<iq >
  <query xmlns="http://jabber.org/protocol/disco#info"/>
</iq>

离线消息

客户端发送离线查询

<iq >
  <query xmlns="jabber:iq:private">
    <storage xmlns="storage:bookmarks"/>
  </query>
</iq>

服务端返回:

<iq type="result" >
  <query xmlns="jabber:iq:private">
    <storage xmlns="storage:bookmarks"/>
  </query>
</iq>

客户端

<iq ></query></iq>

当客户端发送在线状态后

<presence >
  <status>在线</status>
  <priority>1</priority>
</presence>

Of接到消息处理

1、统一交包路由处理(PacketRouterImpl)

2、过滤数据包之后交由PresenceRouter中的handle()方法处理

3、该方法中有两个对象来处理EntityCapabilitiesManager与 PresenceUpdateHandler

   EntityCapabilitiesManager:

   EntityCapabilitiesManager是有用的处理“过滤通知”的联系人,可能不希望收到通知的所  有有效载荷类型使用与PubSub的(XEP-0060)。服务器的管理实体能力的作用是缓存 以前遇到实体XMPP客户端功能,支持相同的身份和特性。如果服务器还没有见过一       个特定的身份和特性组合,探索信息查询发送到该客户端,其答复是缓存以供将来使用       的客户分享这些相同的实体能力。

   PresenceUpdateHandler:

[实现存在协议。客户端使用该协议存在和名册信息更新。

正确处理程序必须检测是否存在类型,更新用户的花名册,并告知用户会话的更新存在状态存在。 Jabber的存在有多种用途,所以这个处理程序可能会在服务器中的所有处理程序是最复杂的。

有四种基本类型的存在更新:

简单的状态更新 - 给服务器(或处理) ,这些更新妥善处理由服务器和多用户的花名册上有兴趣的用户。一空,丢失,或“不可用”类型的属性,表示一个简单的更新(有没有“可用”的类型,尽管它应该被服务器接受。
导演存在更新 - 给特定叽里咕噜实体,这些存在更新妥善处理,并直接传递到实体没有广播名册用户。任何更新类型是可能的认购请求保留的除外。
认购请求 - 这些更新的要求存在订阅状态的变化。这样的要求,始终影响名册。服务器必须:
与适当的用户信息更新名册
推名册更改用户
更新转发给正确的各方。
有效的类型包括“认购” , “认购” , “退订” , “退订” 。
的XMPPServer探针 - 提供了一种机制,服务器到另一台服务器上查询用户的存在状态。这使得用户可以立刻知道用户的存在状态,当他们来到网上,而不是一种存在的方式,从其他的服务器或跟踪他们,因为他们收到更新广播。需要S2S能力。]

4、在PresenceUpdateHandler中处理 方法为process()。 在process()方法中会广播给他的好友花名册,broadcastUpdate(),并且还会广播给其他用户资源。Roster中broadcastPresence()

5、PresenceUpdateHandler类initSession。

6、查找离线消息

7、在OfflineMessageStore中的方法getMessages()先查找出该用户的离线消息,然后删除消息,这样并不是很科学。然后删除用户缓存消息的大小

8、Of返回离线消息

<message >
  <body>2</body>
  <thread>u41WVK</thread>
  <x xmlns="jabber:x:event">
    <offline/>
    <composing/>
  </x>
  <delay xmlns="urn:xmpp:delay" from="8ntmorv1ep4wgcy" stamp="2013-11-04T06:43:41.837Z"/>
  <x xmlns="jabber:x:delay" from="8ntmorv1ep4wgcy" stamp="20131104T06:43:41"/>
</message>