from shutil import copyfile from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.formatters import SRTFormatter from VideoService import VideoService from ChannelService import ChannelService from DownloadInfoService import DownloadService from SrtFileService import SrtService from LoggerUtils import Logger import time import os from func_timeout import func_set_timeout import operator import pysrt class DownLoadUtil: formatter = SRTFormatter() proxies = {"http": "http://127.0.0.1:7890", "https": "https://127.0.0.1:7890"} def iterateSrt(srtFilePath, videoId, channelId): # 查询是否存在 if SrtService.checkByVideoId(videoId): Logger.info("VideoId: {} 已收录", videoId) return subs = pysrt.open(srtFilePath) ordinal = 1 for sub in subs: srtStartTime = str(sub.start.to_time()).rstrip("0") srtEndTime = str(sub.end.to_time()).rstrip("0") SrtService.createOne(videoId, channelId, ordinal, srtStartTime, srtEndTime, sub.text) ordinal = ordinal + 1 @func_set_timeout(60) def downloadOne(videoId): # 获取数据 video = VideoService.getOneByVideoId(videoId) channel = ChannelService.getOneByChannelId(str(video.channelId)) # 格式化title videoTitle = str(video.videoTitle) videoTitle = str(videoTitle).replace("/", u"\u2215") videoTitle = str(videoTitle).replace("?", "?") videoTitle = str(videoTitle).replace("\\", "") videoTitle = str(videoTitle).replace("|", "") videoTitle = str(videoTitle).replace("<", "") videoTitle = str(videoTitle).replace(">", "") videoTitle = str(videoTitle).replace(":", "") # 获取发布时间 videoPublishTime = str(video.videoPublishTime) videoPublishTime = str(videoPublishTime).split("T")[0] # 开始下载 Logger.info("开始下载...{}".format(videoId)) try: mainPath = "E:/Work/youtubedownload/main" tmpPath = "E:/Work/youtubedownload/tmp" # 获取字幕 languages = str(video.videoLanguage) storePath = "/mnt/srt_file" cpPath = "/mnt/tmp_srt_file" # TODO 测试代码删除 storePath = mainPath cpPath = tmpPath storePath = "{}/{}/{}-{}".format( storePath, languages, channel.channelId, channel.channelTitle) cpPath = "{}/{}/{}-{}".format( cpPath, languages, channel.channelId, channel.channelTitle) 1 if not os.path.exists(storePath): Logger.info("开始创建文件夹:" + storePath) os.makedirs(storePath) if not os.path.exists(cpPath): Logger.info("开始创建文件夹:" + cpPath) os.makedirs(cpPath) storePath = "{}/{}.srt".format(storePath, videoId) cpPath = "{}/{}.srt".format(cpPath, videoId) if len(cpPath) > 120: storePath = storePath[:-20] + ".srt" cpPath = cpPath[:-20] + ".srt" videoSrt = YouTubeTranscriptApi.get_transcript( videoId, languages=[languages]) srt_formatted = DownLoadUtil.formatter.format_transcript(videoSrt) Logger.info("文件地址...{}".format(storePath)) with open(storePath, 'w', encoding='utf-8') as srt_file: srt_file.write(srt_formatted) Logger.info("下载完成...{}".format(videoId)) copyfile(storePath, cpPath) # 修改video数据 VideoService.updateIsDownloadByVideoId(videoId, 1) # 修改downloadInfo downloadInfo = DownloadService.getOneByVideoId(videoId, 1) if downloadInfo is not None: DownloadService.updateInfoByVideoId( videoId, downloadInfo.tryTime + 1, 1, 1) # 按句读取字幕文件 DownLoadUtil.iterateSrt(storePath, videoId, video.channelId) except Exception as e: Logger.error("下载失败...{}".format(videoId)) logStr = "Exception...{}".format(e) Logger.error(logStr) downloadInfo = DownloadService.getOneByVideoId(videoId, 1) if operator.contains(logStr, "No transcripts"): Logger.error("VideoId:{},不存在字幕文件".format(videoId)) if downloadInfo is not None: DownloadService.changeDownloadType( videoId, 0, 0, 1, 2) elif operator.contains(logStr, "File name too long"): # 文件名过长 languages = str(video.videoLanguage) videoSrt = YouTubeTranscriptApi.get_transcript( videoId, languages=[languages]) srt_formatted = DownLoadUtil.formatter.format_transcript( videoSrt) storePath = "/mnt/srt_file/" + str(channel.channelTitle) + "/" + \ videoPublishTime + "-" + languages + "-" + videoId + ".srt" cpPath = "/mnt/tmp_srt_file/" + str(channel.channelTitle) + "/" + \ videoPublishTime + "-" + languages + "-" + videoId + ".srt" if len(cpPath) > 120: storePath = storePath[:-20] + ".srt" cpPath = cpPath[:-20] + ".srt" Logger.info("文件名过长,文件地址...{}".format(storePath)) with open(storePath, 'w', encoding='utf-8') as srt_file: srt_file.write(srt_formatted) Logger.info("下载完成...{}".format(videoId)) copyfile(storePath, cpPath) # 修改video数据 VideoService.updateIsDownloadByVideoId(videoId, 1) # 修改downloadInfo downloadInfo = DownloadService.getOneByVideoId(videoId, 1) if downloadInfo is not None: DownloadService.updateInfoByVideoId( videoId, downloadInfo.tryTime + 1, 1, 1) else: if downloadInfo is not None: Logger.info("VideoId:{}开始重试第{}次".format( videoId, downloadInfo.tryTime + 1)) DownloadService.updateInfoByVideoId( videoId, downloadInfo.tryTime + 1, 0, 1) @func_set_timeout(60) def downloadTwo(videoId): # 获取数据 video = VideoService.getOneByVideoId(videoId, 2) channel = ChannelService.getOneByChannelId(str(video.channelId)) # 格式化title videoTitle = str(video.videoTitle) videoTitle = str(videoTitle).replace("/", u"\u2215") videoTitle = str(videoTitle).replace("?", "?") videoTitle = str(videoTitle).replace("\\", "") videoTitle = str(videoTitle).replace("|", "") videoTitle = str(videoTitle).replace("<", "") videoTitle = str(videoTitle).replace(">", "") videoTitle = str(videoTitle).replace(":", "") # 获取发布时间 videoPublishTime = str(video.videoPublishTime) videoPublishTime = str(videoPublishTime).split("T")[0] # 开始下载 Logger.info("开始下载...{}".format(videoId)) try: # 获取字幕 languages = str(video.videoLanguage) storePath = "./download/" + str(channel.channelTitle) if not os.path.exists(storePath): Logger.info("开始创建文件夹:" + storePath) os.makedirs(storePath) storePath = storePath + "\\" + videoPublishTime + \ "-" + languages + "-" + videoTitle + ".srt" videoSrt = YouTubeTranscriptApi.get_transcript( videoId, languages=[languages]) srt_formatted = DownLoadUtil.formatter.format_transcript(videoSrt) Logger.info("文件地址...{}".format(storePath)) with open(storePath, 'w', encoding='utf-8') as srt_file: srt_file.write(srt_formatted) Logger.info("下载完成...{}".format(videoId)) # 修改video数据 VideoService.updateIsDownloadByVideoId(videoId, 1) # 修改downloadInfo downloadInfo = DownloadService.getOneByVideoId(videoId, 2) if downloadInfo is not None: DownloadService.updateInfoByVideoId( videoId, downloadInfo.tryTime + 1, 1, 2) except Exception as e: Logger.error("下载失败...{}".format(videoId)) logStr = "Exception...{}".format(e) Logger.error(logStr) downloadInfo = DownloadService.getOneByVideoId(videoId, 2) if operator.contains(logStr, "No transcripts"): Logger.error("VideoId:{},不存在字幕文件".format(videoId)) if downloadInfo is not None: DownloadService.changeDownloadType( videoId, 6, 0, 2, 3) else: if downloadInfo is not None: Logger.info("VideoId:{}开始重试第{}次".format( videoId, downloadInfo.tryTime + 1)) DownloadService.updateInfoByVideoId( videoId, downloadInfo.tryTime + 1, 0, 2)