Airflow任务调度框架

本文最后更新于:5 个月前

Airflow是什么

Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015 年春季开源,2016 年加入 Apache 软件基金会的孵化计划。

Airflow 通过 DAG 也即是有向非循环图来定义整个工作流,因而具有非常强大的表达能力。

在进一步介绍 Airflow 之前,我想先介绍一些在 Airflow 中常见的名词概念:

  • DAG

    DAG 意为有向无循环图,在 Airflow 中则定义了整个完整的作业。同一个 DAG 中的所有 Task 拥有相同的调度时间。

  • Task

    Task 为 DAG 中具体的作业任务,它必须存在于某一个 DAG 之中。Task 在 DAG 中配置依赖关系,跨 DAG 的依赖是可行的,但是并不推荐。跨 DAG 依赖会导致 DAG 图的直观性降低,并给依赖管理带来麻烦。

  • DAG Run

    当一个 DAG 满足它的调度时间,或者被外部触发时,就会产生一个 DAG Run。可以理解为由 DAG 实例化的实例。

  • Task Instance

    当一个 Task 被调度启动时,就会产生一个 Task Instance。可以理解为由 Task 实例化的实例。

Airflow的服务构成

一个正常运行的 Airflow 系统一般由以下几个服务构成

  • WebServer

    Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。

  • Worker

    一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。

  • Scheduler

    整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。

  • Flower

    Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。

Airflow安装和使用

  • 安装
pip install apache-airflow
  • 设置airflow home
> mkdir ~/airflow
> export AIRFLOW_HOME=~/airflow
  • 初始化
airflow initdb

Airflow 在运行需要 Database 记录每一次跑过的 Task 的結果,所以接下来我們要初始化 Database,Airflow 预设使用 SQLite,之后我們也可以把 Database 变成 PostgreSQL 或是 MySQL 等等。

  • 启动webserver
airflow webserver -p 8080
  • 启动scheduler
airflow scheduler

Airflow自定义

编写DAG1

相同表结构进行数据同步

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable

import sys
sys.path.insert(0, "/opt/bitnami/airflow/dags/git")
from acme.operators.dwh_operators import AuditOperator, PostgresToPostgresOperator



default_args = {
    'owner': 'XingChen', # DAG 擁有者的名稱,如上一篇說明的,通常是負責實作這個 DAG 的人員名稱
    'depends_on_past': False, # 每一次執行的 Task 是否會依賴於上次執行的 Task,如果是 True 的話,代表上次的 Task 如果執行失敗,這次的 Task 不会执行,而是放到trigger里面等待执行
    'start_date': datetime(2020, 2, 24), #  Task 從哪個日期後開始可以被 Scheduler 排入排程
  	'start_data': days_ago(1),   # 两种都可以
    'email': ['airflow@example.com'], # 如果 Task 執行失敗的話,要寄信給哪些人的 email
    'email_on_failure': False, # 如果 Task 執行失敗的話,是否寄信
    'email_on_retry': False, # 如果 Task 重試的話,是否寄信
    'retries': 1, # 最多重試的次數
    'retry_delay': timedelta(minutes=5), # 每次重試中間的間隔
    # 'end_date': datetime(2020, 2, 29), # Task 從哪個日期後,開始不被 Scheduler 放入排程
    # 'execution_timeout': timedelta(seconds=300), # Task 執行時間的上限
    # 'on_failure_callback': some_function, # Task 執行失敗時,呼叫的 function
    # 'on_success_callback': some_other_function, # Task 執行成功時,呼叫的 function
    # 'on_retry_callback': another_function, # Task 重試時,呼叫的 function
}




"""
synchronize basic data of traditional media to internet db
"""

tmpl_search_path = Variable.get("sql_path")


with DAG(
    dag_id="hubble-internet-stage",
    tags=["hubble-internet"],
    default_args=default_args,
    template_searchpath=tmpl_search_path,
    description="synchronize basic data of traditional media to internet db",
    schedule_interval=timedelta(hours=1)) as dag:

    # Law
    audit_law_category_record = AuditOperator(
        task_id='audit_internet_law_category_record',
        postgres_conn_id='hubble-internet-stage-db',
        audit_key="law_category",
        cycle_dtm="{{ ts }}",
        dag=dag,
        pool='hubble_internet_pool'
    )

    extract_law_category = PostgresToPostgresOperator(
        task_id='extract_internet_law_category',
        sql='internet_law_category.sql',
        pg_table='law_category',
        src_postgres_conn_id='hubble-db-stage',
        dest_posgres_conn_id='hubble-internet-stage-db',
        pg_preoperator="DELETE FROM law_category",
        dag=dag,
        pool='hubble_internet_pool')

    audit_law_category_record >> extract_law_category

task1 >> [task2, task3] - 代表task1执行之后同时执行task2task3

sql_path定义在Variable里面,值为sql目录的绝对路径,通过 template_searchpath=tmpl_search_path,可以拿到对应文件

编写DAG2

适用于表结构大体相同,需要修改部分数据或者自定义插入sql

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable

import sys
sys.path.insert(0, "/opt/bitnami/airflow/dags/git")
from acme.operators.dwh_operators import AuditOperator, PostgresToPostgresCustomOperator


"""
Synchronize Spanish environmental data
"""

default_args = {
    "owner": "Lvhaojie",
    "depends_on_past": False,
    "start_date": days_ago(0),
    "email": ["haojie.lv@coloseo.cn"],
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 1,
    "retry_delay": timedelta(minutes=10),
}

tmpl_search_path = Variable.get("sql_path")

with DAG(
    dag_id="spain_uat_country",
    tags=["spain-uat"],
    default_args=default_args,
    template_searchpath=tmpl_search_path,
    description="synchronize spain production environmental data of table country",
    schedule_interval=timedelta(hours=1)) as dag:

    audit_country_record = AuditOperator(
        task_id='audit_spain_country_record',
        postgres_conn_id='spain-sevilla-uat',
        audit_key="country",
        cycle_dtm="{{ ts }}",
        dag=dag,
        pool='spain-uat'
    )

    extract_country = PostgresToPostgresCustomOperator(
        task_id='extract_spain_country',
        src_postgres_conn_id='spain-pablo-picasso-production',
        src_postgres_sql='query_uat_country.sql',
        dest_posgres_conn_id='spain-sevilla-uat',
        dest_posgres_sql='insert_uat_country.sql',
        sql_key="query_data",
        dag=dag,
        pool='spain-uat')

    audit_country_record >> extract_country

自定义Operator

Operator 用来定义 Task,有些 Task 的功能是执行 Bash 指令、有些則是执行 Python,当然也可以通过 Python Operator 执行全部的事,但通过不同功能的 Operator 来定义 Task,在实际上会更容易些。

#!/usr/bin/env python3
import logging


from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime


class PostgresToPostgresOperator(BaseOperator):
    """
    Executes sql code in a Postgres database and inserts into another

    :param src_postgres_conn_id: 源库
    :type src_postgres_conn_id: string

    :param dest_postgres_conn_id: 目标库
    :type dest_postgres_conn_id: String

    :param sql: 执行的 sql 或 sql 模版文件
    :type sql: sql 语句 或 sql 模版文件,模版文件后缀名必须为 .sql

    :param parameters: sql 执行的参数字典
    :type parameters: dict
    """

    template_fields = (
        "sql",
        "parameters",
        "pg_table",
        "pg_preoperator",
        "pg_postoperator",
    )
    template_ext = (".sql",)
    ui_color = "#ededed"

    @apply_defaults
    def __init__(
        self,
        sql,
        pg_table,
        src_postgres_conn_id="postgres_default",
        dest_posgres_conn_id="postgres_default",
        pg_preoperator=None,
        pg_postoperator=None,
        parameters=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.sql = sql
        self.pg_table = pg_table
        self.src_postgres_conn_id = src_postgres_conn_id
        self.dest_postgres_conn_id = dest_posgres_conn_id
        self.pg_preoperator = pg_preoperator
        self.pg_postoperator = pg_postoperator
        self.parameters = parameters

    def execute(self, context):
        logging.info("Executing: " + str(self.sql))
        src_pg = PostgresHook(postgres_conn_id=self.src_postgres_conn_id)
        dest_pg = PostgresHook(postgres_conn_id=self.dest_postgres_conn_id)

        logging.info(
            "Transferring Postgres query results into other Postgres databases."
        )
        conn = src_pg.get_conn()
        cursor = conn.cursor()
        cursor.execute(self.sql, self.parameters)

        if self.pg_preoperator:
            logging.info("Running Postgres preoperator")
            dest_pg.run(self.pg_preoperator)

        logging.info("Inserting rows into Postgres")
        dest_pg.insert_rows(table=self.pg_table, rows=cursor)

        if self.pg_postoperator:
            logging.info("Running Postgres postoperator")
            dest_pg.run(self.pg_postoperator)

        logging.info("Done.")

        
class PostgresToPostgresCustomOperator(BaseOperator):
    """
    Executes sql code in a Postgres database and inserts into another

    :param src_postgres_conn_id: 源库
    :type src_postgres_conn_id: string

    :param dest_postgres_conn_id: 目标库
    :type dest_postgres_conn_id: String

    :param sql: 执行的 sql 或 sql 模版文件
    :type sql: sql 语句 或 sql 模版文件,模版文件后缀名必须为 .sql

    :param parameters: sql 执行的参数字典
    :type parameters: dict
    """

    template_fields = (
        "src_postgres_sql",
        "dest_posgres_sql",
        "pg_preoperator",
        "pg_postoperator",
    )
    template_ext = (".sql",)
    ui_color = "#ededed"

    @apply_defaults
    def __init__(
        self,
        src_postgres_sql,
        dest_posgres_sql,
        src_postgres_conn_id="postgres_default",
        dest_posgres_conn_id="postgres_default",
        pg_preoperator=None,
        pg_postoperator=None,
        sql_key=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.src_postgres_sql = src_postgres_sql
        self.dest_posgres_sql = dest_posgres_sql
        self.src_postgres_conn_id = src_postgres_conn_id
        self.dest_postgres_conn_id = dest_posgres_conn_id
        self.pg_preoperator = pg_preoperator
        self.pg_postoperator = pg_postoperator
        self.sql_key = sql_key

    def execute(self, context):
        logging.info("Executing: " + str(self.src_postgres_sql))
        src_pg = PostgresHook(postgres_conn_id=self.src_postgres_conn_id)
        dest_pg = PostgresHook(postgres_conn_id=self.dest_postgres_conn_id)

        logging.info(
            "Transferring Postgres query results into other Postgres databases."
        )
        src_conn = src_pg.get_conn()
        src_cursor = src_conn.cursor()
        src_cursor.execute(self.src_postgres_sql)

        if self.pg_preoperator:
            logging.info("Running Postgres preoperator")
            dest_pg.run(self.pg_preoperator)

        logging.info("Inserting rows into Postgres")
        if self.sql_key:
            query_data = src_cursor.fetchall()
            for i in query_data:
                dest_pg.run(self.dest_posgres_sql,
                            parameters={self.sql_key: i})
        else:
            dest_pg.run(self.dest_posgres_sql)

        if self.pg_postoperator:
            logging.info("Running Postgres postoperator")
            dest_pg.run(self.pg_postoperator)

        logging.info("Done.")

class PostgresOperatorWithTemplatedParams(BaseOperator):
    """
    Executes sql code in a specific Postgres database

    :param postgres_conn_id: 目标库
    :type  postgres_conn_id: string

    :param sql: sql 语句或 sql 模版
    :type  sql: sql 语句或 sql 模版文件,文件后缀必须为.sql
    """

    template_fields = ("sql", "parameters")
    template_ext = (".sql",)
    ui_color = "#ededed"

    @apply_defaults
    def __init__(
        self,
        sql,
        postgres_conn_id="postgres_default",
        autocommit=False,
        parameters=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.sql = sql
        self.postgres_conn_id = postgres_conn_id
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        logging.info("Executing:" + str(self.sql))
        self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
        self.hook.run(self.sql, self.autocommit, parameters=self.parameters)


class AuditOperator(BaseOperator):
    """
    Manages audit id's in the databse to make sure that
    operations are traceable.

    :param postgres_conn_id: 目标库
    :type postgres_conn_id: string

    :param audit_key: The key to use in the audit table
    :type autid_key: string

    :param cycle_dtm: The dtm of the extraction cycle run (ds)
    :type cycle_dtm: datetime
    """

    template_fields = ("audit_key", "cycle_dtm")
    ui_color = "#ededed"

    @apply_defaults
    def __init__(
        self,
        postgres_conn_id="postgres_default",
        audit_key=None,
        cycle_dtm=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.postgres_conn_id = postgres_conn_id
        self.audit_key = audit_key
        self.cycle_dtm = cycle_dtm

    def execute(self, context):
        logging.info("Getting postgres hook object")
        hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)

        logging.info("Acquiring lock and updating audit table.")
        conn = hook.get_conn()
        cursor = conn.cursor()
        cursor.execute("LOCK TABLE audit_runs IN ACCESS EXCLUSIVE MODE")
        cursor.close()

        logging.info("Acquiring new audit number")
        cursor = conn.cursor()
        cursor.execute(
            "SELECT COALESCE(MAX(audit_id), 0)+1 FROM audit_runs"
        )
        row = cursor.fetchone()
        cursor.close()
        audit_id = row[0]
        logging.info("Found audit id %d." % (audit_id))

        params = {
            "audit_id": audit_id,
            "audit_key": self.audit_key,
            "exec_dtm": datetime.now(),
            "cycle_dtm": self.cycle_dtm,
        }
        cursor = conn.cursor()
        logging.info("updating audit table with audit id: %d" % (audit_id))
        cursor.execute(
            "INSERT INTO audit_runs "
            "(audit_id, audit_key, execution_dtm, cycle_dtm) VALUES "
            "(%(audit_id)s, %(audit_key)s, %(exec_dtm)s, %(cycle_dtm)s)",
            params,
        )
        conn.commit()
        cursor.close()
        conn.close()

        ti = context["ti"]
        ti.xcom_push(key="audit_id", value=audit_id)

        return audit_id
  • template_fields: airflow的每个operator都定义了template_fields变量,其意思是需要渲染的字段,在执行时会自动渲染这些字段里的模板参数。举例:template_fields = (‘sql’, ‘partition’, ‘hive_table’),也就是说执行operator时只有这4个字段里包含的模板参数会被渲染
  • PostgresToPostgresOperator:适用于两个库表结构完全相同的情况;
  • PostgresToPostgresCustomOperator :适用于对数据进行简单处理的任务

以上自定义的Operator,可以自定义执行类似同步数据这种任务。

Airflow日志定期清理

airflow日志记录详细,占用空间也特别大,所以一般情况需要定期进行日志清理。下面我们编写一个定期任务来实现这个功能。

airflow_logs_cleanup.py

"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the task logs to avoid those getting too big.
airflow trigger_dag --conf '[curly-braces]"maxLogAgeInDays":30[curly-braces]' airflow-log-cleanup
--conf options:
    maxLogAgeInDays:<INT> - Optional
"""
from airflow.models import DAG, Variable
from airflow.configuration import conf
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import timedelta
import os
import logging


# airflow-log-cleanup


DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = days_ago(1)
BASE_LOG_FOLDER = conf.get("core", "base_log_folder")
# How often to Run. @daily - Once a day at Midnight
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = ""
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that are 30 days old or older
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get(
    "airflow_log_cleanup__max_log_age_in_days", 14
)
# Whether the job should delete the logs or not. Included if you want to
# temporarily avoid deleting the logs
ENABLE_DELETE = True
# The number of worker nodes you have in Airflow. Will attempt to run this
# process for however many workers there are so that each worker gets its
# logs cleared.
NUMBER_OF_WORKERS = 1
DIRECTORIES_TO_DELETE = [BASE_LOG_FOLDER]
ENABLE_DELETE_CHILD_LOG = Variable.get(
    "airflow_log_cleanup__enable_delete_child_log", "False"
)
LOG_CLEANUP_PROCESS_LOCK_FILE = "/tmp/airflow_log_cleanup_worker.lock"
logging.info("ENABLE_DELETE_CHILD_LOG  " + ENABLE_DELETE_CHILD_LOG)

if not BASE_LOG_FOLDER or BASE_LOG_FOLDER.strip() == "":
    raise ValueError(
        "BASE_LOG_FOLDER variable is empty in airflow.cfg. It can be found "
        "under the [core] section in the cfg file. Kindly provide an "
        "appropriate directory path."
    )

if ENABLE_DELETE_CHILD_LOG.lower() == "true":
    try:
        CHILD_PROCESS_LOG_DIRECTORY = conf.get(
            "scheduler", "child_process_log_directory"
        )
        if CHILD_PROCESS_LOG_DIRECTORY != ' ':
            DIRECTORIES_TO_DELETE.append(CHILD_PROCESS_LOG_DIRECTORY)
    except Exception as e:
        logging.exception(
            "Could not obtain CHILD_PROCESS_LOG_DIRECTORY from " +
            "Airflow Configurations: " + str(e)
        )

default_args = {
    'owner': DAG_OWNER_NAME,
    'depends_on_past': False,
    'email': ALERT_EMAIL_ADDRESSES,
    'email_on_failure': True,
    'email_on_retry': False,
    'start_date': START_DATE,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    DAG_ID,
    default_args=default_args,
    schedule_interval=SCHEDULE_INTERVAL,
    start_date=START_DATE
)
if hasattr(dag, 'doc_md'):
    dag.doc_md = __doc__
if hasattr(dag, 'catchup'):
    dag.catchup = False

start = DummyOperator(
    task_id='start',
    dag=dag)

log_cleanup = """
echo "Getting Configurations..."
BASE_LOG_FOLDER="{{params.directory}}"
WORKER_SLEEP_TIME="{{params.sleep_time}}"
sleep ${WORKER_SLEEP_TIME}s
MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}"
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then
    echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'."
    MAX_LOG_AGE_IN_DAYS='""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'
fi
ENABLE_DELETE=""" + str("true" if ENABLE_DELETE else "false") + """
echo "Finished Getting Configurations"
echo ""
echo "Configurations:"
echo "BASE_LOG_FOLDER:      '${BASE_LOG_FOLDER}'"
echo "MAX_LOG_AGE_IN_DAYS:  '${MAX_LOG_AGE_IN_DAYS}'"
echo "ENABLE_DELETE:        '${ENABLE_DELETE}'"
cleanup() {
    echo "Executing Find Statement: $1"
    FILES_MARKED_FOR_DELETE=`eval $1`
    echo "Process will be Deleting the following File(s)/Directory(s):"
    echo "${FILES_MARKED_FOR_DELETE}"
    echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | \
    grep -v '^$' | wc -l` File(s)/Directory(s)"     \
    # "grep -v '^$'" - removes empty lines.
    # "wc -l" - Counts the number of lines
    echo ""
    if [ "${ENABLE_DELETE}" == "true" ];
    then
        if [ "${FILES_MARKED_FOR_DELETE}" != "" ];
        then
            echo "Executing Delete Statement: $2"
            eval $2
            DELETE_STMT_EXIT_CODE=$?
            if [ "${DELETE_STMT_EXIT_CODE}" != "0" ]; then
                echo "Delete process failed with exit code \
                    '${DELETE_STMT_EXIT_CODE}'"
                echo "Removing lock file..."
                rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
                if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
                    echo "Error removing the lock file. \
                    Check file permissions.\
                    To re-run the DAG, ensure that the lock file has been \
                    deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
                    exit ${REMOVE_LOCK_FILE_EXIT_CODE}
                fi
                exit ${DELETE_STMT_EXIT_CODE}
            fi
        else
            echo "WARN: No File(s)/Directory(s) to Delete"
        fi
    else
        echo "WARN: You're opted to skip deleting the File(s)/Directory(s)!!!"
    fi
}
if [ ! -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ ]; then
    echo "Lock file not found on this node! \
    Creating it to prevent collisions..."
    touch """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
    CREATE_LOCK_FILE_EXIT_CODE=$?
    if [ "${CREATE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
        echo "Error creating the lock file. \
        Check if the airflow user can create files under tmp directory. \
        Exiting..."
        exit ${CREATE_LOCK_FILE_EXIT_CODE}
    fi
    echo ""
    echo "Running Cleanup Process..."
    FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime \
     +${MAX_LOG_AGE_IN_DAYS}"
    DELETE_STMT="${FIND_STATEMENT} -exec rm -f {} \;"
    cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
    CLEANUP_EXIT_CODE=$?
    FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty"
    DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
    cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
    CLEANUP_EXIT_CODE=$?
    FIND_STATEMENT="find ${BASE_LOG_FOLDER}/* -type d -empty"
    DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
    cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
    CLEANUP_EXIT_CODE=$?
    echo "Finished Running Cleanup Process"
    echo "Deleting lock file..."
    rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
    REMOVE_LOCK_FILE_EXIT_CODE=$?
    if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
        echo "Error removing the lock file. Check file permissions. To re-run the DAG, ensure that the lock file has been deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
        exit ${REMOVE_LOCK_FILE_EXIT_CODE}
    fi
else
    echo "Another task is already deleting logs on this worker node. \
    Skipping it!"
    echo "If you believe you're receiving this message in error, kindly check \
    if """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ exists and delete it."
    exit 0
fi
"""

for log_cleanup_id in range(1, NUMBER_OF_WORKERS + 1):

    for dir_id, directory in enumerate(DIRECTORIES_TO_DELETE):

        log_cleanup_op = BashOperator(
            task_id='log_cleanup_worker_num_' + str(log_cleanup_id) + '_dir_' + str(dir_id),
            bash_command=log_cleanup,
            params={
                "directory": str(directory),
                "sleep_time": int(log_cleanup_id)*3},
            dag=dag)

        log_cleanup_op.set_upstream(start)
  • airflow.configuration.conf.get: 可以获取airflow配置文件airflow.cfg中的配置信息。

  • airflow_log_cleanup__max_log_age_in_days: airflow变量设置日志清理范围

  • airflow_log_cleanup__enable_delete_child_log: airflow变量设置是否清理子日志

相关坑点

  • sql中进行时间过滤,应该使用airflow内置的时间,来获取任务执行时的时间,比如 {{ ds }} 或者 {{ ts }}airflow macros
  • airflow会在当前任务结束时间(开始时间+时间间隔)到达时,才会将当前任务编排到scheduler开始执行。
  • 如果没有开启 dag,也就是 trigger 的状态为 off,点击trigger dag会把任务挂起,直至开启 dag 才会执行。
  • airflow日志占用内存比较严重,如果需要定期清理见上文示例。

其他坑点见 传送门


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!