Why Parquet Files Are Faster for Analytics

Why Parquet Files Are Faster for Analytics

Most data pipelines start the same way. Someone exports data to CSV because CSV is simple, universal, and opens in Excel. The file gets checked into a repository, shared over email, loaded into notebooks, and queried by pipelines. CSV works fine until it does not, and then it fails in ways that accumulate slowly before becoming obvious.

The CSV file that was ten megabytes becomes a gigabyte. Loading it takes thirty seconds instead of one. A query that needs two of thirty columns still reads all thirty. Compression helps with storage but not with query speed. A colleague opens it in a different tool and the date column parses incorrectly because the format was never specified. Adding a new column requires communicating its position to every downstream consumer.

Parquet was built to solve all of these problems simultaneously. It is the file format that the analytics engineering community converged on for storing and querying large datasets, and understanding why it is faster reveals foundational principles about how analytical data access actually works.

The Fundamental Problem With Row-Oriented Formats

CSV and JSON are row-oriented formats. Every record is stored as a complete unit, one after the other. A row with twenty columns stores all twenty column values contiguously before the next row begins.

This storage pattern is optimal for one specific access pattern: reading or writing a complete record. Databases that handle transactions, inserting orders, updating customer addresses, fetching a single user by ID, use row-oriented storage because those operations touch entire records.

Analytical queries have a completely different access pattern. They rarely need every column and almost always need every row in a subset. A query computing average transaction amount by country needs exactly two columns out of potentially thirty. With row-oriented storage, reading those two columns means reading all thirty from every row and discarding twenty-eight. At a billion rows, the wasted I/O is enormous.

Parquet solves this with columnar storage. Each column is stored as a contiguous block rather than being interleaved with other columns. Reading two columns out of thirty requires reading only those two column blocks. The other twenty-eight are never touched by the storage layer.

How Parquet Organizes Data

Parquet’s internal structure has three layers that work together to enable efficient analytical access.

Row groups are horizontal partitions of the data. A Parquet file is divided into row groups, each containing a configurable number of rows, typically 128MB worth of data. Row groups are the unit of parallel processing. When multiple CPU cores or multiple nodes read a Parquet file simultaneously, each processes a different row group independently.

Column chunks are the columnar partitions within each row group. Each column in a row group is stored as a separate column chunk. All values for column A in row group one are stored together, followed by all values for column B in row group one, and so on. This is the physical realization of columnar storage.

Pages are the smallest unit within a column chunk, typically 1MB in size. Pages are the unit of compression and encoding. Each page stores values for a single column using the most efficient encoding for that column’s data type and value distribution.

python

import pyarrow.parquet as pq
import pyarrow as pa
import pandas as pd
import numpy as np

# Create sample dataset
np.random.seed(42)
n = 1_000_000

df = pd.DataFrame({
    "transaction_id": np.arange(1, n + 1),
    "customer_id": np.random.randint(1, 50001, n),
    "amount": np.random.lognormal(4.0, 1.2, n).round(2),
    "category": np.random.choice(
        ["retail", "food", "travel", "entertainment"], n
    ),
    "country": np.random.choice(
        ["US", "UK", "DE", "FR", "JP"], n,
        p=[0.40, 0.20, 0.15, 0.15, 0.10]
    ),
    "status": np.random.choice(
        ["completed", "pending", "failed"], n,
        p=[0.80, 0.12, 0.08]
    ),
    "transaction_date": pd.date_range("2022-01-01", periods=n, freq="s")
})

# Write to Parquet with explicit configuration
table = pa.Table.from_pandas(df)
pq.write_table(
    table,
    "transactions.parquet",
    row_group_size=500_000,        # Rows per row group
    compression="snappy",          # Fast compression codec
    write_statistics=True,         # Enable predicate pushdown
    use_dictionary=True            # Dictionary encoding for categoricals
)

# Inspect the file structure
parquet_file = pq.ParquetFile("transactions.parquet")
metadata = parquet_file.metadata

print(f"Row groups: {metadata.num_row_groups}")
print(f"Total rows: {metadata.num_rows:,}")
print(f"Total columns: {metadata.num_columns}")
print(f"\nRow group 0 details:")

row_group = metadata.row_group(0)
print(f"  Rows in group: {row_group.num_rows:,}")
print(f"  Total compressed size: {row_group.total_compressed_size / 1e6:.2f} MB")

for i in range(metadata.num_columns):
    col = row_group.column(i)
    print(f"  Column '{col.path_in_schema}': "
          f"{col.total_compressed_size / 1e3:.1f} KB compressed, "
          f"encoding: {col.encodings}")

Columnar Storage: The Performance Difference in Practice

The performance advantage of columnar storage is most visible when querying a subset of columns from a large dataset.

python

import time
import os

# Write the same data in both formats
df.to_csv("transactions.csv", index=False)
df.to_parquet("transactions.parquet", compression="snappy")

csv_size = os.path.getsize("transactions.csv") / 1e6
parquet_size = os.path.getsize("transactions.parquet") / 1e6

print(f"CSV size: {csv_size:.1f} MB")
print(f"Parquet size: {parquet_size:.1f} MB")
print(f"Compression ratio: {csv_size / parquet_size:.1f}x")

# Benchmark: read only two columns
start = time.perf_counter()
csv_result = pd.read_csv("transactions.csv", usecols=["amount", "country"])
csv_time = time.perf_counter() - start

start = time.perf_counter()
parquet_result = pd.read_parquet(
    "transactions.parquet",
    columns=["amount", "country"]
)
parquet_time = time.perf_counter() - start

print(f"\nRead 2 of {len(df.columns)} columns:")
print(f"  CSV:     {csv_time:.3f}s")
print(f"  Parquet: {parquet_time:.3f}s")
print(f"  Speedup: {csv_time / parquet_time:.1f}x")

When reading two columns from a file with eight columns, Parquet reads roughly twenty-five percent of the data that CSV reads. The speedup is approximately proportional to the fraction of columns selected. For queries touching two of thirty columns, the speedup approaches fifteen times on I/O alone before accounting for compression benefits.

Compression: Why Columnar Data Compresses Better

Parquet files are dramatically smaller than CSV files for the same data. The reason goes beyond simply applying a compression algorithm. The combination of columnar storage and column-specific encoding makes the data far more compressible before any general-purpose algorithm is applied.

Dictionary encoding replaces repeated values with integer codes. A column containing country codes with five distinct values across a million rows stores those five strings once in a dictionary and then stores a million small integers referencing the dictionary. This is far more efficient than storing the string “US” four hundred thousand times.

Run-length encoding compresses sequences of repeated values into a count and a value. A sorted column where the same value appears thousands of consecutive times stores as a small number of run pairs rather than thousands of individual entries.

Delta encoding stores the difference between consecutive values rather than the values themselves. A timestamp column where values increase by one second stores those single-second deltas, which are all the same value and compress to almost nothing.

python

import pyarrow as pa
import pyarrow.parquet as pq
import io

# Demonstrate encoding efficiency with a high-cardinality vs low-cardinality column
low_cardinality = pa.array(
    np.random.choice(["US", "UK", "DE", "FR", "JP"], 1_000_000)
)
high_cardinality = pa.array(
    [f"user_{i}" for i in range(1_000_000)]
)

# Write and measure with different encodings
def measure_column_size(array, use_dictionary):
    table = pa.table({"col": array})
    buf = io.BytesIO()
    pq.write_table(
        table, buf,
        use_dictionary=use_dictionary,
        compression="snappy"
    )
    return buf.tell()

low_dict = measure_column_size(low_cardinality, use_dictionary=True)
low_plain = measure_column_size(low_cardinality, use_dictionary=False)
high_dict = measure_column_size(high_cardinality, use_dictionary=True)

print("Compression comparison:")
print(f"Low cardinality (5 values) with dictionary: {low_dict / 1e6:.2f} MB")
print(f"Low cardinality (5 values) without dict:    {low_plain / 1e6:.2f} MB")
print(f"High cardinality (1M unique) with dict:     {high_dict / 1e6:.2f} MB")
print(f"Dictionary benefit for low cardinality: {low_plain / low_dict:.1f}x")

# Benchmark different compression codecs
codecs = ["none", "snappy", "gzip", "brotli", "zstd"]

print("\nCompression codec comparison:")
print(f"{'Codec':<10} {'Size (MB)':<12} {'Write (s)':<12} {'Read (s)':<10}")

for codec in codecs:
    buf = io.BytesIO()
    start = time.perf_counter()
    pq.write_table(table, buf, compression=codec if codec != "none" else None)
    write_time = time.perf_counter() - start

    size_mb = buf.tell() / 1e6
    buf.seek(0)

    start = time.perf_counter()
    pq.read_table(buf)
    read_time = time.perf_counter() - start

    print(f"{codec:<10} {size_mb:<12.2f} {write_time:<12.3f} {read_time:<10.3f}")

Snappy is the most common choice for analytical workloads because it prioritizes decompression speed over compression ratio. ZStandard offers a better compression ratio than Snappy at comparable or faster decompression and has become increasingly popular. Gzip achieves the smallest files but decompresses slowly, making it appropriate for archival rather than active querying.

Predicate Pushdown: Skipping Data Before Reading It

Column pruning, reading only the columns you need, is one form of data skipping. Predicate pushdown is the other: skipping entire row groups based on filter conditions before reading any of their data.

When Parquet writes a file with statistics enabled, it stores the minimum and maximum value for each column in each row group in the file’s metadata. A query engine reads this metadata before touching the data and skips any row group where the filter condition is provably impossible to satisfy.

python

import duckdb
import pyarrow.parquet as pq

# Write data sorted by date to maximize predicate pushdown effectiveness
df_sorted = df.sort_values("transaction_date")
df_sorted.to_parquet(
    "transactions_sorted.parquet",
    compression="snappy",
    row_group_size=100_000
)

# Show statistics stored in file metadata
parquet_file = pq.ParquetFile("transactions_sorted.parquet")
metadata = parquet_file.metadata

print("Row group statistics for transaction_date column:")
for i in range(metadata.num_row_groups):
    rg = metadata.row_group(i)
    # Find the date column
    for j in range(rg.num_columns):
        col = rg.column(j)
        if col.path_in_schema == "transaction_date":
            stats = col.statistics
            if stats:
                print(f"  Row group {i}: "
                      f"min={stats.min}, "
                      f"max={stats.max}, "
                      f"null_count={stats.null_count}")

# Demonstrate predicate pushdown with DuckDB
conn = duckdb.connect()

# Query without filter (reads everything)
start = time.perf_counter()
result_full = conn.execute("""
    SELECT category, SUM(amount) as total
    FROM read_parquet('transactions_sorted.parquet')
    GROUP BY category
""").fetchdf()
full_time = time.perf_counter() - start

# Query with date filter (skips row groups outside range)
start = time.perf_counter()
result_filtered = conn.execute("""
    SELECT category, SUM(amount) as total
    FROM read_parquet('transactions_sorted.parquet')
    WHERE transaction_date >= '2022-06-01'
      AND transaction_date < '2022-07-01'
    GROUP BY category
""").fetchdf()
filtered_time = time.perf_counter() - start

print(f"\nFull scan: {full_time:.3f}s")
print(f"Filtered scan (1 month): {filtered_time:.3f}s")
print(f"Speedup from predicate pushdown: {full_time / filtered_time:.1f}x")

The effectiveness of predicate pushdown depends critically on data organization. When a column is sorted or clustered such that similar values appear in the same row groups, the min/max statistics exclude many row groups for selective filters. A date filter on a file sorted by date might read ten percent of the data. The same filter on an unsorted file provides no benefit because every row group’s date range spans the full dataset.

This is why partitioning by date is so common in analytical pipelines. Writing separate Parquet files per month means date filters skip entire files rather than relying on within-file statistics.

Schema Enforcement and Evolution

CSV has no schema. The header row contains column names but carries no information about types. Every tool that reads a CSV must infer types by sampling values, and those inferences differ across tools and settings. A column of integers that happens to have a null in the first row might be inferred as float by one tool and object by another.

Parquet embeds a schema in the file that specifies the name and type of every column. The schema is authoritative and enforced on read. A column declared as INT64 contains only integers. A column declared as TIMESTAMP contains only timestamps. Type mismatches are caught at read time rather than silently producing incorrect results.

python

# Define an explicit schema
schema = pa.schema([
    pa.field("transaction_id", pa.int64(), nullable=False),
    pa.field("customer_id", pa.int32()),
    pa.field("amount", pa.float64()),
    pa.field("category", pa.dictionary(pa.int8(), pa.string())),
    pa.field("country", pa.dictionary(pa.int8(), pa.string())),
    pa.field("status", pa.dictionary(pa.int8(), pa.string())),
    pa.field("transaction_date", pa.timestamp("us"))
])

table_with_schema = pa.Table.from_pandas(df, schema=schema)
pq.write_table(table_with_schema, "transactions_typed.parquet")

# Read back and verify schema preserved
read_schema = pq.read_schema("transactions_typed.parquet")
print("Schema preserved in file:")
for field in read_schema:
    print(f"  {field.name}: {field.type} (nullable: {field.nullable})")

# Schema evolution: add a new column to new files
df_v2 = df.copy()
df_v2["channel"] = np.random.choice(["web", "mobile", "pos"], len(df_v2))
df_v2.to_parquet("transactions_v2.parquet", compression="snappy")

# Read both files together with schema merging
import glob
combined = pd.read_parquet(
    ["transactions.parquet", "transactions_v2.parquet"]
)
print(f"\nCombined schema columns: {list(combined.columns)}")
print(f"'channel' nulls in v1 data: {combined['channel'].isna().sum():,}")

Schema evolution in Parquet handles the addition of new columns gracefully. Old files without the new column return null for that column when read alongside new files that have it. This is how production pipelines handle schema changes without rewriting historical data.

Partitioned Parquet Datasets

For very large datasets, partitioning writes separate Parquet files organized in a directory hierarchy based on column values. Queries that filter on the partition column skip entire files rather than relying on within-file statistics.

python

# Write partitioned dataset
df.to_parquet(
    "transactions_partitioned/",
    partition_cols=["country", "category"],
    compression="snappy",
    engine="pyarrow"
)

# Inspect the partition structure
import os
for root, dirs, files in os.walk("transactions_partitioned/"):
    level = root.replace("transactions_partitioned/", "").count(os.sep)
    indent = " " * 2 * level
    if files:
        print(f"{indent}{os.path.basename(root)}/")
        for file in files[:2]:
            size = os.path.getsize(os.path.join(root, file)) / 1e3
            print(f"{indent}  {file} ({size:.0f} KB)")

# Query only US retail transactions (reads only one partition)
start = time.perf_counter()
us_retail = pd.read_parquet(
    "transactions_partitioned/",
    filters=[
        ("country", "==", "US"),
        ("category", "==", "retail")
    ]
)
partitioned_time = time.perf_counter() - start

# Compare with unpartitioned filtered read
start = time.perf_counter()
unpartitioned_filtered = pd.read_parquet(
    "transactions.parquet",
    filters=[
        ("country", "==", "US"),
        ("category", "==", "retail")
    ]
)
unpartitioned_time = time.perf_counter() - start

print(f"\nPartitioned read (US retail): {partitioned_time:.3f}s")
print(f"Unpartitioned read (US retail): {unpartitioned_time:.3f}s")
print(f"Rows returned: {len(us_retail):,}")

Parquet With DuckDB for Analytical Queries

DuckDB’s native Parquet support combines with its vectorized query engine to produce one of the most efficient analytical query setups available without any server infrastructure.

python

conn = duckdb.connect()

# DuckDB can query Parquet files directly with full SQL
start = time.perf_counter()
result = conn.execute("""
    SELECT
        country,
        category,
        COUNT(*) as transactions,
        SUM(amount) as total_revenue,
        AVG(amount) as avg_transaction,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY amount) as p95_amount
    FROM read_parquet('transactions.parquet')
    WHERE status = 'completed'
      AND amount > 50
    GROUP BY country, category
    ORDER BY total_revenue DESC
""").fetchdf()
duckdb_time = time.perf_counter() - start

print(f"DuckDB analytical query: {duckdb_time:.3f}s")
print(result.head(10))

# Compare with pandas equivalent
start = time.perf_counter()
df_completed = df[
    (df["status"] == "completed") & (df["amount"] > 50)
]
pandas_result = (
    df_completed
    .groupby(["country", "category"])
    .agg(
        transactions=("transaction_id", "count"),
        total_revenue=("amount", "sum"),
        avg_transaction=("amount", "mean")
    )
    .reset_index()
    .sort_values("total_revenue", ascending=False)
)
pandas_time = time.perf_counter() - start

print(f"\nPandas equivalent query: {pandas_time:.3f}s")
print(f"DuckDB speedup: {pandas_time / duckdb_time:.1f}x")

Parquet vs CSV vs JSON Cheat Sheet

DimensionCSVJSONParquet
Storage formatRow-orientedRow-orientedColumnar
SchemaNone, inferredNone, inferredEmbedded, enforced
CompressionExternal onlyExternal onlyBuilt-in, column-aware
Typical size vs CSV1x1.5-3x larger5-20x smaller
Column pruningNoNoYes
Predicate pushdownNoNoYes
Type safetyNoPartialFull
Human readableYesYesNo
Streaming writeYesYesNo, requires buffering
Nested data supportNoYesYes
Tool compatibilityUniversalVery wideWide, growing
Best use caseSmall data, interchangeAPIs, configsAnalytics, pipelines

Common Mistakes When Using Parquet

Writing many tiny Parquet files is the most damaging performance mistake. Each file has metadata overhead and each requires a separate open operation. A pipeline that writes one file per row of input rather than batching into larger files creates millions of small files that are slower to read than equivalent CSV. Aim for files between 128MB and 1GB for optimal query performance.

Not sorting or partitioning data before writing wastes the predicate pushdown capability. If a pipeline filters on date in every query, writing data sorted by date or partitioned by date means predicate pushdown skips most files. Writing in random order means every file must be partially read.

Using row group sizes that are too small limits parallelism and makes statistics less effective. The default row group size of around 128MB is well-chosen for most workloads. Reducing it significantly produces files where the metadata overhead becomes a meaningful fraction of total read time.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top