I am sure you can find cheet sheets about airflow from many sources but I hope that having all the examples and exercises in one place will be useful for you. π
Your feedback is very valuable to me, I am looking forward to hearing it. ππΌ
Core Concepts
| Term | Description |
|---|---|
| DAG | Directed Acyclic Graph, defines the workflow. |
| Task | A single unit of work in a DAG (e.g., Python function, Bash command). |
| Operator | Defines what each task does (e.g., PythonOperator, BashOperator). |
| Scheduler | Handles the scheduling and execution of DAGs. |
| Executor | Manages how tasks are executed (e.g., SequentialExecutor, CeleryExecutor). |
| Task Instance | Represents a specific run of a task in a DAG. |
| XCom | Used to share data between tasks. |
Airflow Command-Line Interface (CLI)
| Command | Description |
|---|---|
airflow dags list | List all available DAGs. |
airflow dags trigger <dag_id> | Trigger a DAG manually. |
airflow dags pause <dag_id> | Pause a DAG. |
airflow dags unpause <dag_id> | Unpause a DAG. |
airflow tasks list <dag_id> | List all tasks in a DAG. |
airflow tasks test <dag_id> <task_id> <execution_date> | Test a task without running the DAG. |
airflow webserver | Start the Airflow web server. |
airflow scheduler | Start the Airflow scheduler. |
airflow db init | Initialize the Airflow database. |
Creating a DAG
- Import Airflow Libraries:
from airflow import DAG from airflow.operators.python
import PythonOperator from datetime import datetime
- Define Default Arguments:
default_args = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=5), }
- Define the DAG:
with DAG( dag_id='example_dag', default_args=default_args, schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False ) as dag: task1 = PythonOperator( task_id='say_hello', python_callable=lambda: print("Hello, Airflow!") ) task1
Operators
| Operator | Description | Example |
|---|---|---|
PythonOperator | Executes a Python function. | PythonOperator(python_callable=my_function, task_id='task1') |
BashOperator | Executes a bash command. | BashOperator(bash_command='echo "Hello!"', task_id='task2') |
EmailOperator | Sends an email. | EmailOperator(to='email@example.com', subject='Airflow', html_content='Task done!') |
DummyOperator | Placeholder for pipeline structuring. | DummyOperator(task_id='placeholder') |
BranchPythonOperator | Chooses a task branch based on a condition. | BranchPythonOperator(python_callable=choose_branch, task_id='branching') |
Task Dependencies
| Syntax | Description |
|---|---|
task1 >> task2 | task1 runs before task2. |
task1 << task2 | task1 runs after task2. |
[task1, task2] >> task3 | Both task1 and task2 must finish before task3. |
Common Schedule Intervals
| Expression | Interval |
|---|---|
@once | Run once. |
@hourly | Run every hour. |
@daily | Run once a day at midnight. |
@weekly | Run once a week at midnight on Sunday. |
@monthly | Run once a month at midnight, on the first day. |
@yearly | Run once a year at midnight, on January 1st. |
cron_expression | Custom schedule (e.g., 0 12 * * * runs at 12 PM daily). |
Working with XCom
- Push Data to XCom:
def push_to_xcom(**kwargs): kwargs['ti'].xcom_push(key='key_name', value='my_value')
- Pull Data from XCom:
def pull_from_xcom(**kwargs): value = kwargs['ti'].xcom_pull(task_ids='task_id', key='key_name') print(f"Pulled value: {value}")
Troubleshooting
| Issue | Solution |
|---|---|
| DAG is not listed in the UI. | Ensure the DAG file is in the dags folder and has no syntax errors. |
| Task fails repeatedly. | Check logs for detailed error messages via the web UI or CLI. |
| Web server not starting. | Verify that the Airflow database is initialized (airflow db init). |
| Scheduler not running tasks. | Ensure the scheduler is running (airflow scheduler). |
This cheat sheet provides an overview of Airflowβs most important concepts and commands. For more advanced configurations or customizations, refer to Airflowβs Official Documentation. π
…………
Thank you for your time; sharing is caring! π
…………


