手把手教你在Windows下設(shè)置分布式隊列Celery的心跳輪詢
1 前言
大家好,我是吳老板。用Celery 官方的話來說,Celery 是一個非常優(yōu)秀的分布式隊列,可應(yīng)用于分布式共享中間隊列和定時任務(wù)等等。
2 版本的差異
Celery 有很多個版本,各版本之間的差異可謂不小,比如最新的 Celery6.0 版本在穩(wěn)定性遠(yuǎn)不如 Celery4.0,所以在使用不同版本的時候,系統(tǒng)給到我們的反饋可能并不能如我們所愿。
3 服務(wù)
在 windows 下掛在 Celery 服務(wù)有時候會出現(xiàn)不穩(wěn)定的情況(unix中暫時未發(fā)現(xiàn)這種情況),比如在執(zhí)行定時任務(wù)的時候,過了一段時間之后,Celery 出現(xiàn)了假死狀態(tài),以至于不能按照我們指定的時間點去執(zhí)行任務(wù)。
這些任務(wù)只是加入到待運(yùn)行隊列中(堆積在 Redis 中),只能人為重啟 Celery 服務(wù)之后才能將堆積的任務(wù)釋放出來運(yùn)行。
這樣一來,第一是定時任務(wù)在指定時間點沒有正常運(yùn)行,其二是在其他時間運(yùn)行了這些任務(wù),很可能會產(chǎn)生更新數(shù)據(jù)不及時,時間節(jié)點混亂的問題,不僅達(dá)不到業(yè)務(wù)需求,還會反受其害。
4 設(shè)置心跳
為了解決 Celery 在 windows 中的這種弊端,可以為 Celery 任務(wù)隊列設(shè)置一個心跳時間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊列服務(wù)就不會出現(xiàn)假死狀態(tài)。
5 舉個栗子
我總是很喜歡用示例來說話,前些時間在對某平臺的商家后臺進(jìn)行數(shù)據(jù)采集的時候,為了使用時能自動獲取該網(wǎng)站的 cookie ,
用Pyppeteer 寫了一個自動化登陸的腳本,和往常一樣仍在 Celery 隊列中并迅速的啟動服務(wù)。
腳本是這樣的(非常接近實際的偽代碼,沒辦法,保命要緊)
- # -*- coding: utf-8 -*-
- from db.redisCurd import RedisQueue
- import asyncio
- import random
- import tkinter
- from pyppeteer.launcher import launch
- from platLogin.config import USERNAME, PASSWORD, LOGIN_URL
- class Login():
- def __init__(self, shopId):
- self.shopId = shopId
- self.RedisQueue = RedisQueue("cookie")
- def screen_size(self):
- tk = tkinter.Tk()
- width = tk.winfo_screenwidth()
- height = tk.winfo_screenheight()
- tk.quit()
- return {'width': width, 'height': height}
- async def login(self, username, password, url):
- browser = await launch(
- {
- 'headless': False,
- 'dumpio': True
- },
- args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'],
- )
- page = await browser.newPage() # 啟動新的瀏覽器頁面
- try:
- await page.setViewport(viewport=self.screen_size())
- await page.setJavaScriptEnabled(enabled=True) # 啟用js
- await page.setUserAgent(
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299'
- )
- await self.page_evaluate(page)
- await page.goto(url)
- await asyncio.sleep(2)
- # 輸入用戶名,密碼
- await page.evaluate(f'document.querySelector("#userName").value=""')
- await page.type('#userName', username, {'delay': self.input_time_random() - 50}) # delay是限制輸入的時間
- await page.evaluate('document.querySelector("#passWord").value=""')
- await page.type('#passWord', password, {'delay': self.input_time_random()})
- await page.waitFor(6000)
- loginImgVcode = await page.waitForSelector('#checkCode')
- await loginImgVcode.screenshot({'path': './loginImg.png'})
- await page.waitFor(6000)
- res = use_cjy("./loginImg.png")
- pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234"
- await page.waitFor(6000)
- await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})
- await page.waitFor(6000)
- await page.click('#subMit')
- await page.waitFor(6000)
- await asyncio.sleep(2)
- await self.get_cookie(page)
- await page.waitFor(3000)
- await self.page_close(browser)
- return {'code': 200, 'msg': '登陸成功'}
- except:
- return {'code': -1, 'msg': '出錯'}
- finally:
- await page.waitFor(3000)
- await self.page_close(browser)
- # 獲取登錄后cookie
- async def get_cookie(self, page):
- cookies_list = await page.cookies()
- cookies = ''
- for cookie in cookies_list:
- str_cookie = '{0}={1}; '
- str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
- cookies += str_cookie
- # 將cookie 放入 cookie 池
- self.RedisQueue.put_hash(self.shopId, cookies)
- return cookies
- async def page_evaluate(self, page):
- await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
- await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
- await page.evaluate(
- '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
- await page.evaluate(
- '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
- await page.waitFor(3000)
- async def page_close(self, browser):
- for _page in await browser.pages():
- await _page.close()
- await browser.close()
- def input_time_random(self):
- return random.randint(100, 151)
- def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):
- loop = asyncio.get_event_loop()
- i_future = asyncio.ensure_future(self.login(username, password, url))
- loop.run_until_complete(i_future)
- return i_future.result()
- if __name__ == '__main__':
- Z = Login(shopId="001")
- Z.run()
Celery 任務(wù)文件是這樣的
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import
- import os
- import sys
- import time
- from db.redisCurd import RedisQueue
- from send_msg.weinxin import Send_msg
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(base_dir)
- from logger.logger import log_v
- from celery import Task
- from platLogin.login import Login # 登陸類
- from celery import Celery
- randomQueue = RedisQueue("cookie")
- celery_app = Celery('task')
- celery_app.config_from_object('celeryConfig')
- S = Send_msg()
- dl_dict = {
- 'demo': {
- 'cookie': '',
- 'loginClass': 'Login',
- }
- }
- # todo 這是三種運(yùn)行的狀態(tài)
- class task_status(Task):
- def on_success(self, retval, task_id, args, kwargs):
- log_v.info('任務(wù)信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))
- def on_retry(self, exc, task_id, args, kwargs, einfo):
- log_v.warning('task id:{} , arg:{} , retry ! info: {}'.format(task_id, args, exc))
- # todo 隨便找個hash key作為輪詢對象, celery在win10系統(tǒng)可能不太穩(wěn)定,有時候會有連接斷開的情況
- @celery_app.task(base=task_status)
- def get_cookie_status(platName="demo"):
- try:
- # log_v.debug(f'[+] 輪詢 {platName} 定時器啟動 ..... Done')
- randomQueue.get_hash(platName).decode()
- log_v.debug(f'[+] 輪詢 {platName} 成功 ..... Done')
- return "Erp 輪詢成功"
- except:
- return "Erp 輪詢失敗"
- @celery_app.task(base=task_status)
- def set_plat_cookie(platName="demo", shopId=None):
- log_v.debug(f"[+] {platName} 正在登陸")
- core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)
- result = core.run()
- return result
Celery 配置文件是這樣的
- from __future__ import absolute_import
- import datetime
- from kombu import Exchange, Queue
- from celery.schedules import crontab
- from urllib import parse
- BROKER_URL = f'redis://root:{parse.quote("你的不規(guī)則密碼")}@主機(jī):6379/15'
- # 導(dǎo)入任務(wù),如tasks.py
- CELERY_IMPORTS = ('monitor.tasks',)
- # 列化任務(wù)載荷的默認(rèn)的序列化方式
- CELERY_TASK_SERIALIZER = 'json'
- # 結(jié)果序列化方式
- CELERY_RESULT_SERIALIZER = 'json'
- CELERY_ACCEPT_CONTENT = ['json']
- CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區(qū),不指定默認(rèn)為 'UTC'
- # CELERY_TIMEZONE='UTC'
- CELERYBEAT_SCHEDULE = {
- 'add-every-60-seconds': {
- 'task': 'tasks.get_cookie_status',
- 'schedule': datetime.timedelta(minutes=1), # 每 1 分鐘執(zhí)行一次
- 'args': () # 任務(wù)函數(shù)參數(shù)
- },
- }
啟動服務(wù)
- celery -A tasks beat -l INFO
- celery -A tasks worker -l INFO -c 2
以 2 個線程啟動消費者隊列服務(wù)并啟用定時任務(wù),當(dāng)發(fā)現(xiàn)當(dāng)前平臺的 cookie 不可用時,我會向 Celery 發(fā)送一個信號(就是調(diào)用了前面的set_plat_cookie 這個方法),消費者得到這個任務(wù)這個就會執(zhí)行自動化腳本以獲取 cookie 并儲存在 Redis 中,使用時在從 Redis 中獲取就能正常請求到該平臺的數(shù)據(jù)。
在空閑時間,Celery中的 get_cookie_status 方法會每隔一分鐘向 Redis 請求數(shù)據(jù),這就是我們設(shè)置的 1分鐘心跳。
這樣不管我們的 Celery 是否是后臺啟動,都不會出現(xiàn)假死、卡死的狀態(tài),則萬事大吉矣!!
6 總結(jié)
本文為了解決 Celery 在 windows 中的這種弊端,為 Celery 任務(wù)隊列設(shè)置一個心跳時間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊列服務(wù)都不會出現(xiàn)假死、卡死的狀態(tài)。