「从0到1搭建一个IM项目」消息模块开发之消息核心升级改造
[toc]
概况
到目前为止,我们已经将IM项目的信息发送接收逻辑完成了,但是这里我们要进一步完善,我们需要将这个过程提高并发量,这里我们需要将消息模块加入udp连接。
到目前为止,我们的项目目录结构:
HiChat
├── common //放置公共文件
| |——md5.go
│
├── config //做配置文件
│
├── dao//数据库crud
│ |——user.go
|
├── global //放置各种连接池,配置等
│ |——global.go
|
├── initialize //项目初始化文件
│ |——db.go
| |——logger.go
|
├── middlewear //放置web中间件
| |——jwt.go
│
├── models //数据库表设计
│ |——user_basic.go
| |——message.go
|
├── router //路由
│ |——router.go
├── service //对外api
│ |——user.go
├── test //测试文件
│
├── main.go //项目入口
├── go.mod //项目依赖管理
├── go.sum //项目依赖管理
消息接收引入udp连接
在message.go中,recProc(node)
方法用来,接收用户发来的消息, 现在我们这样做:
//recProc 读取websocket用户发送的消息
func recProc(node *Node) {
for {
//获取信息
_, data, err := node.Conn.ReadMessage()
if err != nil {
zap.S().Info("读取消息失败", err)
return
}
//将消息体放入全局channel中
brodMsg(data)
}
}
//全局channel
var upSendChan chan []byte = make(chan []byte, 1024)
func brodMsg(data []byte) {
upSendChan <- data
}
//init方法,运行message包前调用
func init() {
go UdpSendProc()
go UpdRecProc()
}
//UdpSendProc 完成upd数据发送, 连接到udp服务端,将全局channel中的消息体,写入udp服务端
func UdpSendProc() {
udpConn, err := net.DialUDP("udp", nil, &net.UDPAddr{
//192.168.31.147
IP: net.IPv4(127, 0, 0, 1),
Port: 3000,
Zone: "",
})
if err != nil {
zap.S().Info("拨号udp端口失败", err)
return
}
defer udpConn.Close()
for {
select {
case data := <-upSendChan:
_, err := udpConn.Write(data)
if err != nil {
zap.S().Info("写入udp消息失败", err)
return
}
}
}
}
//UpdRecProc 完成udp数据的接收,启动udp服务,获取udp客户端的写入的消息
func UpdRecProc() {
udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 3000,
})
if err != nil {
zap.S().Info("监听udp端口失败", err)
return
}
defer udpConn.Close()
for {
var buf [1024]byte
n, err := udpConn.Read(buf[0:])
if err != nil {
zap.S().Info("读取udp数据失败", err)
return
}
//处理发送逻辑
dispatch(buf[0:n])
}
}
//dispatch 解析消息,聊天类型判断
func dispatch(data []byte) {
//解析消息
msg := Message{}
err := json.Unmarshal(data, &msg)
if err != nil {
zap.S().Info("消息解析失败", err)
return
}
//判断消息类型
switch msg.Type {
case 1: //私聊
sendMsg(msg.TargetId, data)
case 2: //群发
sendGroupMsg(uint(msg.FormId), uint(msg.TargetId), data)
}
}
//sendMs 向用户单聊发送消息
func sendMsg(id int64, msg []byte) {
rwLocker.Lock()
node, ok := clientMap[id]
rwLocker.Unlock()
if !ok {
zap.S().Info("userID没有对应的node")
return
}
zap.S().Info("targetID:", id, "node:", node)
if ok {
node.DataQueue <- msg
}
}
//sendGroupMsg 群发逻辑
func sendGroupMsg(formId, target uint, data []byte) (int, error) {……}
改造到这里,其功能就和上一篇文章一样了,由于测试方法和前面一样,这里也就给大家测试了。
总结
总体上内容简单,其核心就是将原来的recProc()
读取websocket用户发送的消息中,没有将用户发来的信息直接粗暴塞进接收者用户的websocket连接中,而是将消息体仍进全局channel,然后将通过udp连接将消息体从全局channel中写入udp服务端,进行消息解析,逻辑判断,然后进行转发。