微前端架构设计之 WebSocket API 断连后重连的设计方案

问题

主框架(基座应用) WebSocket 断连后重连成功,但是 SubApp(子应用) 重新订阅 WebSocket 失败。

原因

框架重连成功后会重新订阅 topic,是正常的。但是 SubApp 或者组件生命周期内的 $socket 实例并未更新,其实例的 _connected 属性 false 导致重新订阅 topic失败。

此处存在的问题:

  1. 不应该直接暴露 this.$socket 实例,因为该实例本身存在内部 dirty state
  2. WebSocket 的连接与重连不应该由调用方判断当前状态,增加了调用方的复杂度
  3. 连接异常、连接断开、连接失败,调用方并不知情
  4. 重连成功后应该重置 this.$socket 实例的引用

重新设计

目标

  • 调用方不需要处理 socket 内部的状态,直接连接
  • 连接失败或者断开连接后,调用方将收到错误消息
  • 首次或重新连接成功后,调用方的订阅将被重新发起

思路

使用发布订阅(pub-sub)观察者模式重新设计接口。WebSocket 的连接分成几个事件下发给调用方,这样调用发只需要注册各种 listener 函数,达到跟 socket 本身状态完全解耦的目的:

微前端架构设计之 WebSocket API  断连后重连的设计方案

这里存在的问题是:

register('connected', listenerFn, failedFn) 可能发生在 $socket 连接成功前 ,也可能发生在 连接成功后

所以在统一把 listenerFn 放进 queue 里面,如果 $socket.isConnectedtrue 则直接执行 listenerFn,否则给回 pending 状态,待 socket 连接成功后再执行 listenerFn.

微前端架构设计之 WebSocket API  断连后重连的设计方案

API

  • pub-sub & callback style [推荐]

无论是首次连接成功,还是重连成功,onMessage callback function 都会被执行。
无论是首次连接失败,还是重连失败,onError callback function 都会被执行 。

PS:

  • onMessage 函数在收到服务端推送至客户端的数据时会被执行,若没有数据返回,则不会执行。
  • onMessage 在重连后,会立即被执行。这样做的目的是 立即清除客户端 socket 的异常状态
const SUBSCRIBE_API = '/broadcast/message/user/${userId}'

this.$socket.subscribe(SUBSCRIBE_API, function onMessage(data) {
	console.log(data)
})
}, function onError(error) {
	console.log(error)
})

// 关闭当前订阅
this.$socket.unsubscribe(SUBSCRIBE_API)
  • promise style [不推荐]
    只有首次连接成功,resolve function 才会被执行。
    只有首次连接失败,rejection function 才会被执行 。

PS:
Promise 只会在 subscribe 执行的时候触发一次 resolve 或者 reject。因此该方式 __只能用于在订阅成功后立刻收到消息便关闭 __当前 topic 的这种连接。

export default {
	created() {
		const SUBSCRIBE_API = '/broadcast/message/user/${userId}'

		this.$socket.subscribe(SUBSCRIBE_API).then((data) => {
			// connect success
			console.log(data)
		}).catch(error => {
			// connect failed
			console.error(error)
		})
	},

	beforeDestroy(){
		// 关闭当前订阅
		this.$socket.unsubscribe(SUBSCRIBE_API)
	}
}

DEMO

订阅 topic

this.$socket.subscribe(
      this.SUBSCRIBE_API,
      function handleSignatureMessage(data) {
        if (this.socketErrorMsg) {
          this.socketErrorMsg = null
        }

        console.log(data)
      },
      function handleSignatureError(error) {
        this.socketErrorMsg =
          error instanceof Error ? error.message : '' + error
      }
    )

关闭订阅

// 清空错误
this.socketErrorMsg = null
// 关闭订阅
this.$socket.unsubscribe(this.$SUBSCRIBE_API)

最终代码

ChannelSubscribe 只实现了 Socket 订阅,使用静态属性 _map存放所有的 ChannelSubscribe 实例,实现前文提到的 listeners queue。通过静态方法 ChannelSubscribe.Run 重启当前 ChannelSubscribe._map 中的所有 _subscribe 方法。而 _subscribe 和 _unsubscribe 方法需要在 SocketChannel 中被具体实现,所以 ChannelSubscribe 的逻辑可以用到其他的 Channel 实现。比如说 XHRLoopChannel 长轮询通道订阅中,当轮询中断后重启所有的 ChannelSubscribe 实例,实现 ChannelSubscribe 跟 Channel 内部的完全解藕。

  • ChannelSubscribe.js:
/**
 * 将订阅抽离成单独的对象,内聚其本身的属性和行为
 */
export default class ChannelSubscribe {
  /* 用于保存当前所有已存在的 ChannelSubscribe 实例 */
  /** @type {Map<string, ChannelSubscribe>} ChannelSubscribeMap */
  static _map = new Map()

  /**
   * 运行所有的 ChannelSubscribe 实例
   * @param {import('stompjs').Client} client 最新的 Client 对象
   */
  static Run(client) {
    if (ChannelSubscribe._map.size) {
      /** @type {Iterable<ChannelSubscribe>} */
      const subscribes = ChannelSubscribe._map.values()

      for (const subscribe of subscribes) {
        subscribe.run(client)
      }
    }
  }

  /**
   * 实例化一个通道订阅对象 ChannelSubscribe
   * @param {string} api
   * @param {(data: any) => void} listenerFn
   * @param {(error: Error) => void} [failedFn]
   */
  constructor(api) {
    /* 如果此前已存在该订阅,先直接关闭然后重新订阅 */
    if (ChannelSubscribe._map.has(api)) {
      this._unsubscribe(api)
    }

    /* 具体实现和赋值在 this._subscribe 方法内部 */
    /** @type {import('stompjs').Subscription} */
    this._subscription = null

    /**
     * run() => 执行实例 _subscribe 方法
     *
     * @param {import('stompjs').Client} client
     * @returns {Promise<any>}
     */
    this.run = (client) => {
      ChannelSubscribe._map.set(api, this)

      return this._subscribe(client)
    }
  }

  /**
   * interface API 仅定义接口,不做具体实现
   * 具体实现代码在 SocketChannel.subscribe 方法内部
   */
  _subscribe(api) {
    throw new TypeError(
      `Must implements 'ChannelSubscribe._subscribe(${api})' interface.`
    )
  }

  /**
   * 取消订阅在内部实现,因为其不依赖任何外部状态
   *
   * @param {string} api
   */
  _unsubscribe(api) {
    if (ChannelSubscribe._map.has(api)) {
      const instance = ChannelSubscribe._map.get(api)

      /** @type {import('stompjs').Subscription} */
      const subscription = instance._subscription
      subscription && subscription.unsubscribe()

      return ChannelSubscribe._map.delete(api)
    }
  }
}
  • SocketChannel.js:
import Stomp from 'stompjs'
import SockJS from 'sockjs-client'
import ChannelSubscribe from './ChannelSubscribe'
import './helpers/rewrite-receive-info'
import { debug } from '@@/utils/index'
import { handleUserMessage } from './handlers/user-message'
import { handlePublishVersion } from './handlers/publish-version'
import {
  WEB_SOCKET_API,
  SUBSCRIBE_PUB_API,
  SUBSCRIBE_USER_API
} from './config/subscribe-url'
import { MAX_RETRY_COUNT, RECONNECT_DURATION_TIME } from './config/constants'
import {
  CONNECT_ERROR,
  LOST_CONNECTION,
  NOT_CONNECTED
} from './config/error-message'

/* 已重试次数 */
let retryTimes = 0

export default class SocketChannel {
  /* 私有属性 _client 实例引用对象 */
  /** @type {Stomp.Client|null} */
  static _client = null

  /* 私有属性 _error 连接错误实例 */
  /** @type {Error|null} */
  static _error = null

  /**
   * 建立 socket 连接
   * @param {number} userId
   */
  static connect(userId = '') {
    if (!userId) {
      throw new TypeError(
        `The userId is required for 'Socket.connect(userId: number)' method not ${typeof userId}!`
      )
    }

    const socket = new SockJS(WEB_SOCKET_API)
    // http://jmesnil.net/stomp-websocket/doc/
    const client = Stomp.over(socket)

    if (client && client.ws && client.ws instanceof WebSocket) {
      // https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API
      const ws = client.ws

      // https://github.com/sockjs/sockjs-client/issues/176#issuecomment-135124313
      ws.onerror((error) => {
        SocketChannel._client = null
        SocketChannel._error = new Error(error)
      })

      ws.onclose(() => {
        SocketChannel._client = null
        SocketChannel._error = new Error('WebSocket 已断开连接')
      })
    }

    client.connect(
      {},

      () => {
        debug(`连接成功`)

        // 连接成功
        retryTimes = 0
        SocketChannel._client = client
        SocketChannel._error = null

        // 订阅系统内置 topic
        client.subscribe(SUBSCRIBE_PUB_API, handlePublishVersion)
        client.subscribe(
          '/p2p/' + userId + SUBSCRIBE_USER_API,
          handleUserMessage(userId)
        )

        // 订阅 Invokers topic
        ChannelSubscribe.Run(client)
      },

      (error) => {
        debug(`连接失败`)

        // 连接失败
        SocketChannel._client = null
        SocketChannel._error = new Error(error || 'WebSocket 连接失败')

        ChannelSubscribe.Run(client)

        // 尝试重连 ${MAX_RETRY_COUNT} 次
        if (retryTimes <= MAX_RETRY_COUNT) {
          setTimeout(() => {
            debug(`Socket 接口异常,正在重连第${++retryTimes}次`)

            SocketChannel.connect && SocketChannel.connect(userId)
          }, /* 使用 2 的 n 次方代替原先的间隔常量 */ 2 ** retryTimes * RECONNECT_DURATION_TIME)
        }
      }
    )
  }

  /**
   * subscribe => SubApp Invoker subscribe API
   * @param {string} api
   * @param {(data: any) => void} listenerFn
   * @param {(error: Error) => void} [failedFn]
   *
   * 此时需要处理两种情况:
   *  1. 此时的 client 还未连接
   *   1.1 放进 _SubscribeMap 等待连接时按顺序执行连接
   *   1.2 连接成功后执行
   *  2. 此时的 client 已经连接,则直接使用 client 开始订阅
   *   2.1 订阅成功 - resolver
   *   2.2 订阅失败 - rejecter
   */
  static subscribe(api, listenerFn, failedFn) {
    const instance = new ChannelSubscribe(api, listenerFn, failedFn)

    /* 实现 ChannelSubscribe._subscribe 接口 */
    instance._subscribe = (/** @type {Stomp.Client|null} */ client) => {
      const { _client, _error } = SocketChannel

      /** @type {Stomp.Client|null} */
      client = client || _client

      return new Promise((resolve, reject) => {
        const onResolve = listenerFn || ((value) => resolve(value))
        const onReject = failedFn || ((error) => reject(error))

        // 还未连接
        if (!client) {
          return onReject(new Error(NOT_CONNECTED))
        }

        // 连接出现错误
        if (_error) {
          return onReject(new Error(CONNECT_ERROR))
        }

        // 已断开连接
        if (client && !client.connected) {
          return onReject(new Error(LOST_CONNECTION))
        }

        // 已连接
        if (client && client.connected) {
          /* 赋值 instance._subscription 内部属性 */
          instance._subscription = client.subscribe(api, ({ body } = {}) => {
            onResolve(typeof body === 'string' ? JSON.parse(body) : body)
          })

          // 如果之前 ChannelSubscribeMap 中已存在该 api
          // FIXME:即当前接口是重连后发起,需要被立即执行
          // 这样做的目的是清除客户端 socket 连接的异常状态
          if (ChannelSubscribe._map.has(api)) {
            onResolve({})
          }
        }
      })
    }

    return instance.run()
  }

  /**
   * 取消订阅
   * @param {string} api
   */
  static unsubscribe(api) {
    if (ChannelSubscribe._map.has(api)) {
      const instance = ChannelSubscribe._map.get(api)

      return instance._unsubscribe()
    }
  }
}

END