MQTT是一个轻量级的发布/订阅消息传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。
如何接入MQTT?
一般我们通过paho.mqtt
来接入MQTT。
服务端定义消息的结构:
1 2 3 4 5 6 7
| message Message { required uint64 message_id = 1; required uint64 sender_id = 2; required string content = 3; required uint64 create_time = 4; required uint64 receiver_id = 5; }
|
我们可以把服务端定义的消息结构编译成JS,这样就能够在web端解析消息了。
如何编译?
1
| protoc --js_out=import_style=commonjs,binary:./ messages.proto
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| message ServiceMessage { required uint64 message_id = 1 [jstype = JS_STRING]; // 解决js丢失大数字精度问题 required uint64 user_id = 2; required ServiceType service_type = 3; required MessageType message_type = 4; required string content = 5; required uint64 create_date = 6; required ServiceMessageDirection direction = 7; optional uint64 agent_id = 8; optional uint32 post_proc = 9; optional uint32 to_reply = 10; optional string extend = 11; }
|
注意长数字的转换,会有精度丢失的问题。因为前端的大数字会丢失精度,所以需要在编译的时候将其转成字符串。
引用 paho_mqtt.js: 引用 paho_mqtt.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import sendMessage from './msg_pb.js' import Paho from './paho-mqtt.js'
this.client = new Paho.MQTT.Client(HOSTNAME, PORT, CLIENTID) this.client.onConnectionLost = this.onConnectionLost this.client.onMessageArrived = this.onMessageArrived this.client.onMessageDelivered = this.onMessageDelivered
onConnectionLost(msg) { }
onMessageArrived(msg) { let message = sendMessage.BaseUserUnreadStatics.deserializeBinary(msg.payloadBytes) let protoBuf = message.toObject() }
onMessageDelivered(msg) { }
sendMsg() { let message = new sendMessage.Message() message.setMessageId('xxx') message.setSenderId('xxx') message.setContent('xxx') message.setCreateTime('xxx') message.setReceiverId('xxx') let bytes = message.serializeBinary() let msgSend = new Paho.MQTT.Message((bytes)) msgSend.topic = 'xx' this.client.send(msgSend) }
|
实时通讯遇到的问题(技术+业务)
消息发送和接收的时间有误差,导致消息的顺序不正确。
业务:我们需要实时记录未回复的消息。无法准确评判未回复的通讯对象:
- 按照服务端接收消息的顺序
- 按照客户端接收消息的顺序
HTTP请求和TCP的消息有一个时间差。
- 12:00:00的时候,调用接口获取聊天记录
- 12:00:01的时候,对方发送了一条消息
- 12:00:02的时候,获取聊天记录的接口回来了
此时对方在12:00:01发送的消息就不在消息列表中。
- 消息回执的处理
Qos级别设置为1或2的时候,服务端在接受到消息之后,会发送一条回执消息给用户,用于标识消息已成功发送。我们在mqtt源码里,能找到对应的对消息回执的处理,一般服务端会把消息的id回填过来,需要我们去解析数据:
根据服务端定义的消息的位数(一般是16位或者64位),调用不同的解析方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| function readUint16(buffer, offset) { return 256 * buffer[offset] + buffer[offset + 1] }
function readUint64(buffer, offset) { const uint64 = buffer.slice(offset, offset + 8) const uint64Array = new Uint8Array(uint64) const hexString = Array.from(uint64Array, (byte) => ('0' + byte.toString(16)).slice(-2)).join('') const uint64Str = BigInt(`0x${hexString}`).toString() return uint64Str }
|
开发建议:
- 将聊天记录记录在本地,下次进入页面无需调用聊天记录接口(类似QQ、微信的做法)
- 实时记录未回复的消息,需要给消息新增回执,即对方已经收到消息的标志。类似于钉钉的消息是否已读的功能。