使用 AirFlow 調(diào)度 MaxCompute
背景
airflow是Airbnb開源的一個用python編寫的調(diào)度工具,基于有向無環(huán)圖(DAG),airflow可以定義一組有依賴的任務(wù),按照依賴依次執(zhí)行,通過python代碼定義子任務(wù),并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調(diào)度MaxCompute 任務(wù)
一、環(huán)境準(zhǔn)備
Python 2.7.5 PyODPS支持Python2.6以上版本
Airflow apache-airflow-1.10.7
1.安裝MaxCompute需要的包
pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10 # 可選,安裝后能加速Tunnel上傳。
pip install cython>=0.19.0 # 可選,不建議Windows用戶安裝。
pip install pyodps
注意:如果requests包沖突,先卸載再安裝對應(yīng)的版本
2.執(zhí)行如下命令檢查安裝是否成功
python -c "from odps import ODPS"
二、開發(fā)步驟
1.在Airflow家目錄編寫python調(diào)度腳本Airiflow_MC.py
- # -*- coding: UTF-8 -*-
- import sys
- import os
- from odps import ODPS
- from odps import options
- from airflow import DAG
- from airflow.operators.python_operator import PythonOperator
- from datetime import datetime, timedelta
- from configparser import ConfigParser
- import time
- reload(sys)
- sys.setdefaultencoding('utf8')
- #修改系統(tǒng)默認(rèn)編碼。
- # MaxCompute參數(shù)設(shè)置
- options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
- cfg = ConfigParser()
- cfg.read("odps.ini")
- print(cfg.items())
- odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))
- default_args = {
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'retry_delay': timedelta(minutes=5),
- 'start_date':datetime(2020,1,15)
- # 'email': ['airflow@example.com'],
- # 'email_on_failure': False,
- # 'email_on_retry': False,
- # 'retries': 1,
- # 'queue': 'bash_queue',
- # 'pool': 'backfill',
- # 'priority_weight': 10,
- # 'end_date': datetime(2016, 1, 1),
- }
- dag = DAG(
- 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
- def read_sql(sqlfile):
- with io.open(sqlfile, encoding='utf-8', mode='r') as f:
- sql=f.read()
- f.closed
- return sql
- def get_time():
- print '當(dāng)前時間是{}'.format(time.time())
- return time.time()
- def mc_job ():
- project = odps.get_project() # 取到默認(rèn)項目。
- instance=odps.run_sql("select * from long_chinese;")
- print(instance.get_logview_address())
- instance.wait_for_success()
- with instance.open_reader() as reader:
- count = reader.count
- print("查詢表數(shù)據(jù)條數(shù):{}".format(count))
- for record in reader:
- print record
- return count
- t1 = PythonOperator (
- task_id = 'get_time' ,
- provide_context = False ,
- python_callable = get_time,
- dag = dag )
- t2 = PythonOperator (
- task_id = 'mc_job' ,
- provide_context = False ,
- python_callable = mc_job ,
- dag = dag )
- t2.set_upstream(t1)
2.提交
- python Airiflow_MC.py
3.進行測試
- # print the list of active DAGs
- airflow list_dags
- # prints the list of tasks the "tutorial" dag_id
- airflow list_tasks Airiflow_MC
- # prints the hierarchy of tasks in the tutorial DAG
- airflow list_tasks Airiflow_MC --tree
- #測試task
- airflow test Airiflow_MC get_time 2010-01-16
- airflow test Airiflow_MC mc_job 2010-01-16
4.運行調(diào)度任務(wù)
登錄到web界面點擊按鈕運行
5.查看任務(wù)運行結(jié)果
1.點擊view log
2.查看結(jié)果