Browse Source

Merge branch 'master' of http://47.108.20.249:3000/zhangshu/youtube_prod_mysql into master

master^2
zhangshu 5 months ago
parent
commit
8d4ef636ce
  1. 25
      common/DownloadUtils.py
  2. 16
      common/YoutubeUtils.py
  3. 6
      download_video.py
  4. 13
      entity/KeyWordEntity.py
  5. 2179
      hk_tw_names_20240703_v2.csv
  6. 82
      insert_keyword.py
  7. 13
      insert_keyword_config.json
  8. 27
      parse_video.py
  9. 17
      search_video.py
  10. 10
      service/DownloadInfoService.py
  11. 26
      service/KeyWordService.py
  12. 6
      service/SrtFileService.py
  13. 4
      service/VideoService.py
  14. 54
      test.py

25
common/DownloadUtils.py

@ -30,17 +30,24 @@ class DownloadUtil:
Logger.info("VideoId: {} 已收录", videoId) Logger.info("VideoId: {} 已收录", videoId)
return return
subs = pysrt.open(srtFilePath) subs = pysrt.open(srtFilePath)
srtFiles = []
ordinal = 1 ordinal = 1
for sub in subs: for sub in subs:
srtStartTime = str(sub.start.to_time()).rstrip("0") srtStartTime = str(sub.start.to_time()).rstrip("0")
if sub.start.to_time() == "00:00:00":
srtStartTime = sub.start.to_time()
srtEndTime = str(sub.end.to_time()).rstrip("0") srtEndTime = str(sub.end.to_time()).rstrip("0")
SrtFileService.insertOne(videoId=videoId, channelId=channelId, ordinal=ordinal, srtFile: Srtfile = Srtfile(videoId=videoId, channelId=channelId, ordinal=ordinal,
srtStartTime=srtStartTime, srtEndTime=srtEndTime, srtText=sub.text, isScan=0) srtStartTime=srtStartTime, srtEndTime=srtEndTime, srtText=sub.text, isScan=0)
ordinal = ordinal + 1 ordinal = ordinal + 1
srtFiles.append(srtFile)
# 批量插入字幕数据
SrtFileService.insertList(srtFiles=srtFiles)
Logger.info(
f"读取srt文件成功 videoId:{videoId} channelId:{channelId} srtFilePath:{srtFilePath}")
def downLoadMP3(videoId, storePath): def downLoadMP3(videoId, storePath):
video:Video = VideoService.getOneByVideoId(videoId) video: Video = VideoService.queryOneByVideoId(videoId)
channel:Channel = ChannelService.queryOneByChannelId(video.channelId)
videoUrl = "https://www.youtube.com/watch?v={}".format(videoId) videoUrl = "https://www.youtube.com/watch?v={}".format(videoId)
yt = YouTube(videoUrl, on_progress_callback=on_progress) yt = YouTube(videoUrl, on_progress_callback=on_progress)
ys = yt.streams.get_audio_only() ys = yt.streams.get_audio_only()
@ -48,10 +55,10 @@ class DownloadUtil:
if not os.path.exists(mp3OutPutPath): if not os.path.exists(mp3OutPutPath):
Logger.info("开始创建文件夹:" + mp3OutPutPath) Logger.info("开始创建文件夹:" + mp3OutPutPath)
os.makedirs(mp3OutPutPath) os.makedirs(mp3OutPutPath)
fileName = "{}.mp3".format(videoId) fileName = "{}".format(videoId)
ys.download(output_path=mp3OutPutPath, filename=fileName, mp3=True) ys.download(output_path=mp3OutPutPath, filename=fileName, mp3=True)
@func_set_timeout(60) @func_set_timeout(120)
def downloadOne(videoId, rootPath): def downloadOne(videoId, rootPath):
video: Video = VideoService.queryOneByVideoId(videoId=videoId) video: Video = VideoService.queryOneByVideoId(videoId=videoId)
channel: Channel = ChannelService.queryOneByChannelId(video.channelId) channel: Channel = ChannelService.queryOneByChannelId(video.channelId)
@ -98,8 +105,7 @@ class DownloadUtil:
videoId=videoId) videoId=videoId)
if downloadInfo is not None: if downloadInfo is not None:
DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 1) DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 1)
DownloadUtil.iterateSrt(storePath, videoId, video.channelId) DownloadUtil.iterateSrt(storePathSrt, videoId, video.channelId)
pass
except Exception as e: except Exception as e:
Logger.error(e) Logger.error(e)
logStr = "Exception...{}".format(e) logStr = "Exception...{}".format(e)
@ -115,5 +121,6 @@ class DownloadUtil:
if downloadInfo is not None: if downloadInfo is not None:
DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 1) DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 1)
else: else:
Logger.error("VideoId:{},下载失败".format(videoId))
if downloadInfo is not None: if downloadInfo is not None:
DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 0) DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 1)

16
common/YoutubeUtils.py

@ -7,6 +7,7 @@ import time
from entity.ChannelEntity import Channel from entity.ChannelEntity import Channel
from entity.VideoEntity import Video from entity.VideoEntity import Video
from service.ChannelService import ChannelService from service.ChannelService import ChannelService
from service.DownloadInfoService import DownloadInfoService
from service.VideoService import VideoService from service.VideoService import VideoService
@ -25,7 +26,7 @@ class YouTubeUtil:
"AIzaSyARaW3mqO9szQiHgWZR4el0HWvdyheSHBc", "AIzaSyARaW3mqO9szQiHgWZR4el0HWvdyheSHBc",
"AIzaSyChPXesnVx6fweon_BckhR6UiJWvi5Ma4s" "AIzaSyChPXesnVx6fweon_BckhR6UiJWvi5Ma4s"
# "AIzaSyCTBSbq0YjyxTtjmNsnDyKAwHamlv_ST-s", # "AIzaSyCTBSbq0YjyxTtjmNsnDyKAwHamlv_ST-s"
# "AIzaSyAESnwtbTIBtU707iZowtQkmAo-qKuEOcY" # "AIzaSyAESnwtbTIBtU707iZowtQkmAo-qKuEOcY"
@ -121,13 +122,15 @@ class YouTubeUtil:
video: Video = VideoService.queryOneByVideoId(videoId) video: Video = VideoService.queryOneByVideoId(videoId)
if video == None: if video == None:
VideoService.insertOne( VideoService.insertOne(
videoId=videoId, ChannelId=channelId, videoTitle=videoTitle, videoLen=0, videoId=videoId, channelId=channelId, videoTitle=videoTitle, videoLen=0,
videoType=videoType, videoPublishTime=publisTime, videoLanguage=videoLanguage, isDownload=0) videoType=videoType, videoPublishTime=publisTime, videoLanguage=videoLanguage, isDownload=0)
videosRequest = videosRequest + "," + str(videoId) videosRequest = videosRequest + "," + str(videoId)
videosRequestCount = videosRequestCount + 1 videosRequestCount = videosRequestCount + 1
Logger.info( Logger.info(
"存储VideoUrl:https://www.youtube.com/watch?v=" + videoId "存储VideoUrl:https://www.youtube.com/watch?v=" + videoId
) )
# 新增下载任务
DownloadInfoService.insertOne(videoId=videoId)
else: else:
Logger.info("已存在VideoId:{}".format(videoId)) Logger.info("已存在VideoId:{}".format(videoId))
idList.append(str(videoId)) idList.append(str(videoId))
@ -146,12 +149,13 @@ class YouTubeUtil:
) )
videosRequestCount = 0 videosRequestCount = 0
videosRequest = "" videosRequest = ""
except: except Exception as e:
pass Logger.error(e)
# 获取最后一个视频 # 获取最后一个视频
video: Video = VideoService.getLastVideoByChannelId(channelId) video: Video = VideoService.getLastVideoByChannelId(channelId)
ChannelService.updateTimeByChannelId( if video != None:
channelId, video.videoPublishTime) ChannelService.updateTimeByChannelId(
channelId, video.videoPublishTime)
time.sleep(5) time.sleep(5)
# 继续获取下一页 # 继续获取下一页
try: try:

6
download_video.py

@ -43,11 +43,11 @@ if __name__ == "__main__":
exit exit
Logger.info(f"downloadRootPaht: '{downloadRootPath}'") Logger.info(f"downloadRootPaht: '{downloadRootPath}'")
downloadInfo: DownloadInfo = DownloadInfoService.getOneNoFinish() downloadInfo: DownloadInfo = DownloadInfoService.getOneNoFinish()
if downloadInfo != None: while downloadInfo != None:
Logger.info("开始下载videoId:{}".format(downloadInfo.videoId)) Logger.info("开始下载videoId:{}".format(downloadInfo.videoId))
# TODO 下载字幕文件或音频文件 # 下载字幕文件或音频文件
DownloadUtil.downloadOne(downloadInfo.videoId, downloadRootPath) DownloadUtil.downloadOne(downloadInfo.videoId, downloadRootPath)
# 重新获取下一个下载任务 # 重新获取下一个下载任务
downloadInfo: DownloadInfo = DownloadInfoService.getOneNoFinish() downloadInfo = DownloadInfoService.getOneNoFinish()
else: else:
Logger.info("完成下载") Logger.info("完成下载")

13
entity/KeyWordEntity.py

@ -0,0 +1,13 @@
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
# 创建一个基类
Base = declarative_base()
class Keyword(Base):
__tablename__ = 'Keyword'
id = Column(Integer, primary_key=True, autoincrement=True)
region = Column(String(255), nullable=False)
word = Column(String(255), nullable=False)

2179
hk_tw_names_20240703_v2.csv

File diff suppressed because it is too large

82
insert_keyword.py

@ -0,0 +1,82 @@
from LoggerUtils import Logger, initLogger
from bs4 import BeautifulSoup as bs
from urllib.request import urlopen, Request
import json
import Contant
from sqlalchemy import create_engine
from entity.ChannelEntity import Channel
from entity.KeyWordEntity import Keyword
from service.ChannelService import ChannelService
from service.KeyWordService import KeyWordService
import operator
import argparse
import pandas as pd
from common.Utils import getSession
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='')
parser.add_argument('--file', type=str, default='')
args = parser.parse_args()
csvFile = args.file
# 读取配置文件
with open('insert_keyword_config.json', 'r', encoding='utf-8') as f:
# 使用json.load()方法读取文件内容
data = json.load(f)
# 初始化日志
Contant.logDir = data['log']['dir']
Contant.logFileName = data['log']['fileName']
initLogger(Contant.logDir, Contant.logFileName)
# 连接mysql
dbHost = data['mysql']['host']
dbPort = data['mysql']['port']
dbUserName = data['mysql']['username']
dbPassword = data['mysql']['password']
dbDatabase = data['mysql']['database']
Logger.info("尝试连接mysql host:'{}' port:'{}' username:'{}' password:'{}' database:'{}'",
dbHost, dbPort, dbUserName, dbPassword, dbDatabase)
Contant.engin = create_engine(
f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}')
Logger.info("连接mysql成功")
session = getSession()
# 读取csv文件
df = pd.read_csv(csvFile, encoding="utf-8")
length = df.shape[0]
for i in range(0, length):
region = df.iloc[i]['market']
lname = str(df.iloc[i]["lname"]).strip()
sname = str(df.iloc[i]["sname"]).strip()
# 判断是否存在如果不存在就存入
region = "Taiwan"
keyWord: Keyword = session.query(Keyword).filter(
Keyword.region == region, Keyword.word == lname).one_or_none()
if keyWord == None:
session.add(Keyword(region=region, word=lname))
session.commit()
Logger.info(f"region:{region},keyword:{lname}")
keyWord: Keyword = session.query(Keyword).filter(
Keyword.region == region, Keyword.word == sname).one_or_none()
if keyWord == None:
session.add(Keyword(region=region, word=sname))
session.commit()
Logger.info(f"region:{region},keyword:{sname}")
region = "Hongkong"
keyWord: Keyword = session.query(Keyword).filter(
Keyword.region == region, Keyword.word == lname).one_or_none()
if keyWord == None:
session.add(Keyword(region=region, word=lname))
session.commit()
Logger.info(f"region:{region},keyword:{lname}")
keyWord: Keyword = session.query(Keyword).filter(
Keyword.region == region, Keyword.word == sname).one_or_none()
if keyWord == None:
session.add(Keyword(region=region, word=sname))
session.commit()
Logger.info(f"region:{region},keyword:{sname}")

13
insert_keyword_config.json

@ -0,0 +1,13 @@
{
"mysql": {
"host": "47.108.20.249",
"port": "3306",
"username": "root",
"password": "casino888!",
"database": "youtube"
},
"log": {
"dir": "./logs",
"fileName": "insert_keyword"
}
}

27
parse_video.py

@ -18,6 +18,7 @@ import operator
import argparse import argparse
import difflib import difflib
from shutil import copyfile from shutil import copyfile
import shutil
def get_all_files(directory): def get_all_files(directory):
@ -102,20 +103,36 @@ if __name__ == "__main__":
channel: Channel = ChannelService.queryOneByChannelId(key) channel: Channel = ChannelService.queryOneByChannelId(key)
videos = VideoService.queryAllbyChannelId(key) videos = VideoService.queryAllbyChannelId(key)
Logger.info(f"key: {key} len: {len(videos)}") Logger.info(f"key: {key} len: {len(videos)}")
channelRoot = ""
moveCount = 0
for i in range(len(videos)): for i in range(len(videos)):
video: Video = videos[i] video: Video = videos[i]
srtFileName = getSrtFileName(video=video) srtFileName = getSrtFileName(video=video)
Logger.info(f"匹配video: {video.videoId} i:{i}")
for root, dirs, filenames in os.walk(value): for root, dirs, filenames in os.walk(value):
channelRoot = root
breakFlag = False
for filename in filenames: for filename in filenames:
if get_equal_rate_1(srtFileName, filename) > 0.8: diff = get_equal_rate_1(srtFileName, filename)
if diff > 0.7:
src_path = f"{root}/{filename}" src_path = f"{root}/{filename}"
dst_path = f"{newSrtPaht}/{channel.region}/{channel.channelId}-{channel.channelTitle}" dst_path = f"{newSrtPaht}/{channel.region}/{channel.channelId}-{channel.channelTitle}"
if not os.path.exists(dst_path): if not os.path.exists(dst_path):
Logger.info("开始创建文件夹:" + dst_path) Logger.info("开始创建文件夹:" + dst_path)
os.makedirs(dst_path) os.makedirs(dst_path)
dst_path = f"{dst_path}/{video.videoId}.srt" dst_path = f"{dst_path}/{video.videoId}.srt"
Logger.info(f"src_path:{src_path} dst_path:{dst_path}") Logger.info(
copyfile(src_path, dst_path) f"src_path:{src_path} dst_path:{dst_path} diff:{diff} i:{i}")
shutil.move(src_path, dst_path)
moveCount = moveCount + 1
breakFlag = True
break
# 并且读取srt文件到数据库 # 并且读取srt文件到数据库
DownloadUtil.iterateSrt( # DownloadUtil.iterateSrt(
srtFilePath=dst_path, videoId=video.videoId, channelId=channel.channelId) # srtFilePath=dst_path, videoId=video.videoId, channelId=channel.channelId)
if breakFlag:
break
# 删除channel文件
Logger.info(f"删除channelRoot:{channelRoot} 移动总计:{moveCount}")
shutil.rmtree(channelRoot)
time.sleep(5)

17
search_video.py

@ -10,6 +10,7 @@ from common.YoutubeUtils import YouTubeUtil
import operator import operator
import argparse import argparse
# --start="2023-09-10T00:00:01Z" --end="2023-09-11T00:00:01Z"
if __name__ == "__main__": if __name__ == "__main__":
# 读取参数 # 读取参数
parser = argparse.ArgumentParser(description="") parser = argparse.ArgumentParser(description="")
@ -41,9 +42,13 @@ if __name__ == "__main__":
Logger.info("连接mysql成功") Logger.info("连接mysql成功")
# 查询出所有Channel # 查询出所有Channel
# channels = ChannelService.queryAllChannel() channels = ChannelService.queryAllChannel()
# Logger.info("Channels length:{}".format(len(channels))) Logger.info("Channels length:{}".format(len(channels)))
# for channel in channels: for channel in channels:
# channel : Channel = channel channel: Channel = channel
# # 通过channelId查询videos # 通过channelId查询videos
# YouTubeUtil.getByChannelId(channel.channelId) Logger.info(
f"Id:{channel.id} channelId:{channel.channelId} startTime:{startTime} endTime:{endTime}")
YouTubeUtil.getByChannelId(
channelId=channel.channelId, startTime=startTime, endTime=endTime)
# YouTubeUtil.getByChannelId(channelId="UC67Wr_9pA4I0glIxDt_Cpyw",startTime=startTime, endTime=endTime)

10
service/DownloadInfoService.py

@ -22,7 +22,15 @@ class DownloadInfoService:
def updateIsFinishByVideoId(videoId, tryTime, isFinish): def updateIsFinishByVideoId(videoId, tryTime, isFinish):
session = getSession() session = getSession()
updateSql = update(DownloadInfo).where( updateSql = update(DownloadInfo).where(
DownloadInfo.videoId == videoId).values(tryTime=tryTime, isFinish=isFinish) DownloadInfo.videoId == videoId).values(tryTime=tryTime, isFinished=isFinish)
resutl = session.execute(updateSql) resutl = session.execute(updateSql)
session.commit() session.commit()
session.close() session.close()
def insertOne(videoId):
session = getSession()
downloadInfo: DownloadInfo = DownloadInfo(videoId=videoId, downloadType=1,
tryTime=0, isFinished=0)
session.add(downloadInfo)
session.commit()
session.close()

26
service/KeyWordService.py

@ -0,0 +1,26 @@
from entity.KeyWordEntity import Keyword
from common.Utils import getSession
from sqlalchemy import update
class KeyWordService:
def insertOne(region, word):
session = getSession()
keyWord: Keyword = Keyword(region=region, word=word)
session.add(keyWord)
session.commit()
session.close()
def queryOneByRegionWord(region, word):
session = getSession()
keyWord: Keyword = session.query(Keyword).filter(
Keyword.region == region, Keyword.word == word).one_or_none()
session.close()
return keyWord
def insterKeyWords(keyWords):
session = getSession()
session.bulk_save_objects(keyWords)
session.commit()
session.close()

6
service/SrtFileService.py

@ -21,3 +21,9 @@ class SrtFileService:
session.add(srtFile) session.add(srtFile)
session.commit() session.commit()
session.close() session.close()
def insertList(srtFiles):
session = getSession()
session.bulk_save_objects(srtFiles)
session.commit()
session.close()

4
service/VideoService.py

@ -19,9 +19,9 @@ class VideoService:
session.close() session.close()
return videos return videos
def insertOne(videoId, ChannelId, videoTitle, videoLen, videoType, videoPublishTime, videoLanguage, isDownload): def insertOne(videoId, channelId, videoTitle, videoLen, videoType, videoPublishTime, videoLanguage, isDownload):
session = getSession() session = getSession()
video: Video = Video(videoId=videoId, ChannelId=ChannelId, videoTitle=videoTitle, video: Video = Video(videoId=videoId, channelId=channelId, videoTitle=videoTitle,
videoLen=videoLen, videoType=videoType, videoPublishTime=videoPublishTime, videoLen=videoLen, videoType=videoType, videoPublishTime=videoPublishTime,
videoLanguage=videoLanguage, isDownload=isDownload) videoLanguage=videoLanguage, isDownload=isDownload)
session.add(video) session.add(video)

54
test.py

@ -1,3 +1,4 @@
import time
from LoggerUtils import Logger, initLogger from LoggerUtils import Logger, initLogger
from bs4 import BeautifulSoup as bs from bs4 import BeautifulSoup as bs
from urllib.request import urlopen, Request from urllib.request import urlopen, Request
@ -8,9 +9,14 @@ from entity.ChannelEntity import Channel
from entity.VideoEntity import Video from entity.VideoEntity import Video
from service.ChannelService import ChannelService from service.ChannelService import ChannelService
from service.VideoService import VideoService from service.VideoService import VideoService
from service.ChannelService import ChannelService
from service.DownloadInfoService import DownloadInfoService
from common.YoutubeUtils import YouTubeUtil from common.YoutubeUtils import YouTubeUtil
from common.DownloadUtils import DownloadUtil
import operator import operator
import argparse import argparse
import os
from common.Utils import getSession
if __name__ == "__main__": if __name__ == "__main__":
# 读取配置文件 # 读取配置文件
@ -34,13 +40,43 @@ if __name__ == "__main__":
Contant.engin = create_engine( Contant.engin = create_engine(
f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}') f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}')
Logger.info("连接mysql成功") Logger.info("连接mysql成功")
# session = getSession()
# YouTubeUtil测试
# channelId = "UCBM86JVoHLqg9irpR2XKvGw"
# startTime = "2024-08-22T00:00:01Z"
# endTime = "2024-08-24T00:00:01Z"
# YouTubeUtil.getByChannelId(channelId, startTime, endTime)
# download测试
# videoId = "pBSWhJV0VVU"
# channelId = "UCBM86JVoHLqg9irpR2XKvGw"
# rootPath = "D:/Work/Code/youtube_dev/mysql"
# storePath = "D:/Work/Code/youtube_dev/mysql/main/Korea/UCBM86JVoHLqg9irpR2XKvGw-달란트투자"
# srtFilePath = "D:/Work/Code/youtube_dev/mysql/main/Korea/UCBM86JVoHLqg9irpR2XKvGw-달란트투자/pBSWhJV0VVU.srt"
# DownloadUtil.downloadOne(videoId=videoId, rootPath=rootPath)
# DownloadUtil.iterateSrt(srtFilePath=srtFilePath,
# videoId=videoId, channelId=channelId)
# DownloadUtil.downLoadMP3(videoId=videoId, storePath=storePath)
DownloadInfoService.updateIsFinishByVideoId("CaLR6W_cyeI",1,1)
# 遍历字幕文件,并存入数据库
# srtRootPath = "/mnt/mysql_srt_path_tmp"
# for root, dirs, filenames in os.walk(srtRootPath):
# Logger.info(f"root: {root} filesLen:{len(filenames)}")
# for filename in filenames:
# srtPath = f"{root}/{filename}"
# videoId = filename.replace(".srt", "")
# video: Video = session.query(Video).filter(
# Video.videoId == videoId).one_or_none()
# if video == None:
# continue
# channelId = video.channelId
# Logger.info(
# f"videoId: {videoId},channelId :{channelId}, srtPath: {srtPath}")
# DownloadUtil.iterateSrt(srtFilePath=srtPath,
# videoId=videoId, channelId=channelId)
# os.remove(srtPath)
videoId = "oZhBWA3HNhA" # 关闭session
video = VideoService.queryOneByVideoId(videoId) # session.close()
Logger.info(video)
# VideoService.updateLenByVideoId(videoId, 5344)
video = VideoService.getLastVideoByChannelId("UC67Wr_9pA4I0glIxDt_Cpyw")
if video == None:
Logger.info("meiyou")
else:
Logger.info(video.videoPublishTime)

Loading…
Cancel
Save