自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

使用 AirFlow 調(diào)度 MaxCompute

開發(fā) 后端
airflow是Airbnb開源的一個用python編寫的調(diào)度工具,基于有向無環(huán)圖(DAG),airflow可以定義一組有依賴的任務(wù),按照依賴依次執(zhí)行,通過python代碼定義子任務(wù),并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調(diào)度MaxCompute 任務(wù)

背景

airflow是Airbnb開源的一個用python編寫的調(diào)度工具,基于有向無環(huán)圖(DAG),airflow可以定義一組有依賴的任務(wù),按照依賴依次執(zhí)行,通過python代碼定義子任務(wù),并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調(diào)度MaxCompute 任務(wù)

一、環(huán)境準(zhǔn)備

Python 2.7.5 PyODPS支持Python2.6以上版本
Airflow apache-airflow-1.10.7

1.安裝MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可選,安裝后能加速Tunnel上傳。

pip install cython>=0.19.0 # 可選,不建議Windows用戶安裝。

pip install pyodps

注意:如果requests包沖突,先卸載再安裝對應(yīng)的版本

2.執(zhí)行如下命令檢查安裝是否成功

python -c "from odps import ODPS"

二、開發(fā)步驟

1.在Airflow家目錄編寫python調(diào)度腳本Airiflow_MC.py

 

  1. # -*- coding: UTF-8 -*- 
  2.  
  3. import sys 
  4.  
  5. import os 
  6.  
  7. from odps import ODPS 
  8.  
  9. from odps import options 
  10.  
  11. from airflow import DAG 
  12.  
  13. from airflow.operators.python_operator import PythonOperator 
  14.  
  15. from datetime import datetime, timedelta 
  16.  
  17. from configparser import ConfigParser 
  18.  
  19. import time 
  20.  
  21. reload(sys) 
  22.  
  23. sys.setdefaultencoding('utf8'
  24.  
  25. #修改系統(tǒng)默認(rèn)編碼。 
  26.  
  27. # MaxCompute參數(shù)設(shè)置 
  28.  
  29. options.sql.settings = {'options.tunnel.limit_instance_tunnel'False'odps.sql.allow.fullscan'True
  30.  
  31. cfg = ConfigParser() 
  32.  
  33. cfg.read("odps.ini"
  34.  
  35. print(cfg.items()) 
  36.  
  37. odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint")) 

  1. default_args = { 
  2.  
  3. 'owner''airflow'
  4.  
  5. 'depends_on_past'False
  6.  
  7. 'retry_delay': timedelta(minutes=5), 
  8.  
  9. 'start_date':datetime(2020,1,15) 
  10.  
  11. 'email': ['airflow@example.com'], 
  12.  
  13. 'email_on_failure'False
  14.  
  15. 'email_on_retry'False
  16.  
  17. 'retries': 1, 
  18.  
  19. 'queue''bash_queue'
  20.  
  21. 'pool''backfill'
  22.  
  23. 'priority_weight': 10, 
  24.  
  25. 'end_date': datetime(2016, 1, 1), 
  26.  
  27.  
  28. dag = DAG( 
  29.  
  30. 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30)) 
  31.  
  32. def read_sql(sqlfile): 
  33.  
  34. with io.open(sqlfile, encoding='utf-8', mode='r'as f: 
  35.  
  36. sql=f.read() 
  37.  
  38. f.closed 
  39.  
  40. return sql 
  41.  
  42. def get_time(): 
  43.  
  44. print '當(dāng)前時間是{}'.format(time.time()) 
  45.  
  46. return time.time() 
  47.  
  48. def mc_job (): 
  49.  
  50. project = odps.get_project() # 取到默認(rèn)項目。 
  51.  
  52. instance=odps.run_sql("select * from long_chinese;"
  53.  
  54. print(instance.get_logview_address()) 
  55.  
  56. instance.wait_for_success() 
  57.  
  58. with instance.open_reader() as reader: 
  59.  
  60. count = reader.count 
  61.  
  62. print("查詢表數(shù)據(jù)條數(shù):{}".format(count)) 
  63.  
  64. for record in reader: 
  65.  
  66. print record 
  67.  
  68. return count 
  69.  
  70. t1 = PythonOperator ( 
  71.  
  72. task_id = 'get_time' , 
  73.  
  74. provide_context = False , 
  75.  
  76. python_callable = get_time, 
  77.  
  78. dag = dag ) 
  79.  
  80. t2 = PythonOperator ( 
  81.  
  82. task_id = 'mc_job' , 
  83.  
  84. provide_context = False , 
  85.  
  86. python_callable = mc_job , 
  87.  
  88. dag = dag ) 
  89.  
  90. t2.set_upstream(t1) 

2.提交

  1. python Airiflow_MC.py 

3.進行測試

  1. # print the list of active DAGs 
  2.  
  3. airflow list_dags 
  4.  
  5. # prints the list of tasks the "tutorial" dag_id 
  6.  
  7. airflow list_tasks Airiflow_MC 
  8.  
  9. # prints the hierarchy of tasks in the tutorial DAG 
  10.  
  11. airflow list_tasks Airiflow_MC --tree 
  12.  
  13. #測試task 
  14.  
  15. airflow test Airiflow_MC get_time 2010-01-16 
  16.  
  17. airflow test Airiflow_MC mc_job 2010-01-16 

4.運行調(diào)度任務(wù)

登錄到web界面點擊按鈕運行

5.查看任務(wù)運行結(jié)果

1.點擊view log

2.查看結(jié)果

責(zé)任編輯:梁菲 來源: 阿里云云棲號
相關(guān)推薦

2017-07-04 13:37:57

調(diào)度工具Airflow開源

2021-11-29 08:48:00

K8S KubernetesAirflow

2021-07-27 15:56:28

MaxCompute 資源優(yōu)化

2021-07-08 09:51:18

MaxCompute SQL數(shù)據(jù)處理

2022-09-16 11:23:59

Python框架Celery

2022-01-05 19:34:18

AirflowCeleryMYSQL

2022-01-03 23:59:15

任務(wù)調(diào)度框架

2021-05-13 12:00:51

cron調(diào)度任務(wù)系統(tǒng)運維

2022-01-05 00:03:32

場景容器Airflow

2021-07-29 11:30:41

SaaS云數(shù)據(jù)倉庫MaxCompute

2021-07-15 17:35:28

MaxCompute logview 阿里云

2022-01-25 18:24:20

KubernetesDeschedule

2025-04-07 04:00:00

教學(xué)型任務(wù)調(diào)度系統(tǒng)

2023-08-30 07:14:27

MaxCompute湖倉一體

2022-12-30 12:02:59

數(shù)據(jù)

2021-06-21 17:00:05

云計算Hologres云原生

2024-04-17 07:21:52

物化視圖查詢加速器數(shù)據(jù)倉庫

2009-08-05 10:08:55

MySQL查詢優(yōu)化調(diào)度鎖定

2024-10-21 09:18:47

2021-09-09 09:43:38

MaxComputePAI 阿里云
點贊
收藏

51CTO技術(shù)棧公眾號