Browse Source

增加复制已有srt文件的逻辑

master
zhangshu 5 months ago
parent
commit
d990b31ed2
  1. 2
      .gitignore
  2. 45
      .vscode/launch.json
  3. 121
      parse_video.py
  4. 15
      parse_video_config.json
  5. 7
      service/VideoService.py

2
.gitignore

@ -137,4 +137,4 @@ dmypy.json
# Cython debug symbols
cython_debug/
.vscode/

45
.vscode/launch.json

@ -1,45 +0,0 @@
{
// 使 IntelliSense
//
// 访: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "init channel",
"type": "debugpy",
"request": "launch",
"program": "init_channel.py",
"console": "integratedTerminal"
},
{
"name": "move_data",
"type": "debugpy",
"request": "launch",
"program": "move_data.py",
"console": "integratedTerminal"
},
{
"name": "search video",
"type": "debugpy",
"request": "launch",
"program": "search_video.py",
"console": "integratedTerminal",
"args": ["--start", "2023-09-10T00:00:01Z",
"--end", "2023-09-11T00:00:01Z"]
},
{
"name": "download video",
"type": "debugpy",
"request": "launch",
"program": "download_video.py",
"console": "integratedTerminal"
},
{
"name": "test",
"type": "debugpy",
"request": "launch",
"program": "test.py",
"console": "integratedTerminal"
},
]
}

121
parse_video.py

@ -0,0 +1,121 @@
import os
import time
from LoggerUtils import Logger, initLogger
from bs4 import BeautifulSoup as bs
from urllib.request import urlopen, Request
import json
import Contant
from sqlalchemy import create_engine
from entity.DownloadInfoEntity import DownloadInfo
from entity.VideoEntity import Video
from entity.ChannelEntity import Channel
from service.DownloadInfoService import DownloadInfoService
from service.VideoService import VideoService
from common.YoutubeUtils import YouTubeUtil
from common.DownloadUtils import DownloadUtil
from service.ChannelService import ChannelService
import operator
import argparse
import difflib
from shutil import copyfile
def get_all_files(directory):
"""
递归获取目录下所有文件的路径
:param directory: 目录路径
:return: 文件路径列表
"""
files = []
for root, dirs, filenames in os.walk(directory):
for filename in filenames:
# 将文件的完整路径添加到列表中
files.append(filename)
return files
def getSrtFileName(video: Video):
videoTitle = video.videoTitle
videoTitle = videoTitle.replace("/", u"\u2215")
videoTitle = videoTitle.replace("?", "")
videoTitle = videoTitle.replace("\\", "")
videoTitle = videoTitle.replace("|", "")
videoTitle = videoTitle.replace("<", "")
videoTitle = videoTitle.replace(">", "")
videoTitle = videoTitle.replace(":", "")
videoPublishTime = str(video.videoPublishTime)
videoPublishTime = str(videoPublishTime).split("T")[0]
languages = str(video.videoLanguage)
srtfileName = f'{videoPublishTime}-{languages}-{videoTitle}.srt'
return srtfileName
def get_equal_rate_1(str1, str2):
return difflib.SequenceMatcher(None, str1, str2).quick_ratio()
if __name__ == "__main__":
# 读取配置文件
with open('parse_video_config.json', 'r', encoding='utf-8') as f:
# 使用json.load()方法读取文件内容
data = json.load(f)
# 初始化日志
Contant.logDir = data['log']['dir']
Contant.logFileName = data['log']['fileName']
initLogger(Contant.logDir, Contant.logFileName)
# 连接mysql
dbHost = data['mysql']['host']
dbPort = data['mysql']['port']
dbUserName = data['mysql']['username']
dbPassword = data['mysql']['password']
dbDatabase = data['mysql']['database']
Logger.info("尝试连接mysql host:'{}' port:'{}' username:'{}' password:'{}' database:'{}'",
dbHost, dbPort, dbUserName, dbPassword, dbDatabase)
Contant.engin = create_engine(
f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}')
Logger.info("连接mysql成功")
parseRoot = data['parse_root']
newSrtPaht = data['new_srt_path']
Logger.info(f'parseRoot: {parseRoot}')
chanelNameList = []
for root, dirs, filenames in os.walk(parseRoot):
channelName = str(root).replace(f"{parseRoot}/", "")
chanelNameList.append(channelName)
# 获取所有Channel
channel_dict = {}
channels = ChannelService.queryAllChannel()
for i in range(len(channels)):
channel: Channel = channels[i]
channelTitle = channel.channelTitle
for channelName in chanelNameList:
if get_equal_rate_1(channelTitle, channelName) > 0.9:
channel_dict[str(channel.channelId)
] = f"{parseRoot}/{channelName}"
# 遍历channel_dict,复制字幕文件
for key, value in channel_dict.items():
channel: Channel = ChannelService.queryOneByChannelId(key)
videos = VideoService.queryAllbyChannelId(key)
Logger.info(f"key: {key} len: {len(videos)}")
for i in range(len(videos)):
video: Video = videos[i]
srtFileName = getSrtFileName(video=video)
for root, dirs, filenames in os.walk(value):
for filename in filenames:
if get_equal_rate_1(srtFileName, filename) > 0.8:
src_path = f"{root}/{filename}"
dst_path = f"{newSrtPaht}/{channel.region}/{channel.channelId}-{channel.channelTitle}"
if not os.path.exists(dst_path):
Logger.info("开始创建文件夹:" + dst_path)
os.makedirs(dst_path)
dst_path = f"{dst_path}/{video.videoId}.srt"
Logger.info(f"src_path:{src_path} dst_path:{dst_path}")
copyfile(src_path, dst_path)
# 并且读取srt文件到数据库
DownloadUtil.iterateSrt(
srtFilePath=dst_path, videoId=video.videoId, channelId=channel.channelId)

15
parse_video_config.json

@ -0,0 +1,15 @@
{
"mysql": {
"host": "47.108.20.249",
"port": "3306",
"username": "root",
"password": "casino888!",
"database": "youtube"
},
"log": {
"dir": "./logs",
"fileName": "parse_video"
},
"parse_root": "E:/code/python/tmp_srt_file",
"new_srt_path": "/mnt/new_srt_path"
}

7
service/VideoService.py

@ -12,6 +12,13 @@ class VideoService:
session.close()
return video
def queryAllbyChannelId(channelId):
session = getSession()
videos = session.query(Video).filter(
Video.channelId == channelId).all()
session.close()
return videos
def insertOne(videoId, ChannelId, videoTitle, videoLen, videoType, videoPublishTime, videoLanguage, isDownload):
session = getSession()
video: Video = Video(videoId=videoId, ChannelId=ChannelId, videoTitle=videoTitle,

Loading…
Cancel
Save