Airflow2.2.3 + Celery + MySQL 8構(gòu)建一個健壯的分布式調(diào)度集群
前面聊了Airflow基礎(chǔ)架構(gòu)??,以及又講了如何在容器化內(nèi)部署Airflow??,今天我們就再來看看如何通過Airflow和celery構(gòu)建一個健壯的分布式調(diào)度集群。
1集群環(huán)境
同樣是在Ubuntu 20.04.3 LTS機器上安裝Airflow集群,這次我們準備三臺同等配置服務(wù)器,進行測試,前篇文章??[1]中,我們已經(jīng)在Bigdata1服務(wù)器上安裝了airflow的所有組件,沒看過的可以點擊鏈接先看下之前的文章,現(xiàn)在只需要在其他兩個節(jié)點安裝worker組件即可。
Bigdata1(A) | Bigdata2(B) | Bigdata3(C) | |
---|---|---|---|
Webserver | √ | ||
Scheduler | √ | ||
Worker | √ | √ | √ |
在上篇文章中的docker-compose.yml中沒有對部署文件以及數(shù)據(jù)目錄進行的分離,這樣在后期管理的時候不太方便,因此我們可以把服務(wù)停止后,將數(shù)據(jù)庫以及數(shù)據(jù)目錄與部署文件分開
- 部署文件:docker-compose.yaml/.env 存放在/apps/airflow目錄下
- MySQL以及配置文件: 放在/data/mysql
- airflow數(shù)據(jù)目錄: 放在/data/airflow
這樣拆分開就方便后期的統(tǒng)一管理了。
2部署worker服務(wù)
前期準備
- mkdir /data/airflow/{dags,plugins} -pv
- mkdir -pv /apps/airflow
- mkdir -pv /logs/airflow
worker的部署文件:
- ---
- version: '3'
- x-airflow-common:
- &airflow-common
- # In order to add custom dependencies or upgrade provider packages you can use your extended image.
- # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
- # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
- image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}
- # build: .
- environment:
- &airflow-common-env
- AIRFLOW__CORE__EXECUTOR: CeleryExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #修改MySQL對應(yīng)的賬號和密碼
- AIRFLOW__CELERY__RESULT_BACKEND: db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow #修改MySQL對應(yīng)的賬號和密碼
- AIRFLOW__CELERY__BROKER_URL: redis://:xxxx@$${REDIS_HOST}:7480/0 #修改Redis的密碼
- AIRFLOW__CORE__FERNET_KEY: ''
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
- AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
- AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
- _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
- volumes:
- - /data/airflow/dags:/opt/airflow/dags
- - /logs/airflow:/opt/airflow/logs
- - /data/airflow/plugins:/opt/airflow/plugins
- - /data/airflow/airflow.cfg:/opt/airflow/airflow.cfg
- user: "${AIRFLOW_UID:-50000}:0"
- services:
- airflow-worker:
- <<: *airflow-common
- command: celery worker
- healthcheck:
- test:
- - "CMD-SHELL"
- - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
- interval: 10s
- timeout: 10s
- retries: 5
- environment:
- <<: *airflow-common-env
- # Required to handle warm shutdown of the celery workers properly
- # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
- DUMB_INIT_SETSID: "0"
- restart: always
- hostname: bigdata-20-194 # 此處設(shè)置容器的主機名,便于在flower中查看是哪個worker
- depends_on:
- airflow-init:
- condition: service_completed_successfully
- airflow-init:
- <<: *airflow-common
- entrypoint: /bin/bash
- # yamllint disable rule:line-length
- command:
- - -c
- - |
- function ver() {
- printf "%04d%04d%04d%04d" $${1//./ }
- }
- airflow_version=$$(gosu airflow airflow version)
- airflow_version_comparable=$$(ver $${airflow_version})
- min_airflow_version=2.2.0
- min_airflow_version_comparable=$$(ver $${min_airflow_version})
- if (( airflow_version_comparable < min_airflow_version_comparable )); then
- echo
- echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
- echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
- echo
- exit 1
- fi
- if [[ -z "${AIRFLOW_UID}" ]]; then
- echo
- echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
- echo "If you are on Linux, you SHOULD follow the instructions below to set "
- echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
- echo "For other operating systems you can get rid of the warning with manually created .env file:"
- echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
- echo
- fi
- one_meg=1048576
- mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
- cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
- disk_available=$$(df / | tail -1 | awk '{print $$4}')
- warning_resources="false"
- if (( mem_available < 4000 )) ; then
- echo
- echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
- echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
- echo
- warning_resources="true"
- fi
- if (( cpus_available < 2 )); then
- echo
- echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
- echo "At least 2 CPUs recommended. You have $${cpus_available}"
- echo
- warning_resources="true"
- fi
- if (( disk_available < one_meg * 10 )); then
- echo
- echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
- echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
- echo
- warning_resources="true"
- fi
- if [[ $${warning_resources} == "true" ]]; then
- echo
- echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
- echo "Please follow the instructions to increase amount of resources available:"
- echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
- echo
- fi
- mkdir -p /sources/logs /sources/dags /sources/plugins
- chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
- exec /entrypoint airflow version
- # yamllint enable rule:line-length
- environment:
- <<: *airflow-common-env
- _AIRFLOW_DB_UPGRADE: 'true'
- _AIRFLOW_WWW_USER_CREATE: 'true'
- _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
- _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
- user: "0:0"
- volumes:
- - .:/sources
- airflow-cli:
- <<: *airflow-common
- profiles:
- - debug
- environment:
- <<: *airflow-common-env
- CONNECTION_CHECK_MAX_COUNT: "0"
- # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
- command:
- - bash
- - -c
- - airflow
初始化檢測,檢查環(huán)境是否滿足:
- cd /apps/ariflow/
- echo -e "AIRFLOW_UID=$(id -u)" > .env # 注意,此處一定要保證AIRFLOW_UID是普通用戶的UID,且保證此用戶有創(chuàng)建這些持久化目錄的權(quán)限
- docker-compose up airflow-init
如果數(shù)據(jù)庫已經(jīng)存在,初始化檢測不影響已有的數(shù)據(jù)庫,接下來就運行airflow-worker服務(wù)
- docker-compose up -d
接下來,按照同樣的方式在bigdata3節(jié)點上安裝airflow-worker服務(wù)就可以了。部署完成之后,就可以通過flower查看broker的狀態(tài):
3持久化配置文件
大多情況下,使用airflow多worker節(jié)點的集群,我們就需要持久化airflow的配置文件,并且將airflow同步到所有的節(jié)點上,因此這里需要修改一下docker-compose.yaml中x-airflow-common的volumes,將airflow.cfg通過掛載卷的形式掛載到容器中,配置文件可以在容器中拷貝一份出來,然后在修改;
前期使用的時候,我們需要將docker-compose文件中的一些環(huán)境變量的值寫入到airflow.cfg文件中,例如以下信息:
- [core]
- dags_folder = /opt/airflow/dags
- hostname_callable = socket.getfqdn
- default_timezone = Asia/Shanghai # 修改時區(qū)
- executor = CeleryExecutor
- sql_alchemy_conn = mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
- sql_engine_encoding = utf-8
- sql_alchemy_pool_enabled = True
- sql_alchemy_pool_size = 5
- sql_alchemy_max_overflow = 10
- sql_alchemy_pool_recycle = 1800
- sql_alchemy_pool_pre_ping = True
- sql_alchemy_schema =
- parallelism = 32
- max_active_tasks_per_dag = 16
- dags_are_paused_at_creation = True
- max_active_runs_per_dag = 16
- load_examples = True
- load_default_connections = True
- plugins_folder = /opt/airflow/plugins
- execute_tasks_new_python_interpreter = False
- fernet_key =
- donot_pickle = True
- dagbag_import_timeout = 30.0
- dagbag_import_error_tracebacks = True
- dagbag_import_error_traceback_depth = 2
- dag_file_processor_timeout = 50
- task_runner = StandardTaskRunner
- default_impersonation =
- security =
- unit_test_mode = False
- enable_xcom_pickling = False
- killed_task_cleanup_time = 60
- dag_run_conf_overrides_params = True
- dag_discovery_safe_mode = True
- default_task_retries = 0
- default_task_weight_rule = downstream
- min_serialized_dag_update_interval = 30
- min_serialized_dag_fetch_interval = 10
- max_num_rendered_ti_fields_per_task = 30
- check_slas = True
- xcom_backend = airflow.models.xcom.BaseXCom
- lazy_load_plugins = True
- lazy_discover_providers = True
- max_db_retries = 3
- hide_sensitive_var_conn_fields = True
- sensitive_var_conn_names =
- default_pool_task_slot_count = 128
- [logging]
- base_log_folder = /opt/airflow/logs
- remote_logging = False
- remote_log_conn_id =
- google_key_path =
- remote_base_log_folder =
- encrypt_s3_logs = False
- logging_level = INFO
- fab_logging_level = WARNING
- logging_config_class =
- colored_console_log = True
- colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
- colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
- log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
- simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
- task_log_prefix_template =
- log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
- log_processor_filename_template = {{ filename }}.log
- dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
- task_log_reader = task
- extra_logger_names =
- worker_log_server_port = 8793
- [metrics]
- statsd_on = False
- statsd_host = localhost
- statsd_port = 8125
- statsd_prefix = airflow
- statsd_allow_list =
- stat_name_handler =
- statsd_datadog_enabled = False
- statsd_datadog_tags =
- [secrets]
- backend =
- backend_kwargs =
- [cli]
- api_client = airflow.api.client.local_client
- endpoint_url = http://localhost:8080
- [debug]
- fail_fast = False
- [api]
- enable_experimental_api = False
- auth_backend = airflow.api.auth.backend.deny_all
- maximum_page_limit = 100
- fallback_page_limit = 100
- google_oauth2_audience =
- google_key_path =
- access_control_allow_headers =
- access_control_allow_methods =
- access_control_allow_origins =
- [lineage]
- backend =
- [atlas]
- sasl_enabled = False
- host =
- port = 21000
- username =
- password =
- [operators]
- default_owner = airflow
- default_cpus = 1
- default_ram = 512
- default_disk = 512
- default_gpus = 0
- default_queue = default
- allow_illegal_arguments = False
- [hive]
- default_hive_mapred_queue =
- [webserver]
- base_url = https://devopsman.cn/airflow #自定義airflow域名
- default_ui_timezone = Asia/Shanghai # 設(shè)置默認的時區(qū)
- web_server_host = 0.0.0.0
- web_server_port = 8080
- web_server_ssl_cert =
- web_server_ssl_key =
- web_server_master_timeout = 120
- web_server_worker_timeout = 120
- worker_refresh_batch_size = 1
- worker_refresh_interval = 6000
- reload_on_plugin_change = False
- secret_key = emEfndkf3QWZ5zVLE1kVMg==
- workers = 4
- worker_class = sync
- access_logfile = -
- error_logfile = -
- access_logformat =
- expose_config = False
- expose_hostname = True
- expose_stacktrace = True
- dag_default_view = tree
- dag_orientation = LR
- log_fetch_timeout_sec = 5
- log_fetch_delay_sec = 2
- log_auto_tailing_offset = 30
- log_animation_speed = 1000
- hide_paused_dags_by_default = False
- page_size = 100
- navbar_color = #fff
- default_dag_run_display_number = 25
- enable_proxy_fix = False
- proxy_fix_x_for = 1
- proxy_fix_x_proto = 1
- proxy_fix_x_host = 1
- proxy_fix_x_port = 1
- proxy_fix_x_prefix = 1
- cookie_secure = False
- cookie_samesite = Lax
- default_wrap = False
- x_frame_enabled = True
- show_recent_stats_for_completed_runs = True
- update_fab_perms = True
- session_lifetime_minutes = 43200
- auto_refresh_interval = 3
- [email]
- email_backend = airflow.utils.email.send_email_smtp
- email_conn_id = smtp_default
- default_email_on_retry = True
- default_email_on_failure = True
- [smtp] # 郵箱配置
- smtp_host = localhost
- smtp_starttls = True
- smtp_ssl = False
- smtp_port = 25
- smtp_mail_from = airflow@example.com
- smtp_timeout = 30
- smtp_retry_limit = 5
- [sentry]
- sentry_on = false
- sentry_dsn =
- [celery_kubernetes_executor]
- kubernetes_queue = kubernetes
- [celery]
- celery_app_name = airflow.executors.celery_executor
- worker_concurrency = 16
- worker_umask = 0o077
- broker_url = redis://:xxxx@$${REDIS_HOST}:7480/0
- result_backend = db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow
- flower_host = 0.0.0.0
- flower_url_prefix =
- flower_port = 5555
- flower_basic_auth =
- sync_parallelism = 0
- celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
- ssl_active = False
- ssl_key =
- ssl_cert =
- ssl_cacert =
- pool = prefork
- operation_timeout = 1.0
- task_track_started = True
- task_adoption_timeout = 600
- task_publish_max_retries = 3
- worker_precheck = False
- [celery_broker_transport_options]
- [dask]
- cluster_address = 127.0.0.1:8786
- tls_ca =
- tls_cert =
- tls_key =
- [scheduler]
- job_heartbeat_sec = 5
- scheduler_heartbeat_sec = 5
- num_runs = -1
- scheduler_idle_sleep_time = 1
- min_file_process_interval = 30
- dag_dir_list_interval = 300
- print_stats_interval = 30
- pool_metrics_interval = 5.0
- scheduler_health_check_threshold = 30
- orphaned_tasks_check_interval = 300.0
- child_process_log_directory = /opt/airflow/logs/scheduler
- scheduler_zombie_task_threshold = 300
- catchup_by_default = True
- max_tis_per_query = 512
- use_row_level_locking = True
- max_dagruns_to_create_per_loop = 10
- max_dagruns_per_loop_to_schedule = 20
- schedule_after_task_execution = True
- parsing_processes = 2
- file_parsing_sort_mode = modified_time
- use_job_schedule = True
- allow_trigger_in_future = False
- dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
- trigger_timeout_check_interval = 15
- [triggerer]
- default_capacity = 1000
- [kerberos]
- ccache = /tmp/airflow_krb5_ccache
- principal = airflow
- reinit_frequency = 3600
- kinit_path = kinit
- keytab = airflow.keytab
- forwardable = True
- include_ip = True
- [github_enterprise]
- api_rev = v3
- [elasticsearch]
- host =
- log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
- end_of_log_mark = end_of_log
- frontend =
- write_stdout = False
- json_format = False
- json_fields = asctime, filename, lineno, levelname, message
- host_field = host
- offset_field = offset
- [elasticsearch_configs]
- use_ssl = False
- verify_certs = True
- [kubernetes]
- pod_template_file =
- worker_container_repository =
- worker_container_tag =
- namespace = default
- delete_worker_pods = True
- delete_worker_pods_on_failure = False
- worker_pods_creation_batch_size = 1
- multi_namespace_mode = False
- in_cluster = True
- kube_client_request_args =
- delete_option_kwargs =
- enable_tcp_keepalive = True
- tcp_keep_idle = 120
- tcp_keep_intvl = 30
- tcp_keep_cnt = 6
- verify_ssl = True
- worker_pods_pending_timeout = 300
- worker_pods_pending_timeout_check_interval = 120
- worker_pods_queued_check_interval = 60
- worker_pods_pending_timeout_batch_size = 100
- [smart_sensor]
- use_smart_sensor = False
- shard_code_upper_limit = 10000
- shards = 5
- sensors_enabled = NamedHivePartitionSensor
修改完成之后,重啟一下服務(wù)。
- docker-compose restart
4數(shù)據(jù)同步
因為airflow使用了三個worker節(jié)點,每個節(jié)點修改配置,其他節(jié)點都要同步,同時DAGS目錄以及plugins目錄也需要實時進行同步,在scheduler將信息調(diào)度到某個節(jié)點后,如果找不到對應(yīng)的DAGS文件,就會報錯,因此我們使用lsyncd進行數(shù)據(jù)實時同步:
- apt-get install lsyncd -y
配置節(jié)點之間通過公鑰連接
- ssh-keygen -t rsa -C "airflow-sync" -b 4096 #生成一對名為airflow-sync的密鑰
- for ip in 100 200;do ssh-copy-id -i ~/.ssh/airflow-sync.pub ${USERNAME}@192.168.0.$ip -P12022;done
然后我們就可以通過私鑰訪問了其它節(jié)點了。
編輯同步的配置文件,lsyncd配置的更多參數(shù)學(xué)習(xí),可以直達官方文檔[2]
- settings {
- logfile = "/var/log/lsyncd.log", # 日志文件
- statusFile = "/var/log/lsyncd.status", # 同步狀態(tài)信息
- pidfile = "/var/run/lsyncd.pid",
- statusInterval = 1,
- nodaemon = false, # 守護進程
- inotifyMode = "CloseWrite",
- maxProcesses = 1,
- maxDelays = 1,
- }
- sync {
- default.rsync,
- source = "/data/airflow",
- target = "192.168.0.100:/data/airflow",
- rsync = {
- binary = "/usr/bin/rsync",
- compress = false,
- archive = true,
- owner = true,
- perms = true,
- --delete = true,
- whole_file = false,
- rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync"
- },
- }
- sync {
- default.rsync,
- source = "/data/airflow",
- target = "192.168.0.200:/data/airflow",
- rsync = {
- binary = "/usr/bin/rsync",
- compress = false,
- archive = true,
- owner = true,
- perms = true,
- --delete = true,
- whole_file = false,
- rsh = "/usr/bin/ssh -p 12022 -l suoper -o StrictHostKeyChecking=no -i /home/username/.ssh/airflow-rsync"
- },
- }
以上的參數(shù)是什么意思,可以訪問官網(wǎng)查看,此處是通過rsync的rsh定義ssh命令,能夠解決使用了私鑰,自定義端口等安全措施的場景,當(dāng)然你也可以使用配置無密訪問,然后使用default.rsync或者default.rsyncssh等進行配置。
配置lsyncd的服務(wù)托管
- cat << EOF > /etc/systemd/system/lsyncd.service
- [Unit]
- Description=lsyncd
- ConditionFileIsExecutable=/usr/bin/lsyncd
- After=network-online.target
- Wants=network-online.target
- [Service]
- StartLimitBurst=10
- ExecStart=/usr/bin/lsyncd /etc/lsyncd.conf
- Restart=on-failure
- RestartSec=120
- EnvironmentFile=-/etc/sysconfig/aliyun
- KillMode=process
- [Install]
- WantedBy=multi-user.target
- EOF
- systemctl daemon-reload
- systemctl enable --now lsyncd.service #啟動服務(wù)并配置開啟自啟
這樣就完成了數(shù)據(jù)(dags,plugins,airflow.cfg)的同步問題,后期使用CICD場景的時候,便可以直接將dag文件上傳到Bigdata1節(jié)點上即可,其他兩個節(jié)點就會自動同步了。如果出現(xiàn)問題,可以通過查看日志進行debug
- lsyncd -log all /etc/lsyncd.conf
- tail -f /var/log/lsyncd.log
5反向代理[3]
如果你需要將airflow放在反向代理之后,如https://lab.mycompany.com/myorg/airflow/你可以通過一下配置完成:
在airflow.cfg中配置base_url
- base_url = http://my_host/myorg/airflow
- enable_proxy_fix = True
nginx的配置
- server {
- listen 80;
- server_name lab.mycompany.com;
- location /myorg/airflow/ {
- proxy_pass http://localhost:8080;
- proxy_set_header Host $http_host;
- proxy_redirect off;
- proxy_http_version 1.1;
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection "upgrade";
- }
- }
到這里就基本上完成的airflow分布式調(diào)度集群的安裝了.看下具體效果如下。
看到這里說明你也正在使用或?qū)irflow感興趣,順便送你一個學(xué)習(xí)Airflow資料;
https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1
參考資料
[1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ
[2]lsyncd config file: https://lsyncd.github.io/lsyncd/manual/config/file/
[3]airflow-behind-proxy: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html