How to Build a Data Pipeline in Python with Airflow

Credit Card Fraud Detection Dataset

In the modern data ecosystem, data pipelines are the backbone of analytics, machine learning, and automation. They allow organizations to collect, transform, and move data seamlessly across systems, from APIs and databases to dashboards and AI models.

One of the most powerful tools for building and managing these pipelines is Apache Airflow, an open-source platform for scheduling, monitoring, and automating workflows.

In this guide, you’ll learn how to build a simple yet powerful data pipeline in Python using Airflow. From setup to deployment, complete with practical code examples.

What Is Apache Airflow?

Apache Airflow is a workflow orchestration tool developed by Airbnb and now maintained by the Apache Software Foundation. It allows you to define complex workflows as Directed Acyclic Graphs (DAGs) where each task represents a step in your data pipeline.

Why Use Airflow?

  • Automates repetitive data tasks
  • Handles dependencies and scheduling
  • Provides monitoring through a web UI
  • Integrates with AWS, GCP, and databases
  • Scales easily for big data workflows

Prerequisites

Before you begin, make sure you have:

  • Python 3.8+ installed
  • pip or conda package manager
  • Basic knowledge of pandas and SQL
  • Airflow installed via:
pip install apache-airflow

Initialize the Airflow environment:

airflow db init
airflow users create --username admin --firstname Data --lastname Engineer --role Admin --email admin@example.com
airflow webserver --port 8080
airflow scheduler

Step 1: Create a Simple DAG

A DAG (Directed Acyclic Graph) defines your pipeline workflow.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extracting data from API...")

def transform_data():
    print("Transforming data...")

def load_data():
    print("Loading data into database...")

default_args = {
    'owner': 'codewithfimi',
    'start_date': datetime(2025, 1, 1),
    'retries': 1
}

with DAG(
    dag_id='simple_data_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:
    
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    load = PythonOperator(task_id='load', python_callable=load_data)

    extract >> transform >> load

Step 2: Understanding ETL (Extract, Transform, Load)

StepDescriptionExample
ExtractGet data from APIs, CSVs, or databasesPulling stock data via an API
TransformClean or modify dataRemove null values, aggregate data
LoadStore into database or data warehouseUpload to PostgreSQL or BigQuery

Each step is handled by Airflow tasks connected through dependencies.

Step 3: Adding Real Data

Let’s extract and load real-world data using pandas and requests.

import pandas as pd
import requests

def extract_data():
    url = "https://api.coindesk.com/v1/bpi/currentprice.json"
    data = requests.get(url).json()
    df = pd.DataFrame([{
        "currency": "USD",
        "rate": data["bpi"]["USD"]["rate_float"],
        "time": data["time"]["updatedISO"]
    }])
    df.to_csv('/tmp/bitcoin_rate.csv', index=False)

Then add a load function:

import sqlite3

def load_data():
    conn = sqlite3.connect('bitcoin.db')
    df = pd.read_csv('/tmp/bitcoin_rate.csv')
    df.to_sql('bitcoin_rates', conn, if_exists='append', index=False)
    conn.close()

This creates a daily Bitcoin price pipeline, fetching live data, cleaning it, and storing it in a local database.

Step 4: Monitor and Automate

Once your DAG runs successfully:

  • View execution logs from the Airflow UI
  • Set email alerts for failures
  • Use @hourly, @daily, or custom cron schedules

Step 5: Scale and Deploy

To move from local to production:

  • Host Airflow on AWS MWAA, Google Composer, or Docker
  • Connect to cloud storage (S3, GCS) and databases (Postgres, Snowflake)
  • Use XComs for data sharing between tasks

Real-World Applications

  • Data Warehousing: Automate daily database refreshes
  • Machine Learning Pipelines: Retrain models periodically
  • Business Reporting: Schedule analytics dashboards
  • API Data Collection: Aggregate external data daily

Building a data pipeline in Python with Airflow empowers you to automate complex workflows, reduce manual effort, and maintain data consistency across systems.
Once you master the basics, you can scale your pipelines to handle big data, AI, and real-time analytics — all powered by Python.

FAQs

1. What is a data pipeline in simple terms?

A data pipeline is a system that moves and transforms data from source to destination automatically.

2. Why use Airflow for data pipelines?

Airflow helps you automate, schedule, and monitor workflows efficiently.Great for both small and enterprise projects.

3. Can I use Airflow with cloud platforms?

Yes, Airflow integrates with AWS, GCP, Azure, and many SaaS APIs.

4. Is Airflow good for beginners?

Absolutely. It’s intuitive, Python-based, and widely used in the data industry.

5. What’s next after learning Airflow?

Try combining it with Docker, Spark, and cloud services to build production-grade data pipelines.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top