一、背景

公司之前很多涉及到后端需要主动与前端web交互的业务,代码耦合严重,新的业务场景需要即时通信的得重新接入websocket,花费很多时间和精力,因此需要将websocket(缩写为:ws)抽象为公司内部的通讯服务,可以解决业务不同需求,比如:

  1. 1. 业务采用了轮询方式来获取服务器异步请求的结果(支付回调订单、业务订单)。
  2. 2. 系统中有部分业务使用了即时推送功能(反扫二维码定时刷新、充电端口加载刷新)。
  3. 3. 提高系统的响应速率,同步调用重构为异步调用方式,调用结果以websocket方式推送给前端,降低接口延迟性。
  4. 4. 考虑未来有新的业务需要使用websocket即时通讯支撑。

二、目标

  • • 规范ws通讯工程项目结构和写法。
  • • 剔除业务代码,提高接入效率。
  • • 使用推送代替不合理的接口轮询。
  • • 支撑原有同步调用优化为异步调用,接口响应结果通过ws推送给前端,提高系统的整体响应效率。
  • • 使用MQ代替Redis发布订阅和微服务调用

核心设计

图片

项目结构

图片

三、业务流程

3.1 应用关系图

消息推送(Fanout

图片

消息接收处理(Topic)

图片

3.2 业务时序图

从上到下依次为:

  1. 1. websocket客户端注册,连接流程
  2. 2. 推送消息到服务端流程
  3. 3. 推送消息到客户端流程
图片

四、如何保证消息的可靠性传输

在这个架构的设计过程中,如何保证消息不丢失也是项目的一个重点需要解决的技术问题,对应RabbitMq来说,实现上消息丢失的具体情况主要会分为三种:

  1. 1. 生成者把消息发送到RabbitMQ Server过程丢失;
  2. 2. RabbitMQ Server接收到消息后在持久化之前宕机导致消息丢失;
  3. 3. 消费者接收到消息,在即将消费的时候,业务还未做处理,结果进程挂掉了,这时候RabbitMQ会认为已经消费了,导致消息丢失。

✔确保生产者端的可靠性传输方式(两种方式):

方式一:开启事务机制,生产者在发送消息之前开启RabbitMQ事务,然后在发送消息,如果消息没有成功被RabbitMQ接收到,那边生产者会收到异常报错,此时可以执行事务回滚操作channel.txRollback(),然后重试发送。

方式二:开启确认机制,RabbitMQ提供了发送方确认机制(publisher confirm)来确保消息发送成功,关注公众号:码猿技术专栏,回复关键词:11111 获取阿里内部Java性能调优手册!如果成功发送到RabbitMQ Server,MQ会给你回传一个ack消息,确保这个消息已经发送成功,如果MQ没有接收处理到这条消息,会回调你的一个nack()接口,告诉你这个消息接收失败,这时候你可以重试。

两种方式的优缺点分析:

事务机制是同步的,你提交一个事务之后会阻塞在那里,但是comfirm机制是异步的,你发送一个消息之后不需要等待上一个消息的回调即可以接着下一个消息的发送,所以整体的性能、效率会更高,因此一般在生产者这快保证消息的可靠性传输,都是采用confirm机制实现。

✔MQ Server如何保证消息丢失

方式:开始MQ的持久化,就是将消息写入持久到磁盘,哪怕是MQ自己挂了,重启之后会激动读取之前储存的数据,保证数据不丢失。

✔确保消息者端的可靠性传输

方式:关闭自动ack,开启手动确认机制,RabbitMQ提供接收方响应机制(consumer ack)来确保消息成功接收,简单来说就是每次自己在代码里确保业务逻辑处理完之后,在程序中自己ack一把,可以通过调用一个api来实现,如果你还没处理完,就不触发ack,那么RabbitMQ就会认为你还没处理完,这个时候MQ会把这个消息分配给别的consumer做处理,消息是不会丢失的。

图片

五、消息分类

5.1 客户端→服务端

描述

应用场景为客户端主动向服务端推送消息,在服务端执行相应的业务流程。

现有系统中有此应用场景的业务是:反扫二维码请求开始刷新,用户要请求开始刷新启动二维码

5.2 服务端→客户端

描述

应用场景为服务端触发了某个事件,需要推送相应结果到某个客户端。

现有系统中有此应用场景的业务是:支付完成后,等待第三方服务器回调,回调成功结果推送

5.3 客户端 →客户端

描述

应用场景为客户端需要向另一客户端推送消息。

现有系统中有此应用场景的业务是:C端用户发送接口请求,推送响应结果到用户H5页面中

六、Websocket API设计

6.1 请求websocket连接token

请求方式:GET

统一请求接口url:

域名/xxx/websocket/token

{
  "result": 0,
  "description": "无",
  "data":"wss://dws.test.com:8086/socket/asrwqgvsd" //连接url
}

6.2 使用返回的url连接websocket

连接方式: wss

连接url:

wss://dws.test.com:8086/socket/{token}

七、统一消息体

{
  "sendType":"",
"messageType":"消息类型",
"businessType":"",
"fromUniqueId":"发送端唯一id",
"toUniqueId":"接收端唯一id",
"fromClientType":"发送端类型",
"toClientType":"接收端类型",
"timestamp":0,
"content":"业务数据",
"tags":[
"标签集"
],
"businesses":[
"业务集"
]
}

八、统一调用方式

8.1 Websocket API聚合封装

图片

8.2 业务统一调用

图片

总结

本文主要记录我基于对WebSocket做的抽象统一封装实现消息即时通讯功能的整体设计思想,从项目代码设计上采用了DDD的思想建模,降低了代码的耦合程度,不同业务在需要使用ws即时通讯可以做到“即引即用”的效果,不再需要考虑WebSocket接入底层的配置和逻辑。

 

 

后端专属技术群:

构建高质量的技术交流社群,欢迎从事编程开发、技术招聘HR进群,也欢迎大家分享自己公司的内推信息,相互帮助,一起进步!

加我微信拉你进群,备注ittce,不然不加。

扫码领红包

微信赞赏支付宝扫码领红包

发表回复

后才能评论