欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

如何使用Python从m3u8链接下载mp4视频:原理与源代码示例

最编程 2024-07-30 10:51:10
...
# m3u8视频下载 import os import re import time import shutil import requests from concurrent.futures import ThreadPoolExecutor, wait from Crypto.Cipher import AES # UA伪装 headers = { 'User-Agent': 'Mozilla/5.0 (SymbianOS/9.4; Series60/5.0 NokiaN97-1/20.0.019; Profile/MIDP-2.1 Configuration/CLDC-1.1) AppleWebKit/525 (KHTML like Gecko) BrowserNG/7.1.18124' } def download_mp4(mp4_file_path, ts_url_list, ts_url_title): '''下载ts文件并写入mp4文件 :param mp4_file_path: mp4文件名 :param ts_url_list: ts请求链接列表 :return: ''' # 判断文件是否存在,存在则先清空 if os.path.exists(mp4_file_path): with open(mp4_file_path, 'w') as fp: fp.write('') # 创建存放ts的文件夹 if not os.path.exists('ts'): os.mkdir('ts') print('开始下载{}...'.format(mp4_file_path)) excutor = ThreadPoolExecutor(max_workers=20) # 线程池 len_list = len(ts_url_list) # ts链接总数 all_tasks = [excutor.submit(lambda args: download_ts(*args), (ts_url_id, len_list, ts_url_list, ts_url_title)) for ts_url_id in range(len_list)] # 创建任务 wait(all_tasks) # 等待所有任务执行完成 # 检测ts数目是否正确 if len(os.listdir('ts')) == len_list: pass else: print('ts文件部分缺失...') # 删除存放ts的临时文件 shutil.rmtree('ts') return '' # ts合并为mp4文件 print('ts文件下载完成,正在合并ts文件...') for ts_url_id in range(len_list): ts_file_name = 'ts/{}.ts'.format(ts_url_id) with open(ts_file_name, 'rb') as fp: ts_content = fp.read() # 读取ts数据 with open(mp4_file_path, 'ab') as fp: fp.write(ts_content) # 将ts数据追加写入文件 print('ts文件合并成功!') # 删除存放ts的临时文件 shutil.rmtree('ts') return 1 def download_ts(ts_url_id, len_list, ts_url_list, ts_url_title): ''' 请求下载ts文件 :param ts_url_id: 分区ts的id :param len_list: ts个数 :param ts_url_list: 存放ts的列表 :param ts_url_title: ts链接拼接的头部 :return: ''' print('{}/{}开始下载'.format(ts_url_id, len_list - 1)) # 请求不成功补发请求,最大补发次数为 max_request = 5 # 最大补发请求次数 for i in range(max_request): try: response = requests.get(url=ts_url_title + ts_url_list[ts_url_id], headers=headers, timeout=(5, 20)) # 请求获取ts数据 if response.status_code == 200: ts_content = response.content break except: if i == max_request - 1: print('{}/{}下载失败'.format(ts_url_id, len_list - 1)) return '' else: print('{}/{}下载失败,正在补发请求...'.format(ts_url_id, len_list - 1)) ts_file_name = 'ts/{}.ts'.format(ts_url_id) with open(ts_file_name, 'wb') as fp: fp.write(ts_content) # 将ts数据写入文件 print('{}/{}下载完成'.format(ts_url_id, len_list - 1)) def deciphering(key, fileName): '''对aes加密视频进行解密 :param key: aes解密密钥 :param fileName: 需要解密的文件 :return: ''' # 读取原文件 with open(fileName, 'rb') as fp: part = fp.read() # aes解密需要的偏移量 iv = b'0000000000000000' # 解密数据 plain_data = AES.new(key, AES.MODE_CBC, iv).decrypt(part) # 将解密数据写入文件 with open(fileName, 'wb') as fp: fp.write(plain_data) print('视频解密完成!') def timer(start_time, end_time, mp4_file_name): '''计时器 :param start_time: 开始时间 :param end_time: 结束时间 :return: ''' spend_second = end_time - start_time hour = str(int(spend_second / (60 * 60))) minute = str(int(spend_second / 60)) second = str(int(spend_second % 60)) spend_time = '{}h{}m{}s'.format(hour, minute, second) print('{}下载完成!用时:{}'.format(mp4_file_name, spend_time)) def start(m3u8_url, mp4_file_name, ts_url_title): '''开始 :param m3u8_url: m3u8链接 :param mp4_file_path: 下载后的视频名称 :return: ''' # 开始计时 start_time = time.time() # 创建目录文件 if not os.path.exists('mv'): os.mkdir('mv') # 视频保存路径 mp4_file_path = 'mv/' + mp4_file_name + '.mp4' # 获取m3u8内容 m3u8_file = requests.get(url=m3u8_url, headers=headers).text # 整理ts列表 ts_url_list = re.findall(',\n(.*?)\n#', m3u8_file) # 下载ts,并拼接为mp4文件 mp4 = download_mp4(mp4_file_path, ts_url_list, ts_url_title) # 判断是否存在加密 if mp4 and re.search('#EXT-X-KEY', m3u8_file): print('{}视频存在加密,正在对其进行解密,请稍后...'.format(mp4_file_path)) # 获取key key_url = re.search('#EXT-X-KEY:(.*URI="(.*)")\n', m3u8_file)[2] # 获取key的url key = requests.get(url=key_url, headers=headers).content # 请求获取key # 解密视频 deciphering(key, mp4_file_path) # 计时结束 end_time = time.time() # 耗时统计 timer(start_time, end_time, mp4_file_name) if __name__ == '__main__': # m3u8 链接 m3u8_url = input('请输入m3u8链接:') # ts链接头 ts_url_title = '' # mp4 名称 mp4_file_name = input('请输入视频名称:') # 执行下载 start(m3u8_url, mp4_file_name, ts_url_title)