[toc]

概况

到目前为止,我们已经将IM项目的信息发送接收逻辑完成了,但是这里我们要进一步完善,我们需要将这个过程提高并发量,这里我们需要将消息模块加入udp连接。

到目前为止,我们的项目目录结构:

HiChat   
    ├── common    //放置公共文件
    |      |——md5.go
    │  
    ├── config    //做配置文件
    │  
    ├── dao//数据库crud
    │     |——user.go
    |
    ├── global    //放置各种连接池,配置等
    │   		|——global.go
    |
    ├── initialize  //项目初始化文件
    │  			|——db.go
    |				|——logger.go
    |
    ├── middlewear  //放置web中间件
    |       |——jwt.go
    │ 
    ├── models      //数据库表设计
    │   		|——user_basic.go
    |       |——message.go
    |
    ├── router   		//路由
    │       |——router.go
    ├── service     //对外api
    │   		|——user.go
    ├── test        //测试文件
    │  
    ├── main.go     //项目入口
    ├── go.mod			//项目依赖管理
    ├── go.sum			//项目依赖管理

消息接收引入udp连接

在message.go中,recProc(node)方法用来,接收用户发来的消息, 现在我们这样做:

//recProc 读取websocket用户发送的消息
func recProc(node *Node) {
	for {
		//获取信息
		_, data, err := node.Conn.ReadMessage()
		if err != nil {
			zap.S().Info("读取消息失败", err)
			return
		}
    
    //将消息体放入全局channel中
		brodMsg(data)
	}
}

//全局channel
var upSendChan chan []byte = make(chan []byte, 1024)

func brodMsg(data []byte) {
	upSendChan <- data
}


//init方法,运行message包前调用
func init() {
	go UdpSendProc()  
	go UpdRecProc()
}


//UdpSendProc 完成upd数据发送, 连接到udp服务端,将全局channel中的消息体,写入udp服务端
func UdpSendProc() {
	udpConn, err := net.DialUDP("udp", nil, &net.UDPAddr{
		//192.168.31.147
		IP:   net.IPv4(127, 0, 0, 1),
		Port: 3000,
		Zone: "",
	})
	if err != nil {
		zap.S().Info("拨号udp端口失败", err)
		return
	}

	defer udpConn.Close()

	for {
		select {
		case data := <-upSendChan:
			_, err := udpConn.Write(data)
			if err != nil {
				zap.S().Info("写入udp消息失败", err)
				return
			}
		}
	}
}


//UpdRecProc 完成udp数据的接收,启动udp服务,获取udp客户端的写入的消息
func UpdRecProc() {
	udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
		IP:   net.IPv4(127, 0, 0, 1),
		Port: 3000,
	})
	if err != nil {
		zap.S().Info("监听udp端口失败", err)
		return
	}

	defer udpConn.Close()

	for {
		var buf [1024]byte
		n, err := udpConn.Read(buf[0:])
		if err != nil {
			zap.S().Info("读取udp数据失败", err)
			return
		}

		//处理发送逻辑
		dispatch(buf[0:n])
	}
}

//dispatch 解析消息,聊天类型判断
func dispatch(data []byte) {
	//解析消息
	msg := Message{}
	err := json.Unmarshal(data, &msg)
	if err != nil {
		zap.S().Info("消息解析失败", err)
		return
	}

	//判断消息类型
	switch msg.Type {
	case 1: //私聊
    sendMsg(msg.TargetId, data)
	case 2: //群发
		sendGroupMsg(uint(msg.FormId), uint(msg.TargetId), data)
	}
}

//sendMs 向用户单聊发送消息
func sendMsg(id int64, msg []byte) {
	rwLocker.Lock()
	node, ok := clientMap[id]
	rwLocker.Unlock()

	if !ok {
		zap.S().Info("userID没有对应的node")
		return
	}

	zap.S().Info("targetID:", id, "node:", node)
	if ok {
		node.DataQueue <- msg
	}
}

//sendGroupMsg 群发逻辑
func sendGroupMsg(formId, target uint, data []byte) (int, error) {……}

改造到这里,其功能就和上一篇文章一样了,由于测试方法和前面一样,这里也就给大家测试了。

总结

总体上内容简单,其核心就是将原来的recProc()读取websocket用户发送的消息中,没有将用户发来的信息直接粗暴塞进接收者用户的websocket连接中,而是将消息体仍进全局channel,然后将通过udp连接将消息体从全局channel中写入udp服务端,进行消息解析,逻辑判断,然后进行转发。