在Airflow中编写执行MySQL查询的任务可以通过使用PythonOperator
来执行查询的Python函数。以下是一个简单的示例:
from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.mysql_operator import MySqlOperator from datetime import datetime default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1 } dag = DAG('mysql_query_dag', default_args=default_args, schedule_interval='@daily') def execute_mysql_query(): # 连接MySQL数据库 conn = MySQLdb.connect(host="localhost", user="root", passwd="password", db="database") cursor = conn.cursor() # 执行查询 cursor.execute("SELECT * FROM table") # 获取结果 rows = cursor.fetchall() for row in rows: print(row) # 关闭连接 conn.close() mysql_task = PythonOperator( task_id='execute_mysql_query', python_callable=execute_mysql_query, dag=dag ) mysql_task
在这个例子中,首先创建了一个DAG,并定义了一个Python函数execute_mysql_query
,该函数连接到MySQL数据库,执行查询并打印结果。然后使用PythonOperator
来执行这个函数,并将其添加到DAG中。当DAG运行时,该任务将连接到MySQL数据库执行查询。