气流定时任务 + 时间设置 + cron 表达式
最编程
2024-04-29 22:05:21
...
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
'''任务参数配置'''
default_args = {
"owner": "airflow", # 用户名的任务的所有者
"depends_on_past": False, # 当设置为true时,任务实例将依次运行,同时依赖上一个任务的计划成功。允许start_date的任务实例运行。
"start_date": datetime(2018, 12, 1), # 开始时间
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1, # 重试策略
"retry_delay": timedelta(seconds=5),
"dag_concurrency": 1, # 调度器允许并发运行的任务实例的数量
"max_active_runs_per_dag": 1 # 每个DAG的最大活动DAG运行次数
# 'queue': 'bash_queue',#运行此作业时要定位到哪个队列
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG("Day", default_args=default_args, schedule_interval='0 8 * * 1-5 0')
t1 = BashOperator(
task_id="update_kl_DayMin", # 任务的唯一,有意义的id
bash_command='python /usr/local/airflow/dags/Day.py ', # 要执行的命令,命令集或对bash脚本(必须为'.sh')
dag=dag,
)
schedule_interval 是任务时间设定:与Linux cron 时间是不同的
airflow cron 表达式: * * * * * * (分 时 月 年 周 秒)