import os import time from LoggerUtils import Logger, initLogger from bs4 import BeautifulSoup as bs from urllib.request import urlopen, Request import json import Contant from sqlalchemy import create_engine from entity.DownloadInfoEntity import DownloadInfo from entity.VideoEntity import Video from entity.ChannelEntity import Channel from service.DownloadInfoService import DownloadInfoService from service.VideoService import VideoService from common.YoutubeUtils import YouTubeUtil from common.DownloadUtils import DownloadUtil from service.ChannelService import ChannelService import operator import argparse import difflib from shutil import copyfile def get_all_files(directory): """ 递归获取目录下所有文件的路径 :param directory: 目录路径 :return: 文件路径列表 """ files = [] for root, dirs, filenames in os.walk(directory): for filename in filenames: # 将文件的完整路径添加到列表中 files.append(filename) return files def getSrtFileName(video: Video): videoTitle = video.videoTitle videoTitle = videoTitle.replace("/", u"\u2215") videoTitle = videoTitle.replace("?", "?") videoTitle = videoTitle.replace("\\", "") videoTitle = videoTitle.replace("|", "") videoTitle = videoTitle.replace("<", "") videoTitle = videoTitle.replace(">", "") videoTitle = videoTitle.replace(":", "") videoPublishTime = str(video.videoPublishTime) videoPublishTime = str(videoPublishTime).split("T")[0] languages = str(video.videoLanguage) srtfileName = f'{videoPublishTime}-{languages}-{videoTitle}.srt' return srtfileName def get_equal_rate_1(str1, str2): return difflib.SequenceMatcher(None, str1, str2).quick_ratio() if __name__ == "__main__": # 读取配置文件 with open('parse_video_config.json', 'r', encoding='utf-8') as f: # 使用json.load()方法读取文件内容 data = json.load(f) # 初始化日志 Contant.logDir = data['log']['dir'] Contant.logFileName = data['log']['fileName'] initLogger(Contant.logDir, Contant.logFileName) # 连接mysql dbHost = data['mysql']['host'] dbPort = data['mysql']['port'] dbUserName = data['mysql']['username'] dbPassword = data['mysql']['password'] dbDatabase = data['mysql']['database'] Logger.info("尝试连接mysql host:'{}' port:'{}' username:'{}' password:'{}' database:'{}'", dbHost, dbPort, dbUserName, dbPassword, dbDatabase) Contant.engin = create_engine( f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}') Logger.info("连接mysql成功") parseRoot = data['parse_root'] newSrtPaht = data['new_srt_path'] Logger.info(f'parseRoot: {parseRoot}') chanelNameList = [] for root, dirs, filenames in os.walk(parseRoot): channelName = str(root).replace(f"{parseRoot}/", "") chanelNameList.append(channelName) # 获取所有Channel channel_dict = {} channels = ChannelService.queryAllChannel() for i in range(len(channels)): channel: Channel = channels[i] channelTitle = channel.channelTitle for channelName in chanelNameList: if get_equal_rate_1(channelTitle, channelName) > 0.9: channel_dict[str(channel.channelId) ] = f"{parseRoot}/{channelName}" # 遍历channel_dict,复制字幕文件 for key, value in channel_dict.items(): channel: Channel = ChannelService.queryOneByChannelId(key) videos = VideoService.queryAllbyChannelId(key) Logger.info(f"key: {key} len: {len(videos)}") for i in range(len(videos)): video: Video = videos[i] srtFileName = getSrtFileName(video=video) for root, dirs, filenames in os.walk(value): for filename in filenames: if get_equal_rate_1(srtFileName, filename) > 0.8: src_path = f"{root}/{filename}" dst_path = f"{newSrtPaht}/{channel.region}/{channel.channelId}-{channel.channelTitle}" if not os.path.exists(dst_path): Logger.info("开始创建文件夹:" + dst_path) os.makedirs(dst_path) dst_path = f"{dst_path}/{video.videoId}.srt" Logger.info(f"src_path:{src_path} dst_path:{dst_path}") copyfile(src_path, dst_path) # 并且读取srt文件到数据库 DownloadUtil.iterateSrt( srtFilePath=dst_path, videoId=video.videoId, channelId=channel.channelId)