You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
5.4 KiB

from shutil import copyfile
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import SRTFormatter
from LoggerUtils import Logger
import time
import os
from func_timeout import func_set_timeout
import operator
import pysrt
from pytubefix import YouTube
from pytubefix.cli import on_progress
from entity.VideoEntity import Video
from entity.ChannelEntity import Channel
from entity.DownloadInfoEntity import DownloadInfo
from entity.SrtFileEntity import Srtfile
from service.VideoService import VideoService
from service.ChannelService import ChannelService
from service.DownloadInfoService import DownloadInfoService
from service.SrtFileService import SrtFileService
class DownloadUtil:
formatter = SRTFormatter()
# proxies = {"http": "http://127.0.0.1:7890",
# "https": "https://127.0.0.1:7890"}
def iterateSrt(srtFilePath, videoId, channelId):
# 查询是否存在
if SrtFileService.checkExistsByVideoId(videoId):
Logger.info("VideoId: {} 已收录", videoId)
return
subs = pysrt.open(srtFilePath)
srtFiles = []
ordinal = 1
for sub in subs:
srtStartTime = str(sub.start.to_time()).rstrip("0")
if ordinal == 1:
srtStartTime = sub.start.to_time()
srtEndTime = str(sub.end.to_time()).rstrip("0")
srtFile: Srtfile = Srtfile(videoId=videoId, channelId=channelId, ordinal=ordinal,
srtStartTime=srtStartTime, srtEndTime=srtEndTime, srtText=sub.text, isScan=0)
ordinal = ordinal + 1
srtFiles.append(srtFile)
# 批量插入字幕数据
SrtFileService.insertList(srtFiles=srtFiles)
Logger.info(
f"读取srt文件成功 videoId:{videoId} channelId:{channelId} srtFilePath:{srtFilePath}")
def downLoadMP3(videoId, storePath):
video: Video = VideoService.queryOneByVideoId(videoId)
videoUrl = "https://www.youtube.com/watch?v={}".format(videoId)
yt = YouTube(videoUrl, on_progress_callback=on_progress)
ys = yt.streams.get_audio_only()
mp3OutPutPath = storePath
if not os.path.exists(mp3OutPutPath):
Logger.info("开始创建文件夹:" + mp3OutPutPath)
os.makedirs(mp3OutPutPath)
fileName = "{}".format(videoId)
ys.download(output_path=mp3OutPutPath, filename=fileName, mp3=True)
@func_set_timeout(60)
def downloadOne(videoId, rootPath):
video: Video = VideoService.queryOneByVideoId(videoId=videoId)
channel: Channel = ChannelService.queryOneByChannelId(video.channelId)
# 开始下载
Logger.info("开始下载...{}".format(videoId))
# 配置下载地址 /mnt/youtube_mysql
mainPath = "{}/main".format(rootPath)
tmpPath = "{}/tmp".format(rootPath)
storePath = "{}/{}/{}-{}".format(
mainPath, channel.region, channel.channelId, channel.channelTitle)
cpPath = "{}/{}/{}-{}".format(
tmpPath, channel.region, channel.channelId, channel.channelTitle)
if not os.path.exists(storePath):
Logger.info("开始创建文件夹:" + storePath)
os.makedirs(storePath)
if not os.path.exists(cpPath):
Logger.info("开始创建文件夹:" + cpPath)
os.makedirs(cpPath)
# 获取字幕文件名
storePathSrt = "{}/{}.srt".format(storePath, videoId)
cpPathSrt = "{}/{}.srt".format(cpPath, videoId)
# 判断文件是否存在
if os.path.exists(storePathSrt):
Logger.info("{}已存在", storePathSrt)
return
try:
# 下载字幕文件,并复制
videoSrt = YouTubeTranscriptApi.get_transcript(
videoId, languages=[video.videoLanguage])
srt_formatted = DownloadUtil.formatter.format_transcript(videoSrt)
Logger.info("文件地址...{}".format(storePathSrt))
with open(storePathSrt, 'w', encoding='utf-8') as srt_file:
srt_file.write(srt_formatted)
Logger.info("下载完成...{}".format(videoId))
copyfile(storePathSrt, cpPathSrt)
# 修改video数据
VideoService.upIsDownloadByVideoId(videoId, 1)
# 修改downloadInfo
downloadInfo: DownloadInfo = DownloadInfoService.getOneByVideoId(
videoId=videoId)
if downloadInfo is not None:
DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 1)
DownloadUtil.iterateSrt(storePathSrt, videoId, video.channelId)
except Exception as e:
Logger.error(e)
logStr = "Exception...{}".format(e)
Logger.error(logStr)
# 修改downloadInfo,tryTime + 1
downloadInfo: DownloadInfo = DownloadInfoService.getOneByVideoId(
videoId=videoId)
if operator.contains(logStr, "No transcripts"):
Logger.error("VideoId:{},不存在字幕文件".format(videoId))
# 下载音频文件
DownloadUtil.downLoadMP3(videoId, storePath)
# 更新下载任务
if downloadInfo is not None:
DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 1)
else:
if downloadInfo is not None:
DownloadInfoService.updateIsFinishByVideoId(videoId, 1, 0)