airflow2.0.2分布式安装文档

更多内容可关注兔八哥杂谈

需要安装的组件

组件 功能
Airflow Webserver 查询元数据以监控和执行DAGs的web界面。
Airflow Scheduler 它检查元数据数据库中的DAG和任务的状态,
在必要时创建新任务,并将任务发送到队列。
Airflow Metadata Database 它包含DAG运行和任务实例的状态.
Airflow Message Broker 它将在队列中存储要运行的任务命令。
Airflow Workers 它们从队列中检索命令,执行命令,并更新元数据。

服务器 结点 服务
DATACENTER01 master1 webserver, scheduler,worker,rabbitmq(选装)
DATACENTER03 master2 webserver,scheduler
DATACENTER04 worker1 worker
DATACENTER05 worker2 worker

安装步骤:

准备环境

一、安装erlang

因为要用到RabbitMQ,由于rabbitmq是基于erlang语言开发的,所以必须先安装erlang。

版本对应,若无要求按照文档来即可

  • 安装依赖

    yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel

  • 下载erlang

    wget http://erlang.org/download/otp_src_24.0.tar.gz

    wget https://fossies.org/linux/misc/otp_src_24.0.tar.gz(比较快)

  • 解压otp_src_24.0.tar.gz

    tar -zxvf otp_src_24.0.tar.gz

  • 移动位置

    mv otp_src_24.0 /usr/local/

  • 切换目录

    cd /usr/local/otp_src_24.0/

  • 创建即将安装的目录

    mkdir ../erlang

  • 配置安装路径

    ./configure --prefix=/usr/local/erlang

    如果遇到如下错误,不管

    airflow2.0.2分布式安装文档

  • 安装

    make && make install

  • 查看一下是否安装成功

    ll /usr/local/erlang/bin

  • 添加环境变量

    echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile

  • 刷新环境变量

    source /etc/profile

  • 测试

    erl

  • 退出

    输入halt().退出

    airflow2.0.2分布式安装文档

至此 erlang 安装完成


二、安装RabbitMQ
  • 下载

    wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-generic-unix-3.8.16.tar.xz

  • 由于是tar.xz格式的所以需要用到xz,没有的话就先安装(可选)

    yum install -y xz

  • 第一次解压

    /bin/xz -d rabbitmq-server-generic-unix-3.8.16.tar.xz

  • 第二次解压

    tar -xvf rabbitmq-server-generic-unix-3.8.16.tar

  • 移走

    mv rabbitmq_server-3.8.16/ /usr/local/

  • 改名

    cd /usr/local/
    mv /usr/local/rabbitmq_server-3.7.15 rabbitmq

  • 配置环境变量

    echo 'export PATH=$PATH:/usr/local/rabbitmq/sbin' >> /etc/profile

  • 刷新环境变量

    source /etc/profile

  • 创建配置目录(未使用单独的配置文件,此步骤可以不用)

    mkdir /etc/rabbitmq

  • 启动

    rabbitmq-server -detached

  • 停止

    rabbitmqctl stop

  • 状态

    rabbitmqctl status

  • 开启web插件

    rabbitmq-plugins enable rabbitmq_management

  • 访问:15672端口

http://127.0.0.1:15672/

默认账号密码:guest guest(这个账号只允许本机访问)

  • 查看所有用户

    rabbitmqctl list_users

  • 添加一个用户

    rabbitmqctl add_user lillcol 123456

  • 配置权限

    rabbitmqctl set_permissions -p "/" lillcol ".*" ".*" ".*"

  • 查看用户权限

    rabbitmqctl list_user_permissions lillcol

  • 设置tag

    rabbitmqctl set_user_tags lillcol administrator

  • 删除用户(安全起见,删除默认用户)

    rabbitmqctl delete_user guest

  • 配置好用户之后重启一下rabbit,然后就可以用新账号进行登陆

    airflow2.0.2分布式安装文档


三、安装python3.7.5

经测试,3.7.5一下版本安装airflow过程中会出现各种问题,所以需要安装3.7.5,(3.7.5+没测试不知道会不会出问题)

  • 安装编译相关工具
yum -y groupinstall "Development tools"
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel
yum install libffi-devel -y
  • 下载安装包解压

    wget https://www.python.org/ftp/python/3.7.5/Python-3.7.5.tar.xz

    tar -xvJf Python-3.7.5.tar.xz

  • 编译安装python

    mkdir /usr/python3.7 #创建编译安装目录

    cd Python-3.7.5

    ./configure --prefix=/usr/python3.7 --enable-optimizations

    make && make install

  • 创建软连接

    ln -s /usr/python3.7/bin/python3 /usr/bin/python3.7

    ln -s /usr/python3.7/bin/pip3 /usr/bin/pip3.7

  • 验证是否成功

    python3.7 -V

    pip3.7 -V

[root@DATACENTER04 bin]# python3.7 -V
Python 3.7.5
[root@DATACENTER04 bin]# pip3.7 -V

pip 19.2.3 from /usr/python3.7/lib/python3.7/site-packages/pip (python 3.7)
  • 升级pip3.7

    安装airflow pip版本过低会导致安装失败

    pip3.7 install --upgrade pip==21.1.2
[root@DATACENTER04 bin]# pip3.7 install --upgrade pip==21.1.2
[root@DATACENTER04 bin]# pip3.7 -V
pip 21.1.2 from /usr/python3.7/lib/python3.7/site-packages/pip (python 3.7)
  • 安装gunicorn

    pip3.7 install --upgrade pip==21.1.2
三、配置mysql

此处使用mysql进行元数据管理,要求mysql 5.7+以上版本。

  • 创建airflow_db库
    CREATE DATABASE airflow_db CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;

注意要用UTF8mb3,UTF8mb4在测试过程中出现错误

  • 修改权限
CREATE USER 'airflow' IDENTIFIED BY 'airflow123';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow';

安装airflow

一、安装airflow
  • 配置apps sudo权限(root)

    给apps用户sudo权限,vi /etc/sudoers,,加入下面语句,否则安装install的时候可能会失败
## Allow root to run any commands anywhere 
root	ALL=(ALL) 	ALL
apps    ALL=(ALL)                NOPASSWD: ALL #加入这一句
  • 配置airflow环境变量(root)

    安装完后airflow安装路径默认为:/home/apps/.local/bin,vi /etc/profile尾部加入如下内容:

    export PATH=$PATH:/usr/python3/bin:/home/apps/.local/bin

    source /etc/profile

此处的/home/apps/.local/bin 为~/.local/bin,

根据实际配置PATH=$PATH:~/.local/bin

  • 配置hosts(root),vi /etc/hosts,加入下面语句
199.232.68.133 raw.githubusercontent.com
  • 配置环境变量(apps)(可选,默认~/airflow)
export AIRFLOW_HOME=~/airflow
  • 配置版本信息(apps)
AIRFLOW_VERSION=2.0.2 # airflow版本
PYTHON_VERSION="$(python3.7 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" # python版本
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" # 约束url
  • 安装airlfow(apps)

    执行安装命令,注意要加sudo,否则会有部分缺失,但是没有报错
sudo pip3.7 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" --use-deprecated legacy-resolver  [-i https://pypi.douban.com/simple]
  • 如果上面的步骤顺利执行,此时会有airflow命令,并且会创建~/airflow,进入airflow目录如下
    airflow2.0.2分布式安装文档

注:上述步骤需要在所有安装结点进行操作


二、配置airflow

再看一遍安装规划

服务器 结点 服务
DATACENTER01 master1 webserver, scheduler,worker,rabbitmq(选装)
DATACENTER03 master2 webserver,scheduler
DATACENTER04 worker1 worker
DATACENTER05 worker2 worker

此时的架构如下

airflow2.0.2分布式安装文档


队列服务及元数据库(Metestore)的高可用

队列服务采用RabbitMQ,已经安装在DATACENTER01,可以通过部署高可用实现队列的高可用,(本案例没有对队列做高可用)

元数据库(Metestore) 高可用
取决于所使用的数据库,此处采用mysql。可以通过部署主从备份实现高可用

  • 配置scheduler高可用

    我们可以通过第三方组件 airflow-scheduler-failover-controller 实现 scheduler 的高可用,安装配置步骤如下:
1. 下载failover
gitclone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
//网络不稳定有时候下不下来,可以去找其他资源然后上传服务器安装

2. 安装failover
cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}
sudo pip3.7 install -e . [-i https://pypi.douban.com/simple] 

3. 初始化 failover
sudo pip3.7 install -e . [-i https://pypi.douban.com/simple] 
//初始化 failover 会向${AIRFLOW_HOME}/airflow.cfg中追加内容

4. 更改${AIRFLOW_HOME}/airflow.cfg配置,4~7 步骤之后的所有步骤可以后面统一操作
scheduler_nodes_in_cluster= DATACENTER01,DATACENTER03

5. 配置DATACENTER01,DATACENTER03之间免密登陆

6. 测试免密登陆
scheduler_failover_controller test_connection  
scheduler_failover_controller get_current_host //获取当前的host,可以用于查看安装情况

7. 启动failover
nohup scheduler_failover_controller start >/dev/null 2>&1 &
  1. failover需要在运行scheduler的服务器上部署,此处需要在DATACENTER01,DATACENTER03部署
  2. 免密登陆配置参考Centos7下实现免密码登录
  • 配置{AIRFLOW_HOME}/airflow.cfg

    将一下内容配置进{AIRFLOW_HOME}/airflow.cfg
1.  Executor 为 CeleryExecutor
# executor = LocalExecutor
executor = CeleryExecutor

2. 指定元数据库(metestore)
#sql_alchemy_conn = sqlite:////home/apps/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://airflow:airflow123@10.0.0.1:3306/airflow_db

3. 设置broker,即消息队列,此处使用 RabbitMQ
# broker_url = redis://redis:6379/0
broker_url = amqp://lillcol:123456@DATACENTER01:5672/

4. 设定结果存储后端 backend
# result_backend = db+postgresql://postgres:airflow@postgres/airflow
# 当然您也可以使用 Redis :celery_result_backend =redis://{REDIS_HOST}:6379/1
# celery_result_backend = db+mysql://airflow:airflow123@10.0.0.1:3306/airflow_db 
# 注意此处要用result_backend,有些博客使用celery_result_backend,但是在测试过程中会无法识别
result_backend = db+mysql://airflow:airflow123@10.0.0.1:3306/airflow_db

5. 配置scheduler_nodes_in_cluster容错结点
scheduler_nodes_in_cluster= DATACENTER01,DATACENTER03

6.修改时区 
# default_timezone = utc
default_timezone = Asia/Shanghai

7. 配置web端口(默认8080,因为已被占用此处改为8081)
endpoint_url = http://localhost:8081
base_url = http://localhost:8081
web_server_port = 8081

8. 关闭加载案例(可选)
# load_examples = True
load_examples = False

9. 邮件相关配置(可选)
[smtp]
smtp_host = mail.ndpmedia.com
smtp_starttls = True
smtp_ssl = False
smtp_user = user
smtp_password = pass
smtp_port = 25
smtp_timeout = 30
smtp_mail_from =与user相同
smtp_retry_limit = 5

将修改后的{AIRFLOW_HOME}/airflow.cfg同步到所有安装airflow的服务器上


三、启动airflow集群
  • 初始化数据库(apps@DATACENTER0):airflow db init

次步骤会在mysql上创建相关元数据表

  • 创建用户(apps@DATACENTER01):
airflow users create 
    --username admin 
    --firstname Peter 
    --lastname Parker 
    --role Admin 
    --email spiderman@superhero.org
Password:123456
  • 启动webserver:
airflow webserver -D

次步骤在DATACENTER01,DATACENTER03执行

  • 启动scheduler
#1. 需要先启动scheduler容错插件scheduler_failover_controller,
#   此步骤在DATACENTER01,DATACENTER03执行
nohup scheduler_failover_controller start >/dev/null 2>&1 &

#2. 启动scheduler,次步骤只需要在DATACENTER01执行
nohup airflow scheduler >/dev/null 2>&1 &

同一时间只能启动一个scheduler,一旦运行 scheduler 守护进程的机器出现故障,立刻启动另一台机器上的 scheduler 。

  • 启动worker
#1. 确保必要软件已经安装
sudo pip3.7 install pymysql
sudo pip3.7 install celery
sudo pip3.7 install flower
sudo pip3.7 install psycopg2-binary

#2. 先启动flower,在需要启动worker服务器执行,此处在DATACENTER01,DATACENTER04执行
airflow celery flower -D

#3. 启动worker,在需要启动worker服务器执行,此处在DATACENTER01,DATACENTER04执行
airflow celery worker -D

确保worker的8793已经开放,WEB UI查看log的时候无法加载相关日志


四、启动airflow集群
  • 登陆web UI
# 因为在DATACENTER01、DATACENTER03启动了webserver,可以通过下面二选一打开WEB UI
http://DATACENTER01:8081
http://DATACENTER03:8081

账号:Admin 密码:123456

登陆后可以新增其他的用户

五、配置、执行dag

  • 配置dags

    airflow 的dags默认在{AIRFLOW_HOME}/dags

    任务通过scheduler调度,并通过worker执行

    所以在所有有启动scheduler和worker的服务器上都要有相同的dags与相关脚本

    即我们需要保证所有结点下的{AIRFLOW_HOME}/dags以及依赖的脚本是一致的

    如果不一致可能导致两个结果:
  1. WEB UI中的dags与{AIRFLOW_HOME}/dags中不一致

取决于当前scheduler上面的{AIRFLOW_HOME}/dags

  1. 任务执行失败

在对应的woker上找不到执行的dag或相关脚本

比如目前scheduler 运行在DATACENTER03,此时{AIRFLOW_HOME}/dags如下:

[apps@DATACENTER03 dags]$ ll
total 40
-rw-r--r-- 1 apps dolphinscheduler 12513 May 28 15:14 DAG_**_D2.py
-rw-r--r-- 1 apps dolphinscheduler 12512 May 25 17:51 DAG_**_D.py
drwxr-xr-x 2 apps dolphinscheduler   132 Jun  4 18:03 __pycache__
-rw-r--r-- 1 apps dolphinscheduler  1381 Jun  4 16:43 TEST_RUN2.py
-rw-r--r-- 1 apps dolphinscheduler  1380 Jun  1 09:02 TEST_RUN.py

WEB UI如下:

airflow2.0.2分布式安装文档

  • 启动任务

    airflow2.0.2分布式安装文档

  • 观测执行情况

    airflow2.0.2分布式安装文档

执行任务的(woker)结点为:DATACENTER01

airflow2.0.2分布式安装文档

执行任务的(woker)结点为:DATACENTER04

所以我们必须保证所有结点的dags 与 依赖脚本同步


错误处理

  • Specified key was too long
[apps@DATACENTER03 airflow]$ airflow db init
...
    raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1071, 'Specified key was too long; max key length is 3072 bytes')
[SQL: ALTER TABLE xcom ADD CONSTRAINT pk_xcom PRIMARY KEY (dag_id, task_id, `key`, execution_date)]

解决办法:

#创建airflow_db时候指定编码
#CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE DATABASE airflow_db CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;

CREATE USER 'airflow' IDENTIFIED BY 'airflow123';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow';

  • explicit_defaults_for_timestamp 错误
MySQL [(none)]> show global variables like '%timestamp%';
+---------------------------------+--------+
| Variable_name                   | Value  |
+---------------------------------+--------+
| explicit_defaults_for_timestamp | OFF    |
| log_timestamps                  | SYSTEM |
+---------------------------------+--------+
2 rows in set (0.02 sec)

# 修改explicit_defaults_for_timestamp=1
MySQL [(none)]> set global explicit_defaults_for_timestamp =1;
Query OK, 0 rows affected (0.00 sec)

MySQL [(none)]> show global variables like '%timestamp%';
+---------------------------------+--------+
| Variable_name                   | Value  |
+---------------------------------+--------+
| explicit_defaults_for_timestamp | ON     |
| log_timestamps                  | SYSTEM |
+---------------------------------+--------+

  • -bash: airflow: command not found

    安装完后没有出现airflow命令以及相关结构,解决办法有两个
  1. 卸载apache-airflow,重新安装一次,命令如下:
sudo pip3.7 uninstall apache-airflow==2.0.2
pip3.7 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" --use-deprecated legacy-resolver
  1. 将~/.local/bin加入PATH ,(推荐,在airflow安装前配置)
PATH=$PATH:~/.local/bin

  • No module named 'airflow'
# 在启动webserver的时候可能会出现下面的错误,同样的处理方法
No module named 'airflow'
No module named 'airflow.www'
No module named 'airflow.www.gunicorn_config'
FileNotFoundError: [Errno 2] No such file or directory: 'gunicorn': 'gunicorn'

解决办法:

#创建/usr/python3.7/bin/gunicorn的软连接替换原来的gunicorn,
#可能在``/usr/python3/bin``或``/usr/bin``下,具体看情况操作
1. 删除原来的软连接 
sudo rm -rf /usr/python3/bin/gunicorn
2. 创建新的软连接 
sudo ln -s /usr/python3.7/bin/gunicorn /usr/python3/bin

airflow webserver启动时,会调用subprocess.Popen创建子进程,webserver使用gunicorn

执行gunicorn启动时,可能是在PATH中找不到该命令报错

也可能是gunicorn的版本过低导致报错

目前的版本至少是gunicorn (version 19.10.0)

  • ModuleNotFoundError: No module named 'MySQLdb'

    启动worker的时候ModuleNotFoundError: No module named 'MySQLdb'

    解决办法安装mysqlclient(python 3.7 要安装mysqlclient):
sudo pip3.7 install mysqlclient

  • 无法读取worker端的log

    airlfow日志默认存储在{AIRFLOW_PATH}/logs/{dag}/...下,

    此时在读取在web 端读取不到日志可能有两种情况
  1. 未开放worker的8793端口,解决办法开放端口
  2. 此目录的的权限问题,开放{AIRFLOW_PATH}的权限即可

  • 配置免密登陆,但是执行scheduler_failover_controller test_connection的时候还是需要输入密码

    免密配置问题,可能两个原因:
  1. 权限问题

    sshd为了安全,对属主的目录和文件权限有所要求。如果权限不对,则ssh的免密码登陆不生效。
    要求如下:
#用户目录权限为 755 或者 700,就是不能是77x。
#.ssh目录权限一般为755或者700。
#rsa_id.pub 及authorized_keys权限一般为644
#rsa_id权限必须为600
将目录改成对应的权限即可
  1. 防火墙的问题

    关闭防火墙测试
systemctl status firewalld

参考文档:

官方安装文档

airflow 的安装部署与填坑


如何部署一个健壮的 apache-airflow 调度系统

更多内容可关注兔八哥杂谈