欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

[爬虫] 爬取B站的弹幕,通过bvid或者a_id、c_id

最编程 2024-02-25 09:03:00
...
import json import requests import google.protobuf.text_format as text_format import dm_pb2 as Danmaku import re class BEngine(): """ bilibili引擎 """ def __init__(self): self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"} def do_request(self, url): headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36" } r = requests.get(url, headers=headers) if r.status_code == 200: r.encoding = 'utf-8' return r.text else: return False def get_video_cid(self, bvid): """ 通过bvid获取cid :param bvid: :return: """ api_url = f'https://api.bilibili.com/x/web-interface/view?bvid={bvid}' try: html = self.do_request(api_url) if html: _json = json.loads(html) cid = _json['data'].get('cid') return cid else: return False except: return False def bvid_to_avid(self, bvid): """ 通过bvid获取avid :param bvid: :return: """ table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF' tr = {} for i in range(58): tr[table[i]] = i s = [11, 10, 3, 8, 4, 6] xor = 177451812 add = 8728348608 def dec(x): r = 0 for i in range(6): r += tr[x[s[i]]] * 58 ** i return (r - add) ^ xor return dec(bvid) def avid_to_bvid(self, avid): table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF' tr = {} for i in range(58): tr[table[i]] = i s = [11, 10, 3, 8, 4, 6] xor = 177451812 add = 8728348608 def dec(x): r = 0 for i in range(6): r += tr[x[s[i]]] * 58 ** i return (r - add) ^ xor def enc(x): x = (x ^ xor) + add r = list('BV1 4 1 7 ') for i in range(6): r[s[i]] = table[x // 58 ** i % 58] return ''.join(r) return enc(avid) def get_danmu(self, avid, cid): """ 通过so文件获取解密后的弹幕列表 :return: """ result = [] url = 'http://api.bilibili.com/x/v2/dm/web/seg.so' params = { 'type': 1, # 弹幕类型 'oid': cid, # cid 'pid': avid, # avid 'segment_index': 1 # 弹幕分段 } resp = requests.get(url, params, headers=self.headers) data = resp.content danmaku_seg = Danmaku.DmSegMobileReply() danmaku_seg.ParseFromString(data) # 使用MessageToDict 就不用使用parse_danmu result = MessageToDict(danmaku_seg, preserving_proto_field_name=True)['elems'] # for j in danmaku_seg.elems: # parse_data = text_format.MessageToString(j, as_utf8=True) # rstrip = parse_data.replace("\n", ",").rstrip(",") # result.append(rstrip) # print(result) return result def parse_danmu(self, danmu_list): """ 解析出每个弹幕列表内容 :param danmu_list: :return: """ result = [] for each_dm in danmu_list: res = re.findall( '''id: \d+,progress: (\d+),mode: (\d+),fontsize: (\d+),color: (\d+),midHash: "(.*?)",content: "(.*?)",ctime: (\d+),weight: (\d+),idStr: "(\d+)"''', each_dm) if res and len(res[0]) == 9: item = { "progress": res[0][0], "mode": res[0][1], "fontsize": res[0][2], "color": res[0][3], "midHash": res[0][4], "content": res[0][5], "ctime": res[0][6], "weight": res[0][7], "idStr": res[0][8], } result.append(item) else: continue return result def getdanmu_format(self, bvid): """ 弹幕直接格式化 :param bvid: :return: """ avid = e.bvid_to_avid(bvid) cid = e.get_video_cid(bvid) return self.get_danmu(avid, cid) def getdanmu_format_by_avid(self, avid, cid): """ 弹幕直接格式化 :param bvid: :return: """ return self.get_danmu(avid, cid) if __name__ == '__main__': e = BEngine() print(e.getdanmu_format_by_avid(656835181, 1154635809)) bvid = "BV1Dz4y1L7hj" # print(e.getdanmu_format(bvid))