import time from LoggerUtils import Logger, initLogger from bs4 import BeautifulSoup as bs from urllib.request import urlopen, Request import json import Contant from sqlalchemy import create_engine from entity.ChannelEntity import Channel from entity.VideoEntity import Video from service.ChannelService import ChannelService from service.VideoService import VideoService from service.ChannelService import ChannelService from service.DownloadInfoService import DownloadInfoService from common.YoutubeUtils import YouTubeUtil from common.DownloadUtils import DownloadUtil import operator import argparse import os from common.Utils import getSession if __name__ == "__main__": # 读取配置文件 with open('test_config.json', 'r', encoding='utf-8') as f: # 使用json.load()方法读取文件内容 data = json.load(f) # 初始化日志 Contant.logDir = data['log']['dir'] Contant.logFileName = data['log']['fileName'] initLogger(Contant.logDir, Contant.logFileName) # 连接mysql dbHost = data['mysql']['host'] dbPort = data['mysql']['port'] dbUserName = data['mysql']['username'] dbPassword = data['mysql']['password'] dbDatabase = data['mysql']['database'] Logger.info("尝试连接mysql host:'{}' port:'{}' username:'{}' password:'{}' database:'{}'", dbHost, dbPort, dbUserName, dbPassword, dbDatabase) Contant.engin = create_engine( f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}') Logger.info("连接mysql成功") # session = getSession() # YouTubeUtil测试 # channelId = "UCBM86JVoHLqg9irpR2XKvGw" # startTime = "2024-08-22T00:00:01Z" # endTime = "2024-08-24T00:00:01Z" # YouTubeUtil.getByChannelId(channelId, startTime, endTime) # download测试 # videoId = "pBSWhJV0VVU" # channelId = "UCBM86JVoHLqg9irpR2XKvGw" # rootPath = "D:/Work/Code/youtube_dev/mysql" # storePath = "D:/Work/Code/youtube_dev/mysql/main/Korea/UCBM86JVoHLqg9irpR2XKvGw-달란트투자" # srtFilePath = "D:/Work/Code/youtube_dev/mysql/main/Korea/UCBM86JVoHLqg9irpR2XKvGw-달란트투자/pBSWhJV0VVU.srt" # DownloadUtil.downloadOne(videoId=videoId, rootPath=rootPath) # DownloadUtil.iterateSrt(srtFilePath=srtFilePath, # videoId=videoId, channelId=channelId) # DownloadUtil.downLoadMP3(videoId=videoId, storePath=storePath) DownloadInfoService.updateIsFinishByVideoId("CaLR6W_cyeI",1,1) # 遍历字幕文件,并存入数据库 # srtRootPath = "/mnt/mysql_srt_path_tmp" # for root, dirs, filenames in os.walk(srtRootPath): # Logger.info(f"root: {root} filesLen:{len(filenames)}") # for filename in filenames: # srtPath = f"{root}/{filename}" # videoId = filename.replace(".srt", "") # video: Video = session.query(Video).filter( # Video.videoId == videoId).one_or_none() # if video == None: # continue # channelId = video.channelId # Logger.info( # f"videoId: {videoId},channelId :{channelId}, srtPath: {srtPath}") # DownloadUtil.iterateSrt(srtFilePath=srtPath, # videoId=videoId, channelId=channelId) # os.remove(srtPath) # 关闭session # session.close()