python websocket爬虫_python根据websocket抓取斗鱼弹幕和礼物消息
1.斗魚彈幕協議
到斗魚官方開放平臺看斗魚通訊協議,網址“https://open.douyu.com/source/api/63”,登錄后可查看
所以根據斗魚協議做編碼函數:def msg_encode(msg):
#消息以 \0 結尾,并以utf-8編碼
msg = msg + '\0'
msg_bytes = msg.encode('utf-8')
#消息長度 + 頭部長度8
length_bytes = int.to_bytes(len(msg) + 8, 4, byteorder='little')
#斗魚客戶端發送消息類型 689
type = 689
type_bytes = int.to_bytes(type, 2, byteorder='little')
# 加密字段與保留字段,默認 0 長度各 1
end_bytes = int.to_bytes(0, 1, byteorder='little')
#按順序相加 消息長度 + 消息長度 + 消息類型 + 加密字段 + 保留字段
head_bytes = length_bytes + length_bytes + type_bytes + end_bytes + end_bytes
#消息頭部拼接消息內容
data = head_bytes + msg_bytes
return data
然后根據斗魚協議做解碼函數:def msg_decode(msg_bytes):
#定義一個游標位置
cursor = 0
msg = []
while cursor < len(msg_bytes):
#根據斗魚協議,報文 前四位與第二個四位,都是消息長度,取前四位,轉化成整型
content_length = int.from_bytes(msg_bytes[cursor: (cursor + 4) - 1], byteorder='little')
#報文長度不包含前4位,從第5位開始截取消息長度的字節流,并扣除前8位的協議頭,取出正文,用utf-8編碼成字符串
content = msg_bytes[(cursor + 4) + 8:(cursor + 4) + content_length - 1].decode(encoding='utf-8',
errors='ignore')
msg.append(content)
cursor = (cursor + 4) + content_length
# print(msg)
return msg
解碼后的消息需要反序列化:def msg_format(msg_str):
try:
msg_dict = {}
msg_list = msg_str.split('/')[0:-1]
for msg in msg_list:
msg = msg.replace('@s', '/').replace('@A', '@')
msg_tmp = msg.split('@=')
msg_dict[msg_tmp[0]] = msg_tmp[1]
return msg_dict
except Exception as e:
print(str(e))
2.抓包分析
建議通過chrome開發者工具抓包,或者使用fiddler抓包,網頁的東西,直接用chrome更方便
為什么不用斗魚開放平臺提供的api直接用呢,因為api是面向注冊的開發者的,api都需要申請使用,普通用戶只能抓包分析通用接口
分析:
wsproxy.douyu.com中交互消息可以獲得多個彈幕服務器地址,我們直接取一個就行,不需要在程序中實現實時分析
然后分析danmuproxy.douyu.com連接中詳細交互信息,實現與服務器交互
匿名登錄時彈幕中禮物消息可能默認被屏蔽,需在登錄時提交開啟指令,或登錄后提交開啟指令
彈幕消息類型與指令非常豐富,這里舉例簡單的消息和指令,詳細可以繼續分析
結果如下:
登錄:type@=loginreq/roomid@=74751/dfl@=/username@=visitor8243989/uid@=1132317461/ver@=20190610/aver@=218101901/ct@=0/
##匿名分配的username與uid可以從wsproxy.douyu.com中交互消息獲取,也可直接自定義提交就行
退出登錄:type@=logout/
心跳消息:type@=mrkl/
加入組消息:type@=joingroup/rid@=74751/gid@=-9999/ #gid默認1,此處改成 - 9999 改成海量彈幕模式
屏蔽禮物消息:type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=1@AS@Ssn@AA=106@ASss@AA=1@AS@Ssn@AA=107@ASss@AA=1@AS@Ssn@AA=108@ASss@AA=1@AS@Ssn@AA=110@ASss@AA=1@AS@Ssn@AA=901@ASss@AA=1@AS@S/
開啟禮物消息:type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=0@AS@Ssn@AA=106@ASss@AA=0@AS@Ssn@AA=107@ASss@AA=0@AS@Ssn@AA=108@ASss@AA=0@AS@Ssn@AA=110@ASss@AA=0@AS@Ssn@AA=901@ASss@AA=0@AS@S/
##開啟與屏蔽禮物可以詳細控制,可以逐項嘗試分析
注:登錄消息中有個字段/dfl@=,此處直接加入屏蔽禮物或開啟禮物指令中相應dfl@=后面的內容,即可在登錄時控制禮物屏蔽或開啟
3.禮物消息處理
獲取斗魚禮物類型,需抓包分析來源網址
url1 = 'https://webconf.douyucdn.cn/resource/common/gift/flash/gift_effect.json'
url2 = 'https://webconf.douyucdn.cn/resource/common/prop_gift_list/prop_gift_config.json'
將獲取到的禮物json處理一下,合并且去除不需要信息,做成字典:
def get_gift_dict():
gift_json = {}
gift_json1 = requests.get('https://webconf.douyucdn.cn/resource/common/gift/flash/gift_effect.json').text
gift_json2 = requests.get('https://webconf.douyucdn.cn/resource/common/prop_gift_list/prop_gift_config.json').text
gift_json1=gift_json1.replace('DYConfigCallback(','')[0:-2]
gift_json2=gift_json2.replace('DYConfigCallback(','')[0:-2]
gift_json1 = json.loads(gift_json1)['data']['flashConfig']
gift_json2= json.loads(gift_json2)['data']
for gift in gift_json1:
gift_json[gift] = gift_json1[gift]['name']
for gift in gift_json2:
gift_json[gift] = gift_json2[gift]['name']
return gift_json
4.websocket連接
python環境先安裝websocket包 pip3 install websocket-client
引入websocket包后,使用websecket.WebSocketApp建立websecket連接客戶端,創建時在on_open、on_error、on_message、on_close4個參數中傳入相應事件發生時需要處理的方法
顧名思義,4個參數很好理解了
websocket.WebSocketApp(url, on_open=on_open, on_error=on_error,on_message=on_message, on_close=on_close)
5.效果與源碼
實際效果展示:
源碼(演示代碼,異常判斷,數據校驗就不做了):
__author__ = 'admin'
import websocket
import threading
import time
import requests
import json
class DyDanmu:
def __init__(self, roomid, url):
self.gift_dict = self.get_gift_dict()
self.gift_dict_keys = self.gift_dict.keys()
self.room_id = roomid
self.client = websocket.WebSocketApp(url, on_open=self.on_open, on_error=self.on_error,
on_message=self.on_message, on_close=self.on_close)
self.heartbeat_thread = threading.Thread(target=self.heartbeat)
def start(self):
self.client.run_forever()
def stop(self):
self.logout()
self.client.close()
def on_open(self):
self.login()
self.join_group()
self.heartbeat_thread.setDaemon(True)
self.heartbeat_thread.start()
def on_error(self, error):
print(error)
def on_close(self):
print('close')
def send_msg(self, msg):
msg_bytes = self.msg_encode(msg)
self.client.send(msg_bytes)
def on_message(self, msg):
message = self.msg_decode(msg)
# print(message)
for msg_str in message:
msg_dict = self.msg_format(msg_str)
if msg_dict['type'] == 'chatmsg':
print(msg_dict['nn'] + ':' + msg_dict['txt'])
if msg_dict['type'] == 'dgb':
if msg_dict['gfid'] in self.gift_dict_keys:
print(msg_dict['nn'] + '\t送出\t' + msg_dict['gfcnt'] + '\t個\t' + self.gift_dict[msg_dict['gfid']])
else:
print(msg_dict['nn'] + '\t送出\t' + msg_dict['gfcnt'] + '\t個\t' + msg_dict['gfid'] + '\t未知禮物')
# print(msg_dict)
# 發送登錄信息
def login(self):
login_msg = 'type@=loginreq/roomid@=%s/' \
'dfl@=sn@AA=105@ASss@AA=0@AS@Ssn@AA=106@ASss@AA=0@AS@Ssn@AA=107@ASss@AA=0@AS@Ssn@AA=108@ASss@AA=0@AS@Ssn@AA=110@ASss@AA=0@AS@Ssn@AA=901@ASss@AA=0/' \
'username@=%s/uid@=%s/ltkid@=/biz@=/stk@=/devid@=8d8c22ce6093e6a7264f99da00021501/ct@=0/pt@=2/cvr@=0/tvr@=7/apd@=/rt@=1605498503/vk@=0afb8a90c2cb545e8459d60c760dc08b/' \
'ver@=20190610/aver@=218101901/dmbt@=chrome/dmbv@=78/' % (
self.room_id, 'visitor4444086', '1178849206'
)
self.send_msg(login_msg)
def logout(self):
logout_msg = 'type@=logout/'
self.send_msg(logout_msg)
# 發送入組消息
def join_group(self):
join_group_msg = 'type@=joingroup/rid@=%s/gid@=-9999/' % (self.room_id)
self.send_msg(join_group_msg)
# 關閉禮物信息推送
def close_gift(self):
close_gift_msg = 'type@=dmfbdreq/dfl@=sn@AA=105@ASss@AA=1@AS@Ssn@AA=106@ASss@AA=1@AS@Ssn@AA=107@ASss@AA=1@AS@Ssn@AA=108@ASss@AA=1@AS@Ssn@AA=110@ASss@AA=1@AS@Ssn@AA=901@ASss@AA=1@AS@S/'
self.send_msg(close_gift_msg)
# 保持心跳線程
def heartbeat(self):
while True:
# 45秒發送一個心跳包
self.send_msg('type@=mrkl/')
print('發送心跳')
time.sleep(45)
def msg_encode(self, msg):
# 消息以 \0 結尾,并以utf-8編碼
msg = msg + '\0'
msg_bytes = msg.encode('utf-8')
#消息長度 + 頭部長度8
length_bytes = int.to_bytes(len(msg) + 8, 4, byteorder='little')
#斗魚客戶端發送消息類型 689
type = 689
type_bytes = int.to_bytes(type, 2, byteorder='little')
# 加密字段與保留字段,默認 0 長度各 1
end_bytes = int.to_bytes(0, 1, byteorder='little')
#按順序相加 消息長度 + 消息長度 + 消息類型 + 加密字段 + 保留字段
head_bytes = length_bytes + length_bytes + type_bytes + end_bytes + end_bytes
#消息頭部拼接消息內容
data = head_bytes + msg_bytes
return data
def msg_decode(self, msg_bytes):
# 定義一個游標位置
cursor = 0
msg = []
while cursor < len(msg_bytes):
#根據斗魚協議,報文 前四位與第二個四位,都是消息長度,取前四位,轉化成整型
content_length = int.from_bytes(msg_bytes[cursor: (cursor + 4) - 1], byteorder='little')
#報文長度不包含前4位,從第5位開始截取消息長度的字節流,并扣除前8位的協議頭,取出正文,用utf-8編碼成字符串
content = msg_bytes[(cursor + 4) + 8:(cursor + 4) + content_length - 1].decode(encoding='utf-8',
errors='ignore')
msg.append(content)
cursor = (cursor + 4) + content_length
# print(msg)
return msg
def msg_format(self, msg_str):
try:
msg_dict = {}
msg_list = msg_str.split('/')[0:-1]
for msg in msg_list:
msg = msg.replace('@s', '/').replace('@A', '@')
msg_tmp = msg.split('@=')
msg_dict[msg_tmp[0]] = msg_tmp[1]
return msg_dict
except Exception as e:
print(str(e))
def get_gift_dict(self):
gift_json = {}
gift_json1 = requests.get('https://webconf.douyucdn.cn/resource/common/gift/flash/gift_effect.json').text
gift_json2 = requests.get(
'https://webconf.douyucdn.cn/resource/common/prop_gift_list/prop_gift_config.json').text
gift_json1 = gift_json1.replace('DYConfigCallback(', '')[0:-2]
gift_json2 = gift_json2.replace('DYConfigCallback(', '')[0:-2]
gift_json1 = json.loads(gift_json1)['data']['flashConfig']
gift_json2 = json.loads(gift_json2)['data']
for gift in gift_json1:
gift_json[gift] = gift_json1[gift]['name']
for gift in gift_json2:
gift_json[gift] = gift_json2[gift]['name']
return gift_json
if __name__ == '__main__':
roomid = '74751'
url = 'wss://danmuproxy.douyu.com:8506/'
dy = DyDanmu(roomid, url)
dy.start()
總結
以上是生活随笔為你收集整理的python websocket爬虫_python根据websocket抓取斗鱼弹幕和礼物消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用了vscode和clion我都裂开了
- 下一篇: mysql运营_为线上运营Mysql数据