Cheet Sheet – Airflow πŸ’¨

I am sure you can find cheet sheets about airflow from many sources but I hope that having all the examples and exercises in one place will be useful for you. 🌍

Your feedback is very valuable to me, I am looking forward to hearing it. πŸ‘‹πŸΌ

TermDescription
DAGDirected Acyclic Graph, defines the workflow.
TaskA single unit of work in a DAG (e.g., Python function, Bash command).
OperatorDefines what each task does (e.g., PythonOperator, BashOperator).
SchedulerHandles the scheduling and execution of DAGs.
ExecutorManages how tasks are executed (e.g., SequentialExecutor, CeleryExecutor).
Task InstanceRepresents a specific run of a task in a DAG.
XComUsed to share data between tasks.

CommandDescription
airflow dags listList all available DAGs.
airflow dags trigger <dag_id>Trigger a DAG manually.
airflow dags pause <dag_id>Pause a DAG.
airflow dags unpause <dag_id>Unpause a DAG.
airflow tasks list <dag_id>List all tasks in a DAG.
airflow tasks test <dag_id> <task_id> <execution_date>Test a task without running the DAG.
airflow webserverStart the Airflow web server.
airflow schedulerStart the Airflow scheduler.
airflow db initInitialize the Airflow database.

  1. Import Airflow Libraries:
from airflow import DAG from airflow.operators.python
import PythonOperator from datetime import datetime
  1. Define Default Arguments:
default_args = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=5), }
  1. Define the DAG:
with DAG( dag_id='example_dag', default_args=default_args, schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False ) as dag: task1 = PythonOperator( task_id='say_hello', python_callable=lambda: print("Hello, Airflow!") ) task1

OperatorDescriptionExample
PythonOperatorExecutes a Python function.PythonOperator(python_callable=my_function, task_id='task1')
BashOperatorExecutes a bash command.BashOperator(bash_command='echo "Hello!"', task_id='task2')
EmailOperatorSends an email.EmailOperator(to='email@example.com', subject='Airflow', html_content='Task done!')
DummyOperatorPlaceholder for pipeline structuring.DummyOperator(task_id='placeholder')
BranchPythonOperatorChooses a task branch based on a condition.BranchPythonOperator(python_callable=choose_branch, task_id='branching')

SyntaxDescription
task1 >> task2task1 runs before task2.
task1 << task2task1 runs after task2.
[task1, task2] >> task3Both task1 and task2 must finish before task3.

ExpressionInterval
@onceRun once.
@hourlyRun every hour.
@dailyRun once a day at midnight.
@weeklyRun once a week at midnight on Sunday.
@monthlyRun once a month at midnight, on the first day.
@yearlyRun once a year at midnight, on January 1st.
cron_expressionCustom schedule (e.g., 0 12 * * * runs at 12 PM daily).

  1. Push Data to XCom:
def push_to_xcom(**kwargs): kwargs['ti'].xcom_push(key='key_name', value='my_value')
  1. Pull Data from XCom:
def pull_from_xcom(**kwargs): value = kwargs['ti'].xcom_pull(task_ids='task_id', key='key_name') print(f"Pulled value: {value}")

IssueSolution
DAG is not listed in the UI.Ensure the DAG file is in the dags folder and has no syntax errors.
Task fails repeatedly.Check logs for detailed error messages via the web UI or CLI.
Web server not starting.Verify that the Airflow database is initialized (airflow db init).
Scheduler not running tasks.Ensure the scheduler is running (airflow scheduler).

This cheat sheet provides an overview of Airflow’s most important concepts and commands. For more advanced configurations or customizations, refer to Airflow’s Official Documentation. 😊

…………

Thank you for your time; sharing is caring! 🌍

…………

Leave a Reply

Your email address will not be published. Required fields are marked *