package move_data import ( "container/list" "database/sql" "fmt" "main_program/config" "main_program/entity" "regexp" _ "github.com/mattn/go-sqlite3" "github.com/tealeg/xlsx" "gorm.io/driver/mysql" "gorm.io/gorm" ) type moveDataService struct{} var MoveDataService moveDataService func (m *moveDataService) Start(table string, sqlitePath string, mysqlConfig config.MysqlEntity, xlsxPath string) { config.Logger.Info("开始迁移sqlite3到mysql...") sqliteDB, err := sql.Open("sqlite3", sqlitePath) if err != nil { config.Logger.Fatal(err) } config.Logger.Info("连接成功sqlite3...") dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", mysqlConfig.User, mysqlConfig.Password, mysqlConfig.Host, mysqlConfig.Database) mysqlDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { config.Logger.Fatal("数据库连接失败...") } config.Logger.Info("连接成功mysql...") if table == "Channel" { if xlsxPath == "" { config.Logger.Error("xlsxPath not existx...") return } m.moveChannel(sqliteDB, mysqlDB, xlsxPath) } else if table == "Videos" { m.moveVideos(sqliteDB, mysqlDB) } else if table == "DownLoadInfo" { m.moveDownLoadInfo(mysqlDB) } } func (m *moveDataService) moveChannel(sqliteDB *sql.DB, mysqlDB *gorm.DB, xlsxPath string) { config.Logger.Info("读取xlsx获取region...") file, err := xlsx.OpenFile(xlsxPath) if err != nil { config.Logger.Fatalf("Error opening file: %s", err) } config.Logger.Info("开始迁移Channel表...") // 从sqlite3获取前50个Channel continueFlag := true for continueFlag { rows, err := sqliteDB.Query("SELECT * FROM Channel_copy WHERE is_copy = 0 limit 50") if err != nil { config.Logger.Fatal(err) } ChannelList := list.New() for rows.Next() { channelCopy := new(entity.ChannelCopy) if err := rows.Scan(&channelCopy.Id, &channelCopy.ChannelId, &channelCopy.ChannelTitle, &channelCopy.ChannelLanguage, &channelCopy.ChannelReptileTime, &channelCopy.Is_Copy); err != nil { config.Logger.Fatal(err) } ChannelList.PushBack(channelCopy) } rows.Close() if ChannelList.Len() <= 0 { break } // 放入mysql里 for e := ChannelList.Front(); e != nil; e = e.Next() { channelCopy := e.Value.(*entity.ChannelCopy) config.Logger.Info(channelCopy) channel := new(entity.Channel) channel.ChannelId = channelCopy.ChannelId channel.ChannelTitle = channelCopy.ChannelTitle channel.ChannelLanguage = channelCopy.ChannelLanguage channel.ChannelReptileTime = channelCopy.ChannelReptileTime channel.Region = m.getRegionByChannelId(channel.ChannelId, file) result := mysqlDB.Create(&channel) if result.Error != nil { config.Logger.Fatal(result.Error) } // 修改sqlite里状态 sqlStr, err := sqliteDB.Prepare("UPDATE Channel_copy SET is_copy = 1 WHERE id = ?") if err != nil { config.Logger.Error(err) } _, err = sqlStr.Exec(channelCopy.Id) if err != nil { config.Logger.Error(err) } sqlStr.Close() } tmpRows, _ := sqliteDB.Query("SELECT * FROM Channel_copy WHERE is_copy = 0 limit 50") continueFlag = tmpRows.Next() tmpRows.Close() } config.Logger.Info("完成迁移Channel表...") } func (m *moveDataService) getRegionByChannelId(channnelId string, file *xlsx.File) string { for _, sheet := range file.Sheets { for _, row := range sheet.Rows { if row.Cells[3].Value == channnelId { return row.Cells[2].Value } } } return "" } func (m *moveDataService) moveVideos(sqliteDB *sql.DB, mysqlDB *gorm.DB) { config.Logger.Info("开始迁移Videos表...") continueFlag := true count := 0 for continueFlag { rows, err := sqliteDB.Query("SELECT * FROM Vidoes_copy WHERE isCopy = 0 and channelId != '' limit 1000") if err != nil { config.Logger.Fatal(err) } videoList := list.New() for rows.Next() { videoCopy := new(entity.VideoCopy) if err := rows.Scan(&videoCopy.Id, &videoCopy.VideoId, &videoCopy.ChannelId, &videoCopy.VideoTitle, &videoCopy.VideoLen, &videoCopy.VideoType, &videoCopy.VideoPublishTime, &videoCopy.VideoLanguage, &videoCopy.IsDownload, &videoCopy.IsCopy); err != nil { config.Logger.Fatal(err) } videoList.PushBack(videoCopy) } rows.Close() var videos []entity.Video for e := videoList.Front(); e != nil; e = e.Next() { videoCpoy := e.Value.(*entity.VideoCopy) video := new(entity.Video) video.VideoId = videoCpoy.VideoId video.ChannelId = videoCpoy.ChannelId re := regexp.MustCompile("[\U00010000-\U0010ffff\u2600-\u27BF\u1f300-\u1f64F\u1f680-\u1f6FF\u2700-\u27BF]+") video.VideoTitle = re.ReplaceAllString(videoCpoy.VideoTitle, "") video.VideoLen = videoCpoy.VideoLen video.VideoType = videoCpoy.VideoType video.VideoPublishTime = videoCpoy.VideoPublishTime video.VideoLanguage = videoCpoy.VideoLanguage video.IsDownload = videoCpoy.IsDownload videos = append(videos, *video) } result := mysqlDB.CreateInBatches(videos, 100) if result.Error != nil { config.Logger.Error(result.Error) break } for e := videoList.Front(); e != nil; e = e.Next() { videoCpoy := e.Value.(*entity.VideoCopy) // 修改sqlite里状态 sqlStr, err := sqliteDB.Prepare("UPDATE Vidoes_copy SET isCopy = 1 WHERE id = ?") if err != nil { config.Logger.Error(err) } _, err = sqlStr.Exec(videoCpoy.Id) if err != nil { config.Logger.Error(err) } sqlStr.Close() } count += 1 config.Logger.Infof("count:%d", count) tmpRows, _ := sqliteDB.Query("SELECT * FROM Vidoes_copy WHERE isCopy = 0 and channelId != '' limit 1") continueFlag = tmpRows.Next() tmpRows.Close() } config.Logger.Info("完成迁移Videos表...") } func (m *moveDataService) moveDownLoadInfo(mysqlDB *gorm.DB) { continueFlag := true count := 1 for continueFlag { // 从videos表里获取,直接放到DownLoadInfo,并全部设置为未下载状态 sqlStr := "SELECT v.* FROM Videos v WHERE NOT EXISTS ( SELECT * FROM Download_info d WHERE v.videoId = d.videoId ) LIMIT 1000;" var videos []entity.Video mysqlDB.Raw(sqlStr).Scan(&videos) var downloadInfos []entity.DownloadInfo for _, video := range videos { downloadInfo := new(entity.DownloadInfo) downloadInfo.DownloadType = 1 downloadInfo.IsFinished = 0 downloadInfo.TryTime = 0 downloadInfo.VideoId = video.VideoId downloadInfos = append(downloadInfos, *downloadInfo) } mysqlDB.CreateInBatches(downloadInfos, 1000) count += 1 config.Logger.Infof("count: %d", count) // 判断是否还存在 existsStr := "SELECT EXISTS(SELECT v.* FROM Videos v WHERE NOT EXISTS ( SELECT * FROM Download_info d WHERE v.videoId = d.videoId ) LIMIT 1);" mysqlDB.Raw(existsStr).Scan(&continueFlag) } }