优化成本和性能以提升 Amazon MWAA 大数据博客

更新时间: 2026-01-27 12:34:44 浏览:19

优化 Amazon MWAA 的成本和性能

关键要点

在这篇文章中,我们将介绍如何通过遵循最佳实践来优化 Amazon MWAAManaged Workflows for Apache Airflow的性能和降低成本。Amazon MWAA 是一项托管服务,允许用户以可扩展的方式编排数据管道和工作流。通过正确配置和调整,您可以在确保性能的同时,显著减少运营成本。

Amazon MWAA 概述

Amazon Managed Workflows for Apache AirflowAmazon MWAA是一项托管服务,支持 Apache Airflow 的数据管道和工作流调度。通过 Amazon MWAA,您可以设计 有向无环图DAG,而无需管理基础设施的运维负担。本文主要提供如何优化性能与节约成本的指导。

Amazon MWAA 环境包含四个 Airflow 组件,这些组件托管在 AWS 计算资源组上:调度器承担调度任务,工作节点负责执行任务,Web 服务器提供 UI,以及元数据数据库跟踪状态。针对间歇性或可变工作负载,优化成本与保持价格和性能至关重要。接下来,我们将详细介绍在 Amazon MWAA 环境中实现成本优化和高效性能的最佳实践,提供详细说明和示例。针对特定 Amazon MWAA 工作负载,并不一定需要应用所有最佳实践,您可以有选择性地实施与自己工作负载相关的原则。

适当配置 Amazon MWAA 环境

适当配置 Amazon MWAA 环境确保您拥有能够在不同工作负载中并发扩展的环境,从而提供最佳的性价比。选择的环境类型决定了工作节点支持的并发任务数量。在 Amazon MWAA 中,您可以选择五种不同的 环境类。在本节中,我们将探讨如何配置适合的 Amazon MWAA 环境。

监控资源利用率

配置 Amazon MWAA 环境的第一步是监控现有设置的资源使用情况。您可以使用 Amazon CloudWatch 监控底层组件,该服务收集原始数据并处理为可读的近实时指标。通过这些环境指标,您可以更好地了解关键性能指标,以帮助您适当调整环境的规模并排除工作流的问题。根据工作负载的并发任务需求,您可以调整环境的大小、最大和最小工作节点数量。CloudWatch 将提供所有 Amazon MWAA 使用的底层 AWS 服务的 CPU 和内存利用率。详见 Amazon MWAA 的容器、队列和数据库指标 获取更多可用指标的详细信息。

分析工作负载模式

接下来,深入分析您的工作流程模式。检查 DAG 调度、任务并发性和任务执行时间。在高峰时期监控 CPU/内存使用情况,查询 CloudWatch 指标和 Airflow 日志,识别长时间运行的任务、瓶颈和资源密集型操作。这有助于您制定关于适当 Amazon MWAA 环境类的明智决策。

选择正确的环境类

将需求与 Amazon MWAA 环境类规格从 mw1small 到 mw12xlarge相匹配,以高效处理您的工作负载。您可以通过 API、AWS 命令行界面AWS CLI或 AWS 管理控制台 垂直扩展现有环境。请注意,更改环境类需要定期停机。

微调配置参数

微调 Apache Airflow 中的配置参数对于优化工作流性能和降低成本至关重要。您可以设置自动扩展、并行性、日志记录和 DAG 代码优化等设置。

自动扩展

Amazon MWAA 支持工作节点的自动扩展,它会根据您的工作负载需求自动调整运行的工作节点和 Web 服务器的数量。您可以指定 Amazon MWAA 环境中运行的 Airflow 工作节点的最小和最大数量。工作节点自动扩展使用 RunningTasks 和 QueuedTasks 指标,根据公式正在运行的任务 队列中的任务/每个工作节点的任务数来计算所需的工作节点。如果所需工作节点数量超过当前运行的工作节点数量,Amazon MWAA 会通过 AWS Fargate 增加额外的工作节点,直到达到配置的最大值。

当额外的工作节点超过需求时,Amazon MWAA 将优雅地缩减。例如,假设一个大型 Amazon MWAA 环境,最小为 1 个工作节点,最大为 10 个,每个大型 Amazon MWAA 工作节点可以支持多达 20 个任务。假设每天早上 800,DAG 开始运行,使用 190 个并发任务。Amazon MWAA 将自动扩展到 10 个工作节点,因为所需的工作节点 = 190 个请求的任务一些在运行,一些在等待/ 20每个工作节点的任务= 95,向上取整为 10。到上午 1000,完成了一半的任务,剩下 85 个在运行。此时,Amazon MWAA 将缩减为 6 个工作节点95 个任务/20 个任务每个工作节点 = 525,向上取整为 6。正在运行任务仍然受保护,不会在缩减过程中被中断。随着队列和运行任务的减少,Amazon MWAA 会在不影响运行任务的情况下移除工作节点,直到达到最小配置的工作节点数量。

Web 服务器自动扩展 允许您根据 CPU 使用率和活跃连接数自动扩展 Web 服务器的数量。无论是 REST API 请求、AWS CLI 使用,还是更多并发的 Airflow UI 用户,Amazon MWAA 确保您的 Airflow 环境可以无缝应对增加的需求。您可以在配置 Amazon MWAA 环境时指定 Web 服务器的最大和最小数量。

日志和指标

选择合适的日志级别

启用后,Amazon MWAA 会将 Airflow 日志发送到 CloudWatch。您可以查看日志以确定 Airflow 任务延迟或工作流错误,无需其他第三方工具。您需要启用日志记录以查看 Airflow DAG 处理、任务、调度器、Web 服务器和工作节点日志。您可以选择 INFO、WARNING、ERROR 或 CRITICAL 级别的 Airflow 日志。当您选择日志级别时,Amazon MWAA 会发送该级别及以上严重性的日志。请注意,按照 CloudWatch 日志收费标准 的要求,减少日志级别可以降低总体成本。根据环境选择最合适的日志级别,例如在开发和 UAT 环境下使用 INFO,而在生产环境中使用 ERROR。

设置适当的日志保留策略

默认情况下,日志会无限期保存且不会过期。为了降低 CloudWatch 成本,您可以 调整每个日志组的保留策略。

选择所需的 CloudWatch 指标

您可以根据 Amazon MWAA 配置选项选择发送到 CloudWatch 的 Airflow 指标:metricsstatsdallowlist。参考完整的 可用指标列表。某些指标,如 scheduledelay 和 durationsuccess 是按 DAG 发布的,而其他指标,如 tifinish 是按任务每个 DAG 发布的。

因此,DAG 和任务的累积数量直接影响 CloudWatch 指标的摄取成本。为了控制 CloudWatch 成本,选择发布部分指标。例如,以下设置只会发布以 scheduler 和 executor 开头的指标:

inimetricsstatsdallowlist = schedulerexecutor

建议与 metricsmetricsusepatternmatch 一起使用 metricsstatsdallowlist。有效的做法是对整个指标名称使用正则表达式regex模式匹配,而不仅仅是匹配名称开头的前缀。

监控 CloudWatch 仪表板并设置报警

在 CloudWatch 中创建 自定义仪表板,并为特定指标添加报警,以监控 Amazon MWAA 环境的健康状态。配置报警可以让您主动监控环境的健康状况。

优化 AWS Secrets Manager 的调用

Airflow 提供了一种存储变量和连接信息等秘密的机制。默认情况下,这些秘密存储在 Airflow 元数据库中。Airflow 用户可以选择配置一个集中管理的秘密存储位置,例如 AWS Secrets Manager。当指定时,Airflow 首先会检查此替代秘密后端,以便在请求连接或变量时。如果替代后端包含所需的值,将返回该值;如果没有,Airflow 将检查元数据库并返回该值。使用 Secrets Manager 的成本的一个因素是发出的 API 调用次数。

在 Amazon MWAA 控制台上,您可以为 Airflow 使用的连接和变量配置后端 Secrets Manager 路径。默认情况下,Airflow 会在配置的后端中搜索所有连接和变量。为减少 Amazon MWAA 代表您发出的 Secrets Manager API 调用次数,请配置使用 查找模式。通过指定模式,可以缩小 Airflow 将检查的路径范围。这将有助于降低您在使用 Secrets Manager 时的成本。

要使用秘密缓存,请启用 AIRFLOWSECRETSUSECACHE 并设置 TTL,以帮助减少 Secrets Manager 的 API 调用。

例如,如果您只想在 Secrets Manager 中查找特定子集的连接、变量或配置信息,请设置相关的 lookuppattern 参数。该参数的值为字符串类型的正则表达式。如需查找以 m 开头的连接,您的配置文件应如下所示:

优化成本和性能以提升 Amazon MWAA 大数据博客

ini[secrets]backend = airflowprovidersamazonawssecretssecretsmanagerSecretsManagerBackendbackendkwargs =

{ connectionsprefix airflow/connections connectionslookuppattern m profilename default}

DAG 代码优化

调度器和工作节点是参与解析 DAG 的两个组件。调度器解析完 DAG 后,将其放入队列,工作节点从队列中拿取 DAG。此时,工作节点只知道 DAGid 和 Python 文件以及其他信息。工作节点必须解析 Python 文件才能运行任务。

DAG 解析运行两次,一次由调度器执行,一次由工作节点执行。由于工作节点也在解析 DAG,因此代码解析所需的时间决定了所需的工作节点数量,这会增加运行这些工作节点的成本。

例如,对于 200 个 DAG,每个 DAG 有 10 个任务,假设每个任务解析时间为 60 秒,则可以计算:

所有 DAG 的总任务 = 2000每个任务的时间 = 60秒 20秒解析 DAG总时间 = 2000 80 = 160000 秒每个工作节点的总时间 = 72000 秒所需工作节点数量 = 总时间/每个工作节点的总时间 = 160000/72000 3

现在,让我们将解析 DAG 所需的时间增加到 100 秒:

所有 DAG 的总任务 = 2000每个任务的时间 = 60 秒 100 秒总时间 = 2000 160 = 320000 秒每个工作节点的总时间 = 72000 秒所需工作节点数量 = 总时间/每个工作节点的总时间 = 320000/72000 5

如您所见,DAG 解析时间从 20 秒增加到 100 秒,所需工作节点数量从 3 增加到 5,从而增加了计算成本。

为了缩短代码解析的时间,请遵循后续部分中的最佳实践。

移除顶层导入

代码中的导入每次解析 DAG 时都会运行。如果您在创建 DAG 对象时不需要这些导入的库,请将其移动到任务级别,而不是在顶部定义。在任务内定义后,导入将仅在任务运行时被调用。

避免多次调用数据库如元数据库或外部系统数据库。仅在运行任务时使用模板Jinja,减少在解析任务时调用填充变量的次数。

例如,以下代码:

pythonimport pendulumfrom airflow import DAGfrom airflowdecorators import taskimport numpy as np # lt 不要这样做!

with DAG( dagid=examplepythonoperator schedule=None startdate=pendulumdatetime(2021 1 1 tz=UTC) catchup=False tags=[example]) as dag

@task()def printarray()    打印 Numpy 数组。    import numpy as np  # lt 应该这样做!    a = nparange(15)reshape(3 5)    print(a)    return aprintarray()

另一个例子如下:

python

不好的例子

from airflowmodels import Variable

foovar = Variableget(foo) # 不要这样做!

bashusevariablebad1 = BashOperator( taskid=bashusevariablebad1 bashcommand=echo variable foo={fooenv} env={fooenv foovar})

bashusevariablebad2 = BashOperator( taskid=bashusevariablebad2 bashcommand=fecho variable foo={Variableget(foo)} # 不要这样做!)

bashusevariablebad3 = BashOperator( taskid=bashusevariablebad3 bashcommand=echo variable foo={fooenv} env={fooenv Variableget(foo)} # 不要这样做!)

好的例子

bashusevariablegood = BashOperator( taskid=bashusevariablegood bashcommand=echo variable foo={fooenv} env={fooenv {{ varvalueget(foo) }}})

@taskdef mytask() var = Variableget(foo) # 这样没问题,因为函数 mytask 仅在运行任务时被调用,而不是扫描 DAG。print(var)

编写 DAG

复杂的 DAG 包含大量任务和它们之间的依赖关系,会影响调度性能。保持 Airflow 实例高效利用的一种方法是简化和优化您的 DAG。

例如,具有简单线性结构 A B C 的 DAG 在任务调度时的延迟将小于具有深度嵌套树结构并且依赖任务数量呈指数增长的 DAG。

动态 DAG

在以下示例中,一个 DAG 定义了来自数据库的硬编码表名。开发人员必须为数据库中的每个表定义 N 个 DAG。

python

不好的例子

dagparams = getData()noofdags = int(dagparams[noofdags][N])

为每个 noofdags 中的编号构建一个 DAG

for n in range(noofdags) dagid = dynperft1{}format(str(n)) defaultargs = {owner airflowstartdate datetime(2022 2 2 12 n)}

为了减少冗长和易出错的工作,使用 动态 DAG。采用以下方式在查询数据库目录后创建 DAG,动态生成与数据库表数量相同的 DAG。这将在减少代码量的同时达到相同的目的。

pythondef getData() client = boto3client(dynamodb) response = clientgetitem( TableName=mwaadagcreation Key={key {S mwaa}} ) return response[Item]

阶段性 DAG 调度

在您的环境中,同时或在短时间内运行所有 DAG 可能会导致处理任务所需的工作节点数量增加,从而增加计算成本。对于那些对时间不敏感的业务场景,可以考虑分散 DAG 运行的时间表,以最大化可用工作资源的利用率。

DAG 文件夹解析

较简单的 DAG 通常只在一个 Python 文件中实现;而更复杂的 DAG 可能分散在多个文件中并且需要随之一起打包。您可以在 DAGFOLDER 中将所有内容置于标准文件系统布局中,也可以将 DAG 及其所有 Python 文件打包为一个 zip 文件。Airflow 会检查 DAGFOLDER 中的所有目录和文件。使用 [airflowignore](https//airflowapacheorg/docs/apacheairflow/252/coreconcepts/dagshtml#air

飞兔加速器永久免费版