自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

手把手教你在Windows下設(shè)置分布式隊列Celery的心跳輪詢

系統(tǒng) Windows 分布式
大家好,我是吳老板。用Celery 官方的話來說,Celery 是一個非常優(yōu)秀的分布式隊列,可應(yīng)用于分布式共享中間隊列和定時任務(wù)等等。

[[385390]]

1 前言

大家好,我是吳老板。用Celery 官方的話來說,Celery 是一個非常優(yōu)秀的分布式隊列,可應(yīng)用于分布式共享中間隊列和定時任務(wù)等等。

2 版本的差異

Celery 有很多個版本,各版本之間的差異可謂不小,比如最新的 Celery6.0 版本在穩(wěn)定性遠(yuǎn)不如 Celery4.0,所以在使用不同版本的時候,系統(tǒng)給到我們的反饋可能并不能如我們所愿。

3 服務(wù)

在 windows 下掛在 Celery 服務(wù)有時候會出現(xiàn)不穩(wěn)定的情況(unix中暫時未發(fā)現(xiàn)這種情況),比如在執(zhí)行定時任務(wù)的時候,過了一段時間之后,Celery 出現(xiàn)了假死狀態(tài),以至于不能按照我們指定的時間點去執(zhí)行任務(wù)。

這些任務(wù)只是加入到待運(yùn)行隊列中(堆積在 Redis 中),只能人為重啟 Celery 服務(wù)之后才能將堆積的任務(wù)釋放出來運(yùn)行。

這樣一來,第一是定時任務(wù)在指定時間點沒有正常運(yùn)行,其二是在其他時間運(yùn)行了這些任務(wù),很可能會產(chǎn)生更新數(shù)據(jù)不及時,時間節(jié)點混亂的問題,不僅達(dá)不到業(yè)務(wù)需求,還會反受其害。

4 設(shè)置心跳

為了解決 Celery 在 windows 中的這種弊端,可以為 Celery 任務(wù)隊列設(shè)置一個心跳時間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊列服務(wù)就不會出現(xiàn)假死狀態(tài)。

5 舉個栗子

我總是很喜歡用示例來說話,前些時間在對某平臺的商家后臺進(jìn)行數(shù)據(jù)采集的時候,為了使用時能自動獲取該網(wǎng)站的 cookie ,

用Pyppeteer 寫了一個自動化登陸的腳本,和往常一樣仍在 Celery 隊列中并迅速的啟動服務(wù)。

腳本是這樣的(非常接近實際的偽代碼,沒辦法,保命要緊)

  1. # -*- coding: utf-8 -*- 
  2. from db.redisCurd import RedisQueue 
  3. import asyncio 
  4. import random 
  5. import tkinter 
  6. from pyppeteer.launcher import launch 
  7. from platLogin.config import USERNAME, PASSWORD, LOGIN_URL 
  8.  
  9. class Login(): 
  10.     def __init__(self, shopId): 
  11.         self.shopId = shopId 
  12.         self.RedisQueue = RedisQueue("cookie"
  13.  
  14.     def screen_size(self): 
  15.         tk = tkinter.Tk() 
  16.         width = tk.winfo_screenwidth() 
  17.         height = tk.winfo_screenheight() 
  18.         tk.quit() 
  19.         return {'width': width, 'height': height} 
  20.  
  21.     async def login(self, username, password, url): 
  22.         browser = await launch( 
  23.             { 
  24.                 'headless'False
  25.                 'dumpio'True 
  26.             }, 
  27.             args=['--no-sandbox''--disable-infobars''--user-data-dir=./userData'], 
  28.         ) 
  29.         page = await browser.newPage()  # 啟動新的瀏覽器頁面 
  30.  
  31.         try: 
  32.             await page.setViewport(viewport=self.screen_size()) 
  33.             await page.setJavaScriptEnabled(enabled=True)  # 啟用js 
  34.             await page.setUserAgent( 
  35.                 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299' 
  36.             ) 
  37.             await self.page_evaluate(page) 
  38.             await page.goto(url) 
  39.             await asyncio.sleep(2) 
  40.             # 輸入用戶名,密碼 
  41.             await page.evaluate(f'document.querySelector("#userName").value=""'
  42.             await page.type('#userName', username, {'delay': self.input_time_random() - 50})  # delay是限制輸入的時間 
  43.             await page.evaluate('document.querySelector("#passWord").value=""'
  44.             await page.type('#passWord'password, {'delay': self.input_time_random()}) 
  45.             await page.waitFor(6000) 
  46.  
  47.             loginImgVcode = await page.waitForSelector('#checkCode')   
  48.             await loginImgVcode.screenshot({'path''./loginImg.png'}) 
  49.             await page.waitFor(6000) 
  50.  
  51.             res = use_cjy("./loginImg.png"
  52.             pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234" 
  53.  
  54.             await page.waitFor(6000) 
  55.             await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50}) 
  56.             await page.waitFor(6000) 
  57.  
  58.             await page.click('#subMit'
  59.             await page.waitFor(6000) 
  60.             await asyncio.sleep(2) 
  61.             await self.get_cookie(page) 
  62.             await page.waitFor(3000) 
  63.             await self.page_close(browser) 
  64.             return {'code': 200, 'msg''登陸成功'
  65.         except
  66.             return {'code': -1, 'msg''出錯'
  67.  
  68.         finally: 
  69.             await page.waitFor(3000) 
  70.             await self.page_close(browser) 
  71.  
  72.     # 獲取登錄后cookie 
  73.     async def get_cookie(self, page): 
  74.         cookies_list = await page.cookies() 
  75.         cookies = '' 
  76.         for cookie in cookies_list: 
  77.             str_cookie = '{0}={1}; ' 
  78.             str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value')) 
  79.             cookies += str_cookie 
  80.         # 將cookie 放入 cookie 池 
  81.         self.RedisQueue.put_hash(self.shopId, cookies) 
  82.         return cookies 
  83.  
  84.     async def page_evaluate(self, page): 
  85.         await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }'''
  86.         await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {},  }; }'''
  87.         await page.evaluate( 
  88.             '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }'''
  89.         await page.evaluate( 
  90.             '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }'''
  91.         await page.waitFor(3000) 
  92.  
  93.     async def page_close(self, browser): 
  94.         for _page in await browser.pages(): 
  95.             await _page.close() 
  96.         await browser.close() 
  97.  
  98.     def input_time_random(self): 
  99.         return random.randint(100, 151) 
  100.  
  101.     def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL): 
  102.         loop = asyncio.get_event_loop() 
  103.         i_future = asyncio.ensure_future(self.login(username, password, url)) 
  104.         loop.run_until_complete(i_future) 
  105.         return i_future.result() 
  106.  
  107.  
  108. if __name__ == '__main__'
  109.     Z = Login(shopId="001"
  110.     Z.run() 

Celery 任務(wù)文件是這樣的

  1. # -*- coding: utf-8 -*- 
  2. from __future__ import absolute_import 
  3. import os 
  4. import sys 
  5. import time 
  6. from db.redisCurd import RedisQueue 
  7. from send_msg.weinxin import Send_msg 
  8. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 
  9. sys.path.append(base_dir) 
  10. from logger.logger import log_v 
  11. from celery import Task 
  12. from platLogin.login import Login  # 登陸類 
  13. from celery import Celery 
  14.  
  15. randomQueue = RedisQueue("cookie"
  16.  
  17. celery_app = Celery('task'
  18. celery_app.config_from_object('celeryConfig'
  19.  
  20. S = Send_msg() 
  21.  
  22. dl_dict = { 
  23.     'demo': { 
  24.         'cookie'''
  25.         'loginClass''Login'
  26.     } 
  27.  
  28. # todo 這是三種運(yùn)行的狀態(tài) 
  29. class task_status(Task): 
  30.     def on_success(self, retval, task_id, args, kwargs):  
  31.         log_v.info('任務(wù)信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args)) 
  32.  
  33.     def on_failure(self, exc, task_id, args, kwargs, einfo):   
  34.         log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc)) 
  35.  
  36.     def on_retry(self, exc, task_id, args, kwargs, einfo):  
  37.         log_v.warning('task id:{} , arg:{} , retry !  info: {}'.format(task_id, args, exc)) 
  38.  
  39.  
  40. # todo 隨便找個hash key作為輪詢對象, celery在win10系統(tǒng)可能不太穩(wěn)定,有時候會有連接斷開的情況 
  41. @celery_app.task(base=task_status) 
  42. def get_cookie_status(platName="demo"): 
  43.     try: 
  44.         # log_v.debug(f'[+] 輪詢 {platName} 定時器啟動 ..... Done'
  45.         randomQueue.get_hash(platName).decode() 
  46.         log_v.debug(f'[+] 輪詢 {platName} 成功 ..... Done'
  47.         return "Erp 輪詢成功" 
  48.     except
  49.         return "Erp 輪詢失敗" 
  50.  
  51.  
  52. @celery_app.task(base=task_status) 
  53. def set_plat_cookie(platName="demo", shopId=None): 
  54.     log_v.debug(f"[+] {platName} 正在登陸"
  55.     core = eval(dl_dict[platName]['loginClass'])(shopId=shopId) 
  56.     result = core.run() 
  57.     return result 

Celery 配置文件是這樣的

  1. from __future__ import absolute_import 
  2. import datetime 
  3. from kombu import Exchange, Queue 
  4. from celery.schedules import crontab 
  5. from urllib import parse 
  6.  
  7. BROKER_URL = f'redis://root:{parse.quote("你的不規(guī)則密碼")}@主機(jī):6379/15' 
  8.  
  9. # 導(dǎo)入任務(wù),如tasks.py 
  10. CELERY_IMPORTS = ('monitor.tasks',) 
  11.  
  12. # 列化任務(wù)載荷的默認(rèn)的序列化方式 
  13. CELERY_TASK_SERIALIZER = 'json' 
  14.  
  15. # 結(jié)果序列化方式 
  16. CELERY_RESULT_SERIALIZER = 'json' 
  17. CELERY_ACCEPT_CONTENT = ['json'
  18.  
  19. CELERY_TIMEZONE = 'Asia/Shanghai'  # 指定時區(qū),不指定默認(rèn)為 'UTC' 
  20. # CELERY_TIMEZONE='UTC' 
  21.  
  22. CELERYBEAT_SCHEDULE = { 
  23.     'add-every-60-seconds': { 
  24.         'task''tasks.get_cookie_status'
  25.         'schedule': datetime.timedelta(minutes=1),  # 每 1 分鐘執(zhí)行一次 
  26.         'args': ()  # 任務(wù)函數(shù)參數(shù) 
  27.     }, 

啟動服務(wù)

  1. celery -A tasks beat -l INFO 
  2. celery -A tasks worker -l INFO -c 2 

以 2 個線程啟動消費者隊列服務(wù)并啟用定時任務(wù),當(dāng)發(fā)現(xiàn)當(dāng)前平臺的 cookie 不可用時,我會向 Celery 發(fā)送一個信號(就是調(diào)用了前面的set_plat_cookie 這個方法),消費者得到這個任務(wù)這個就會執(zhí)行自動化腳本以獲取 cookie 并儲存在 Redis 中,使用時在從 Redis 中獲取就能正常請求到該平臺的數(shù)據(jù)。

在空閑時間,Celery中的 get_cookie_status 方法會每隔一分鐘向 Redis 請求數(shù)據(jù),這就是我們設(shè)置的 1分鐘心跳。

這樣不管我們的 Celery 是否是后臺啟動,都不會出現(xiàn)假死、卡死的狀態(tài),則萬事大吉矣!!

6 總結(jié)

 

本文為了解決 Celery 在 windows 中的這種弊端,為 Celery 任務(wù)隊列設(shè)置一個心跳時間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊列服務(wù)都不會出現(xiàn)假死、卡死的狀態(tài)。

 

責(zé)任編輯:武曉燕 來源: Python爬蟲與數(shù)據(jù)挖掘
相關(guān)推薦

2018-05-22 15:30:30

Python網(wǎng)絡(luò)爬蟲分布式爬蟲

2020-06-01 16:25:43

WindowsLinux命令

2018-05-09 09:44:51

Java分布式系統(tǒng)

2021-10-30 19:30:23

分布式Celery隊列

2018-07-02 08:25:14

2011-01-10 14:41:26

2011-05-03 15:59:00

黑盒打印機(jī)

2021-07-14 09:00:00

JavaFX開發(fā)應(yīng)用

2011-08-29 18:03:47

設(shè)置路由器路由器

2023-05-26 00:34:21

WindowsHadoopLinux

2021-09-26 16:08:23

CC++clang_forma

2021-02-26 11:54:38

MyBatis 插件接口

2011-02-22 13:46:27

微軟SQL.NET

2021-12-28 08:38:26

Linux 中斷喚醒系統(tǒng)Linux 系統(tǒng)

2009-12-15 16:44:07

水星路由器設(shè)置教程

2022-07-27 08:16:22

搜索引擎Lucene

2022-01-08 20:04:20

攔截系統(tǒng)調(diào)用

2022-03-14 14:47:21

HarmonyOS操作系統(tǒng)鴻蒙

2023-04-26 12:46:43

DockerSpringKubernetes

2022-12-07 08:42:35

點贊
收藏

51CTO技術(shù)棧公眾號