Python爬蟲的任務(wù)數(shù)據(jù)操作的小技巧
需求
爬取某網(wǎng)站的項目列表頁,獲取其url,標(biāo)題等信息,作為后續(xù)爬取詳情頁的任務(wù)url。
先上代碼
代碼
- # -*- coding: utf-8 -*-
- # @Time : 2019-11-08 14:04
- # @Author : cxa
- # @File : motor_helper.py
- # @Software: PyCharm
- import asyncio
- import datetime
- from loguru import logger
- from motor.motor_asyncio import AsyncIOMotorClient
- from collections import Iterable
- try:
- import uvloop
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- except ImportError:
- pass
- db_configs = {
- 'host': '127.0.0.1',
- 'port': '27017',
- 'db_name': 'mafengwo',
- 'user': ''
- }
- class MotorOperation:
- def __init__(self):
- self.__dict__.update(**db_configs)
- if self.user:
- self.motor_uri = f"mongodb://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}?authSource={self.db_name}"
- else:
- self.motor_uri = f"mongodb://{self.host}:{self.port}/{self.db_name}"
- self.client = AsyncIOMotorClient(self.motor_uri)
- self.mb = self.client[self.db_name]
- async def save_data_with_status(self, items, col="seed_data"):
- for item in items:
- data = dict()
- data["update_time"] = datetime.datetime.now()
- data["status"] = 0 # 0初始
- data.update(item)
- print("data", data)
- await self.mb[col].update_one({
- "url": item.get("url")},
- {'$set': data, '$setOnInsert': {'create_time': datetime.datetime.now()}},
- upsert=True)
- async def add_index(self, col="seed_data"):
- # 添加索引
- await self.mb[col].create_index('url')
因為我的爬蟲是異步網(wǎng)絡(luò)模塊aiohttp寫的,所以選擇了pymongo的異步版本motor進(jìn)行操作。
異步代碼的基本屬性就是async/await成對的出現(xiàn),如果把上面的await和async去掉,就是類似pymongo的寫法了,這里異步不是重點,重點是我們怎么處理每條數(shù)據(jù)。
這里除了網(wǎng)頁的url,標(biāo)題等信息,我需要附加3個字段。分別是create_time, status,update_time。
這三個字段分別代表,數(shù)據(jù)插入數(shù)據(jù),狀態(tài)和更新時間。
那么我為什么添加三個字段呢?
首先,我們需要判斷每次的任務(wù)數(shù)據(jù)是否存在,我這里的情況是存在就更新不存在就插入,那么我就需要一個查詢條件,作為更新的條件,很顯然這里可以使用任務(wù)的url作為唯一條件(你還可以使用url+標(biāo)題做個md5然后保存),好了查詢條件確定。
下面說create_time這個比較好理解就是數(shù)據(jù)插入時間,關(guān)鍵是為什么還要一個update_time,這個的話和status字段有一定的關(guān)系。畫重點:這個status作為后續(xù)爬蟲進(jìn)行爬取的一個標(biāo)志用。目前這個status有4個值,0-4,我這是這樣定義的,
0:初始狀態(tài)
1:抓取中的任務(wù)
2:抓取成功
3:抓取失敗
4:抓取成功但是沒有匹配到任務(wù)。
后面隨著任務(wù)的爬取,狀態(tài)也是不斷變化的,同時我們需要更新update_time為最新的時間。這個目前的話是體現(xiàn)不出來什么作用,它的使用場景是,重復(fù)任務(wù)的抓取,比如今天我抓取了任務(wù)列表里的url1、url2,第二天的時候我如果再抓到,為了區(qū)分是抓取失敗還是抓取成功,我們根據(jù)create_time和update_time就可以進(jìn)行推斷了,如果兩者相同而且是當(dāng)前的日期說明剛抓的,如果update_time的日期比create_time新可以說明,抓到了重復(fù)的任務(wù)。關(guān)于字段的設(shè)計就啰嗦這么些。
下面是實現(xiàn),我們可以通過update_one方法,對數(shù)據(jù)作存在或者插入操作,因為url作為查詢條件,后面量大的話就最好添加一個索引。也就是上面的 add_index方法。
好了最好說插入更新的具體代碼
需要注意的是
- {'$set': data, '$setOnInsert': {'create_time': datetime.datetime.now()}}
$setOnInsert里面使用的字段是數(shù)據(jù)不存在的時候才插入的,存在就不動了,只插入$set里面指定的。
另外$setOnInsert里面使用的字段不能在$set里面再次出現(xiàn)
upsert=True代表的是不存在就插入。