Browse Source

新增测试和search_video

master
zhangshu 6 months ago
parent
commit
4bc313f5d3
  1. 18
      .vscode/launch.json
  2. 173
      common/YoutubeUtils.py
  3. 19
      entity/VideoEntity.py
  4. 50
      search_video.py
  5. 13
      search_video_config.json
  6. 18
      service/ChannelService.py
  7. 36
      service/VideoService.py
  8. 46
      test.py
  9. 13
      test_config.json

18
.vscode/launch.json

@ -17,6 +17,22 @@
"request": "launch",
"program": "move_data.py",
"console": "integratedTerminal"
}
},
{
"name": "search video",
"type": "debugpy",
"request": "launch",
"program": "search_video.py",
"console": "integratedTerminal",
"args": ["--start", "2023-09-10T00:00:01Z",
"--end", "2023-09-11T00:00:01Z"]
},
{
"name": "test",
"type": "debugpy",
"request": "launch",
"program": "test.py",
"console": "integratedTerminal"
},
]
}

173
common/YoutubeUtils.py

@ -0,0 +1,173 @@
import httplib2
import googleapiclient.discovery
import googleapiclient.errors
from LoggerUtils import Logger
import operator
import time
from entity.ChannelEntity import Channel
from entity.VideoEntity import Video
from service.ChannelService import ChannelService
from service.VideoService import VideoService
class YouTubeUtil:
# apiKeys = ["AIzaSyDlRgmPXVQEjF2gbmomI5FUZX_uAOBmEGI", "AIzaSyBI5i5vFZpQErMnEXKMf0VUS2Bel8jGrTk",
# "AIzaSyAnmA0Ggy1yXsZZACfItmeZAa7wcmh6SbM", "AIzaSyC4O8tBoAfkupmBybxDah2JUxgj4ct5uk0",
# "AIzaSyDJ2S9Ijhw_hULx3nHvPUoGUpMENbZOIl8", "AIzaSyA87Ckpna3hOQ31nISs8V8rp--OLw0m6Aw",
# "AIzaSyDIWbV0EOLHkOr9tWpANose6ggd2r9vcLg", "AIzaSyBKE3lYwWFIYc9Vx4YKMbRpkOXigZlY52U"]
# AIzaSyCTBSbq0YjyxTtjmNsnDyKAwHamlv_ST-s
# AIzaSyAESnwtbTIBtU707iZowtQkmAo-qKuEOcY
# AIzaSyCsYUC5vN0pB6y9xsCj0B1ehAoqOJ3WMf0
# AIzaSyDjPkCgDQ9Tv_xcChjY2E6GpJ6IzngnD5I
# AIzaSyAxIycOdQYGB5kWhwe3B-kJAYRo7wOnp8o
apiKeys = [
"AIzaSyARaW3mqO9szQiHgWZR4el0HWvdyheSHBc",
"AIzaSyChPXesnVx6fweon_BckhR6UiJWvi5Ma4s"
# "AIzaSyCTBSbq0YjyxTtjmNsnDyKAwHamlv_ST-s",
# "AIzaSyAESnwtbTIBtU707iZowtQkmAo-qKuEOcY"
# "AIzaSyDjPkCgDQ9Tv_xcChjY2E6GpJ6IzngnD5I",
# "AIzaSyAxIycOdQYGB5kWhwe3B-kJAYRo7wOnp8o",
# "AIzaSyCsYUC5vN0pB6y9xsCj0B1ehAoqOJ3WMf0"
]
apiIndex = 0
# 获取youtube对象
def getYoutube():
# 本地测试使用代码
# proxy_info = httplib2.ProxyInfo(
# proxy_type=httplib2.socks.PROXY_TYPE_HTTP, proxy_host="127.0.0.1", proxy_port=7890)
# http = httplib2.Http(timeout=10, proxy_info=proxy_info,
# disable_ssl_certificate_validation=False)
http = httplib2.Http(
timeout=10, disable_ssl_certificate_validation=False)
api_service_name = "youtube"
api_version = "v3"
# 获取apiKey
apiKey = YouTubeUtil.apiKeys[YouTubeUtil.apiIndex]
Logger.info(
"当前APIKey:{},当前apiIndex:{},totalIndex:{}".format(
apiKey, YouTubeUtil.apiIndex, len(YouTubeUtil.apiKeys) - 1
)
)
# 等于7,还原成0
if YouTubeUtil.apiIndex == (len(YouTubeUtil.apiKeys) - 1):
YouTubeUtil.apiIndex = 0
else:
YouTubeUtil.apiIndex = YouTubeUtil.apiIndex + 1
# 获取对象
youtube = googleapiclient.discovery.build(
api_service_name, api_version, developerKey=apiKey, http=http
)
return youtube
def getVidoeLen(videoIds):
youtube = YouTubeUtil.getYoutube()
request = youtube.videos().list(part="contentDetails", id=videoIds)
response = request.execute()
response["items"][0]["contentDetails"]
return response
def getVideoLenByStr(str):
len = 0
str = str.split("PT")[1]
if operator.contains(str, "H"):
H = str.split("H")[0]
len = len + int(H) * 3600
str = str.split("H")[1]
if operator.contains(str, "M"):
M = str.split("M")[0]
len = len + int(M) * 60
str = str.split("M")[1]
if operator.contains(str, "S"):
S = str.split("S")[0]
len = len + int(S)
return len
def getByChannelId(channelId, startTime, endTime):
channel = ChannelService.queryOneByChannelId(channelId)
# 检查是否存在
if channel is None:
Logger.info("没有相应的频道 channelId:{}".format(channelId))
return
# 回去youtube并查询
videoLanguage = str(channel.channelLanguage)
youtube = YouTubeUtil.getYoutube()
request = youtube.search().list(
part="snippet",
channelId=channelId,
maxResults=50,
order="date",
publishedAfter=startTime,
publishedBefore=endTime,
type="video",
)
response = request.execute()
while True:
videosRequest = ""
videosRequestCount = 0
idList = []
for i in response["items"]:
try:
videoId = i["id"]["videoId"]
publisTime = i["snippet"]["publishedAt"]
videoTitle = i["snippet"]["title"]
videoType = "video"
# 查询是否存在Video,如果不存在就插入
video: Video = VideoService.queryOneByVideoId(videoId)
if video == None:
VideoService.insertOne(
videoId=videoId, ChannelId=channelId, videoTitle=videoTitle, videoLen=0,
videoType=videoType, videoPublishTime=publisTime, videoLanguage=videoLanguage, isDownload=0)
videosRequest = videosRequest + "," + str(videoId)
videosRequestCount = videosRequestCount + 1
Logger.info(
"存储VideoUrl:https://www.youtube.com/watch?v=" + videoId
)
else:
Logger.info("已存在VideoId:{}".format(videoId))
idList.append(str(videoId))
if videosRequest != "" and videosRequestCount >= 10:
lenRes = YouTubeUtil.getVidoeLen(videosRequest)
for i in lenRes["items"]:
tmpId = i["id"]
videoLenStr = i["contentDetails"]["duration"]
videoLen = YouTubeUtil.getVideoLenByStr(
videoLenStr)
VideoService.updateLenByVideoId(
videoId=tmpId, videoLen=videoLen)
Logger.info(
"更新时长,videoId:{},len:{}".format(
tmpId, videoLen)
)
videosRequestCount = 0
videosRequest = ""
except:
pass
# 获取最后一个视频
video: Video = VideoService.getLastVideoByChannelId(channelId)
ChannelService.updateTimeByChannelId(
channelId, video.videoPublishTime)
time.sleep(5)
# 继续获取下一页
try:
# youtube = YouTubeUtil.getYoutube
request = youtube.search().list(
part="snippet",
channelId=channelId,
maxResults=50,
order="date",
publishedAfter=startTime,
publishedBefore=endTime,
type="video",
pageToken=response["nextPageToken"],
)
response = request.execute()
except Exception as e:
Logger.error(e)
print("no nextPageToken")
break

19
entity/VideoEntity.py

@ -0,0 +1,19 @@
from sqlalchemy import create_engine, Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
# 如果没有创建 Base,请取消注释下一行
Base = declarative_base()
class Video(Base):
__tablename__ = 'Videos'
id = Column(Integer, primary_key=True, autoincrement=True)
videoId = Column(String(255), nullable=False)
channelId = Column(String(255), nullable=False)
videoTitle = Column(String(255), nullable=False)
videoLen = Column(Integer, nullable=False)
videoType = Column(String(255), nullable=False)
videoPublishTime = Column(String(255), nullable=False)
videoLanguage = Column(String(255), nullable=False)
isDownload = Column(Integer, nullable=False)

50
search_video.py

@ -0,0 +1,50 @@
from LoggerUtils import Logger, initLogger
from bs4 import BeautifulSoup as bs
from urllib.request import urlopen, Request
import json
import Contant
from sqlalchemy import create_engine
from entity.ChannelEntity import Channel
from service.ChannelService import ChannelService
from common.YoutubeUtils import YouTubeUtil
import operator
import argparse
if __name__ == "__main__":
# 读取参数
parser = argparse.ArgumentParser(description="")
parser.add_argument("--start", type=str, default="")
parser.add_argument("--end", type=str, default="")
args = parser.parse_args()
startTime = args.start
endTime = args.end
# 读取配置文件
with open('search_video_config.json', 'r', encoding='utf-8') as f:
# 使用json.load()方法读取文件内容
data = json.load(f)
# 初始化日志
Contant.logDir = data['log']['dir']
Contant.logFileName = data['log']['fileName']
initLogger(Contant.logDir, Contant.logFileName)
# 连接mysql
dbHost = data['mysql']['host']
dbPort = data['mysql']['port']
dbUserName = data['mysql']['username']
dbPassword = data['mysql']['password']
dbDatabase = data['mysql']['database']
Logger.info("尝试连接mysql host:'{}' port:'{}' username:'{}' password:'{}' database:'{}'",
dbHost, dbPort, dbUserName, dbPassword, dbDatabase)
Contant.engin = create_engine(
f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}')
Logger.info("连接mysql成功")
YouTubeUtil.getByChannelId("channel.channelId", startTime,endTime)
# 查询出所有Channel
# channels = ChannelService.queryAllChannel()
# Logger.info("Channels length:{}".format(len(channels)))
# for channel in channels:
# channel : Channel = channel
# # 通过channelId查询videos
# YouTubeUtil.getByChannelId(channel.channelId)

13
search_video_config.json

@ -0,0 +1,13 @@
{
"mysql": {
"host": "47.108.20.249",
"port": "3306",
"username": "root",
"password": "casino888!",
"database": "youtube"
},
"log": {
"dir": "./logs",
"fileName": "search_video"
}
}

18
service/ChannelService.py

@ -1,6 +1,6 @@
from entity.ChannelEntity import Channel
from common.Utils import getSession
from sqlalchemy import update
class ChannelService:
@ -24,7 +24,7 @@ class ChannelService:
def queryOneByChannelId(channelId):
session = getSession()
channel = session.query(Channel).filter(
Channel.channelId == channelId).first()
Channel.channelId == channelId).one_or_none()
session.close()
return channel
@ -43,3 +43,17 @@ class ChannelService:
update_channel.region = region
session.commit()
session.close()
def queryAllChannel():
session = getSession()
Channels = session.query(Channel).all()
session.close()
return Channels
def updateTimeByChannelId(channelId, videoPublishTime):
session = getSession()
updateSql = update(Channel).where(
Channel.channelId == channelId).values(channelReptileTime=videoPublishTime)
resutl = session.execute(updateSql)
session.commit()
session.close()

36
service/VideoService.py

@ -0,0 +1,36 @@
from entity.VideoEntity import Video
from common.Utils import getSession
from sqlalchemy import update
class VideoService:
def queryOneByVideoId(videoId):
session = getSession()
video = session.query(Video).filter(
Video.videoId == videoId).one_or_none()
session.close()
return video
def insertOne(videoId, ChannelId, videoTitle, videoLen, videoType, videoPublishTime, videoLanguage, isDownload):
session = getSession()
video: Video = Video(videoId=videoId, ChannelId=ChannelId, videoTitle=videoTitle,
videoLen=videoLen, videoType=videoType, videoPublishTime=videoPublishTime,
videoLanguage=videoLanguage, isDownload=isDownload)
session.add(video)
session.commit()
session.close()
def updateLenByVideoId(videoId, videoLen):
session = getSession()
updateSql = update(Video).where(
Video.videoId == videoId).values(videoLen=videoLen)
resutl = session.execute(updateSql)
session.commit()
session.close()
def getLastVideoByChannelId(channelId):
session = getSession()
video:Video = session.query(Video).where(Video.channelId==channelId).order_by(Video.videoPublishTime.desc()).first()
session.close()
return video

46
test.py

@ -0,0 +1,46 @@
from LoggerUtils import Logger, initLogger
from bs4 import BeautifulSoup as bs
from urllib.request import urlopen, Request
import json
import Contant
from sqlalchemy import create_engine
from entity.ChannelEntity import Channel
from entity.VideoEntity import Video
from service.ChannelService import ChannelService
from service.VideoService import VideoService
from common.YoutubeUtils import YouTubeUtil
import operator
import argparse
if __name__ == "__main__":
# 读取配置文件
with open('test_config.json', 'r', encoding='utf-8') as f:
# 使用json.load()方法读取文件内容
data = json.load(f)
# 初始化日志
Contant.logDir = data['log']['dir']
Contant.logFileName = data['log']['fileName']
initLogger(Contant.logDir, Contant.logFileName)
# 连接mysql
dbHost = data['mysql']['host']
dbPort = data['mysql']['port']
dbUserName = data['mysql']['username']
dbPassword = data['mysql']['password']
dbDatabase = data['mysql']['database']
Logger.info("尝试连接mysql host:'{}' port:'{}' username:'{}' password:'{}' database:'{}'",
dbHost, dbPort, dbUserName, dbPassword, dbDatabase)
Contant.engin = create_engine(
f'mysql+mysqlconnector://{dbUserName}:{dbPassword}@{dbHost}:{dbPort}/{dbDatabase}')
Logger.info("连接mysql成功")
videoId = "oZhBWA3HNhA"
video = VideoService.queryOneByVideoId(videoId)
Logger.info(video)
# VideoService.updateLenByVideoId(videoId, 5344)
video = VideoService.getLastVideoByChannelId("UC67Wr_9pA4I0glIxDt_Cpyw")
if video == None:
Logger.info("meiyou")
else:
Logger.info(video.videoPublishTime)

13
test_config.json

@ -0,0 +1,13 @@
{
"mysql": {
"host": "47.108.20.249",
"port": "3306",
"username": "root",
"password": "casino888!",
"database": "youtube"
},
"log": {
"dir": "./logs",
"fileName": "test"
}
}
Loading…
Cancel
Save