Airflow任务调度框架
本文最后更新于:5 个月前
Airflow是什么
Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015 年春季开源,2016 年加入 Apache 软件基金会的孵化计划。
Airflow 通过 DAG 也即是有向非循环图来定义整个工作流,因而具有非常强大的表达能力。
在进一步介绍 Airflow 之前,我想先介绍一些在 Airflow 中常见的名词概念:
DAG
DAG 意为有向无循环图,在 Airflow 中则定义了整个完整的作业。同一个 DAG 中的所有 Task 拥有相同的调度时间。
Task
Task 为 DAG 中具体的作业任务,它必须存在于某一个 DAG 之中。Task 在 DAG 中配置依赖关系,跨 DAG 的依赖是可行的,但是并不推荐。跨 DAG 依赖会导致 DAG 图的直观性降低,并给依赖管理带来麻烦。
DAG Run
当一个 DAG 满足它的调度时间,或者被外部触发时,就会产生一个 DAG Run。可以理解为由 DAG 实例化的实例。
Task Instance
当一个 Task 被调度启动时,就会产生一个 Task Instance。可以理解为由 Task 实例化的实例。
Airflow的服务构成
一个正常运行的 Airflow 系统一般由以下几个服务构成
WebServer
Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。
Worker
一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。
Scheduler
整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。
Flower
Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。
Airflow安装和使用
- 安装
pip install apache-airflow
- 设置airflow home
> mkdir ~/airflow
> export AIRFLOW_HOME=~/airflow
- 初始化
airflow initdb
Airflow 在运行需要 Database 记录每一次跑过的 Task 的結果,所以接下来我們要初始化 Database,Airflow 预设使用 SQLite,之后我們也可以把 Database 变成 PostgreSQL 或是 MySQL 等等。
- 启动webserver
airflow webserver -p 8080
- 启动scheduler
airflow scheduler
Airflow自定义
编写DAG1
相同表结构进行数据同步
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
import sys
sys.path.insert(0, "/opt/bitnami/airflow/dags/git")
from acme.operators.dwh_operators import AuditOperator, PostgresToPostgresOperator
default_args = {
'owner': 'XingChen', # DAG 擁有者的名稱,如上一篇說明的,通常是負責實作這個 DAG 的人員名稱
'depends_on_past': False, # 每一次執行的 Task 是否會依賴於上次執行的 Task,如果是 True 的話,代表上次的 Task 如果執行失敗,這次的 Task 不会执行,而是放到trigger里面等待执行
'start_date': datetime(2020, 2, 24), # Task 從哪個日期後開始可以被 Scheduler 排入排程
'start_data': days_ago(1), # 两种都可以
'email': ['airflow@example.com'], # 如果 Task 執行失敗的話,要寄信給哪些人的 email
'email_on_failure': False, # 如果 Task 執行失敗的話,是否寄信
'email_on_retry': False, # 如果 Task 重試的話,是否寄信
'retries': 1, # 最多重試的次數
'retry_delay': timedelta(minutes=5), # 每次重試中間的間隔
# 'end_date': datetime(2020, 2, 29), # Task 從哪個日期後,開始不被 Scheduler 放入排程
# 'execution_timeout': timedelta(seconds=300), # Task 執行時間的上限
# 'on_failure_callback': some_function, # Task 執行失敗時,呼叫的 function
# 'on_success_callback': some_other_function, # Task 執行成功時,呼叫的 function
# 'on_retry_callback': another_function, # Task 重試時,呼叫的 function
}
"""
synchronize basic data of traditional media to internet db
"""
tmpl_search_path = Variable.get("sql_path")
with DAG(
dag_id="hubble-internet-stage",
tags=["hubble-internet"],
default_args=default_args,
template_searchpath=tmpl_search_path,
description="synchronize basic data of traditional media to internet db",
schedule_interval=timedelta(hours=1)) as dag:
# Law
audit_law_category_record = AuditOperator(
task_id='audit_internet_law_category_record',
postgres_conn_id='hubble-internet-stage-db',
audit_key="law_category",
cycle_dtm="{{ ts }}",
dag=dag,
pool='hubble_internet_pool'
)
extract_law_category = PostgresToPostgresOperator(
task_id='extract_internet_law_category',
sql='internet_law_category.sql',
pg_table='law_category',
src_postgres_conn_id='hubble-db-stage',
dest_posgres_conn_id='hubble-internet-stage-db',
pg_preoperator="DELETE FROM law_category",
dag=dag,
pool='hubble_internet_pool')
audit_law_category_record >> extract_law_category
task1 >> [task2, task3]
- 代表task1
执行之后同时执行task2
和task3
sql_path
定义在Variable
里面,值为sql目录的绝对路径,通过template_searchpath=tmpl_search_path,
可以拿到对应文件
编写DAG2
适用于表结构大体相同,需要修改部分数据或者自定义插入sql
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
import sys
sys.path.insert(0, "/opt/bitnami/airflow/dags/git")
from acme.operators.dwh_operators import AuditOperator, PostgresToPostgresCustomOperator
"""
Synchronize Spanish environmental data
"""
default_args = {
"owner": "Lvhaojie",
"depends_on_past": False,
"start_date": days_ago(0),
"email": ["haojie.lv@coloseo.cn"],
"email_on_failure": True,
"email_on_retry": True,
"retries": 1,
"retry_delay": timedelta(minutes=10),
}
tmpl_search_path = Variable.get("sql_path")
with DAG(
dag_id="spain_uat_country",
tags=["spain-uat"],
default_args=default_args,
template_searchpath=tmpl_search_path,
description="synchronize spain production environmental data of table country",
schedule_interval=timedelta(hours=1)) as dag:
audit_country_record = AuditOperator(
task_id='audit_spain_country_record',
postgres_conn_id='spain-sevilla-uat',
audit_key="country",
cycle_dtm="{{ ts }}",
dag=dag,
pool='spain-uat'
)
extract_country = PostgresToPostgresCustomOperator(
task_id='extract_spain_country',
src_postgres_conn_id='spain-pablo-picasso-production',
src_postgres_sql='query_uat_country.sql',
dest_posgres_conn_id='spain-sevilla-uat',
dest_posgres_sql='insert_uat_country.sql',
sql_key="query_data",
dag=dag,
pool='spain-uat')
audit_country_record >> extract_country
自定义Operator
Operator
用来定义 Task
,有些 Task
的功能是执行 Bash
指令、有些則是执行 Python
,当然也可以通过 Python Operator
执行全部的事,但通过不同功能的 Operator
来定义 Task
,在实际上会更容易些。
#!/usr/bin/env python3
import logging
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime
class PostgresToPostgresOperator(BaseOperator):
"""
Executes sql code in a Postgres database and inserts into another
:param src_postgres_conn_id: 源库
:type src_postgres_conn_id: string
:param dest_postgres_conn_id: 目标库
:type dest_postgres_conn_id: String
:param sql: 执行的 sql 或 sql 模版文件
:type sql: sql 语句 或 sql 模版文件,模版文件后缀名必须为 .sql
:param parameters: sql 执行的参数字典
:type parameters: dict
"""
template_fields = (
"sql",
"parameters",
"pg_table",
"pg_preoperator",
"pg_postoperator",
)
template_ext = (".sql",)
ui_color = "#ededed"
@apply_defaults
def __init__(
self,
sql,
pg_table,
src_postgres_conn_id="postgres_default",
dest_posgres_conn_id="postgres_default",
pg_preoperator=None,
pg_postoperator=None,
parameters=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.sql = sql
self.pg_table = pg_table
self.src_postgres_conn_id = src_postgres_conn_id
self.dest_postgres_conn_id = dest_posgres_conn_id
self.pg_preoperator = pg_preoperator
self.pg_postoperator = pg_postoperator
self.parameters = parameters
def execute(self, context):
logging.info("Executing: " + str(self.sql))
src_pg = PostgresHook(postgres_conn_id=self.src_postgres_conn_id)
dest_pg = PostgresHook(postgres_conn_id=self.dest_postgres_conn_id)
logging.info(
"Transferring Postgres query results into other Postgres databases."
)
conn = src_pg.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql, self.parameters)
if self.pg_preoperator:
logging.info("Running Postgres preoperator")
dest_pg.run(self.pg_preoperator)
logging.info("Inserting rows into Postgres")
dest_pg.insert_rows(table=self.pg_table, rows=cursor)
if self.pg_postoperator:
logging.info("Running Postgres postoperator")
dest_pg.run(self.pg_postoperator)
logging.info("Done.")
class PostgresToPostgresCustomOperator(BaseOperator):
"""
Executes sql code in a Postgres database and inserts into another
:param src_postgres_conn_id: 源库
:type src_postgres_conn_id: string
:param dest_postgres_conn_id: 目标库
:type dest_postgres_conn_id: String
:param sql: 执行的 sql 或 sql 模版文件
:type sql: sql 语句 或 sql 模版文件,模版文件后缀名必须为 .sql
:param parameters: sql 执行的参数字典
:type parameters: dict
"""
template_fields = (
"src_postgres_sql",
"dest_posgres_sql",
"pg_preoperator",
"pg_postoperator",
)
template_ext = (".sql",)
ui_color = "#ededed"
@apply_defaults
def __init__(
self,
src_postgres_sql,
dest_posgres_sql,
src_postgres_conn_id="postgres_default",
dest_posgres_conn_id="postgres_default",
pg_preoperator=None,
pg_postoperator=None,
sql_key=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.src_postgres_sql = src_postgres_sql
self.dest_posgres_sql = dest_posgres_sql
self.src_postgres_conn_id = src_postgres_conn_id
self.dest_postgres_conn_id = dest_posgres_conn_id
self.pg_preoperator = pg_preoperator
self.pg_postoperator = pg_postoperator
self.sql_key = sql_key
def execute(self, context):
logging.info("Executing: " + str(self.src_postgres_sql))
src_pg = PostgresHook(postgres_conn_id=self.src_postgres_conn_id)
dest_pg = PostgresHook(postgres_conn_id=self.dest_postgres_conn_id)
logging.info(
"Transferring Postgres query results into other Postgres databases."
)
src_conn = src_pg.get_conn()
src_cursor = src_conn.cursor()
src_cursor.execute(self.src_postgres_sql)
if self.pg_preoperator:
logging.info("Running Postgres preoperator")
dest_pg.run(self.pg_preoperator)
logging.info("Inserting rows into Postgres")
if self.sql_key:
query_data = src_cursor.fetchall()
for i in query_data:
dest_pg.run(self.dest_posgres_sql,
parameters={self.sql_key: i})
else:
dest_pg.run(self.dest_posgres_sql)
if self.pg_postoperator:
logging.info("Running Postgres postoperator")
dest_pg.run(self.pg_postoperator)
logging.info("Done.")
class PostgresOperatorWithTemplatedParams(BaseOperator):
"""
Executes sql code in a specific Postgres database
:param postgres_conn_id: 目标库
:type postgres_conn_id: string
:param sql: sql 语句或 sql 模版
:type sql: sql 语句或 sql 模版文件,文件后缀必须为.sql
"""
template_fields = ("sql", "parameters")
template_ext = (".sql",)
ui_color = "#ededed"
@apply_defaults
def __init__(
self,
sql,
postgres_conn_id="postgres_default",
autocommit=False,
parameters=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.sql = sql
self.postgres_conn_id = postgres_conn_id
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info("Executing:" + str(self.sql))
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
class AuditOperator(BaseOperator):
"""
Manages audit id's in the databse to make sure that
operations are traceable.
:param postgres_conn_id: 目标库
:type postgres_conn_id: string
:param audit_key: The key to use in the audit table
:type autid_key: string
:param cycle_dtm: The dtm of the extraction cycle run (ds)
:type cycle_dtm: datetime
"""
template_fields = ("audit_key", "cycle_dtm")
ui_color = "#ededed"
@apply_defaults
def __init__(
self,
postgres_conn_id="postgres_default",
audit_key=None,
cycle_dtm=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.postgres_conn_id = postgres_conn_id
self.audit_key = audit_key
self.cycle_dtm = cycle_dtm
def execute(self, context):
logging.info("Getting postgres hook object")
hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
logging.info("Acquiring lock and updating audit table.")
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("LOCK TABLE audit_runs IN ACCESS EXCLUSIVE MODE")
cursor.close()
logging.info("Acquiring new audit number")
cursor = conn.cursor()
cursor.execute(
"SELECT COALESCE(MAX(audit_id), 0)+1 FROM audit_runs"
)
row = cursor.fetchone()
cursor.close()
audit_id = row[0]
logging.info("Found audit id %d." % (audit_id))
params = {
"audit_id": audit_id,
"audit_key": self.audit_key,
"exec_dtm": datetime.now(),
"cycle_dtm": self.cycle_dtm,
}
cursor = conn.cursor()
logging.info("updating audit table with audit id: %d" % (audit_id))
cursor.execute(
"INSERT INTO audit_runs "
"(audit_id, audit_key, execution_dtm, cycle_dtm) VALUES "
"(%(audit_id)s, %(audit_key)s, %(exec_dtm)s, %(cycle_dtm)s)",
params,
)
conn.commit()
cursor.close()
conn.close()
ti = context["ti"]
ti.xcom_push(key="audit_id", value=audit_id)
return audit_id
template_fields
: airflow的每个operator都定义了template_fields变量,其意思是需要渲染的字段,在执行时会自动渲染这些字段里的模板参数。举例:template_fields = (‘sql’, ‘partition’, ‘hive_table’),也就是说执行operator时只有这4个字段里包含的模板参数会被渲染PostgresToPostgresOperator
:适用于两个库表结构完全相同的情况;PostgresToPostgresCustomOperator
:适用于对数据进行简单处理的任务以上自定义的
Operator
,可以自定义执行类似同步数据这种任务。
Airflow日志定期清理
airflow日志记录详细,占用空间也特别大,所以一般情况需要定期进行日志清理。下面我们编写一个定期任务来实现这个功能。
airflow_logs_cleanup.py
"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the task logs to avoid those getting too big.
airflow trigger_dag --conf '[curly-braces]"maxLogAgeInDays":30[curly-braces]' airflow-log-cleanup
--conf options:
maxLogAgeInDays:<INT> - Optional
"""
from airflow.models import DAG, Variable
from airflow.configuration import conf
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
import os
import logging
# airflow-log-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = days_ago(1)
BASE_LOG_FOLDER = conf.get("core", "base_log_folder")
# How often to Run. @daily - Once a day at Midnight
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = ""
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that are 30 days old or older
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get(
"airflow_log_cleanup__max_log_age_in_days", 14
)
# Whether the job should delete the logs or not. Included if you want to
# temporarily avoid deleting the logs
ENABLE_DELETE = True
# The number of worker nodes you have in Airflow. Will attempt to run this
# process for however many workers there are so that each worker gets its
# logs cleared.
NUMBER_OF_WORKERS = 1
DIRECTORIES_TO_DELETE = [BASE_LOG_FOLDER]
ENABLE_DELETE_CHILD_LOG = Variable.get(
"airflow_log_cleanup__enable_delete_child_log", "False"
)
LOG_CLEANUP_PROCESS_LOCK_FILE = "/tmp/airflow_log_cleanup_worker.lock"
logging.info("ENABLE_DELETE_CHILD_LOG " + ENABLE_DELETE_CHILD_LOG)
if not BASE_LOG_FOLDER or BASE_LOG_FOLDER.strip() == "":
raise ValueError(
"BASE_LOG_FOLDER variable is empty in airflow.cfg. It can be found "
"under the [core] section in the cfg file. Kindly provide an "
"appropriate directory path."
)
if ENABLE_DELETE_CHILD_LOG.lower() == "true":
try:
CHILD_PROCESS_LOG_DIRECTORY = conf.get(
"scheduler", "child_process_log_directory"
)
if CHILD_PROCESS_LOG_DIRECTORY != ' ':
DIRECTORIES_TO_DELETE.append(CHILD_PROCESS_LOG_DIRECTORY)
except Exception as e:
logging.exception(
"Could not obtain CHILD_PROCESS_LOG_DIRECTORY from " +
"Airflow Configurations: " + str(e)
)
default_args = {
'owner': DAG_OWNER_NAME,
'depends_on_past': False,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'start_date': START_DATE,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE
)
if hasattr(dag, 'doc_md'):
dag.doc_md = __doc__
if hasattr(dag, 'catchup'):
dag.catchup = False
start = DummyOperator(
task_id='start',
dag=dag)
log_cleanup = """
echo "Getting Configurations..."
BASE_LOG_FOLDER="{{params.directory}}"
WORKER_SLEEP_TIME="{{params.sleep_time}}"
sleep ${WORKER_SLEEP_TIME}s
MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}"
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then
echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'."
MAX_LOG_AGE_IN_DAYS='""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'
fi
ENABLE_DELETE=""" + str("true" if ENABLE_DELETE else "false") + """
echo "Finished Getting Configurations"
echo ""
echo "Configurations:"
echo "BASE_LOG_FOLDER: '${BASE_LOG_FOLDER}'"
echo "MAX_LOG_AGE_IN_DAYS: '${MAX_LOG_AGE_IN_DAYS}'"
echo "ENABLE_DELETE: '${ENABLE_DELETE}'"
cleanup() {
echo "Executing Find Statement: $1"
FILES_MARKED_FOR_DELETE=`eval $1`
echo "Process will be Deleting the following File(s)/Directory(s):"
echo "${FILES_MARKED_FOR_DELETE}"
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | \
grep -v '^$' | wc -l` File(s)/Directory(s)" \
# "grep -v '^$'" - removes empty lines.
# "wc -l" - Counts the number of lines
echo ""
if [ "${ENABLE_DELETE}" == "true" ];
then
if [ "${FILES_MARKED_FOR_DELETE}" != "" ];
then
echo "Executing Delete Statement: $2"
eval $2
DELETE_STMT_EXIT_CODE=$?
if [ "${DELETE_STMT_EXIT_CODE}" != "0" ]; then
echo "Delete process failed with exit code \
'${DELETE_STMT_EXIT_CODE}'"
echo "Removing lock file..."
rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error removing the lock file. \
Check file permissions.\
To re-run the DAG, ensure that the lock file has been \
deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
fi
exit ${DELETE_STMT_EXIT_CODE}
fi
else
echo "WARN: No File(s)/Directory(s) to Delete"
fi
else
echo "WARN: You're opted to skip deleting the File(s)/Directory(s)!!!"
fi
}
if [ ! -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ ]; then
echo "Lock file not found on this node! \
Creating it to prevent collisions..."
touch """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
CREATE_LOCK_FILE_EXIT_CODE=$?
if [ "${CREATE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error creating the lock file. \
Check if the airflow user can create files under tmp directory. \
Exiting..."
exit ${CREATE_LOCK_FILE_EXIT_CODE}
fi
echo ""
echo "Running Cleanup Process..."
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime \
+${MAX_LOG_AGE_IN_DAYS}"
DELETE_STMT="${FIND_STATEMENT} -exec rm -f {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
echo "Finished Running Cleanup Process"
echo "Deleting lock file..."
rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
REMOVE_LOCK_FILE_EXIT_CODE=$?
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error removing the lock file. Check file permissions. To re-run the DAG, ensure that the lock file has been deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
fi
else
echo "Another task is already deleting logs on this worker node. \
Skipping it!"
echo "If you believe you're receiving this message in error, kindly check \
if """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ exists and delete it."
exit 0
fi
"""
for log_cleanup_id in range(1, NUMBER_OF_WORKERS + 1):
for dir_id, directory in enumerate(DIRECTORIES_TO_DELETE):
log_cleanup_op = BashOperator(
task_id='log_cleanup_worker_num_' + str(log_cleanup_id) + '_dir_' + str(dir_id),
bash_command=log_cleanup,
params={
"directory": str(directory),
"sleep_time": int(log_cleanup_id)*3},
dag=dag)
log_cleanup_op.set_upstream(start)
airflow.configuration.conf.get
: 可以获取airflow配置文件airflow.cfg
中的配置信息。
airflow_log_cleanup__max_log_age_in_days
: airflow变量设置日志清理范围
airflow_log_cleanup__enable_delete_child_log
: airflow变量设置是否清理子日志
相关坑点
- sql中进行时间过滤,应该使用airflow内置的时间,来获取任务执行时的时间,比如
{{ ds }}
或者{{ ts }}
– airflow macros - airflow会在当前任务结束时间(开始时间+时间间隔)到达时,才会将当前任务编排到scheduler开始执行。
- 如果没有开启 dag,也就是
trigger
的状态为off
,点击trigger dag
会把任务挂起,直至开启 dag 才会执行。 - airflow日志占用内存比较严重,如果需要定期清理见上文示例。
其他坑点见 传送门
- 参考文章 - Airflow动手玩 / etl-with-airflow / Airflow模版语法 / 日志清理实例
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!