">
前言
雖然標(biāo)題是全站,但目前只做了等級(jí) top 100 直播間的全天彈幕收集。
彈幕收集系統(tǒng)基于之前的B 站直播彈幕姬 Python 版修改而來(lái)。具體協(xié)議分析可以看上一篇文章。
直播彈幕協(xié)議是直接基于 TCP 協(xié)議,所以如果 B 站對(duì)類似我這種行為做反制措施,比較困難。應(yīng)該有我不知道的技術(shù)手段來(lái)檢測(cè)類似我這種惡意行為。
我試過(guò)同時(shí)連接 100 個(gè)房間,和連接單個(gè)房間 100 次的實(shí)驗(yàn),都沒(méi)有問(wèn)題。>150 會(huì)被關(guān)閉鏈接。
直播間的選取
現(xiàn)在彈幕收集系統(tǒng)在選取直播間上比較簡(jiǎn)單,直接選取了等級(jí) top100。
以后會(huì)修改這部分,改成定時(shí)去 http://live.bilibili.com/all 查看新開(kāi)播的直播間,并動(dòng)態(tài)添加任務(wù)。
異步任務(wù)和彈幕存儲(chǔ)
收集系統(tǒng)仍舊使用了 asyncio 異步協(xié)程框架,對(duì)于每一個(gè)直播間都使用如下方法來(lái)加進(jìn) loop 中。
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
其實(shí)若將心跳任務(wù) HeartbeatLoop 放入 connectorServer 中去啟動(dòng),代碼看起來(lái)更優(yōu)雅一些。但這么做是因?yàn)槲倚枰S護(hù)一個(gè)任務(wù)列表,后面會(huì)有描述。
在彈幕存儲(chǔ)上我花了些時(shí)間選擇。
數(shù)據(jù)庫(kù)存儲(chǔ)是一個(gè)同步 IO 的過(guò)程,Insert 的時(shí)候會(huì)阻塞彈幕收集的任務(wù)。雖然有 aiomysql 這種異步接口,但配置數(shù)據(jù)庫(kù)太麻煩,我的設(shè)想是這個(gè)小系統(tǒng)能夠方便地部署。
最終我選擇使用自帶的 sqlite3。但 sqlite3 無(wú)法做并行操作,故開(kāi)了一個(gè)線程單獨(dú)進(jìn)行數(shù)據(jù)庫(kù)存儲(chǔ)。在另一個(gè)線程中,100 * 2 個(gè)任務(wù)搜集所有的彈幕、人數(shù)信息,并塞進(jìn)隊(duì)列 commentq, numq 中。存儲(chǔ)線程每隔 10s 喚醒一次,將隊(duì)列中的數(shù)據(jù)寫進(jìn) sqlite3 中,并清空隊(duì)列。
在多線程和異步的配合下,網(wǎng)絡(luò)流量沒(méi)有被阻塞。
可能的連接失敗場(chǎng)景處理
彈幕協(xié)議是直接基于 TCP,位與位直接關(guān)聯(lián)性較強(qiáng),一旦解析錯(cuò)誤,很容易就拋 Exception(個(gè)人感覺(jué),雖然 TCP 是可靠傳輸,但B站服務(wù)器自身發(fā)生錯(cuò)誤也是有可能的)。所以有必要設(shè)計(jì)一個(gè)自動(dòng)重連機(jī)制。
在 asyncio 文檔中提到,
Done means either that a result / exception are available, or that the future was cancelled.
函數(shù)正常返回、拋出異常或者是被 cancel,都會(huì)退出當(dāng)前任務(wù)。可以使用 done() 來(lái)判斷。
每一個(gè)直播間對(duì)應(yīng)兩個(gè)任務(wù),解析任務(wù)是最容易掛的,但并不會(huì)影響心跳任務(wù),所以必須找出并將對(duì)應(yīng)心跳任務(wù)結(jié)束。
在創(chuàng)建任務(wù)的時(shí)候使用字典記錄每個(gè)房間的兩個(gè)任務(wù),
self.tasks[url] = [task1, task2]
在運(yùn)行過(guò)程中,每隔 10s 做一次檢查,
for url in self.tasks: item = self.tasks[url] task1 = item[0] task2 = item[1] if task1.done() == True or task2.done() == True: if task1.done() == False: task1.cancel() if task2.done() == False: task2.cancel() danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task11 = asyncio.ensure_future(danmuji.connectServer()) task22 = asyncio.ensure_future(danmuji.HeartbeatLoop()) self.tasks[url] = [task11, task22]
實(shí)際我只見(jiàn)過(guò)一次任務(wù)失敗的場(chǎng)景,是因?yàn)橹鞑シ块g被封了,導(dǎo)致無(wú)法進(jìn)入直播間。
結(jié)論
B站人數(shù)是按照連接彈幕服務(wù)器的鏈接數(shù)量統(tǒng)計(jì)的。通過(guò)操縱鏈接量,可以瞬間增加任意人數(shù)觀看,有商機(jī)?
運(yùn)行的這幾天中,發(fā)現(xiàn)即使大部分房間不在直播,也能有 >5 的人數(shù),包括凌晨。我只能猜測(cè)也有和我一樣的人在 24h 收集彈幕。
top100 平均一天 40M 彈幕數(shù)據(jù)。
收集的彈幕能做什么?還沒(méi)想好,可能可以拿來(lái)做用戶行為分析 -_^
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com