回顾 (Recap)

In the first post of our series, we learned a bit about Apache Airflow and how it can help us build not only Data Engineering & ETL pipelines, but also other types of relevant workflows within advanced analytics, such as MLOps workloads.

在本系列的第一篇文章中 ,我们学到了一些有关Apache Airflow的知识 ,以及它如何帮助我们不仅构建数据工程ETL管道,而且还帮助我们构建了高级分析中其他类型的相关工作流,例如MLOps工作负载。

We skimmed briefly through some of its building blocks, namely Sensors, Operators, Hooks and Executors. These components provide the basic foundation for working with Apache Airflow. Back then, we worked with the SequentialExecutor, the simplest possible Airflow setup. Having support for running only one task at a time, it is used mainly for simple demonstrations. That is obviously not enough for production scenarios, where we might want to have many tasks and workflows being executed in parallel.

我们简要浏览了它的一些构造块,即传感器操作员挂钩执行器 。 这些组件为使用Apache Airflow提供了基础。 那时,我们与SequentialExecutor合作,这是最简单的Airflow设置。 由于支持一次仅运行一个任务,因此主要用于简单的演示。 对于生产场景,这显然是不够的,在生产场景中,我们可能希望并行执行许多任务和工作流。

As we already discussed, Apache Airflow ships with support for multiple types of Executors — each of them is more suited to a particular type of scenario.

正如我们已经讨论的那样, Apache Airflow附带了对多种类型的执行器的支持-每个执行器都更适合于特定类型的场景。

  • LocalExecutor unleashes more possibilities by allowing multiple parallel tasks and/or DAGs. This is achieved by leveraging a full fledged RDBMS for the job metadata database, which can be built on top of a PostgreSql or MySQL database. While you can definitely run some light production workloads using LocalExecutor, for scaling up you would have to resort with vertical scaling by beefing up the hardware resources of your environment.

    LocalExecutor通过允许多个并行任务和/或DAG释放更多可能性。 这是通过将完整的RDBMS用于作业元数据数据库来实现的,该数据库可以建立在PostgreSqlMySQL数据库之上。 虽然您可以肯定地使用LocalExecutor运行一些轻量级工作负载,但要进行扩展,您将不得不通过增强环境的硬件资源来进行垂直扩展。

  • CeleryExecutor addresses this limitation: it unlocks the possibility of scaling the setup horizontally by allowing you to create a processing cluster with multiple machines. Each machine spins up one or more Celery Workers in order to split Airflow’s workload. Communication between workers and the Scheduler is performed through a message broker interface (can be implemented using Redis or RabbitMQ). Having said that, you must define the number of workers beforehand and set it up in Airflow’s configuration, which can make the setup a bit static.

    CeleryExecutor解决了这一限制:它允许您创建具有多台计算机的处理集群,从而释放了水平扩展设置的可能性。 每台机器都会旋转一个或多个Celery Workers ,以分配Airflow的工作量。 通过消息代理接口(可以使用Redis或RabbitMQ来实现)在工作程序与调度程序之间进行通信。 话虽如此,您必须事先定义工作人员的数量,然后在Airflow的配置中进行设置,这会使设置有些静态。

  • DaskExecutor works in a similar way as CeleryExecutor, but instead of Celery, it leverages the Dask framework for achieving distributed computing capabilities.

    DaskExecutor的工作方式与CeleryExecutor相似,但是它代替了Celery,而是利用Dask框架来实现分布式计算功能。

  • More recently, KubernetesExecutor has become available as an option to scale an Apache Airflow setup. It allows you to deploy Airflow to a Kubernetes cluster. The capability of spinning Kubernetes Pods up and down comes out of the box. You will want to use such setup if you would like to add more scale to your setup, in a more flexible and dynamic manner. However, that comes at the expense of managing a Kubernetes cluster.

    最近, KubernetesExecutor已成为扩展Apache Airflow设置的选项。 它允许您将Airflow部署到Kubernetes集群。 可以自由旋转Kubernetes Pod的功能。 如果您想以更灵活和动态的方式向设置中添加更多比例,则将需要使用这种设置。 但是,这是以管理Kubernetes集群为代价的。

Apache Airflow MVP (An Apache Airflow MVP)

When starting up a data team or capability, evaluating cost versus benefit as well as complexity versus added value is a critical, time consuming, daunting task. Agile organizations and startups usually work with prototypes to tackle such scenario — sometimes working products are better at answering questions than people.

在启动数据团队或能力时,评估成本收益以及复杂性增值是一项关键,耗时且艰巨的任务。 敏捷的组织和初创公司通常使用原型来解决这种情况-有时,工作产品比人更擅长回答问题。

Inspired by this philosophy, we will create a basic, hypothetical setup for an Apache Airflow production environment. We will have a walkthrough on how to deploy such an environment using the LocalExecutor, one of the possible Apache Airflow task mechanisms.

受到这一理念的启发,我们将为Apache Airflow生产环境创建一个基本的假设设置。 我们将演练如何使用LocalExecutor (一种可能的Apache Airflow任务机制)部署这样的环境。

For a production prototype, choosing LocalExecutor is justified by the following reasons:

对于生产原型,选择LocalExecutor的理由如下:

  • Provides parallel processing capabilities

    提供并行处理能力
  • Only one computing node — less maintenance & operations overhead

    仅一个计算节点-更少的维护和运营开销
  • No need for Message Brokers

    无需消息代理

本地执行器 (LocalExecutor)

You might be asking — how is that possible? Well, as the name indicates, when we use the LocalExecutor we are basically running all Airflow components from the same physical environment. When we look at the Apache Airflow architecture, this is what we are talking about:

您可能会问-这怎么可能? 顾名思义,当我们使用LocalExecutor时,我们基本上是在同一物理环境中运行所有Airflow组件。 当我们查看Apache Airflow架构时,我们正在谈论的是:

Main Airflow Components for a LocalExecutor Setup. Source: Author
LocalExecutor设置的主要气流组件。 资料来源:作者

We have multiple OS processes running the Web Server, Scheduler and Workers. We can think of LocalExecutor in abstract terms as the layer that makes the interface between the Scheduler and the Workers. Its function is basically spinning up Workers in order to execute the tasks from Airflow DAGs, while monitoring its status and completion.

我们有多个OS进程正在运行Web ServerSchedulerWorkers 。 我们可以抽象地认为LocalExecutor是SchedulerWorkers之间建立接口的层。 它的功能基本上是使工作人员分拆,以便执行来自气流DAG的任务,同时监视其状态和完成情况。

转动车轮 (Getting The Wheels Turning)

We had a conceptual introduction on LocalExecutor. Without further ado, let’s setup our environment. Our work will revolve around the following:

我们对LocalExecutor进行了概念介绍。 事不宜迟,让我们设置环境。 我们的工作将围绕以下内容:

  1. Postgresql Installation and Configuration

    PostgreSQL安装和配置

  2. Apache Airflow Installation

    Apache Airflow安装

  3. Apache Airflow Configuration

    Apache Airflow配置

  4. Testing

    测试中

  5. Setting up Airflow to run as a Service

    设置气流以作为服务运行

These steps were tested with Ubuntu 18.04 LTS, but they should work with any Debian based Linux distro. Here we assume that you already have Python 3.6+ configured. If that’s not the case, please refer to this post.

这些步骤已在Ubuntu 18.04 LTS上进行了测试,但它们应可与任何基于DebianLinux发行版一起使用。 在这里,我们假设您已经配置了Python 3.6+ 。 如果不是这种情况,请参阅这篇文章 。

Note: you could also use a managed instance of PostgreSql, such as Azure Database for PostgreSql or Amazon RDS for PostgreSql, for instance. This is in fact recommended for a production setup, since that would remove maintenance and backup burden.

注意:例如,您还可以使用PostgreSql的托管实例,例如Azure Database for PostgreSqlAmazon RDS for PostgreSql 。 实际上,建议将其用于生产设置,因为这样可以减少维护和备份负担。

1. Postgresql的安装和配置 (1. Postgresql Installation and Configuration)

To install PostgreSql we can simply run the following in our prompt:

要安装PostgreSql,我们可以简单地在提示符下运行以下命令:

sudo apt-get install postgresql postgresql-contrib

In a few seconds, PostgreSql should be installed.

几秒钟后,应该安装了PostgreSql。

Next, we need to set it up. First step is creating a psql object:

接下来,我们需要进行设置。 第一步是创建一个psql对象:

sudo -u postgres psql

We proceed to setting up the required user, database and permissions:

我们继续设置所需的用户,数据库和权限:

postgres=# CREATE USER airflow PASSWORD 'airflow'; #you might wanna change this
CREATE ROLE
postgres=# CREATE DATABASE airflow;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;
GRANT

Finally, we need to install libpq-dev for enabling us to implement a PostgreSql client:

最后,我们需要安装libpq-dev以使我们能够实现PostgreSql客户端:

sudo apt install libpq-dev

Optional Step 1: you can make your setup more secure by restricting the connections to your database only to the local machine. To do this, you need to change the IP addresses in the pg_hba.conf file:

可选步骤1 :通过限制到数据库的连接仅到本地计算机,可以使设置更安全。 为此,您需要更改pg_hba.conf文件中的IP地址:

sudo vim /etc/postgresql/12/main/pg_hba.conf
PostgreSql Configurations (pg_hba.conf)
PostgreSQL配置(pg_hba.conf)

Optional Step 2: you might want to configure PostgreSql to start automatically whenever you boot. To do this:

可选步骤2 :您可能希望将PostgreSql配置为在每次引导时自动启动。 去做这个:

sudo update-rc.d postgresql enable

2. Apache Airflow安装 (2. Apache Airflow Installation)

We will install Airflow and its dependencies using pip:

我们将使用pip安装Airflow及其依赖项:

pip install apache-airflow['postgresql']
pip install psycopg2

By now you should have Airflow installed. By default, Airflow gets installed to ~/.local/bin. Remember to run the following command:

现在,您应该已经安装了Airflow。 默认情况下,Airflow被安装到〜/ .local / bin 。 请记住运行以下命令:

export PATH=$PATH:/home/your_user/.local/bin/

This is required so that the system knows where to locate Airflow’s binary.

这是必需的,以便系统知道在哪里可以找到Airflow的二进制文件。

Note: for this example we are not using virtualenv or Pipenv, but you can feel free to use it. Just make sure that environment dependencies are properly mapped when you setup Airflow to run as a service :)

注意:在此示例中,我们没有使用virtualenvPipenv ,但是您可以随意使用它。 只需确保将Airflow设置为作为服务运行时就正确映射了环境依赖性:)

3. Apache Airflow配置 (3. Apache Airflow Configuration)

Now we need to configure Airflow to use LocalExecutor and to use our PostgreSql database.

现在我们需要配置Airflow以使用LocalExecutor并使用我们的PostgreSql数据库。

Go to Airflow’s installation directory and edit airflow.cfg.

转到Airflow的安装目录,然后编辑airflow.cfg。

vim airflow.cfg

Make sure that the executor parameter is set to LocalExecutor and SqlAlchemy connection string is set accordingly:

确保将executor参数设置为LocalExecutor,相应地设置SqlAlchemy连接字符串:

Airflow configuration for LocalExecutor
LocalExecutor的气流配置

Finally, we need to initialize our database:

最后,我们需要初始化数据库:

airflow initdb

Make sure that no error messages were displayed as part of initdb’s output.

确保没有错误消息显示为initdb输出的一部分。

4.测试 (4. Testing)

It is time to check if Airflow is properly working. To do that, we spin up the Scheduler and the Webserver:

现在是时候检查气流是否正常工作了。 为此,我们启动了SchedulerWebserver

airflow scheduler
airflow webserver

Once you fire up your browser and point to your machine’s IP, you should see a fresh Airflow installation:

启动浏览器并指向计算机的IP后,您应该会看到全新的Airflow安装:

5.设置气流以作为服务运行 (5. Setting up Airflow to Run as a Service)

Our last step is to configure the daemon for the Scheduler and the Webserver services. This is required so that we ensure that Airflow gets automatically restarted in case there is a failure, or after our machine is rebooted.

我们的最后一步是为Scheduler和Webserver服务配置守护程序。 这是必需的,这样我们可以确保在出现故障时或在重新启动计算机后自动重新启动Airflow。

As an initial step, we need to configure Gunicorn. Since by default it is not installed globally, we need to create a symbolic link for it.

首先,我们需要配置Gunicorn。 由于默认情况下它不是全局安装的,因此我们需要为其创建符号链接。

sudo ln -fs $(which gunicorn) /bin/gunicorn

Next, we create service files for Webserver and Scheduler:

接下来,我们为WebserverScheduler创建服务文件:

sudo touch /etc/systemd/system/airflow-webserver.service
sudo touch /etc/systemd/system/airflow-scheduler.service

Our airflow-webserver.service must look like the following:

我们的airflow-webserver.service必须如下所示:

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# “License”); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service
Wants=postgresql.service mysql.service
[Service]
EnvironmentFile=/etc/environment
User=airflow
Group=airflow
Type=simple
ExecStart= /home/airflow/.local/bin/airflow webserver
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target

Similarly, we add the following content to airflow-scheduler.service:

同样,我们向airflow-scheduler.service添加以下内容

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# “License”); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service
Wants=postgresql.service mysql.service
[Service]
EnvironmentFile=/etc/environment
User=airflow
Group=airflow
Type=simple
ExecStart=/home/airflow/.local/bin/airflow scheduler
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target

Note: depending on the directory where you installed Airflow, your ExecStart variable might need to be changed.

注意 :根据安装Airflow的目录,可能需要更改ExecStart变量。

Now we just need to reload our system daemon, enable and start our services:

现在,我们只需要重新加载我们的系统守护程序,启用并启动我们的服务即可:

sudo systemctl daemon-reload
sudo systemctl enable airflow-scheduler.service
sudo systemctl start airflow-scheduler.service
sudo systemctl enable airflow-webserver.service
sudo systemctl start airflow-webserver.service

Our services should have been started successfully. To confirm that:

我们的服务应该已经成功启动。 确认:

$ sudo systemctl status airflow-webserver.service
$ sudo systemctl status airflow-scheduler.service

You should see some output stating that both services are active and enabled. For example, for the Webserver, you should see something similar to this:

您应该看到一些输出,表明这两个服务都处于活动状态并已启用 。 例如,对于Webserver ,您应该看到类似于以下内容:

That’s it. Now you have a basic Production setup for Apache Airflow using the LocalExecutor, which allows you to run DAGs containing parallel tasks and/or run multiple DAGs at the same time. This is definitely a must-have for any kind of serious use case — which I also plan on showcasing on a future post.

而已。 现在,您已经具有使用LocalExecutor的 Apache Airflow的基本生产设置该设置允许您运行包含并行任务的DAG和/或同时运行多个DAG。 对于任何类型的严肃用例,这绝对是必不可少的-我也计划在以后的文章中进行展示。

Of course, there are many possible improvements here:

当然,这里有许多可能的改进:

  • The most obvious one would be to automate these steps by creating a CICD pipeline with an Ansible runbook, for instance

    最明显的是通过创建一个CICD管道与Ansible运行手册,自动化这些步骤例如

  • Using more secure PostgreSql credentials for Airflow and storing them in a more secure manner. We could store them as a secret variable within a CICD pipeline and set them up as environment variables, instead of storing in airflow.cfg

    Airflow使用更安全的PostgreSql凭据,并以更安全的方式存储它们。 我们可以将它们存储为CICD管路内的秘密变量,将它们设置为环境变量,而不是airflow.cfg存储

  • Restricting permissions for Airflow users in both OS and PostgreSql

    在OS和PostgreSql中限制Airflow用户的权限

But for now, we will leave these steps for a future article.

但是现在,我们将这些步骤留给以后的文章。

I’m glad that you made it until here and I hope you found it useful. Check out my other articles:

我很高兴您能做到到这里,并希望您觉得它有用。 看看我的其他文章:

翻译自: https://towardsdatascience/an-apache-airflow-mvp-complete-guide-for-a-basic-production-installation-using-localexecutor-beb10e4886b2

更多推荐

使用localexecutor进行基本生产安装的apache airflow mvp完整指南