How to Ingest Data From API Into a Database

How to Ingest Data From API Into a Database

Most of the data that powers real analytics pipelines does not live in a file you can download once and be done with it. It lives behind an API that delivers fresh data every time you call it. Weather readings updated every hour. Stock prices refreshed every minute. Customer records synced from a CRM. Social media metrics pulled daily. E-commerce orders flowing in continuously.

The challenge is that an API call returns data in memory, usually as a JSON response, and the moment your script ends that data is gone. If you want to query it, join it with other tables, track it over time, or build a dashboard on top of it, you need to move it from the API response into a database where it can live permanently and be queried however you need.

This process is called data ingestion. It is one of the most fundamental tasks in data engineering and it follows the same pattern regardless of which API you are calling or which database you are storing into. You call the API, parse the response, transform the data into the shape your database expects, and insert it. Then you automate that sequence to run on a schedule so the database stays current without manual intervention.

This guide walks through the entire process step by step, from making your first API call to building a reusable ingestion pipeline that handles pagination, errors, and duplicate records.

What Is API Data Ingestion?

API data ingestion is the process of extracting data from an external API endpoint and loading it into a database for storage, querying, and analysis. It is the extract and load portion of an ETL pipeline where the source is an HTTP API rather than a file or another database.

Think of it like a supply chain for data. The API is the supplier with inventory that updates constantly. Your database is the warehouse where you store everything you receive. The ingestion script is the logistics system that calls the supplier, receives the shipment, checks it for quality, and puts it in the right location in the warehouse. Without the logistics system the supplier keeps updating their inventory but nothing ever reaches your warehouse.

The API delivers data in JSON format in most cases. JSON is a nested structure of keys and values that is easy for APIs to generate and easy for Python to parse, but it is not directly insertable into a relational database. Part of the ingestion process is flattening and transforming that JSON structure into rows and columns that fit your database schema.

Setting Up Your Environment

Install the required libraries:

pip install requests pandas sqlalchemy psycopg2-binary python-dotenv

requests handles the HTTP calls to the API. pandas helps parse and transform the JSON response into a structured DataFrame. sqlalchemy provides a consistent interface for connecting to and inserting into both SQLite and PostgreSQL. psycopg2-binary is the driver sqlalchemy uses to communicate with PostgreSQL. python-dotenv loads API keys and database credentials from a .env file so they never appear in your code.

python

import requests
import pandas as pd
import sqlite3
import json
import os
import time
import logging
from datetime import datetime
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

load_dotenv()

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

Setting up logging from the start is worth the two extra lines. When the script runs on a schedule overnight and something goes wrong, the log tells you exactly what happened, which API call failed, which record caused the error, and at what time. Without logging you are debugging a black box.

Step by Step: Ingesting API Data Into a Database

Step 1: Make Your First API Call

This guide uses the Open-Meteo weather API which requires no authentication and returns real weather data, making it a perfect learning example. The same pattern works for any REST API that returns JSON.

python

import requests

def fetch_weather_data(latitude, city_name):
    url = "https://api.open-meteo.com/v1/forecast"

    params = {
        'latitude':            latitude[0],
        'longitude':           latitude[1],
        'hourly':              'temperature_2m,relative_humidity_2m,wind_speed_10m',
        'forecast_days':       1,
        'timezone':            'auto'
    }

    response = requests.get(url, params=params)

    if response.status_code != 200:
        raise Exception(f"API call failed: {response.status_code} {response.text}")

    data = response.json()
    logger.info(f"Successfully fetched data for {city_name}")
    return data

weather_raw = fetch_weather_data((40.7128, -74.0060), 'New York')
print(json.dumps(weather_raw, indent=2)[:500])

Always check response.status_code before trying to parse the response. A 200 means success. A 401 means your API key is wrong or missing. A 429 means you hit the rate limit. A 500 means the API itself has a problem. Raising an exception on non-200 responses means failures are caught immediately rather than silently producing empty or malformed data downstream.

Step 2: Parse the JSON Response Into a DataFrame

API responses rarely arrive in a format that maps directly to database rows. JSON is usually nested, with arrays inside objects inside arrays. Flattening it into a tabular structure is the core transformation step:

python

def parse_weather_response(data, city_name):
    hourly = data['hourly']

    df = pd.DataFrame({
        'timestamp':        hourly['time'],
        'temperature_c':    hourly['temperature_2m'],
        'humidity_pct':     hourly['relative_humidity_2m'],
        'wind_speed_kmh':   hourly['wind_speed_10m'],
        'city':             city_name,
        'ingested_at':      datetime.utcnow().isoformat()
    })

    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['ingested_at'] = pd.to_datetime(df['ingested_at'])

    df['temperature_c']  = pd.to_numeric(df['temperature_c'],  errors='coerce')
    df['humidity_pct']   = pd.to_numeric(df['humidity_pct'],   errors='coerce')
    df['wind_speed_kmh'] = pd.to_numeric(df['wind_speed_kmh'], errors='coerce')

    logger.info(f"Parsed {len(df)} rows for {city_name}")
    return df

weather_df = parse_weather_response(weather_raw, 'New York')
print(weather_df.head())
print(weather_df.dtypes)

Adding an ingested_at column to every row is a practice worth adopting on every ingestion pipeline. It records exactly when each row was pulled from the API, which lets you audit the pipeline, identify stale data, and reconstruct when specific values were observed if the API updates historical data over time.

Explicit type conversion with pd.to_numeric(errors=’coerce’) and pd.to_datetime() prevents type mismatches when inserting into the database. A column that pandas infers as object type when it should be float causes an insertion error or silent data corruption. Coercing explicitly means any value that cannot convert becomes NaN, which is visible and handleable, rather than a string sitting in a numeric column that causes failures later.

Step 3: Create the Database and Table

SQLite requires no server installation and is the right choice for learning and for lightweight pipelines processing modest data volumes. PostgreSQL is the right choice for production pipelines with multiple concurrent readers and writers.

python

def create_database_and_table(db_path='weather_data.db'):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute('''
        CREATE TABLE IF NOT EXISTS weather_readings (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp       DATETIME NOT NULL,
            city            VARCHAR(100) NOT NULL,
            temperature_c   FLOAT,
            humidity_pct    FLOAT,
            wind_speed_kmh  FLOAT,
            ingested_at     DATETIME NOT NULL,
            UNIQUE(timestamp, city)
        )
    ''')

    conn.commit()
    conn.close()
    logger.info("Database and table created successfully.")

create_database_and_table()

The UNIQUE(timestamp, city) constraint is the most important part of this table definition. It prevents duplicate records from being inserted if the ingestion script runs twice for the same time period. Without it every script execution inserts the same rows again and your table grows with duplicate data that corrupts every aggregate query that runs against it.

CREATE TABLE IF NOT EXISTS means the script can run repeatedly without failing on subsequent executions because the table already exists. The first run creates the table. Every subsequent run finds it already there and moves on.

Step 4: Insert Data Into the Database

python

def insert_weather_data(df, db_path='weather_data.db'):
    engine = create_engine(f'sqlite:///{db_path}')

    rows_before = 0
    with engine.connect() as conn:
        result = conn.execute(text("SELECT COUNT(*) FROM weather_readings"))
        rows_before = result.fetchone()[0]

    df.to_sql(
        name='weather_readings',
        con=engine,
        if_exists='append',
        index=False,
        method='multi'
    )

    with engine.connect() as conn:
        result = conn.execute(text("SELECT COUNT(*) FROM weather_readings"))
        rows_after = result.fetchone()[0]

    rows_inserted = rows_after - rows_before
    logger.info(f"Inserted {rows_inserted} new rows. Total rows: {rows_after}")
    return rows_inserted

insert_weather_data(weather_df)

if_exists=’append’ tells pandas to add rows to the existing table rather than dropping and recreating it. Using ‘replace’ would wipe the entire table on every run, which defeats the purpose of building up a historical record over time.

method=’multi’ batches multiple rows into a single INSERT statement rather than one statement per row. For small datasets the difference is negligible. For large datasets it is the difference between an insertion that takes seconds and one that takes minutes.

Step 5: Handle Duplicate Records Gracefully

The UNIQUE constraint prevents duplicates but it also causes the entire insert to fail if any row in the batch violates the constraint. A better approach handles conflicts row by row so new records are inserted and duplicates are silently skipped:

python

def insert_with_conflict_handling(df, db_path='weather_data.db'):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    inserted = 0
    skipped  = 0

    for _, row in df.iterrows():
        try:
            cursor.execute('''
                INSERT OR IGNORE INTO weather_readings
                (timestamp, city, temperature_c, humidity_pct,
                 wind_speed_kmh, ingested_at)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                str(row['timestamp']),
                row['city'],
                row['temperature_c'],
                row['humidity_pct'],
                row['wind_speed_kmh'],
                str(row['ingested_at'])
            ))

            if cursor.rowcount > 0:
                inserted += 1
            else:
                skipped += 1

        except sqlite3.Error as e:
            logger.error(f"Error inserting row: {e}")
            continue

    conn.commit()
    conn.close()

    logger.info(f"Inserted: {inserted} rows. Skipped (duplicates): {skipped} rows.")
    return inserted, skipped

inserted, skipped = insert_with_conflict_handling(weather_df)

INSERT OR IGNORE silently skips any row that would violate the UNIQUE constraint instead of raising an error. cursor.rowcount tells you whether the insert succeeded (1) or was skipped (0) for each row. Logging the count of inserted versus skipped rows gives you visibility into whether the pipeline is receiving new data on each run or repeatedly trying to insert data it has already stored.

Step 6: Build a Multi-City Pipeline With Error Handling

A production-ready ingestion pipeline handles multiple data sources, retries on transient failures, and logs outcomes clearly:

python

def run_ingestion_pipeline(cities, db_path='weather_data.db'):
    create_database_and_table(db_path)

    pipeline_results = []

    for city_name, coordinates in cities.items():
        logger.info(f"Starting ingestion for {city_name}")

        retries    = 3
        success    = False
        last_error = None

        for attempt in range(retries):
            try:
                raw_data = fetch_weather_data(coordinates, city_name)
                df       = parse_weather_response(raw_data, city_name)
                inserted, skipped = insert_with_conflict_handling(df, db_path)

                pipeline_results.append({
                    'city':     city_name,
                    'status':   'success',
                    'inserted': inserted,
                    'skipped':  skipped
                })

                success = True
                break

            except Exception as e:
                last_error = str(e)
                logger.warning(f"Attempt {attempt + 1} failed for {city_name}: {e}")
                time.sleep(2 ** attempt)

        if not success:
            logger.error(f"All retries failed for {city_name}: {last_error}")
            pipeline_results.append({
                'city':     city_name,
                'status':   'failed',
                'error':    last_error,
                'inserted': 0,
                'skipped':  0
            })

        time.sleep(1)

    return pipeline_results

cities = {
    'New York':    (40.7128, -74.0060),
    'London':      (51.5074, -0.1278),
    'Lagos':       (6.5244,   3.3792),
    'Tokyo':       (35.6762, 139.6503),
    'Sydney':      (-33.8688, 151.2093)
}

results = run_ingestion_pipeline(cities)

print("\nPipeline Summary:")
for r in results:
    if r['status'] == 'success':
        print(f"  {r['city']}: {r['inserted']} inserted, {r['skipped']} skipped")
    else:
        print(f"  {r['city']}: FAILED - {r.get('error', 'unknown error')}")

The retry loop with exponential backoff, time.sleep(2 ** attempt), handles the most common transient failures in API ingestion. A network hiccup on the first attempt, waiting 1 second then retrying, almost always succeeds on the second try. A 429 rate limit error, waiting 2 seconds then 4 seconds, usually clears by the third attempt. Without retries any transient failure permanently skips that city for the entire run.

time.sleep(1) between cities respects the API’s rate limits. Sending requests too quickly triggers rate limiting which causes 429 errors on subsequent requests. A one second pause between each city keeps the request rate well within typical API limits.

Step 7: Query and Verify the Ingested Data

Always verify the data after ingestion before trusting the pipeline is working correctly:

python

def verify_ingestion(db_path='weather_data.db'):
    conn = sqlite3.connect(db_path)

    total_rows = pd.read_sql(
        "SELECT COUNT(*) AS total FROM weather_readings", conn
    )
    print("Total rows:", total_rows['total'].values[0])

    rows_per_city = pd.read_sql('''
        SELECT
            city,
            COUNT(*)           AS row_count,
            MIN(timestamp)     AS earliest,
            MAX(timestamp)     AS latest,
            MAX(ingested_at)   AS last_ingested
        FROM weather_readings
        GROUP BY city
        ORDER BY city
    ''', conn)
    print("\nRows per city:")
    print(rows_per_city.to_string(index=False))

    latest_readings = pd.read_sql('''
        SELECT
            city,
            timestamp,
            temperature_c,
            humidity_pct,
            wind_speed_kmh
        FROM weather_readings
        WHERE timestamp = (
            SELECT MAX(timestamp)
            FROM weather_readings w2
            WHERE w2.city = weather_readings.city
        )
        ORDER BY city
    ''', conn)
    print("\nLatest reading per city:")
    print(latest_readings.to_string(index=False))

    conn.close()

verify_ingestion()

Checking row counts per city, earliest and latest timestamps, and last ingestion time after every pipeline run gives you immediate visibility into whether the pipeline is working correctly, whether all cities returned data, and whether the data covers the expected time range.

Connecting to PostgreSQL for Production

SQLite is excellent for learning and small pipelines. For production workloads that need concurrent access and larger data volumes, PostgreSQL is the standard choice:

python

from sqlalchemy import create_engine

DB_USER     = os.getenv('DB_USER',     'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD', 'yourpassword')
DB_HOST     = os.getenv('DB_HOST',     'localhost')
DB_PORT     = os.getenv('DB_PORT',     '5432')
DB_NAME     = os.getenv('DB_NAME',     'weather_db')

pg_engine = create_engine(
    f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
)

def insert_to_postgres(df, engine):
    df.to_sql(
        name='weather_readings',
        con=engine,
        if_exists='append',
        index=False,
        method='multi'
    )
    logger.info(f"Inserted {len(df)} rows to PostgreSQL.")

Store credentials in a .env file and load them with python-dotenv. Never hardcode database passwords or API keys directly in your script. A .env file that is added to .gitignore keeps credentials out of version control and out of any logs that might be shared or stored publicly.

API Ingestion Pattern Summary

StepWhat HappensKey Consideration
Call APIHTTP GET request with parametersHandle status codes, add retries
Parse responseFlatten JSON to DataFrameConvert types explicitly
Create tableDefine schema with constraintsAdd UNIQUE to prevent duplicates
Insert dataAppend rows to tableUse INSERT OR IGNORE for conflicts
VerifyQuery row counts and timestampsRun after every ingestion
ScheduleAutomate with cron or task schedulerMatch schedule to API update frequency

Common Limitations

APIs have rate limits that break pipelines. Most public APIs restrict how many requests you can make per minute or per day. Exceeding the limit returns a 429 status code and may temporarily block your IP. Always check the API documentation for rate limit details, add delays between requests, and implement exponential backoff in your retry logic so temporary rate limiting does not permanently fail the run.

API response structures change without warning. An API that returns temperature_2m today might rename it to temperature tomorrow as the provider updates their schema. A hard-coded key name in your parser immediately breaks the ingestion. Build defensive parsing that logs missing keys clearly rather than crashing silently, and subscribe to API changelog notifications when they are available.

Network failures are inevitable over long runs. A pipeline ingesting data from twenty endpoints overnight will encounter at least one network timeout, DNS failure, or connection reset in a typical week of operation. Without retry logic these transient failures permanently skip data. Every production ingestion script needs a retry mechanism with a limit on attempts and a log of what failed so missed data can be backfilled.

Common Mistakes to Avoid

Not handling pagination. Most APIs return data in pages of 100 or 1000 records and require multiple requests with incrementing page numbers or cursor tokens to retrieve the full dataset. If your ingestion script only calls the first page, you silently miss every record beyond the first page limit without any error. Always check the API documentation for pagination parameters and implement a loop that continues requesting pages until the response indicates no more data remains.

Storing raw JSON instead of structured rows. Inserting the entire API response as a JSON blob into a single text column avoids the work of parsing but creates far more work later when you need to query specific fields. Querying inside JSON columns is slow, syntax varies across databases, and schema changes in the API break your queries in unpredictable ways. Always parse JSON into proper typed columns at ingestion time.

Ignoring the ingested_at timestamp. Without recording when each row was ingested you lose the ability to audit the pipeline, identify data gaps from failed runs, or understand when specific values were observed versus when they changed. The ingested_at column costs one line of code and provides essential operational visibility.

Hardcoding API keys in the script. An API key in your source code gets committed to version control, shared with every collaborator, and potentially exposed in public repositories. Use environment variables or a .env file for all credentials and add .env to your .gitignore before the first commit.

API to Database Ingestion Cheat Sheet

TaskCode
Make API callrequests.get(url, params=params)
Check status coderesponse.status_code == 200
Parse JSON responseresponse.json()
Flatten JSON to DataFramepd.DataFrame(data[‘key’])
Add ingestion timestampdf[‘ingested_at’] = datetime.utcnow()
Convert typespd.to_numeric(col, errors=’coerce’)
Create SQLite connectionsqlite3.connect(‘db.db’)
Create SQLAlchemy enginecreate_engine(‘sqlite:///db.db’)
Insert DataFrame to tabledf.to_sql(name, engine, if_exists=’append’)
Skip duplicate rowsINSERT OR IGNORE INTO table
Query inserted datapd.read_sql(‘SELECT …’, conn)
Load env variablesload_dotenv() then os.getenv(‘KEY’)

Ingesting data from an API into a database is the foundation of almost every real-world data pipeline. The specific API and the specific database change but the pattern never does. Call the endpoint, check the status code, parse the JSON into typed rows, create a table with a unique constraint to prevent duplicates, insert with conflict handling, verify the result, and automate to run on a schedule.

The two pieces of this pipeline that separate a script that works once from one that works reliably for months are the retry logic and the duplicate handling. APIs fail transiently. Network connections drop. Rate limits get hit. Without retries those failures silently skip data. Without a unique constraint and INSERT OR IGNORE, running the script twice for the same time period fills your table with duplicate rows that corrupt every downstream query.

Start with SQLite for learning and small pipelines. Move to PostgreSQL when the data volume grows or when multiple processes need to read and write simultaneously. Keep credentials in environment variables from day one. Log every outcome so you can debug failures without guessing what happened overnight.

Once this pattern is comfortable on one API, adapting it to any other API is mostly a matter of changing the URL, the parameters, and the parsing logic. The structure that makes it reliable stays exactly the same.

FAQs

How do I ingest data from an API into a database using Python?

Use the requests library to call the API, parse the JSON response into a pandas DataFrame, and insert it into your database using sqlalchemy with df.to_sql(). Create the table with a UNIQUE constraint on the natural key columns to prevent duplicate records on repeated runs, and use INSERT OR IGNORE to skip conflicts gracefully.

What is the best database for storing API data in Python?

SQLite is the easiest starting point because it requires no server installation and works with a single file. PostgreSQL is the right choice for production pipelines that need concurrent access, larger data volumes, or advanced querying features. Both work seamlessly with sqlalchemy and pandas using the same df.to_sql() interface.

How do I handle duplicate records when ingesting API data?

Add a UNIQUE constraint on the columns that naturally identify a record, like a combination of timestamp and city or record ID and date. Use INSERT OR IGNORE in SQLite or INSERT … ON CONFLICT DO NOTHING in PostgreSQL so that duplicate rows are silently skipped rather than raising an error that stops the entire ingestion.

How do I handle API rate limits in a data ingestion pipeline?

Add a time.sleep() call between requests to stay within the allowed request rate. Implement a retry loop with exponential backoff so that 429 rate limit responses trigger a wait and retry rather than a permanent failure. Always check the API documentation for the specific rate limit and set your sleep interval to stay safely below it.

How do I automate API data ingestion to run on a schedule?

On Linux and Mac use cron to schedule your Python script. On Windows use Task Scheduler. For cloud pipelines, Apache Airflow is the standard tool for orchestrating scheduled data pipelines with built-in retry logic, dependency management, and monitoring. For simple scripts a cron job running every hour or every day is sufficient for most API ingestion use cases.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top