数据血缘(Lineage)
贡献者:@morefreeze
注意: Lineage 支持是实验性的,可能随时会发生变化。
Airflow 可以帮助跟踪数据的来源,以及数据发生了什么变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。
Airflow 通过任务的 inlets 和 outlets 跟踪数据。 让我们通过一个例子看看它是如何工作的。
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
args = {
'owner': 'airflow' ,
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_lineage', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
f_final = File("/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
inlets={"auto": True},
outlets={"datasets": [f_final,]})
f_in = File("/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(
task_id='run_me_first', bash_command='echo 1', dag=dag,
inlets={"datasets": [f_in,]},
outlets={"datasets": outlets}
)
run_this.set_downstream(run_this_last)
任务定义了参数inlets
和outlets
。 inlets
可以是一个数据集列表{"datesets":[dataset1,dataset2]}
,也可以是指定的上游任务outlets
像这样{"task_ids":["task_id1","task_id2"]}
,或者不想指定直接用{"auto":True}
也可以,甚至是前面几种的组合。 outlets
也是一个数据集列表{"datesets":[dataset1,dataset2]}
。 在运行任务时,数据集的字段会被模板渲染。
注意: 只要 Operator 支持,它会自动地加上 inlets 和 outlets。
在示例 DAG 任务中, run_me_first
是一个 BashOperator,它接收CAT1
, CAT2
, CAT3
作为 inlets(译注:根据代码,应为“输出 outlets”)。 其中的execution_date
会在任务运行时被渲染成执行时间。
注意: 在底层,Airflow 会在
pre_execute
方法中准备 lineage 元数据。 当任务运行结束时,会调用post_execute
将 lineage 元数据推送到 XCOM 中。 因此,如果您要创建自己的 Operator,并且需要覆写这些方法,确保分别用prepare_lineage
和apply_lineage
装饰这些方法。
Apache Atlas
Airflow 可以将 lineage 元数据发送到 Apache Atlas。 您需要在airflow.cfg
中配置atlas
:
[lineage]
backend = airflow.lineage.backend.atlas
[atlas]
username = my_username
password = my_password
host = host
port = 21000
请确保已经安装了atlasclient
。