Go异步读取MySQL
Go异步读取MySQL
有些使用我们面临在数据量很多的情况,例如之前在公司实习,就需要在多个6亿条记录的MySQL数据库中查询数据,面对这庞大是数据量,我们只能使用异步的方法对数据进行读取,下面是自己的一点想法:
func getTest(guid, start, end, table string, db *database.GormMysqlDB, model interface{}) ([]LogResp, error) {
// 检查 model 的实际类型
var esct ExtensionServiceClick
var esit ExtensionServiceImpress
//类型断言
switch v := model.(type) {
case ExtensionServiceClick:
esct = v
case ExtensionServiceImpress:
esit = v
default:
return nil, fmt.Errorf("未知的结构体类型")
}
// 继续处理其他逻辑
var totalRecords int64
err := db.ReadTrackingDB.Table(table).Model(&esct).Count(&totalRecords).Error
if err != nil {
return nil, err
}
fmt.Println("读取总数完成:", table, ": ", totalRecords, esit)
// 定义每个 Goroutine 处理的记录范围
recordPerPage := 1000
numWorkers := 5
pageSize := 0
if totalRecords < 5000 {
pageSize = 1
} else {
pageSize = int(totalRecords) / recordPerPage / numWorkers
}
// 使用 WaitGroup 等待所有 Goroutine 完成
var wg sync.WaitGroup
wg.Add(numWorkers)
// 使用 Channel 传递记录给主 Goroutine
recordCh := make(chan interface{})
// 启动多个 Goroutine 并发读取表数据
for i := 0; i < numWorkers; i++ {
go func(offset int) {
defer wg.Done()
var records []LogInfo
// 执行查询语句,读取数据
err := db.ReadTrackingDB.Table(table).Model(&LogInfo{}).Where("create_time >= ? and create_time <= ?", start, end).Offset(offset * pageSize * recordPerPage).Limit(pageSize * recordPerPage).Find(&records).Error
if err != nil {
fmt.Println("查询数据失败:", err)
return
}
for _, record := range records {
recordCh <- record
}
}(i)
}
logRsp := make([]LogResp, 0)
// 启动一个 Goroutine 用于接收并处理数据
go func() {
for record := range recordCh {
// 处理记录
log := LogResp{
Time: timeToStr(record.CreateTime),
Table: table,
Detail: record.Extra,
}
logRsp = append(logRsp, log)
fmt.Println("log:", log, "record:", record)
}
}()
// 等待所有 Goroutine 完成
wg.Wait()
close(recordCh)
fmt.Println("logRsp:", logRsp)
return nil, nil
}