用Python教你如何“養(yǎng)”一只DHT爬蟲
廢話少說(shuō), 直接上菜.
我假設(shè)你了解:
1, DHT協(xié)議
2, 網(wǎng)絡(luò)字節(jié)序/主機(jī)字節(jié)序
3, bencode
4, UDP
5, 種子文件構(gòu)造
不懂的趕緊去google, 要是缺一個(gè), 我會(huì)一口鹽汽水噴死你的!
最重要的是, 你必須會(huì)編程!!!!!!! 必須會(huì)!!!!!!!!!!!
ok, DHT原理是什么我在這就不寫了, 畢竟會(huì)看我這文章的人都是已經(jīng)知道了的.
本文貼的代碼均為Python, 使用其他編程語(yǔ)言的人可以看注釋. 為了簡(jiǎn)單, 只會(huì)說(shuō)大概思路和關(guān)鍵性代碼, 細(xì)節(jié)自行搞定.
本文講的是要實(shí)現(xiàn)一個(gè)爬蟲, 所以不會(huì)跟協(xié)議文檔那么嚴(yán)格. 只要保證你能正確請(qǐng)求,回應(yīng)即可. 用軟件開發(fā)的一句話來(lái)說(shuō): 只要接口一致, 管你內(nèi)部細(xì)節(jié)代碼是怎么寫的.
第一步, 構(gòu)建自己的路由表, 這里涉及到大量Python代碼, 請(qǐng)深呼吸:
在構(gòu)建自己的路由表之前, 得寫兩個(gè)輔助函數(shù), 后面會(huì)用到:
- from hashlib import sha1
- from random import randint
- def node_id():
- """
- 把爬蟲"偽裝"成正常node, 一個(gè)正常的node有ip, port, node ID三個(gè)屬性, 因?yàn)槭腔赨DP協(xié)議,
- 所以向?qū)Ψ桨l(fā)送信息時(shí), 即使沒"明確"說(shuō)明自己的ip和port時(shí), 對(duì)方自然會(huì)知道你的ip和port,
- 反之亦然. 那么我們自身node就只需要生成一個(gè)node ID就行, 協(xié)議里說(shuō)到node ID用sha1算法生成,
- sha1算法生成的值是長(zhǎng)度是20 byte, 也就是20 * 8 = 160 bit, 正好如DHT協(xié)議里說(shuō)的那范圍: 0 至 2的160次方,
- 也就是總共能生成1461501637330902918203684832716283019655932542976個(gè)獨(dú)一無(wú)二的node.
- ok, 由于sha1總是生成20 byte的值, 所以哪怕你寫SHA1(20)或SHA1(19)或SHA1("I am a 2B")都可以,
- 只要保證大大降低與別人重復(fù)幾率就行. 注意, node ID非十六進(jìn)制,
- 也就是說(shuō)非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA這個(gè)樣子, 即非hash.hexdigest().
- """
- hash = sha1()
- s = ""
- for i in range(20):
- s += chr(randint(0, 255))
- hash.update(s)
- return hash.digest()
- def intify(nid):
- """這是一個(gè)小工具, 把一個(gè)node ID轉(zhuǎn)換為數(shù)字. 后面會(huì)頻繁用到."""
- assert len(nid) == 20
- return long(nid.encode('hex'), 16) #先轉(zhuǎn)換成16進(jìn)制, 再變成數(shù)字
協(xié)議里說(shuō)道, table里有bucket, bucket里有node, 每個(gè)bucket有K個(gè)node, 目前K=8, 即每個(gè)bucket有8個(gè)node. 由于table范圍是0到2的160次方, 那么一個(gè)table最多能有(2的160次方)/K那么多的bucket.
OK, 按照OOP編程思想來(lái)說(shuō), 那么肯定會(huì)有table, bucket, node這3個(gè)類, 無(wú)OOP的, 自己看著辦.
由于是基于Kademila而寫, 所以我習(xí)慣上把這三個(gè)類名變?yōu)镵Table, KBucket, KNode:
- class KNode:
- def __init__(self, nid, ip, port):
- """
- nid就是node ID的簡(jiǎn)寫, 就不取id這么模糊的變量名了. __init__方法相當(dāng)于別的OOP語(yǔ)言中的構(gòu)造方法,
- 在python嚴(yán)格來(lái)說(shuō)不是構(gòu)造方法, 它是初始化, 不過(guò), 功能差不多就行.
- """
- self.nid = nid #node ID
- self.ip = ip
- self.port = port
- #以下兩個(gè)方法非Python程序員不需關(guān)心
- def __eq__(self, other):
- return self.nid == other.nid
- def __ne__(self, other):
- return self.nid != other.nid
- class KBucket:
- def __init__(self, min, max):
- """
- min和max就是該bucket負(fù)責(zé)的范圍, 比如該bucket的min:0, max:16的話,
- 那么存儲(chǔ)的node的intify(nid)值均為: 0到15, 那16就不負(fù)責(zé), 這16將會(huì)是該bucket后面的bucket的min值.
- nodes屬性就是個(gè)列表, 存儲(chǔ)node. last_accessed代表最后訪問(wèn)時(shí)間, 因?yàn)閰f(xié)議里說(shuō)到,
- 當(dāng)該bucket負(fù)責(zé)的node有請(qǐng)求, 回應(yīng)操作; 刪除node; 添加node; 更新node; 等這些操作時(shí),
- 那么就要更新該bucket, 所以設(shè)置個(gè)last_accessed屬性, 該屬性標(biāo)志著這個(gè)bucket的"新鮮程度". 用linux話來(lái)說(shuō), touch一下.
- 這個(gè)用來(lái)便于后面說(shuō)的定時(shí)刷新路由表.
- """
- self.min = min #最小node ID數(shù)字值
- self.max = max #最大node ID數(shù)字值
- self.nodes = [] #node列表
- self.last_accessed = time() #最后訪問(wèn)時(shí)間
- def nid_in_range(self, nid):
- """判斷指定的node ID是否屬于該bucket的范圍里"""
- return self.min <= intify(nid) < self.max
- def append(self, node):
- """
- 添加node, 參數(shù)node是KNode實(shí)例.
- 如果新插入的node的nid屬性長(zhǎng)度不等于20, 終止.
- 如果滿了, 拋出bucket已滿的錯(cuò)誤, 終止. 通知上層代碼進(jìn)行拆表.
- 如果未滿, 先看看新插入的node是否已存在, 如果存在, 就替換掉, 不存在, 就添加,
- 添加/替換時(shí), 更新該bucket的"新鮮程度".
- """
- if len(node.nid) != 20: return
- if len(self.nodes) < 8:
- if node in self.nodes:
- self.nodes.remove(node)
- self.nodes.append(node)
- else:
- self.nodes.append(node)
- self.last_accessed = time()
- else:
- raise BucketFull
- class KTable:
- """
- 該類只實(shí)例化一次.
- """
- def __init__(self, nid):
- """
- 這里的nid就是通過(guò)node_id()函數(shù)生成的自身node ID. 協(xié)議里說(shuō)道, 每個(gè)路由表至少有一個(gè)bucket,
- 還規(guī)定第一個(gè)bucket的min=0, max=2的160次方, 所以這里就給予了一個(gè)buckets屬性來(lái)存儲(chǔ)bucket, 這個(gè)是列表.
- """
- self.nid = nid #自身node ID
- self.buckets = [KBucket(0, 2 ** 160)] #存儲(chǔ)bucket的例表
- def append(self, node):
- """添加node, 參數(shù)node是KNode實(shí)例"""
- #如果待插入的node的ID與自身一樣, 那么就忽略, 終止接下來(lái)的操作.
- if node.nid == self.nid: return
- #定位出待插入的node應(yīng)該屬于哪個(gè)bucket.
- index = self.bucket_index(node.nid)
- bucket = self.buckets[index]
- #協(xié)議里說(shuō)道, 插入新節(jié)點(diǎn)時(shí), 如果所歸屬的bucket是滿的, 又都是活躍節(jié)點(diǎn),
- #那么先看看自身的node ID是否在該bucket的范圍里, 如果不在該范圍里, 那么就把
- #該node忽略掉, 程序終止; 如果在該范圍里, 就要把該bucket拆分成兩個(gè)bucket, 按范圍"公平平分"node.
- try:
- bucket.append(node)
- except BucketFull:
- if not bucket.nid_in_range(self.nid): return #這個(gè)步驟很重要, 不然遞歸循環(huán)很狂暴, 導(dǎo)致程序死翹翹.
- self.split_bucket(index)
- self.append(node)
- def bucket_index(self, nid):
- """
- 定位bucket的所在索引
- 傳一個(gè)node的ID, 從buckets屬性里循環(huán), 定位該nid屬于哪個(gè)bucket, 找到后, 返回對(duì)應(yīng)的bucket的索引;
- 沒有找到, 說(shuō)明就是要?jiǎng)?chuàng)建新的bucket了, 那么索引值就是最大索引值+1.
- 注意: 為了簡(jiǎn)單, 就采用循環(huán)方式. 這個(gè)恐怕不是最有效率的方式.
- """
- for index, bucket in enumerate(self.buckets):
- if bucket.nid_in_range(nid):
- return index
- return index
- def split_bucket(self, index):
- """
- 拆表
- index是待拆分的bucket(old bucket)的所在索引值.
- 假設(shè)這個(gè)old bucket的min:0, max:16. 拆分該old bucket的話, 分界點(diǎn)是8, 然后把old bucket的max改為8, min還是0.
- 創(chuàng)建一個(gè)新的bucket, new bucket的min=8, max=16.
- 然后根據(jù)的old bucket中的各個(gè)node的nid, 看看是屬于哪個(gè)bucket的范圍里, 就裝到對(duì)應(yīng)的bucket里.
- 各回各家,各找各媽.
- new bucket的所在索引值就在old bucket后面, 即index+1, 把新的bucket插入到路由表里.
- """
- old = self.buckets[index]
- point = old.max - (old.max - old.min)/2
- new = KBucket(point, old.max)
- old.max = point
- self.buckets.insert(index + 1, new)
- for node in old:
- if new.nid_in_range(node.nid):
- new.append(node)
- for node in new:
- old.remove(node)
- def find_close_nodes(self, target):
- """
- 返回與目標(biāo)node ID或infohash的最近K個(gè)node.
- 定位出與目標(biāo)node ID或infohash所在的bucket, 如果該bucuck有K個(gè)節(jié)點(diǎn), 返回.
- 如果不夠到K個(gè)節(jié)點(diǎn)的話, 把該bucket前面的bucket和該bucket后面的bucket加起來(lái), 只返回前K個(gè)節(jié)點(diǎn).
- 還是不到K個(gè)話, 再重復(fù)這個(gè)動(dòng)作. 要注意不要超出最小和最大索引范圍.
- 總之, 不管你用什么算法, 想盡辦法找出最近的K個(gè)節(jié)點(diǎn).
- """
- nodes = []
- if len(self.buckets) == 0: return nodes
- index = self.bucket_index(target)
- nodes = self.buckets[index].nodes
- min = index - 1
- max = index + 1
- while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
- if min >= 0:
- nodes.extend(self.buckets[min].nodes)
- if max < len(self.buckets):
- nodes.extend(self.buckets[max].nodes)
- min -= 1
- max += 1
- num = intify(target)
- nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid)))
- return nodes[:K] #K是個(gè)常量, K=8
ok, 路由表這個(gè)玩意兒比較有技術(shù)含量, 又難以描述, 所以會(huì)貼代碼演示. 接下來(lái)的DHT客戶端/服務(wù)端就不再貼那么多的代碼了, 畢竟處理網(wǎng)絡(luò)的代碼太多太復(fù)雜, 技術(shù)含量不高, 按照DHT協(xié)議描述的那樣操作就行了.
第二步, 實(shí)現(xiàn)DHT客戶端
#p#
實(shí)現(xiàn)一個(gè)DHT客戶端, 不用都要實(shí)現(xiàn)ping, find_node, get_peers, announce_peer操作了, 做一個(gè)爬蟲, 僅僅只需要實(shí)現(xiàn)find_node功能就能達(dá)到目的了. 因?yàn)槲覀冎幌氩煌5亟慌笥讯?
要想加入DHT網(wǎng)絡(luò), 首先得認(rèn)識(shí)第一個(gè)已知的node, 這個(gè)node最好是長(zhǎng)期在線的, 又穩(wěn)定的. 我這里就認(rèn)識(shí)一個(gè), dht.transmissionbt.com:6881, 由于UDP的原因, 只能接受ip, 所以請(qǐng)?zhí)崆敖馕龀鰅p地址.
然后使用DHT協(xié)議說(shuō)的那個(gè)find_node操作, 現(xiàn)在解釋一下某些key的潛在意義吧
t: transaction ID的簡(jiǎn)稱, 傳輸ID. 起什么作用呢? 由于UDP無(wú)3次握手這個(gè)機(jī)制, 所以任何人都可隨便發(fā)送信息給你, 根本就不需與你提前進(jìn)行連接. 想想這么個(gè)情況, 你發(fā)送了一請(qǐng)求數(shù)據(jù)給某node, 然后收到DHT說(shuō)的回復(fù)類型的數(shù)據(jù), 即y=r, 但是, 你怎么知道回應(yīng)的是你之前的哪個(gè)請(qǐng)求呢? 所以就要靠t了, 當(dāng)你發(fā)送時(shí), t=aa的話, 對(duì)方回應(yīng)這個(gè)請(qǐng)求時(shí), 回應(yīng)消息的t絕對(duì)是aa, 這樣你就能區(qū)分了. 在發(fā)送之前, 要為該t值注冊(cè)一個(gè)處理函數(shù), 當(dāng)收到回應(yīng)時(shí), 調(diào)用該函數(shù)進(jìn)行處理. 記得設(shè)置一個(gè)定時(shí)器, 時(shí)間一到, 立馬刪除該函數(shù), 不然你內(nèi)存飆升. 如果超時(shí)后才收到信息的話, 很遺憾, 沒了處理函數(shù), 直接忽略. 我建議定時(shí)器設(shè)定到5秒. 當(dāng)然, 隨便你. 一定要保證成功收到第一個(gè)node的find_node回復(fù), 不然失敗, 就沒法繼續(xù)find_node了.即: 認(rèn)識(shí)不到第一個(gè)朋友, 就別想認(rèn)識(shí)第二個(gè)朋友.
id: 就是自身node ID了, 是node_id()函數(shù)生成的哈, 樣子絕不是DHT協(xié)議例子中的樣子, 這主要是為了方便演示而已
target: 這個(gè)也是自身的node ID, 作用是問(wèn)某node離我最近的都有哪些node.
收到對(duì)方回復(fù)后, 把那key為nodes給解析出來(lái), 按DHT協(xié)議說(shuō)的, 每個(gè)node是以4字節(jié)ip+2字節(jié)port+20字節(jié)node ID組成, 那么nodes的字節(jié)數(shù)就會(huì)是26的倍數(shù). "解碼"node和"編碼"node的Python代碼如下:
- from struct import unpack
- def num_to_dotted(n):
- d = 256 * 256 * 256
- q = []
- while d > 0:
- m, n = divmod(n, d)
- q.append(str(m))
- d /= 256
- return '.'.join(q)
- def decode_nodes(nodes):
- n = []
- nrnodes = len(nodes) / 26
- nodes = unpack("!" + "20sIH" * nrnodes, nodes)
- for i in range(nrnodes):
- nid, ip, port = nodes[i * 3], num_to_dotted(nodes[i * 3 + 1]), nodes[i * 3 + 2]
- n.append((nid, ip, port))
- return n
- decode_nodes函數(shù)的反作用函數(shù)如下:
- def dotted_to_num(ip):
- hexn = ''.join(["%02X" % long(i) for i in ip.split('.')])
- return long(hexn, 16)
- def encode_nodes(nodes):
- n = []
- for node in nodes:
- n.extend([node.nid, dotted_to_num(node.ip), node.port])
- return pack("!" + "20sIH" * len(nodes), *n)
解析出來(lái)后, 插入到路由表里, 然后使用find_node繼續(xù)向剛解析出來(lái)的node進(jìn)行請(qǐng)求, target還是自身node ID, 以此循環(huán). 這樣就能認(rèn)識(shí)很多很多的node啦. 細(xì)節(jié)就不說(shuō)了, 自己看著辦.
第三步, 實(shí)現(xiàn)DHT服務(wù)器端, 協(xié)議文檔說(shuō)得很清楚了, 我只列出幾個(gè)需要注意的問(wèn)題:
1, 服務(wù)器端得實(shí)現(xiàn)處理node發(fā)來(lái)的ping, find_node, get_peers, announce_peer請(qǐng)求.
2, 回應(yīng)信息里的t的值是對(duì)方的t值, 不是自己隨便寫的.
3, 最好要實(shí)現(xiàn)那個(gè)token機(jī)制, 這樣就減少被搗亂的幾率, 此token就按協(xié)議那種方式就行, 每5分鐘變換一次, 10分鐘內(nèi)的有效.
4, 一定要記得前面說(shuō)的那句"當(dāng)該bucket負(fù)責(zé)的node有請(qǐng)求, 回應(yīng)操作; 刪除node; 添加node; 更新node; 等這些操作時(shí), 那么就刷新該bucket".
5, 由于是做DHT爬蟲, 所以處理get_peers請(qǐng)求和find_node請(qǐng)求差不多一樣, 都是返回最近的node. 當(dāng)然, 你閑得蛋疼的話, 可以來(lái)得標(biāo)準(zhǔn)點(diǎn), 做能返回peers那種, 不過(guò), 沒必要.
6, announce_peer請(qǐng)求里的port就是對(duì)方提供下載種子的端口, 監(jiān)聽于TCP, 不是DHT連接的端口. 還有請(qǐng)求消息里的id就僅僅指的是對(duì)方的node ID, 我看博客園某人寫的文章說(shuō)是對(duì)方的peer ID, 我表示很不解.
第四步, 定時(shí)刷新路由表
按協(xié)議所說(shuō), 過(guò)一段時(shí)間后, 路由表里的node不全都是活躍了, 所以要定時(shí)刷新路由表. 說(shuō)下大概思路, 遍歷路由表中的bucket列表, 看看bucket的last_accessed是不是超過(guò)了比如說(shuō)15分鐘, 如果超過(guò)了, 說(shuō)明有可能不"新鮮"了, 那么從該bucket隨機(jī)選擇一個(gè)node, 向該node發(fā)送find_node操作, 接著就不管了. 筆者為了簡(jiǎn)單, 就采用這么簡(jiǎn)單的方式了, 沒有去確認(rèn)可疑node是否還"活"著. 你可以嚴(yán)格按照協(xié)議文檔那么寫.
你可能會(huì)問(wèn)的問(wèn)題:
1, 怎么知道一個(gè)資源的下載速度?
答: 有句話這么說(shuō): 下載人數(shù)越多, 下載速度越快. 那么, 如果某一個(gè)資源的infohash出現(xiàn)的announce_peer次數(shù)越高, 那么就說(shuō)明下載人數(shù)就越多, 人數(shù)越多就越快. 這個(gè)下載速度就沒法用絕對(duì)速度表示, 不過(guò)可以使用相對(duì)速度.
2, 怎么在眾多的資源中過(guò)濾出影視資源?
答: 得到種子, 如果有files字段話, 遍歷它進(jìn)行正則匹配, 看看有木有后綴名是rmvb, avi, mp4等什么的, 如果有, 那它大部分情況就是影視了. 如果沒有files字段, 就對(duì)name字段進(jìn)行正則匹配吧.
3, 為什么那些影視資源總是些"很黃很暴力"?
答: 這個(gè)就不是很清楚了, 我想, 使用BT的人大多數(shù)都是些擼管男吧.
4, infohash這個(gè)詞是根據(jù)什么而來(lái)的?
答: 種子文件中info字段的hash值
5, 如何認(rèn)識(shí)更多的node?
答: 多開啟幾個(gè)node實(shí)例.
6, 什么情況下, 對(duì)方把我給加入到對(duì)方的路由表里?
答: 當(dāng)你向?qū)Ψ絝ind_node時(shí). 也許你的ping, get_peers也能讓對(duì)方把你添加到路由表里. 同理, 當(dāng)你接到ping, find_node, get_peers時(shí), 就把對(duì)方給添加到路由表里, 至少收到find_node請(qǐng)求時(shí), 就要把對(duì)方給添加到路由表里.
7, 如何長(zhǎng)期保存node?
答: 數(shù)據(jù)庫(kù), 文件都行, 我不建議長(zhǎng)期保存node, 你只要一直在線, 使用內(nèi)存存儲(chǔ)最好不過(guò)了, 速度快, 代碼簡(jiǎn)單.
原文鏈接:http://www.cnblogs.com/52web/p/How-to-write-a-DHT-crawler.html