android封装signalR的demo

后端用的是c#,所以长链接这块用的是signalR。公司的前端是用flutter的,也有线程的signalR的插件。可惜会出现一些问题,决定自己封装一个。这里就简单介绍一下android原生封装signalR吧

这边实现了,心跳机制,断线重连,消息去重发送,连接状态等。

先封装了hubConnection,然后在这层实现了心跳。这一块必须得扯上后端,后端实现了一个方法,收到什么参数,马上就把这个参数传回来。然后就用这个方法实现心跳,发送一个消息给服务器,服务器收到这个消息。记录下发出时间和接收时间,不小于自己设定的时间间隔,则认定网络状态有效。当心跳无效的时候就把连接状态置为false,表示连接断开。其实他提供了一个回调oncloed。当连接关闭的时候会调用这个回调。但是不能太依赖这个,所以自己写了心跳来确保连接。下为心跳的逻辑。

 while(isRunning){
                long ping = System.currentTimeMillis()/1000;
                //发送心跳包
                try{
                    hubConnection.send("Echo",String.valueOf(ping));
                }catch (Exception e){
                    connectStatus = false;
                }
                //心跳延时
                try {
                    Thread.sleep(heartDelay);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //最后一次接收消息时间小于发送心跳时间,
                //起码在心跳时间内,没有收到包。
                if(lastRecvTime < ping){
                    long delay = System.currentTimeMillis()/1000 - ping;
                    //时间差大于重连时间的时候,判定为超时,连接状态置为false
                    if(delay > KeepAliveTimeOutSecond){
                        connectStatus = false;
                    }else {
                        connectStatus = true;
                    }
                }else {
                    connectStatus = true;
                }
            }

这个isRunning则表明需不需要进行心跳检测,当连接断开的时候当然是不必要的啦。(ps,来自后端大佬的一个建议,死循环线程里要加一个try,避免他因为错误而中断循环)。

然后开放了三个方法,开始连接,断开连接,发送消息。

/**
     * 开放的三个方法
     * */
    public void send(String method,Object... message){try{
            hubConnection.send(method,message);
        }catch (Exception e){
            connectStatus = false;
        }

    }

    public void stopConnect(){
        isRunning = false;
        connectStatus = false;
        hubConnection.stop();
    }

    public void startConnect(){
        Log.i(TAG,"start connect this message from SignalRSession");
        hubConnection = HubConnectionBuilder.create(url)
                .build();
        setOn();
        hubConnection.start().blockingAwait();
        heartCheck();
        isRunning = true;
    }

发送消息就不多说了,就是包一下。这里加try是为了保证特殊原因连接丢失的情况下,调用send方法不会出错。

断开连接的时候把心跳循环停掉,连接状态也是理所当然的变成false,然后是hubConnection的stop。

建立连接的话,就是把url传入,这里的url是在这个类初始化的时候拿到的。setOn是我自己写的建立监听的函数,发送过来消息都会在setOn中收到,然后通过handler发出去。然后开始的时候要建立心跳连接。当然这块可以放到初始化里。可以优化下。

public SignalRChannel(String url1, android.os.Handler handler) {
        this.url = url1;
        this.receiveHandler = handler;
    }

这是这个类的构造器,url用来建立连接就不多说。这个handler是为了发送消息以及更上层接收消息。

到此为止,第一层封装完了。

接下来是第二层,实现了断线重连,消息去重,记录数据库等操作。数据库选用的框架用的是room。

这一块操作比较多,可能会讲的有点乱。到时候可以看看我的demo消化下。

public ReliableClient(String url1, Context context) {
        this.url = url1;
        this.context = context;
        //创建数据库,如果存在不会重复创建
        db = Room.databaseBuilder(context,
                AppDatabase.class, "database-name").build();
        recordDb = Room.databaseBuilder(context,
                recordDatabase.class,"database-name1").build();
        loadData();
        logFile = new LogFile(context);
        Thread t = new Thread(runnableSend);
        t.start();
    }

这个是构造器,第一个数据库用来存收到的数据,第二个数据库用来处理进度(处理到第几个数据了) 。loadData是获取进度,即刚刚的数据库。logFile是我自己写的类,用于写日志。然后这个线程启动的是短线重连。这里一个ReliableClient可以用单例来实现。

private void loadData() {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                if(recordDb.recordDao().databaseCount()<1){
                    //数据库没有数据,设置为默认值
                    curRecvSeq = -1;
                    authMessage = null;
                    Log.i(TAG,"load <1 ");
                }else if(recordDb.recordDao().databaseCount() == 1){
                    //数据库一条数据,取这条数据
                    recordData messageData = recordDb.recordDao().getRecord();
                    curRecvSeq = messageData.curRecvSeq;
                    authMessage = new AuthRequest(messageData.ClientType,messageData.Token,messageData.UserId,messageData.Version);
                    if(authMessage.ClientType == -1){
                        authMessage = null;
                    }
                    Log.i(TAG,"load = 1 "+curRecvSeq);
                }else {
                    Log.i(TAG,"qweq: "+recordDb.recordDao().databaseCount());
                    //数据库很多数据,取最后一条的数据
                    recordData messageData = recordDb.recordDao().getRecord();
                    curRecvSeq = messageData.curRecvSeq;
                    authMessage = new AuthRequest(messageData.ClientType,messageData.Token,messageData.UserId,messageData.Version);

                    recordDb.recordDao().deleteAll();
                    recordData record1 = new recordData();
                    record1.Token = authMessage.Token;
                    record1.curRecvSeq = messageData.curRecvSeq;
                    record1.Version = authMessage.version;
                    record1.ClientType = authMessage.ClientType;
                    record1.UserId = authMessage.UserId;
                    recordDb.recordDao().insertAll(record1);
                    if(authMessage.ClientType == -1){
                        authMessage = null;
                    }
                    Log.i(TAG,"load > 1 "+curRecvSeq);
                }
            }
        };
        new Thread(runnable).start();
        if(curRecvSeq != -1){
        //如果有操作记录,那么查询数据库,取出未处理的数据,发给flutter。
            List<MessageData> messageDataList = db.userDao().getAll();
            for(MessageData messageData : messageDataList){
                //未操作数据压入哈希表
                hTable.put(messageData.seq,messageData);
                curRecvSeq ++;
            }
        }
        Log.i(TAG,"load msg :"+curRecvSeq);
    }

这个数据库理论上只能存在一条数据,因为是记录嘛,然后这里的逻辑是,当数据库没有数据时,给他一个默认值,标记为初次启动。当一条数据的时候读取这条数据。当出现不可抗力时,出现了多条数据,取出最后一条数据,然后删库不跑路。把这最后一条记录插进数据库。这个记录是为了获取登录信息的,账号,token等。这样他从后台启动起来的时候,还是处于连接状态。

下面是断线重连机制以及消息发送队列机制

private Runnable runnableSend = new Runnable() {
        @Override
        public void run() {
            while(isRunning){
                try {//刷新连接状态
                    if(signalRChannel == null || !signalRChannel.isConnected()){
                        try{
                            reConnect();
                        }catch (Exception e){
                            e.printStackTrace();
                            continue;
                        }
                    }
                    while(!sendMessageQueue.isEmpty()){
                        //发送消息
                        SendMessage sendMessage = sendMessageQueue.poll();
                        signalRChannel.send(sendMessage.method, sendMessage.message);
                    }
                    if(!logFile.fileStatus){
                        logFile.openLog();
                    }
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };

这里还是跑一个死循环线程,反复确认连接状态,如果断开连接的话,执行重连。消息发送也比较简单,放在一个队列里,顺便发送出去。

下面是重连,比较简单就这样看吧。

private void reConnect() {
        if(signalRChannel != null){
            signalRChannel.stopConnect();
        }
        signalRChannel = new SignalRChannel(url,receiveHandler);
        signalRChannel.startConnect();
        if(authMessage!=null){
            signalRChannel.send("Auth",authMessage);
        }
    }

下面是三个开放给外部的方法

   //发送消息
    public void send(String method,Object... messages){
        /**
         * queue
         * */
        if(method.equals("Echo")){
            long time = System.currentTimeMillis()/1000;
            signalRChannel.send(method,String.valueOf(time/1000));
        }else {
            SendMessage sendMessage = new SendMessage(method,messages);
            sendMessageQueue.offer(sendMessage);
        }
    }

    //登录
    public void LogIn(AuthRequest authRequest){
        this.authMessage = authRequest;
        //todo: write file
//        signalRChannel.send("Auth",authMessage);

    }

    //登出
    public void LogOut(){
        authMessage = null;
        if(signalRChannel != null){
            signalRChannel.stopConnect();
        }
        if(logFile.fileStatus){
            logFile.closeLog();
        }
    }

发送消息的时候给他压进消息队列里,等一段时间发送。当然我这里设置时间是4秒,有点不合理,这个需要自己改一下。

然后这里的登录登出只是状态登出了,长链接是一直存在的。

登出是先把authMessage清空,然后断开重连一下,就断开了。

登录只是记录下他的登录信息,因为我们登录走的是另外的方法。

这里大致是这样了,其他很多代码都是跟我们自己的业务相关,我会觉得不具有参考性,就不列出来了。

最后贴一下demo地址: https://github.com/libo1223/signalR