from shutil import copyfile from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.formatters import SRTFormatter from VideoService import VideoService from ChannelService import ChannelService from DownloadInfoService import DownloadService from SrtFileService import SrtService from LoggerUtils import Logger import time import os from func_timeout import func_set_timeout import operator import pysrt from pytubefix import YouTube from pytubefix.cli import on_progress class DownLoadUtil: formatter = SRTFormatter() proxies = {"http": "http://127.0.0.1:7890", "https": "https://127.0.0.1:7890"} def iterateSrt(srtFilePath, videoId, channelId): # 查询是否存在 if SrtService.checkByVideoId(videoId): Logger.info("VideoId: {} 已收录", videoId) return subs = pysrt.open(srtFilePath) ordinal = 1 for sub in subs: srtStartTime = str(sub.start.to_time()).rstrip("0") srtEndTime = str(sub.end.to_time()).rstrip("0") SrtService.createOne(videoId, channelId, ordinal, srtStartTime, srtEndTime, sub.text) ordinal = ordinal + 1 def downLoadMP3(videoId): video = VideoService.getOneByVideoId(videoId) languages = str(video.videoLanguage) channel = ChannelService.getOneByChannelId(str(video.channelId)) videoUrl = "https://www.youtube.com/watch?v={}".format(videoId) yt = YouTube(videoUrl, on_progress_callback=on_progress) ys = yt.streams.get_audio_only() mp3OutPutPath = "/mnt/srt_file" # TODO test mp3OutPutPath = "D:/Work/Code/youtube_dev" mp3OutPutPath = "{}/{}/{}-{}".format( mp3OutPutPath, languages, channel.channelId, channel.channelTitle) if not os.path.exists(mp3OutPutPath): Logger.info("开始创建文件夹:" + mp3OutPutPath) os.makedirs(mp3OutPutPath) fileName = "{}.mp3".format(videoId) ys.download(output_path=mp3OutPutPath, filename=fileName, mp3=True) @func_set_timeout(60) def downloadOne(videoId): # 获取数据 video = VideoService.getOneByVideoId(videoId) channel = ChannelService.getOneByChannelId(str(video.channelId)) # 格式化title videoTitle = str(video.videoTitle) videoTitle = str(videoTitle).replace("/", u"\u2215") videoTitle = str(videoTitle).replace("?", "?") videoTitle = str(videoTitle).replace("\\", "") videoTitle = str(videoTitle).replace("|", "") videoTitle = str(videoTitle).replace("<", "") videoTitle = str(videoTitle).replace(">", "") videoTitle = str(videoTitle).replace(":", "") # 获取发布时间 videoPublishTime = str(video.videoPublishTime) videoPublishTime = str(videoPublishTime).split("T")[0] # 开始下载 Logger.info("开始下载...{}".format(videoId)) try: mainPath = "D:/Work/Code/youtube_dev/main" tmpPath = "D:/Work/Code/youtube_dev/tmp" # 获取字幕 languages = str(video.videoLanguage) storePath = "/mnt/srt_file" cpPath = "/mnt/tmp_srt_file" # TODO 测试代码删除 storePath = mainPath cpPath = tmpPath storePath = "{}/{}/{}-{}".format( storePath, languages, channel.channelId, channel.channelTitle) cpPath = "{}/{}/{}-{}".format( cpPath, languages, channel.channelId, channel.channelTitle) 1 if not os.path.exists(storePath): Logger.info("开始创建文件夹:" + storePath) os.makedirs(storePath) if not os.path.exists(cpPath): Logger.info("开始创建文件夹:" + cpPath) os.makedirs(cpPath) storePath = "{}/{}.srt".format(storePath, videoId) cpPath = "{}/{}.srt".format(cpPath, videoId) # 判断文件是否存在 if os.path.exists(storePath): Logger.info("{}已存在",storePath) return if len(cpPath) > 120: storePath = storePath[:-20] + ".srt" cpPath = cpPath[:-20] + ".srt" videoSrt = YouTubeTranscriptApi.get_transcript( videoId, languages=[languages]) srt_formatted = DownLoadUtil.formatter.format_transcript(videoSrt) Logger.info("文件地址...{}".format(storePath)) with open(storePath, 'w', encoding='utf-8') as srt_file: srt_file.write(srt_formatted) Logger.info("下载完成...{}".format(videoId)) copyfile(storePath, cpPath) # 修改video数据 VideoService.updateIsDownloadByVideoId(videoId, 1) # 修改downloadInfo downloadInfo = DownloadService.getOneByVideoId(videoId, 1) if downloadInfo is not None: DownloadService.updateInfoByVideoId( videoId, downloadInfo.tryTime + 1, 1, 1) # 按句读取字幕文件 DownLoadUtil.iterateSrt(storePath, videoId, video.channelId) except Exception as e: Logger.error("下载失败...{}".format(videoId)) # logStr = "Exception...{}".format(e) # Logger.error(logStr) # downloadInfo = DownloadService.getOneByVideoId(videoId, 1) # if operator.contains(logStr, "No transcripts"): # Logger.error("VideoId:{},不存在字幕文件".format(videoId)) # 下载音频文件 # DownLoadUtil.downLoadMP3(videoId) # DownloadService.updateInfoByVideoId( # videoId, downloadInfo.tryTime, 1, 1) # else: # if downloadInfo is not None: # if downloadInfo.tryTime >= 5: # 下载音频文件 # DownLoadUtil.downLoadMP3(videoId) # DownloadService.updateInfoByVideoId( # videoId, downloadInfo.tryTime, 1, 1) # else: # Logger.info("VideoId:{}开始重试第{}次".format( # videoId, downloadInfo.tryTime + 1)) # DownloadService.updateInfoByVideoId( # videoId, downloadInfo.tryTime + 1, 0, 1)