使用 Operators(执行器)
operator(执行器)代表一个理想情况下是幂等的任务。operator(执行器)决定了 DAG 运行时实际执行的内容。
有关更多信息,请参阅Operators Concepts文档和Operators API Reference 。
BashOperator
使用BashOperator在Bash shell 中执行命令。
run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    dag=dag)
模板
您可以使用Jinja 模板来参数化bash_command参数。
also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag,
)
故障排除
找不到 Jinja 模板
在使用bash_command参数直接调用 Bash 脚本时,需要在脚本名称后添加空格。这是因为 Airflow 尝试将 Jinja 模板应用于一个失败的脚本。
t2 = BashOperator(
    task_id='bash_example',
    # 这将会出现`Jinja template not found`的错误
    # bash_command="/home/batcher/test.sh",
    # 在加了空格之后,这会正常工作
    bash_command="/home/batcher/test.sh ",
    dag=dag)
PythonOperator
使用PythonOperator执行 Python 回调。
 def print_context ( ds , ** kwargs ):
    pprint ( kwargs )
    print ( ds )
    return 'Whatever you return gets printed in the logs'
run_this = PythonOperator (
    task_id = 'print_the_context' ,
    provide_context = True ,
    python_callable = print_context ,
    dag = dag )
传递参数
使用op_args和op_kwargs参数将额外参数传递给 Python 的回调函数。
def my_sleeping_function(random_base):
    """这是一个将在 DAG 执行体中运行的函数"""
    time.sleep(random_base)
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag,
    )
    run_this >> task
模板
当您将provide_context参数设置为True,Airflow 会传入一组额外的关键字参数:一个用于每个Jinja 模板变量和一个templates_dict参数。
templates_dict参数是模板化的,因此字典中的每个值都被评估为Jinja 模板。
Google 云平台 Operators(执行器)
GoogleCloudStorageToBigQueryOperator
使用GoogleCloudStorageToBigQueryOperator执行 BigQuery 加载作业。
GceInstanceStartOperator
允许启动一个已存在的 Google Compute Engine 实例。
在此示例中,参数值从 Airflow 变量中提取。此外,default_args字典用于将公共参数传递给单个 DAG 中的所有 operator(执行器)。
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
INSTANCE = models.Variable.get('INSTANCE', '')
SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')
SET_MACHINE_TYPE_BODY = {
    'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)
}
default_args = {
    'start_date': airflow.utils.dates.days_ago(1)
}
通过将所需的参数传递给构造函数来定义GceInstanceStartOperator。
gce_instance_start = GceInstanceStartOperator(
    project_id=PROJECT_ID,
    zone=LOCATION,
    resource_id=INSTANCE,
    task_id='gcp_compute_start_task'
)
GceInstanceStopOperator
允许停止一个已存在的 Google Compute Engine 实例。
参数定义请参阅上面的GceInstanceStartOperator。
通过将所需的参数传递给构造函数来定义GceInstanceStopOperator。
gce_instance_stop = GceInstanceStopOperator(
    project_id=PROJECT_ID,
    zone=LOCATION,
    resource_id=INSTANCE,
    task_id='gcp_compute_stop_task'
)
GceSetMachineTypeOperator
允许把一个已停止实例的机器类型改变至特定的类型。
参数定义请参阅上面的GceInstanceStartOperator。
通过将所需的参数传递给构造函数来定义GceSetMachineTypeOperator。
gce_set_machine_type = GceSetMachineTypeOperator(
    project_id=PROJECT_ID,
    zone=LOCATION,
    resource_id=INSTANCE,
    body=SET_MACHINE_TYPE_BODY,
    task_id='gcp_compute_set_machine_type'
)
GcfFunctionDeleteOperator
使用default_args字典来传递参数给 operator(执行器)。
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
# A fully-qualified name of the function to delete
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                               ENTRYPOINT)
default_args = {
    'start_date': airflow.utils.dates.days_ago(1)
}
使用GcfFunctionDeleteOperator来从 Google Cloud Functions 删除一个函数。
t1 = GcfFunctionDeleteOperator(
    task_id="gcf_delete_task",
    name=FUNCTION_NAME
)
故障排除
如果你想要使用服务账号来运行或部署一个 operator(执行器),但得到了一个 403 禁止的错误,这意味着你的服务账号没有正确的 Cloud IAM 权限。
- 指定该服务账号为 Cloud Functions Developer 角色。
- 授权 Cloud Functions 的运行账户为 Cloud IAM Service Account User 角色。
使用 gcloud 分配 Cloud IAM 权限的典型方法如下所示。只需将您的 Google Cloud Platform 项目 ID 替换为 PROJECT_ID,将 SERVICE_ACCOUNT_EMAIL 替换为您的服务帐户的电子邮件 ID 即可。
gcloud iam service-accounts add-iam-policy-binding \
  [email protected] \
  --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  --role="roles/iam.serviceAccountUser"
细节请参阅Adding the IAM service agent user role to the runtime service
GcfFunctionDeployOperator
使用GcfFunctionDeployOperator来从 Google Cloud Functions 部署一个函数。
以下 Airflow 变量示例显示了您可以使用的 default_args 的各种变体和组合。变量定义如下:
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                               ENTRYPOINT)
RUNTIME = 'nodejs6'
VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)
使用这些变量,您可以定义请求的主体:
body = {
    "name": FUNCTION_NAME,
    "entryPoint": ENTRYPOINT,
    "runtime": RUNTIME,
    "httpsTrigger": {}
}
创建 DAG 时,default_args 字典可用于传递正文和其他参数:
default_args = {
    'start_date': dates.days_ago(1),
    'project_id': PROJECT_ID,
    'location': LOCATION,
    'body': body,
    'validate_body': VALIDATE_BODY
}
请注意,在上面的示例中,body 和 default_args 都是不完整的。根据设置的变量,如何传递源代码相关字段可能有不同的变体。目前,您可以传递 sourceArchiveUrl,sourceRepository 或 sourceUploadUrl,CloudFunction API 规范中所述。此外,default_args 可能包含 zip_path 参数,以在部署源代码之前运行上载源代码的额外步骤。在最后一种情况下,您还需要在正文中提供一个空的 sourceUploadUrl 参数。
基于上面定义的变量,此处显示了设置源代码相关字段的示例逻辑:
if SOURCE_ARCHIVE_URL:
    body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
elif SOURCE_REPOSITORY:
    body['sourceRepository'] = {
        'url': SOURCE_REPOSITORY
    }
elif ZIP_PATH:
    body['sourceUploadUrl'] = ''
    default_args['zip_path'] = ZIP_PATH
elif SOURCE_UPLOAD_URL:
    body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
else:
    raise Exception("Please provide one of the source_code parameters")
创建 operator(执行器)的代码如下:
deploy_task = GcfFunctionDeployOperator(
    task_id="gcf_deploy_task",
    name=FUNCTION_NAME
)
Troubleshooting
如果你想要使用服务账号来运行或部署一个 operator(执行器),但得到了一个 403 禁止的错误,这意味着你的服务账号没有正确的 Cloud IAM 权限。
- 指定该服务账号为 Cloud Functions Developer 角色。
- 授权 Cloud Functions 的运行账户为 Cloud IAM Service Account User 角色。
使用 gcloud 分配 Cloud IAM 权限的典型方法如下所示。只需将您的 Google Cloud Platform 项目 ID 替换为 PROJECT_ID,将 SERVICE_ACCOUNT_EMAIL 的替换为您的服务帐户的电子邮件 ID 即可。
gcloud iam service-accounts add-iam-policy-binding \
  [email protected] \
  --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
  --role="roles/iam.serviceAccountUser"
细节请参阅Adding the IAM service agent user role to the runtime service
如果您的函数的源代码位于 Google Source Repository 中,请确保您的服务帐户具有 Source Repository Viewer 角色,以便在必要时可以下载源代码。
CloudSqlInstanceDatabaseCreateOperator
在 Cloud SQL 实例中创建新数据库。
有关参数定义,请参阅上面的GceInstanceStartOperator。
通过将所需的参数传递给构造函数来定义CloudSqlInstanceDatabaseCreateOperator。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
    project_id=PROJECT_ID,
    body=db_create_body,
    instance=INSTANCE_NAME,
    task_id='sql_db_create_task'
)
示例请求体:
db_create_body = {
    "instance": INSTANCE_NAME,
    "name": DB_NAME,
    "project": PROJECT_ID
}
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关数据库插入,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceDatabaseDeleteOperator
在 Cloud SQL 实例中删除数据库。
有关参数定义,请参阅CloudSqlInstanceDatabaseDeleteOperator。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
    project_id=PROJECT_ID,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id='sql_db_delete_task'
)
模版
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
                   'api_version')
更多信息
有关数据库删除,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceDatabasePatchOperator
使用修补程序语义更新包含有关 Cloud SQL 实例内数据库的信息的资源。请参阅: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
有关参数定义,请参阅CloudSqlInstanceDatabasePatchOperator。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
    project_id=PROJECT_ID,
    body=db_patch_body,
    instance=INSTANCE_NAME,
    database=DB_NAME,
    task_id='sql_db_patch_task'
)
示例请求体:
db_patch_body = {
    "charset": "utf16",
    "collation": "utf16_general_ci"
}
模版
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
                   'api_version')
更多信息
有关数据库修改,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceDeleteOperator
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
使用 operator(执行器)
sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
    project_id=PROJECT_ID,
    instance=INSTANCE_NAME,
    task_id='sql_instance_delete_task'
)
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关删除,请参阅Google Cloud SQL API 文档。
CloudSqlInstanceCreateOperator
在 Google Cloud Platform 中创建新的 Cloud SQL 实例。
有关参数定义,请参阅CloudSqlInstanceCreateOperator。
如果存在具有相同名称的实例,则不会执行任何操作,并且 operator(执行器)将成功执行。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
定义实例的示例:
body = {
    "name": INSTANCE_NAME,
    "settings": {
        "tier": "db-n1-standard-1",
        "backupConfiguration": {
            "binaryLogEnabled": True,
            "enabled": True,
            "startTime": "05:00"
        },
        "activationPolicy": "ALWAYS",
        "dataDiskSizeGb": 30,
        "dataDiskType": "PD_SSD",
        "databaseFlags": [],
        "ipConfiguration": {
            "ipv4Enabled": True,
            "requireSsl": True,
        },
        "locationPreference": {
            "zone": "europe-west4-a"
        },
        "maintenanceWindow": {
            "hour": 5,
            "day": 7,
            "updateTrack": "canary"
        },
        "pricingPlan": "PER_USE",
        "replicationType": "ASYNCHRONOUS",
        "storageAutoResize": False,
        "storageAutoResizeLimit": 0,
        "userLabels": {
            "my-key": "my-value"
        }
    },
    "databaseVersion": "MYSQL_5_7",
    "region": "europe-west4",
}
使用 operator(执行器)
sql_instance_create_task = CloudSqlInstanceCreateOperator(
    project_id=PROJECT_ID,
    body=body,
    instance=INSTANCE_NAME,
    task_id='sql_instance_create_task'
)
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关插入,请参阅Google Cloud SQL API 文档。
CloudSqlInstancePatchOperator
更新 Google Cloud Platform 中的 Cloud SQL 实例的设置(部分更新)。
有关参数定义,请参阅CloudSqlInstancePatchOperator。
这是部分更新,因此仅设置/更新正文中指定的设置的值。现有实例的其余部分将保持不变。
参数
示例 DAG 中的一些参数取自环境变量:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
定义实例的示例:
patch_body = {
    "name": INSTANCE_NAME,
    "settings": {
        "dataDiskSizeGb": 35,
        "maintenanceWindow": {
            "hour": 3,
            "day": 6,
            "updateTrack": "canary"
        },
        "userLabels": {
            "my-key-patch": "my-value-patch"
        }
    }
}
使用 operator(执行器)
sql_instance_patch_task = CloudSqlInstancePatchOperator(
    project_id=PROJECT_ID,
    body=patch_body,
    instance=INSTANCE_NAME,
    task_id='sql_instance_patch_task'
)
模版
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
更多信息
有关部分更新,请参阅Google Cloud SQL API 文档。

