Go异步读取MySQL

有些使用我们面临在数据量很多的情况,例如之前在公司实习,就需要在多个6亿条记录的MySQL数据库中查询数据,面对这庞大是数据量,我们只能使用异步的方法对数据进行读取,下面是自己的一点想法:

func getTest(guid, start, end, table string, db *database.GormMysqlDB, model interface{}) ([]LogResp, error) {
    // 检查 model 的实际类型
    var esct ExtensionServiceClick
    var esit ExtensionServiceImpress
  	//类型断言
    switch v := model.(type) {
    case ExtensionServiceClick:
       esct = v
    case ExtensionServiceImpress:
       esit = v
    default:
       return nil, fmt.Errorf("未知的结构体类型")
    }

    // 继续处理其他逻辑
    var totalRecords int64
    err := db.ReadTrackingDB.Table(table).Model(&esct).Count(&totalRecords).Error
    if err != nil {
       return nil, err
    }
    fmt.Println("读取总数完成:", table, ": ", totalRecords, esit)

    // 定义每个 Goroutine 处理的记录范围
    recordPerPage := 1000
    numWorkers := 5
    pageSize := 0

    if totalRecords < 5000 {
       pageSize = 1
    } else {
       pageSize = int(totalRecords) / recordPerPage / numWorkers
    }

    // 使用 WaitGroup 等待所有 Goroutine 完成
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    // 使用 Channel 传递记录给主 Goroutine
    recordCh := make(chan interface{})

    // 启动多个 Goroutine 并发读取表数据
    for i := 0; i < numWorkers; i++ {
       go func(offset int) {
          defer wg.Done()

          var records []LogInfo
          // 执行查询语句,读取数据
          err := db.ReadTrackingDB.Table(table).Model(&LogInfo{}).Where("create_time >= ? and create_time <= ?", start, end).Offset(offset * pageSize * recordPerPage).Limit(pageSize * recordPerPage).Find(&records).Error
          if err != nil {
             fmt.Println("查询数据失败:", err)
             return
          }

          for _, record := range records {
             recordCh <- record
          }
       }(i)
    }

    logRsp := make([]LogResp, 0)
    // 启动一个 Goroutine 用于接收并处理数据
    go func() {
       for record := range recordCh {
          // 处理记录
          log := LogResp{
             Time:   timeToStr(record.CreateTime),
             Table:  table,
             Detail: record.Extra,
          }
          logRsp = append(logRsp, log)
          fmt.Println("log:", log, "record:", record)
       }
    }()

    // 等待所有 Goroutine 完成
    wg.Wait()
    close(recordCh)
    fmt.Println("logRsp:", logRsp)

    return nil, nil
}