用 python 抓取视频网站的视频标记 m3u8 文件与 ts 文件 - 代码
最编程
2024-03-18 14:35:55
...
参数配置
为了方便使用,将一些参数单独拿出来作为配置文件。
# 下载必须的配置
URL_BASE = 'https://xxx.com/xxx/{}' # m3u8文件的链接
PATTERN = 'xxx(\d+).ts' # 提取ts片段索引号的正则
# 下面是多线程需要配置的
TS_BASE = 'xxx{}.ts'
THREAD_NUM = 15 # 线程数
TS_NUM = 347 # ts文件的个数,在index.m3u8中找到最后一个ts是多少就写多少
下载m3u8文件
只有一个下载,比较简单,主要是根据它判断是否有加密
def download_m3u8(url):
response = requests.get(url)
file = "./caches/" + url.split("/")[-1]
with open(file, "w", encoding="utf8") as f:
f.write(response.text)
加密的话,会多出一行记录加密方式和密钥,没有这一行就没有加密。
下载ts文件
单线程
有了之前的配置就可以根据我封装的方法下载ts文件了,这里提供两种方式。如果传入ts_list那么下载传入的,否则根据TS_BASE生成ts_list进行下载。
def download_ts(ts_list=None):
'''
:param ts_list: [xxx000.ts, ..., xxx999.ts]
:return:
'''
# 如果ts列表为空,那么构造ts列表
if ts_list is None or len(ts_list) == 0:
ts_list = []
for i in range(0, TS_NUM+1):
idx = str(i) if i > 999 else str(i).zfill(3)
ts_list.append(TS_BASE.format(idx))
# 根据ts列表下载文件
for item in ts_list:
url = URL_BASE.format(item)
file="./caches/"+url.split("/")[-1]
response = my_request(url)
with open(file, "wb") as f:
f.write(response.content)
print(url, file)
其中ts_list可以根据m3u8文件获得,my_request()是我为了实现失败自动重试封装的一个方法https://blog.****.net/weixin_44112790/article/details/104197569。
def read_m3u8(file):
ts_list = []
with open(file) as f:
for line in f:
s = line.strip()
if s.find(".ts") > -1:
ts_list.append(s)
return ts_list
@retry() # 利用retry装饰函数,使得抛出异常的时候自动重试知道成功。
def my_request(url):
requests.adapters.DEFAULT_RETRIES = 15
s = requests.session()
s.keep_alive = False # 关闭之前的连接,避免连接过多
try:
response = requests.get(url, timeout=5)
except BaseException: # 捕获异常的时候,这里粗略的写了BaseException,根据需要可写的更具体。
print(url, "请求失败,开始重试")
response = requests.get(url, timeout=5)
return response
到这里,应该可以成功将所有的ts文件下载下来。
from config import *
if __name__ == '__main__':
download_m3u8(URL_BASE.format("index.m3u8"))
# download_ts()
# ts_list = read_m3u8("./caches/index.m3u8")
# download_ts(ts_list)
多线程
为了提高下载速度,可以配置多线程的一些参数用多线程进行下载。
import requests
import time
import threading
# 使用 threading 模块创建线程
import queue
#优先级队列模块
#线程优先级队列(Queue)
from config import *
from retrying import retry
exitFlag = 0
class MyThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print("开启线程:" + self.name+"\n")
download_data(self.threadID, self.name, self.q)
print("退出线程:" + self.name+"\n")
def download_data(id,thread_name, q):
@retry()
def my_request(url):
requests.adapters.DEFAULT_RETRIES = 15
s = requests.session()
s.keep_alive = False
try:
response = requests.get(url, timeout=5)
except BaseException:
print(url, "请求失败,开始重试")
response = requests.get(url, timeout=5)
return response
while not exitFlag:
id += 1
if id >= THREAD_NUM:
data = q.get()
url = URL_BASE.format(data)
file = "./caches/" + url.split("/")[-1]
response = my_request(url)
with open(file, "wb") as f:
f.write(response.content)
print(thread_name, url, file)
if __name__ == '__main__':
work_queue = queue.Queue(TS_NUM+1)
threads = []
# 填充队列
for i in range(0, TS_NUM+1):
idx = str(i) if i > 999 else str(i).zfill(3)
work_queue.put(TS_BASE.format(idx))
# 创建新线程并且启动
for thread_id in range(0, THREAD_NUM):
thread = MyThread(thread_id, "Thread-{}".format(thread_id), work_queue)
thread.start()
threads.append(thread)
# 等待队列清空
while not work_queue.empty():
pass
# 通知线程是时候退出
exitFlag = 1
# 等待所有线程完成
for t in threads:
t.join()
print("退出主线程")
解密
如果没有加密直接跳过这一步,进入合并即可。
import os
import re
from Crypto.Cipher import AES
from config import PATTERN, TS_NUM
def check_ts(path):
# 记录已有的片段
lst = os.listdir(path)
ts_idx_list = []
for item in lst:
if item.find('.ts') > 0:
idx = int(re.findall(PATTERN, item)[0])
ts_idx_list.append(idx)
# 检查缺失了哪些片段
for idx in range(0, TS_NUM):
if not idx in ts_idx_list:
print(idx)
print("一共缺少{}个".format(TS_NUM+1-len(ts_idx_list)))
def decode_ts(src, key_path, target):
'''
:param src: 单个加密后的ts源文件路径 如 './caches/ntROGW6R4598270.ts'
:param key_path: 解密key的路径 如 './caches/key.key'
:param target: 解密后的文件名 如 './results/ntROGW6R4598270.ts'
:return:
'''
raw = open(src, 'rb').read()
iv = raw[0:16]
data = raw[16:]
key = open(key_path, 'rb').read()
plain_data = AES.new(key, AES.MODE_CBC, iv).decrypt(data) # 解密失败的话可以换一种模式看看
open(target, 'wb').write(plain_data)
if __name__ == '__main__':
path = './caches/'
check_ts(path) # 检查有没有缺失片段
# 读取缓存批量解密
lst = os.listdir(path)
for item in lst:
if item.find('.ts') > 0 and re.findall(PATTERN, item)[0] >="0": # 后面的条件可以更改解密的开始位置
decode_ts(path+item, path+'key.key', './results/'+item)
print(item)
目录结构是caches和results的话,就可以直接运行我的代码,否则需要自己改一下目录。
合并
将可播放的ts文件片段按顺序合并成一个文件,一定要按顺序!
import os
import re
from config import PATTERN
output = open('./last_result.ts', 'wb+')
ts_list = os.listdir('./results')
ts_list.sort(key=lambda ts : int(re.findall(PATTERN, ts)[0]))
for ts in ts_list:
print(ts)
with open('./results/'+ts, 'rb') as f:
output.write(f.read())
很简单的文件读写操作即可完成,同样注意目录要和代码中的一致。
上一篇: 从零开始启动 npm
下一篇: sqlplus 设置长度
推荐阅读
-
用 python 抓取视频网站的视频标记 m3u8 文件与 ts 文件 - 代码
-
Python爬虫实现电影资源下载方法-本人python运行环境: IDE丨pycharm2.3 版本丨Python3.6 系统丨Windows 10 爬虫目的与思路:实现目的与思路: 目的: 实现对电影目标url的解析与下载,由于第三方vip解析,只提供在线观看,隐藏想实现对目标视频的下载。 思路: 1.在百度搜索全名解析得到解析网站:jx.618g.com?/url=[电影播放地址]。 2.首先拿到想要看的电影url,通过第三方vip视频解析网站进行解析。 3.通过抓包,模拟浏览器发送正常请求。 4.通过拿到缓存ts文件,下载视频ts文件。 5.最后通过转换为mp4文件,即可实现正常播放。主要代码: