|
|
|
package move_data
|
|
|
|
|
|
|
|
import (
|
|
|
|
"container/list"
|
|
|
|
"database/sql"
|
|
|
|
"fmt"
|
|
|
|
"main_program/config"
|
|
|
|
"main_program/entity"
|
|
|
|
"regexp"
|
|
|
|
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
|
|
"github.com/tealeg/xlsx"
|
|
|
|
"gorm.io/driver/mysql"
|
|
|
|
"gorm.io/gorm"
|
|
|
|
)
|
|
|
|
|
|
|
|
type moveDataService struct{}
|
|
|
|
|
|
|
|
var MoveDataService moveDataService
|
|
|
|
|
|
|
|
func (m *moveDataService) Start(table string, sqlitePath string, mysqlConfig config.MysqlEntity, xlsxPath string) {
|
|
|
|
config.Logger.Info("开始迁移sqlite3到mysql...")
|
|
|
|
sqliteDB, err := sql.Open("sqlite3", sqlitePath)
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Fatal(err)
|
|
|
|
}
|
|
|
|
config.Logger.Info("连接成功sqlite3...")
|
|
|
|
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", mysqlConfig.User, mysqlConfig.Password, mysqlConfig.Host, mysqlConfig.Database)
|
|
|
|
mysqlDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Fatal("数据库连接失败...")
|
|
|
|
}
|
|
|
|
config.Logger.Info("连接成功mysql...")
|
|
|
|
if table == "Channel" {
|
|
|
|
if xlsxPath == "" {
|
|
|
|
config.Logger.Error("xlsxPath not existx...")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
m.moveChannel(sqliteDB, mysqlDB, xlsxPath)
|
|
|
|
} else if table == "Videos" {
|
|
|
|
m.moveVideos(sqliteDB, mysqlDB)
|
|
|
|
} else if table == "DownLoadInfo" {
|
|
|
|
m.moveDownLoadInfo(mysqlDB)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *moveDataService) moveChannel(sqliteDB *sql.DB, mysqlDB *gorm.DB, xlsxPath string) {
|
|
|
|
config.Logger.Info("读取xlsx获取region...")
|
|
|
|
file, err := xlsx.OpenFile(xlsxPath)
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Fatalf("Error opening file: %s", err)
|
|
|
|
}
|
|
|
|
config.Logger.Info("开始迁移Channel表...")
|
|
|
|
// 从sqlite3获取前50个Channel
|
|
|
|
continueFlag := true
|
|
|
|
for continueFlag {
|
|
|
|
rows, err := sqliteDB.Query("SELECT * FROM Channel_copy WHERE is_copy = 0 limit 50")
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Fatal(err)
|
|
|
|
}
|
|
|
|
ChannelList := list.New()
|
|
|
|
for rows.Next() {
|
|
|
|
channelCopy := new(entity.ChannelCopy)
|
|
|
|
if err := rows.Scan(&channelCopy.Id, &channelCopy.ChannelId, &channelCopy.ChannelTitle, &channelCopy.ChannelLanguage, &channelCopy.ChannelReptileTime, &channelCopy.Is_Copy); err != nil {
|
|
|
|
config.Logger.Fatal(err)
|
|
|
|
}
|
|
|
|
ChannelList.PushBack(channelCopy)
|
|
|
|
}
|
|
|
|
rows.Close()
|
|
|
|
if ChannelList.Len() <= 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
// 放入mysql里
|
|
|
|
for e := ChannelList.Front(); e != nil; e = e.Next() {
|
|
|
|
channelCopy := e.Value.(*entity.ChannelCopy)
|
|
|
|
config.Logger.Info(channelCopy)
|
|
|
|
channel := new(entity.Channel)
|
|
|
|
channel.ChannelId = channelCopy.ChannelId
|
|
|
|
channel.ChannelTitle = channelCopy.ChannelTitle
|
|
|
|
channel.ChannelLanguage = channelCopy.ChannelLanguage
|
|
|
|
channel.ChannelReptileTime = channelCopy.ChannelReptileTime
|
|
|
|
channel.Region = m.getRegionByChannelId(channel.ChannelId, file)
|
|
|
|
result := mysqlDB.Create(&channel)
|
|
|
|
if result.Error != nil {
|
|
|
|
config.Logger.Fatal(result.Error)
|
|
|
|
}
|
|
|
|
// 修改sqlite里状态
|
|
|
|
sqlStr, err := sqliteDB.Prepare("UPDATE Channel_copy SET is_copy = 1 WHERE id = ?")
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Error(err)
|
|
|
|
}
|
|
|
|
_, err = sqlStr.Exec(channelCopy.Id)
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Error(err)
|
|
|
|
}
|
|
|
|
sqlStr.Close()
|
|
|
|
}
|
|
|
|
tmpRows, _ := sqliteDB.Query("SELECT * FROM Channel_copy WHERE is_copy = 0 limit 50")
|
|
|
|
continueFlag = tmpRows.Next()
|
|
|
|
tmpRows.Close()
|
|
|
|
}
|
|
|
|
config.Logger.Info("完成迁移Channel表...")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *moveDataService) getRegionByChannelId(channnelId string, file *xlsx.File) string {
|
|
|
|
for _, sheet := range file.Sheets {
|
|
|
|
for _, row := range sheet.Rows {
|
|
|
|
if row.Cells[3].Value == channnelId {
|
|
|
|
return row.Cells[2].Value
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *moveDataService) moveVideos(sqliteDB *sql.DB, mysqlDB *gorm.DB) {
|
|
|
|
config.Logger.Info("开始迁移Videos表...")
|
|
|
|
continueFlag := true
|
|
|
|
count := 0
|
|
|
|
for continueFlag {
|
|
|
|
rows, err := sqliteDB.Query("SELECT * FROM Vidoes_copy WHERE isCopy = 0 and channelId != '' limit 1000")
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Fatal(err)
|
|
|
|
}
|
|
|
|
videoList := list.New()
|
|
|
|
for rows.Next() {
|
|
|
|
videoCopy := new(entity.VideoCopy)
|
|
|
|
if err := rows.Scan(&videoCopy.Id, &videoCopy.VideoId, &videoCopy.ChannelId, &videoCopy.VideoTitle, &videoCopy.VideoLen,
|
|
|
|
&videoCopy.VideoType, &videoCopy.VideoPublishTime,
|
|
|
|
&videoCopy.VideoLanguage, &videoCopy.IsDownload, &videoCopy.IsCopy); err != nil {
|
|
|
|
config.Logger.Fatal(err)
|
|
|
|
}
|
|
|
|
videoList.PushBack(videoCopy)
|
|
|
|
}
|
|
|
|
rows.Close()
|
|
|
|
var videos []entity.Video
|
|
|
|
for e := videoList.Front(); e != nil; e = e.Next() {
|
|
|
|
videoCpoy := e.Value.(*entity.VideoCopy)
|
|
|
|
video := new(entity.Video)
|
|
|
|
video.VideoId = videoCpoy.VideoId
|
|
|
|
video.ChannelId = videoCpoy.ChannelId
|
|
|
|
re := regexp.MustCompile("[\U00010000-\U0010ffff\u2600-\u27BF\u1f300-\u1f64F\u1f680-\u1f6FF\u2700-\u27BF]+")
|
|
|
|
video.VideoTitle = re.ReplaceAllString(videoCpoy.VideoTitle, "")
|
|
|
|
video.VideoLen = videoCpoy.VideoLen
|
|
|
|
video.VideoType = videoCpoy.VideoType
|
|
|
|
video.VideoPublishTime = videoCpoy.VideoPublishTime
|
|
|
|
video.VideoLanguage = videoCpoy.VideoLanguage
|
|
|
|
video.IsDownload = videoCpoy.IsDownload
|
|
|
|
videos = append(videos, *video)
|
|
|
|
}
|
|
|
|
result := mysqlDB.CreateInBatches(videos, 100)
|
|
|
|
if result.Error != nil {
|
|
|
|
config.Logger.Error(result.Error)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
for e := videoList.Front(); e != nil; e = e.Next() {
|
|
|
|
videoCpoy := e.Value.(*entity.VideoCopy)
|
|
|
|
// 修改sqlite里状态
|
|
|
|
sqlStr, err := sqliteDB.Prepare("UPDATE Vidoes_copy SET isCopy = 1 WHERE id = ?")
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Error(err)
|
|
|
|
}
|
|
|
|
_, err = sqlStr.Exec(videoCpoy.Id)
|
|
|
|
if err != nil {
|
|
|
|
config.Logger.Error(err)
|
|
|
|
}
|
|
|
|
sqlStr.Close()
|
|
|
|
}
|
|
|
|
count += 1
|
|
|
|
config.Logger.Infof("count:%d", count)
|
|
|
|
tmpRows, _ := sqliteDB.Query("SELECT * FROM Vidoes_copy WHERE isCopy = 0 and channelId != '' limit 1")
|
|
|
|
continueFlag = tmpRows.Next()
|
|
|
|
tmpRows.Close()
|
|
|
|
}
|
|
|
|
config.Logger.Info("完成迁移Videos表...")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *moveDataService) moveDownLoadInfo(mysqlDB *gorm.DB) {
|
|
|
|
continueFlag := true
|
|
|
|
count := 1
|
|
|
|
for continueFlag {
|
|
|
|
// 从videos表里获取,直接放到DownLoadInfo,并全部设置为未下载状态
|
|
|
|
sqlStr := "SELECT v.* FROM Videos v WHERE NOT EXISTS ( SELECT * FROM Download_info d WHERE v.videoId = d.videoId ) LIMIT 1000;"
|
|
|
|
var videos []entity.Video
|
|
|
|
mysqlDB.Raw(sqlStr).Scan(&videos)
|
|
|
|
var downloadInfos []entity.DownloadInfo
|
|
|
|
for _, video := range videos {
|
|
|
|
config.Logger.Info(video)
|
|
|
|
downloadInfo := new(entity.DownloadInfo)
|
|
|
|
downloadInfo.DownloadType = 1
|
|
|
|
downloadInfo.IsFinished = 0
|
|
|
|
downloadInfo.TryTime = 0
|
|
|
|
downloadInfo.VideoId = video.VideoId
|
|
|
|
downloadInfos = append(downloadInfos, *downloadInfo)
|
|
|
|
}
|
|
|
|
mysqlDB.CreateInBatches(downloadInfos, 1000)
|
|
|
|
count += 1
|
|
|
|
config.Logger.Infof("count: %d", count)
|
|
|
|
// 判断是否还存在
|
|
|
|
existsStr := "SELECT EXISTS(SELECT v.* FROM Videos v WHERE NOT EXISTS ( SELECT * FROM Download_info d WHERE v.videoId = d.videoId ) LIMIT 1);"
|
|
|
|
mysqlDB.Raw(existsStr).Scan(&continueFlag)
|
|
|
|
}
|
|
|
|
}
|