Compare commits

...

14 Commits

  1. 66
      keyword_analyse/main.go
  2. 16
      main_program/common/Constant.go
  3. 13
      main_program/config.yml
  4. 26
      main_program/config/EnvConfig.go
  5. 139
      main_program/config/config.go
  6. 23
      main_program/entity/Channel.go
  7. 13
      main_program/entity/DownloadInfo.go
  8. 11
      main_program/entity/KetWord.go
  9. 17
      main_program/entity/SrtFile.go
  10. 30
      main_program/entity/Video.go
  11. 16
      main_program/entity/WorldResultSet.go
  12. 27
      main_program/go.mod
  13. 43
      main_program/go.sum
  14. 108
      main_program/keyword_analyse/KeywordAnalyseService.go
  15. 78
      main_program/main.go
  16. BIN
      main_program/moveData/channel_region.xlsx
  17. 202
      main_program/moveData/moveDataService.go
  18. 21
      main_program/service/ChannelService.go
  19. 21
      main_program/service/KeyWordService.go
  20. 55
      main_program/service/SrtFileService.go
  21. 26
      main_program/service/VideoService.go
  22. 15
      main_program/service/WordResultSetService.go

66
keyword_analyse/main.go

@ -6,27 +6,30 @@ import (
"keyword_analyse/config"
"keyword_analyse/entity"
"strings"
"sync"
_ "github.com/mattn/go-sqlite3"
)
var (
dbStr string = "D:/Work/Code/youtube_dev/youtube_prod/db/youtube_prod.db"
dbWriteLock sync.Mutex
)
func main() {
config.InitConfig()
config.Logger.Info("aaaaa")
db, err := sql.Open("sqlite3", "E:/code/youtube_prod/db/youtube_prod.db")
DB, err := sql.Open("sqlite3", dbStr)
if err != nil {
config.Logger.Fatal(err)
}
defer db.Close()
config.Logger.Info("连接成功")
// 获取关键字
keyWordList := list.New()
SrtFileList := list.New()
keyWordRows, err := db.Query("SELECT * FROM keyWord")
keyWordRows, err := DB.Query("SELECT * FROM keyWord")
if err != nil {
config.Logger.Fatal(err)
}
defer keyWordRows.Close()
for keyWordRows.Next() {
// entity := &entity.KeyWord{1, "aa", "aaa"}
entity := new(entity.KeyWord)
@ -35,12 +38,15 @@ func main() {
}
keyWordList.PushBack(entity)
}
// 获取未匹配的字幕
rows, err := db.Query("SELECT * FROM srtFile WHERE isScan = 0")
keyWordRows.Close()
continueFlag := true
for continueFlag {
// 获取未匹配的字幕,每次1000行
rows, err := DB.Query("SELECT * FROM srtFile WHERE isScan = 0 limit 500")
if err != nil {
config.Logger.Fatal(err)
}
defer rows.Close()
SrtFileList := list.New()
for rows.Next() {
entity := new(entity.SrtFile)
if err := rows.Scan(&entity.Id, &entity.VideoId, &entity.ChannelId, &entity.Ordinal, &entity.SrtStartTime, &entity.SrtEndTime, &entity.SrtText, &entity.IsScan); err != nil {
@ -48,21 +54,55 @@ func main() {
}
SrtFileList.PushBack(entity)
}
config.Logger.Infof("keyWordLen:%d,srtFileLen:%d", keyWordList.Len(), SrtFileList.Len())
rows.Close()
var wg sync.WaitGroup
for e := SrtFileList.Front(); e != nil; e = e.Next() {
wg.Add(1)
srtFile := e.Value.(*entity.SrtFile)
go analyse(*srtFile, *keyWordList)
go analyse(*srtFile, *keyWordList, &wg)
}
wg.Wait()
config.Logger.Info("所有携程执行完毕")
tmpRows, _ := DB.Query("SELECT * FROM srtFile WHERE isScan = 0 limit 1")
continueFlag = tmpRows.Next()
tmpRows.Close()
}
}
func analyse(srtFile entity.SrtFile, keyWordList list.List) {
func analyse(srtFile entity.SrtFile, keyWordList list.List, wg *sync.WaitGroup) {
defer wg.Done()
filterList := list.New()
for e := keyWordList.Front(); e != nil; e = e.Next() {
keyWord := e.Value.(*entity.KeyWord)
if strings.Contains(srtFile.SrtText, keyWord.Word) {
config.Logger.Info(keyWord.Word)
} else {
config.Logger.Info("no contains")
filterList.PushBack(keyWord)
}
}
dbWriteLock.Lock()
DB, _ := sql.Open("sqlite3", dbStr)
// 存入WordResutlSet
for e := filterList.Front(); e != nil; e = e.Next() {
keyWord := e.Value.(*entity.KeyWord)
// INSERT INTO "main"."srtfile" ("id", "videoId", "channelId", "ordinal", "srtStartTime", "srtEndTime", "srtText", "isScan") VALUES (1, 'oZhBWA3HNhA', 'UC67Wr_9pA4I0glIxDt_Cpyw', 1, '00:00:30.74', '00:00:35.579', '皆さんおはようございますさあ今日も', 1);
insterSql, _ := DB.Prepare("INSERT INTO 'main'.'World_Result_Set' ('keyWordId','wordText','srtId','srtOrdinal','srtText','videoId') VALUES(?,?,?,?,?,?)")
_, err := insterSql.Exec(keyWord.Id, keyWord.Word, srtFile.Id, srtFile.Ordinal, srtFile.SrtText, srtFile.VideoId)
if err != nil {
config.Logger.Error(err)
}
insterSql.Close()
}
// 设置isScan为1
sqlStr, err := DB.Prepare("UPDATE srtFile SET isScan = 1 WHERE id = ?")
if err != nil {
config.Logger.Error(err)
}
_, err = sqlStr.Exec(srtFile.Id)
if err != nil {
config.Logger.Error(err)
}
sqlStr.Close()
DB.Close()
dbWriteLock.Unlock()
}

16
main_program/common/Constant.go

@ -0,0 +1,16 @@
package common
import (
config "main_program/config"
"gorm.io/gorm"
)
var ConfigFile string
var MyEnv config.EnvConfig
var Command string
var Table string
var Dbpath string
var XlsxPath string
var AnalyseRegion string
var MysqlDB *gorm.DB

13
main_program/config.yml

@ -0,0 +1,13 @@
log:
logenv: dev
logpath: main_program
command: move_data
movedata:
table: Channel
dbpath: D:/Work/Code/youtube_dev/youtube_prod.db
xlsxpath: D:/Work/Code/youtube_dev/youtube-golang/main_program/moveData/channel_region.xlsx
mysql:
host: 47.108.20.249:3306
user: root
password: casino888!
database: youtube

26
main_program/config/EnvConfig.go

@ -0,0 +1,26 @@
package config
type EnvConfig struct {
Log LogEntity
Command string
MoveData MoveDataEntity
Mysql MysqlEntity
}
type LogEntity struct {
LogEnv string
LogPath string
}
type MoveDataEntity struct {
Table string
DBPath string
XlsxPath string
}
type MysqlEntity struct {
Host string
User string
Password string
Database string
}

139
main_program/config/config.go

@ -0,0 +1,139 @@
package config
import (
"bytes"
"fmt"
"io"
"os"
"path"
"time"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/logrusorgru/aurora"
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
)
const (
maxAgeHour = 168
rotationHour = 24
)
var isInit bool
var Logger *logrus.Logger
func init() {
isInit = false
}
func checkInitLog() bool {
return isInit
}
func InitConfig(logEnv string, logPath string) {
if checkInitLog() {
return
}
Logger = initLogger(logEnv, logPath)
}
func initLogger(env string, logPath string) *logrus.Logger {
logFilePath := ""
errFilePath := ""
if dir, err := os.Getwd(); err == nil {
logFilePath = dir + "/logs/" + logPath + "/all.log"
errFilePath = dir + "/logs/" + logPath + "/error.log"
}
accessWriter, err := rotatelogs.New(
logFilePath+".%Y-%m-%d",
rotatelogs.WithLinkName(logFilePath),
rotatelogs.WithRotationTime(time.Hour*rotationHour),
rotatelogs.WithMaxAge(time.Hour*maxAgeHour),
)
if err != nil {
panic(err)
}
errorWriter, err := rotatelogs.New(
errFilePath+".%Y-%m-%d",
rotatelogs.WithLinkName(errFilePath),
rotatelogs.WithRotationTime(time.Hour*rotationHour),
rotatelogs.WithMaxAge(time.Hour*maxAgeHour),
)
if err != nil {
panic(err)
}
allWriter := io.MultiWriter(accessWriter, errorWriter)
//实例化
logger := logrus.New()
logger.Out = os.Stdout
//设置日志格式
logger.SetFormatter(&ConsoleFormatter{})
logger.SetReportCaller(true)
//设置输出
if env != "dev" {
logger.AddHook(lfshook.NewHook(
lfshook.WriterMap{
logrus.DebugLevel: accessWriter,
logrus.InfoLevel: accessWriter,
logrus.ErrorLevel: allWriter,
logrus.PanicLevel: allWriter,
},
&SaveFormatter{},
))
}
//设置日志级别
logger.SetLevel(logrus.DebugLevel)
Logger = logger
isInit = true
return logger
}
type ConsoleFormatter struct {
}
func (m *ConsoleFormatter) Format(entry *logrus.Entry) ([]byte, error) {
var b *bytes.Buffer
if entry.Buffer != nil {
b = entry.Buffer
} else {
b = &bytes.Buffer{}
}
timestamp := entry.Time.Format("2006-01-02 15:04:05")
var newLog string
var levelString aurora.Value
switch entry.Level.String() {
case "info":
levelString = aurora.Green(entry.Level)
case "warning":
levelString = aurora.Yellow(entry.Level)
case "debug":
levelString = aurora.Gray(16-1, entry.Level)
case "error":
levelString = aurora.Red(entry.Level)
case "fatal":
levelString = aurora.Red(entry.Level)
case "panic":
levelString = aurora.Red(entry.Level)
}
newLog = fmt.Sprintf("[%s] [%s:%d] [%s] %s \n", timestamp, path.Base(entry.Caller.File), entry.Caller.Line, levelString, entry.Message)
b.WriteString(newLog)
return b.Bytes(), nil
}
type SaveFormatter struct {
}
func (m *SaveFormatter) Format(entry *logrus.Entry) ([]byte, error) {
var b *bytes.Buffer
if entry.Buffer != nil {
b = entry.Buffer
} else {
b = &bytes.Buffer{}
}
timestamp := entry.Time.Format("2006-01-02 15:04:05")
newLog := fmt.Sprintf("[%s] [%s:%d] [%s] %s \n", timestamp, path.Base(entry.Caller.File), entry.Caller.Line, entry.Level, entry.Message)
b.WriteString(newLog)
return b.Bytes(), nil
}

23
main_program/entity/Channel.go

@ -0,0 +1,23 @@
package entity
type ChannelCopy struct {
Id int
ChannelId string
ChannelTitle string
ChannelLanguage string
ChannelReptileTime *string
Is_Copy int
}
type Channel struct {
Id uint `gorm:"column:id;primaryKey;autoIncrement"`
ChannelId string `gorm:"column:channelId;type:varchar(255);not null"`
ChannelTitle string `gorm:"column:channelTitle;type:varchar(255);not null"`
ChannelLanguage string `gorm:"column:channelLanguage;type:varchar(255);not null"`
ChannelReptileTime *string `gorm:"column:channelReptileTime;type:varchar(255);default:null"`
Region string `gorm:"column:region;type:varchar(255);default:null"`
}
func (Channel) TableName() string {
return "Channel"
}

13
main_program/entity/DownloadInfo.go

@ -0,0 +1,13 @@
package entity
type DownloadInfo struct {
Id uint `gorm:"column:id;primaryKey;autoIncrement"`
VideoId string `gorm:"column:videoId;type:varchar(255);not null"`
DownloadType int `gorm:"column:downloadType;not null"`
TryTime int `gorm:"column:tryTIme;not null"`
IsFinished int `gorm:"column:isFinished;not null"`
}
func (DownloadInfo) TableName() string {
return "Download_info"
}

11
main_program/entity/KetWord.go

@ -0,0 +1,11 @@
package entity
type Keyword struct {
Id uint `gorm:"column:id;primaryKey;autoIncrement"`
Region string `gorm:"column:region;type:varchar(255);not null"`
Word string `gorm:"column:word;type:varchar(255);not null"`
}
func (Keyword) TableName() string {
return "Keyword"
}

17
main_program/entity/SrtFile.go

@ -0,0 +1,17 @@
package entity
type Srtfile struct {
Id uint `gorm:"column:id;primaryKey;autoIncrement"`
VideoId string `gorm:"column:videoId;type:varchar(255);not null"`
ChannelId string `gorm:"column:channelId;type:varchar(255);not null"`
Ordinal int `gorm:"column:ordinal;type:int(11);not null"`
SrtStartTime string `gorm:"column:srtStartTime;type:varchar(255);not null"`
SrtEndTime string `gorm:"column:srtEndTime;type:varchar(255);not null"`
SrtText string `gorm:"column:srtText;type:varchar(255);not null"`
IsScan int `gorm:"column:isScan;type:int(11);default:null"`
}
// TableName sets the insert table name for this struct type
func (Srtfile) TableName() string {
return "Srtfile"
}

30
main_program/entity/Video.go

@ -0,0 +1,30 @@
package entity
type VideoCopy struct {
Id int
VideoId string
ChannelId string
VideoTitle string
VideoLen int
VideoType string
VideoPublishTime string
VideoLanguage string
IsDownload int
IsCopy int
}
type Video struct {
Id uint `gorm:"column:id;primaryKey;autoIncrement"`
VideoId string `gorm:"column:videoId;type:varchar(255);not null"`
ChannelId string `gorm:"column:channelId;type:varchar(255);not null"`
VideoTitle string `gorm:"column:videoTitle;type:varchar(255);not null"`
VideoLen int `gorm:"column:videoLen;type:int;not null"`
VideoType string `gorm:"column:videoType;type:varchar(255);not null"`
VideoPublishTime string `gorm:"column:videoPublishTime;type:varchar(255);not null"`
VideoLanguage string `gorm:"column:videoLanguage;type:varchar(255);not null"`
IsDownload int `gorm:"column:isDownload;type:int;not null"`
}
func (Video) TableName() string {
return "Videos"
}

16
main_program/entity/WorldResultSet.go

@ -0,0 +1,16 @@
package entity
type WorldResultSet struct {
Id uint `gorm:"column:id;primaryKey;autoIncrement"`
KeyWordId int `gorm:"column:keyWordId"`
WordText string `gorm:"column:wordText"`
SrtId int `gorm:"column:srtId"`
SrtOrdinal int `gorm:"column:srtOrdinal"`
SrtText string `gorm:"column:srtText"`
VideoId string `gorm:"column:videoId"`
}
// TableName 设置 WorldResultSet 表名
func (WorldResultSet) TableName() string {
return "World_Result_Set"
}

27
main_program/go.mod

@ -0,0 +1,27 @@
module main_program
go 1.21.13
require (
github.com/sirupsen/logrus v1.9.3
gopkg.in/yaml.v2 v2.4.0
)
require (
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
gorm.io/gorm v1.25.7 // indirect
)
require (
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/mattn/go-sqlite3 v1.14.22
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/tealeg/xlsx v1.0.5
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
gorm.io/driver/mysql v1.5.7
)

43
main_program/go.sum

@ -0,0 +1,43 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ=
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 h1:mZHayPoR0lNmnHyvtYjDeq0zlVHn9K/ZXoy17ylucdo=
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5/go.mod h1:GEXHk5HgEKCvEIIrSpFI3ozzG5xOKA2DVlEX/gGnewM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE=
github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo=
gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM=
gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=

108
main_program/keyword_analyse/KeywordAnalyseService.go

@ -0,0 +1,108 @@
package keywordanalyse
import (
"main_program/common"
"main_program/config"
"main_program/entity"
service "main_program/service"
"strings"
"time"
)
type keywordAnalyse struct{}
var KeywordAnalyseService keywordAnalyse
func (m *keywordAnalyse) Start() {
config.Logger.Info("开始逐句解析...")
var videoId string
var err error
// 获取需要解析的videoId
if common.AnalyseRegion == "" {
config.Logger.Info("直接获取VideoId")
videoId, err = service.SrtFileService.QueryOneNotScanVideoId()
if err != nil {
config.Logger.Info("没有需要解析的SrtFile...")
return
}
} else {
config.Logger.Info("通过common.AnalyseRegion获取VideoId")
videoId, err = service.SrtFileService.QueryOneNotScanVideoIdByRegion(common.AnalyseRegion)
if err != nil {
config.Logger.Info("没有需要解析的SrtFile...")
return
}
}
continueFlag := true
for continueFlag {
config.Logger.Infof("需要解析的VideoId:%s", videoId)
// 根据videoId获取channel
video, err := service.VideoService.QueryOneByVideoId(videoId)
if err != nil {
config.Logger.Infof("获取video失败,videoId:%s", videoId)
}
// 根据channelId获取channel
channel, err := service.ChannelService.QueryOneByChannelId(video.ChannelId)
if err != nil {
config.Logger.Infof("获取Channel失败,channelId:%s", video.ChannelId)
}
// 根据region获取关键字
keywords, err := service.KeywordService.QueryKeywordsByRegion(channel.Region)
if err != nil {
config.Logger.Infof("获取keywords失败,region:%s", channel.Region)
}
// 获取所有srtFile
srtFiles, err := service.SrtFileService.QuerySrtFilesByVideoId(videoId)
if err != nil {
config.Logger.Infof("获取srtFiles失败,videoId:%s", videoId)
}
for i := 0; i < len(srtFiles); i++ {
analyse(srtFiles[i], keywords)
}
// 5秒后循环获取
config.Logger.Info("5秒后循环获取")
time.Sleep(5 * time.Second)
if common.AnalyseRegion == "" {
videoId, err = service.SrtFileService.QueryOneNotScanVideoId()
if err != nil {
config.Logger.Info("没有需要解析的SrtFile...")
continueFlag = false
}
} else {
config.Logger.Info("通过common.AnalyseRegion获取VideoId")
videoId, err = service.SrtFileService.QueryOneNotScanVideoIdByRegion(common.AnalyseRegion)
if err != nil {
config.Logger.Info("没有需要解析的SrtFile...")
continueFlag = false
}
}
}
}
func analyse(srtFile entity.Srtfile, keywords []entity.Keyword) {
for i := 0; i < len(keywords); i++ {
keyword := keywords[i]
if strings.Contains(srtFile.SrtText, keyword.Word) {
config.Logger.Infof("srtFileId: %d Ordinal: %d text: %s word: %s", srtFile.Id, srtFile.Ordinal, srtFile.SrtText, keyword.Word)
// 匹配完成后,输入到result
wordResultSet := entity.WorldResultSet{KeyWordId: int(keyword.Id), WordText: keyword.Word, SrtId: int(srtFile.Id),
SrtOrdinal: srtFile.Ordinal, SrtText: srtFile.SrtText, VideoId: srtFile.VideoId}
err := service.WordResultSetService.InsterOneByEntity(wordResultSet)
if err != nil {
config.Logger.Error("存储解析结果失败...")
return
}
}
}
// 修改srtFile状态
err := service.SrtFileService.UpdateIsScanById(int(srtFile.Id), 1)
if err != nil {
config.Logger.Error("更新srtFile失败")
}
}

78
main_program/main.go

@ -0,0 +1,78 @@
package main
import (
"flag"
"fmt"
"io/ioutil"
common "main_program/common"
"main_program/config"
keywordanalyse "main_program/keyword_analyse"
move_data "main_program/moveData"
"gopkg.in/yaml.v2"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
func init() {
flag.StringVar(&common.ConfigFile, "cf", "config.yml", "配置文件名")
flag.StringVar(&common.Command, "cmd", "", "命令")
flag.StringVar(&common.Table, "table", "", "需要迁移的表")
flag.StringVar(&common.Dbpath, "sqlite", "", "sqlite文件地址")
flag.StringVar(&common.XlsxPath, "excel", "", "excel文件地址")
flag.StringVar(&common.AnalyseRegion, "analyse", "", "解析的地区")
}
func main() {
flag.Parse()
//读取配置文件
data, _ := ioutil.ReadFile(common.ConfigFile)
err := yaml.Unmarshal(data, &common.MyEnv)
if err != nil {
fmt.Println("读取配置文件错误...")
return
}
config.InitConfig(common.MyEnv.Log.LogEnv, common.MyEnv.Log.LogPath)
config.Logger.Info("初始化Logger成功...")
// 初始化myslq
config.Logger.Info("=====================")
config.Logger.Info(common.MyEnv.Command)
config.Logger.Info("=====================")
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", common.MyEnv.Mysql.User, common.MyEnv.Mysql.Password, common.MyEnv.Mysql.Host, common.MyEnv.Mysql.Database)
common.MysqlDB, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Error),
})
// 判断command
command := common.MyEnv.Command
if common.Command != "" {
command = common.Command
}
config.Logger.Infof("Command:%s", command)
if command == "move_data" {
// 判断参数
table := common.MyEnv.MoveData.Table
if common.Table != "" {
table = common.Table
}
dbPath := common.MyEnv.MoveData.DBPath
if common.Dbpath != "" {
dbPath = common.Dbpath
}
xlsxPath := common.MyEnv.MoveData.XlsxPath
if common.XlsxPath != "" {
xlsxPath = common.XlsxPath
}
if table == "" || dbPath == "" {
config.Logger.Error("move_data配置文件错误...")
return
}
config.Logger.Infof("开始调用move_data方法")
config.Logger.Infof("table:%s dbPath:%s xlsxPath:%s", table, dbPath, xlsxPath)
move_data.MoveDataService.Start(table, dbPath, common.MyEnv.Mysql, xlsxPath)
} else if command == "keyword_analyse" {
keywordanalyse.KeywordAnalyseService.Start()
}
}

BIN
main_program/moveData/channel_region.xlsx

Binary file not shown.

202
main_program/moveData/moveDataService.go

@ -0,0 +1,202 @@
package move_data
import (
"container/list"
"database/sql"
"fmt"
"main_program/config"
"main_program/entity"
"regexp"
_ "github.com/mattn/go-sqlite3"
"github.com/tealeg/xlsx"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
type moveDataService struct{}
var MoveDataService moveDataService
func (m *moveDataService) Start(table string, sqlitePath string, mysqlConfig config.MysqlEntity, xlsxPath string) {
config.Logger.Info("开始迁移sqlite3到mysql...")
sqliteDB, err := sql.Open("sqlite3", sqlitePath)
if err != nil {
config.Logger.Fatal(err)
}
config.Logger.Info("连接成功sqlite3...")
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", mysqlConfig.User, mysqlConfig.Password, mysqlConfig.Host, mysqlConfig.Database)
mysqlDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
config.Logger.Fatal("数据库连接失败...")
}
config.Logger.Info("连接成功mysql...")
if table == "Channel" {
if xlsxPath == "" {
config.Logger.Error("xlsxPath not existx...")
return
}
m.moveChannel(sqliteDB, mysqlDB, xlsxPath)
} else if table == "Videos" {
m.moveVideos(sqliteDB, mysqlDB)
} else if table == "DownLoadInfo" {
m.moveDownLoadInfo(mysqlDB)
}
}
func (m *moveDataService) moveChannel(sqliteDB *sql.DB, mysqlDB *gorm.DB, xlsxPath string) {
config.Logger.Info("读取xlsx获取region...")
file, err := xlsx.OpenFile(xlsxPath)
if err != nil {
config.Logger.Fatalf("Error opening file: %s", err)
}
config.Logger.Info("开始迁移Channel表...")
// 从sqlite3获取前50个Channel
continueFlag := true
for continueFlag {
rows, err := sqliteDB.Query("SELECT * FROM Channel_copy WHERE is_copy = 0 limit 50")
if err != nil {
config.Logger.Fatal(err)
}
ChannelList := list.New()
for rows.Next() {
channelCopy := new(entity.ChannelCopy)
if err := rows.Scan(&channelCopy.Id, &channelCopy.ChannelId, &channelCopy.ChannelTitle, &channelCopy.ChannelLanguage, &channelCopy.ChannelReptileTime, &channelCopy.Is_Copy); err != nil {
config.Logger.Fatal(err)
}
ChannelList.PushBack(channelCopy)
}
rows.Close()
if ChannelList.Len() <= 0 {
break
}
// 放入mysql里
for e := ChannelList.Front(); e != nil; e = e.Next() {
channelCopy := e.Value.(*entity.ChannelCopy)
config.Logger.Info(channelCopy)
channel := new(entity.Channel)
channel.ChannelId = channelCopy.ChannelId
channel.ChannelTitle = channelCopy.ChannelTitle
channel.ChannelLanguage = channelCopy.ChannelLanguage
channel.ChannelReptileTime = channelCopy.ChannelReptileTime
channel.Region = m.getRegionByChannelId(channel.ChannelId, file)
result := mysqlDB.Create(&channel)
if result.Error != nil {
config.Logger.Fatal(result.Error)
}
// 修改sqlite里状态
sqlStr, err := sqliteDB.Prepare("UPDATE Channel_copy SET is_copy = 1 WHERE id = ?")
if err != nil {
config.Logger.Error(err)
}
_, err = sqlStr.Exec(channelCopy.Id)
if err != nil {
config.Logger.Error(err)
}
sqlStr.Close()
}
tmpRows, _ := sqliteDB.Query("SELECT * FROM Channel_copy WHERE is_copy = 0 limit 50")
continueFlag = tmpRows.Next()
tmpRows.Close()
}
config.Logger.Info("完成迁移Channel表...")
}
func (m *moveDataService) getRegionByChannelId(channnelId string, file *xlsx.File) string {
for _, sheet := range file.Sheets {
for _, row := range sheet.Rows {
if row.Cells[3].Value == channnelId {
return row.Cells[2].Value
}
}
}
return ""
}
func (m *moveDataService) moveVideos(sqliteDB *sql.DB, mysqlDB *gorm.DB) {
config.Logger.Info("开始迁移Videos表...")
continueFlag := true
count := 0
for continueFlag {
rows, err := sqliteDB.Query("SELECT * FROM Vidoes_copy WHERE isCopy = 0 and channelId != '' limit 1000")
if err != nil {
config.Logger.Fatal(err)
}
videoList := list.New()
for rows.Next() {
videoCopy := new(entity.VideoCopy)
if err := rows.Scan(&videoCopy.Id, &videoCopy.VideoId, &videoCopy.ChannelId, &videoCopy.VideoTitle, &videoCopy.VideoLen,
&videoCopy.VideoType, &videoCopy.VideoPublishTime,
&videoCopy.VideoLanguage, &videoCopy.IsDownload, &videoCopy.IsCopy); err != nil {
config.Logger.Fatal(err)
}
videoList.PushBack(videoCopy)
}
rows.Close()
var videos []entity.Video
for e := videoList.Front(); e != nil; e = e.Next() {
videoCpoy := e.Value.(*entity.VideoCopy)
video := new(entity.Video)
video.VideoId = videoCpoy.VideoId
video.ChannelId = videoCpoy.ChannelId
re := regexp.MustCompile("[\U00010000-\U0010ffff\u2600-\u27BF\u1f300-\u1f64F\u1f680-\u1f6FF\u2700-\u27BF]+")
video.VideoTitle = re.ReplaceAllString(videoCpoy.VideoTitle, "")
video.VideoLen = videoCpoy.VideoLen
video.VideoType = videoCpoy.VideoType
video.VideoPublishTime = videoCpoy.VideoPublishTime
video.VideoLanguage = videoCpoy.VideoLanguage
video.IsDownload = videoCpoy.IsDownload
videos = append(videos, *video)
}
result := mysqlDB.CreateInBatches(videos, 100)
if result.Error != nil {
config.Logger.Error(result.Error)
break
}
for e := videoList.Front(); e != nil; e = e.Next() {
videoCpoy := e.Value.(*entity.VideoCopy)
// 修改sqlite里状态
sqlStr, err := sqliteDB.Prepare("UPDATE Vidoes_copy SET isCopy = 1 WHERE id = ?")
if err != nil {
config.Logger.Error(err)
}
_, err = sqlStr.Exec(videoCpoy.Id)
if err != nil {
config.Logger.Error(err)
}
sqlStr.Close()
}
count += 1
config.Logger.Infof("count:%d", count)
tmpRows, _ := sqliteDB.Query("SELECT * FROM Vidoes_copy WHERE isCopy = 0 and channelId != '' limit 1")
continueFlag = tmpRows.Next()
tmpRows.Close()
}
config.Logger.Info("完成迁移Videos表...")
}
func (m *moveDataService) moveDownLoadInfo(mysqlDB *gorm.DB) {
continueFlag := true
count := 1
for continueFlag {
// 从videos表里获取,直接放到DownLoadInfo,并全部设置为未下载状态
sqlStr := "SELECT v.* FROM Videos v WHERE NOT EXISTS ( SELECT * FROM Download_info d WHERE v.videoId = d.videoId ) LIMIT 1000;"
var videos []entity.Video
mysqlDB.Raw(sqlStr).Scan(&videos)
var downloadInfos []entity.DownloadInfo
for _, video := range videos {
downloadInfo := new(entity.DownloadInfo)
downloadInfo.DownloadType = 1
downloadInfo.IsFinished = 0
downloadInfo.TryTime = 0
downloadInfo.VideoId = video.VideoId
downloadInfos = append(downloadInfos, *downloadInfo)
}
mysqlDB.CreateInBatches(downloadInfos, 1000)
count += 1
config.Logger.Infof("count: %d", count)
// 判断是否还存在
existsStr := "SELECT EXISTS(SELECT v.* FROM Videos v WHERE NOT EXISTS ( SELECT * FROM Download_info d WHERE v.videoId = d.videoId ) LIMIT 1);"
mysqlDB.Raw(existsStr).Scan(&continueFlag)
}
}

21
main_program/service/ChannelService.go

@ -0,0 +1,21 @@
package service
import (
"errors"
"main_program/common"
"main_program/entity"
)
type channelService struct{}
var ChannelService channelService
func (c *channelService) QueryOneByChannelId(channelId string) (channel entity.Channel, err error) {
var channelEntity entity.Channel
result := common.MysqlDB.Where(&entity.Channel{ChannelId: channelId}).First(&channelEntity)
if result.Error != nil {
return channelEntity, errors.New("query channel error")
} else {
return channelEntity, nil
}
}

21
main_program/service/KeyWordService.go

@ -0,0 +1,21 @@
package service
import (
"errors"
"main_program/common"
"main_program/entity"
)
type keywordService struct{}
var KeywordService keywordService
func (k *keywordService) QueryKeywordsByRegion(region string) (keywords []entity.Keyword, err error) {
var keywordEntitys []entity.Keyword
result := common.MysqlDB.Where(&entity.Keyword{Region: region}).Find(&keywordEntitys)
if result.Error != nil {
return nil, errors.New("query keywords faild")
} else {
return keywordEntitys, nil
}
}

55
main_program/service/SrtFileService.go

@ -0,0 +1,55 @@
package service
import (
"errors"
"main_program/common"
"main_program/config"
"main_program/entity"
"gorm.io/gorm"
)
type srtFileService struct{}
var SrtFileService srtFileService
func (srt *srtFileService) QueryOneNotScanVideoId() (videoId string, err error) {
var srtFile entity.Srtfile
result := common.MysqlDB.Where(&entity.Srtfile{IsScan: 0}).First(&srtFile)
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return "", errors.New("no srtFile")
} else {
return srtFile.VideoId, nil
}
}
func (srt *srtFileService) QuerySrtFilesByVideoId(videoId string) (srtFiles []entity.Srtfile, err error) {
var srtFileEntitys []entity.Srtfile
result := common.MysqlDB.Where(&entity.Srtfile{VideoId: videoId}).Find(&srtFileEntitys)
if result.Error != nil {
return nil, errors.New("query srtFiles failed")
} else {
return srtFileEntitys, nil
}
}
func (srt *srtFileService) QueryOneNotScanVideoIdByRegion(region string) (videoId string, err error) {
var srtFile entity.Srtfile
sqlStr := "SELECT Srtfile.* FROM Srtfile JOIN Channel ON Srtfile.channelId = Channel.channelId WHERE Channel.region = ? and Srtfile.isScan = 0 limit 1;"
result := common.MysqlDB.Raw(sqlStr, region).First(&srtFile)
if result.Error != nil {
return "", errors.New("no srtFile")
} else {
return srtFile.VideoId, nil
}
}
func (srt *srtFileService) UpdateIsScanById(id int, isScan int) (err error) {
result := common.MysqlDB.Model(&entity.Srtfile{Id: uint(id)}).Update("isScan", isScan)
if result.Error != nil {
config.Logger.Error(result.Error)
return errors.New("update srtFile failed")
} else {
return nil
}
}

26
main_program/service/VideoService.go

@ -0,0 +1,26 @@
package service
import (
"errors"
"main_program/common"
entity "main_program/entity"
"gorm.io/gorm"
)
type videoService struct{}
var VideoService videoService
func (v *videoService) QueryOneByVideoId(videoId string) (video entity.Video, err error) {
var videoEntity entity.Video
result := common.MysqlDB.Where(&entity.Video{VideoId: videoId}).First(&videoEntity)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return videoEntity, errors.New("no video")
}
return videoEntity, errors.New("query video failed")
} else {
return videoEntity, nil
}
}

15
main_program/service/WordResultSetService.go

@ -0,0 +1,15 @@
package service
import (
"main_program/common"
"main_program/entity"
)
type wordResultSetService struct{}
var WordResultSetService wordResultSetService
func (w *wordResultSetService) InsterOneByEntity(wordResultSet entity.WorldResultSet) (err error) {
result := common.MysqlDB.Create(&wordResultSet)
return result.Error
}
Loading…
Cancel
Save