You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
6.4 KiB

10 months ago
from shutil import copyfile
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import SRTFormatter
from VideoService import VideoService
from ChannelService import ChannelService
from DownloadInfoService import DownloadService
from SrtFileService import SrtService
10 months ago
from LoggerUtils import Logger
import time
import os
from func_timeout import func_set_timeout
import operator
import pysrt
9 months ago
from pytubefix import YouTube
from pytubefix.cli import on_progress
10 months ago
class DownLoadUtil:
formatter = SRTFormatter()
proxies = {"http": "http://127.0.0.1:7890",
"https": "https://127.0.0.1:7890"}
def iterateSrt(srtFilePath, videoId, channelId):
# 查询是否存在
if SrtService.checkByVideoId(videoId):
Logger.info("VideoId: {} 已收录", videoId)
return
subs = pysrt.open(srtFilePath)
ordinal = 1
for sub in subs:
srtStartTime = str(sub.start.to_time()).rstrip("0")
srtEndTime = str(sub.end.to_time()).rstrip("0")
SrtService.createOne(videoId, channelId, ordinal,
srtStartTime, srtEndTime, sub.text)
ordinal = ordinal + 1
10 months ago
9 months ago
def downLoadMP3(videoId):
video = VideoService.getOneByVideoId(videoId)
languages = str(video.videoLanguage)
channel = ChannelService.getOneByChannelId(str(video.channelId))
videoUrl = "https://www.youtube.com/watch?v={}".format(videoId)
yt = YouTube(videoUrl, on_progress_callback=on_progress)
ys = yt.streams.get_audio_only()
mp3OutPutPath = "/mnt/srt_file"
# TODO test
mp3OutPutPath = "D:/Work/Code/youtube_dev"
mp3OutPutPath = "{}/{}/{}-{}".format(
mp3OutPutPath, languages, channel.channelId, channel.channelTitle)
if not os.path.exists(mp3OutPutPath):
Logger.info("开始创建文件夹:" + mp3OutPutPath)
os.makedirs(mp3OutPutPath)
fileName = "{}.mp3".format(videoId)
ys.download(output_path=mp3OutPutPath, filename=fileName, mp3=True)
10 months ago
@func_set_timeout(60)
def downloadOne(videoId):
# 获取数据
video = VideoService.getOneByVideoId(videoId)
channel = ChannelService.getOneByChannelId(str(video.channelId))
# 格式化title
videoTitle = str(video.videoTitle)
videoTitle = str(videoTitle).replace("/", u"\u2215")
videoTitle = str(videoTitle).replace("?", "")
videoTitle = str(videoTitle).replace("\\", "")
videoTitle = str(videoTitle).replace("|", "")
videoTitle = str(videoTitle).replace("<", "")
videoTitle = str(videoTitle).replace(">", "")
videoTitle = str(videoTitle).replace(":", "")
# 获取发布时间
videoPublishTime = str(video.videoPublishTime)
videoPublishTime = str(videoPublishTime).split("T")[0]
# 开始下载
Logger.info("开始下载...{}".format(videoId))
try:
9 months ago
mainPath = "D:/Work/Code/youtube_dev/main"
tmpPath = "D:/Work/Code/youtube_dev/tmp"
10 months ago
# 获取字幕
languages = str(video.videoLanguage)
storePath = "/mnt/srt_file"
cpPath = "/mnt/tmp_srt_file"
# TODO 测试代码删除
storePath = mainPath
cpPath = tmpPath
storePath = "{}/{}/{}-{}".format(
storePath, languages, channel.channelId, channel.channelTitle)
cpPath = "{}/{}/{}-{}".format(
cpPath, languages, channel.channelId, channel.channelTitle)
1
10 months ago
if not os.path.exists(storePath):
Logger.info("开始创建文件夹:" + storePath)
os.makedirs(storePath)
if not os.path.exists(cpPath):
Logger.info("开始创建文件夹:" + cpPath)
os.makedirs(cpPath)
storePath = "{}/{}.srt".format(storePath, videoId)
cpPath = "{}/{}.srt".format(cpPath, videoId)
9 months ago
# 判断文件是否存在
if os.path.exists(storePath):
Logger.info("{}已存在",storePath)
return
10 months ago
if len(cpPath) > 120:
storePath = storePath[:-20] + ".srt"
cpPath = cpPath[:-20] + ".srt"
videoSrt = YouTubeTranscriptApi.get_transcript(
videoId, languages=[languages])
srt_formatted = DownLoadUtil.formatter.format_transcript(videoSrt)
Logger.info("文件地址...{}".format(storePath))
with open(storePath, 'w', encoding='utf-8') as srt_file:
srt_file.write(srt_formatted)
Logger.info("下载完成...{}".format(videoId))
copyfile(storePath, cpPath)
# 修改video数据
VideoService.updateIsDownloadByVideoId(videoId, 1)
# 修改downloadInfo
downloadInfo = DownloadService.getOneByVideoId(videoId, 1)
if downloadInfo is not None:
DownloadService.updateInfoByVideoId(
videoId, downloadInfo.tryTime + 1, 1, 1)
# 按句读取字幕文件
DownLoadUtil.iterateSrt(storePath, videoId, video.channelId)
10 months ago
except Exception as e:
Logger.error("下载失败...{}".format(videoId))
9 months ago
# logStr = "Exception...{}".format(e)
# Logger.error(logStr)
# downloadInfo = DownloadService.getOneByVideoId(videoId, 1)
# if operator.contains(logStr, "No transcripts"):
# Logger.error("VideoId:{},不存在字幕文件".format(videoId))
# 下载音频文件
# DownLoadUtil.downLoadMP3(videoId)
# DownloadService.updateInfoByVideoId(
# videoId, downloadInfo.tryTime, 1, 1)
# else:
# if downloadInfo is not None:
# if downloadInfo.tryTime >= 5:
# 下载音频文件
# DownLoadUtil.downLoadMP3(videoId)
# DownloadService.updateInfoByVideoId(
# videoId, downloadInfo.tryTime, 1, 1)
# else:
# Logger.info("VideoId:{}开始重试第{}次".format(
# videoId, downloadInfo.tryTime + 1))
# DownloadService.updateInfoByVideoId(
# videoId, downloadInfo.tryTime + 1, 0, 1)