How Columnar Databases Improve Query Speed

How Columnar Databases Improve Query Speed

The fastest path to understanding columnar databases is a single question: when your query asks for the average transaction amount across ten million rows, why should the database read the customer name, email address, shipping address, phone number, and thirty other columns it will immediately discard?

With a row-oriented database, it has no choice. Data is stored row by row, every field of every record packed together on disk. Reading any field means reading the entire row. To compute an average of one column across ten million rows, a row-oriented database reads every byte of every row, extracts the one value it needs, and throws the rest away. The wasted I/O can be enormous.

A columnar database stores each column as a separate structure. Reading the transaction amount column reads only transaction amounts. Nothing else is touched. For analytical queries that aggregate across one or two columns of a table with fifty, the difference in data read from storage is often ten to fifty times less. Everything downstream, decompression, parsing, aggregation, operates on this smaller dataset. The speed improvement compounds through every layer of query execution.

This guide explains how columnar storage works, why it interacts so powerfully with compression and modern CPU architecture, and how these principles produce the query speeds that make columnar databases the standard for analytical workloads.

Row vs Columnar Storage: The Physical Difference

Imagine a simple table with four columns: customer_id, name, country, and amount. With five rows, the data looks like this:

customer_id | name    | country | amount
1001        | Alice   | US      | 150.00
1002        | Bob     | UK      | 89.50
1003        | Carol   | US      | 234.75
1004        | David   | DE      | 67.25
1005        | Eve     | US      | 189.00

A row-oriented database stores this data on disk in row order. The physical byte sequence looks like:

[1001][Alice][US][150.00][1002][Bob][UK][89.50][1003][Carol][US][234.75]...

Every row is contiguous. Reading any single field requires reading the entire row because the storage layer does not know in advance which fields a query will request.

A columnar database stores the same data organized by column. The physical byte sequence looks like:

[1001][1002][1003][1004][1005]  <- customer_id column
[Alice][Bob][Carol][David][Eve] <- name column
[US][UK][US][DE][US]            <- country column
[150.00][89.50][234.75][67.25][189.00] <- amount column

Each column is a contiguous block. Reading the amount column touches only those five values. The customer IDs, names, and countries are stored in entirely separate locations and are never read by a query that does not need them.

python

import duckdb
import pandas as pd
import numpy as np
import time

# Generate a realistic analytical dataset
np.random.seed(42)
n = 5_000_000

df = pd.DataFrame({
    "transaction_id": np.arange(1, n + 1),
    "customer_id": np.random.randint(1, 100_001, n),
    "customer_name": [f"Customer_{i}" for i in np.random.randint(1, 100_001, n)],
    "email": [f"user_{i}@example.com" for i in np.random.randint(1, 100_001, n)],
    "amount": np.random.lognormal(4.0, 1.2, n).round(2),
    "category": np.random.choice(
        ["retail", "food", "travel", "entertainment", "health"], n
    ),
    "country": np.random.choice(
        ["US", "UK", "DE", "FR", "JP"], n, p=[0.4, 0.2, 0.15, 0.15, 0.1]
    ),
    "status": np.random.choice(
        ["completed", "pending", "failed"], n, p=[0.8, 0.12, 0.08]
    ),
    "transaction_date": pd.date_range("2022-01-01", periods=n, freq="s"),
    "merchant_id": np.random.randint(1, 10_001, n),
    "device_type": np.random.choice(["mobile", "desktop", "tablet"], n),
    "is_international": np.random.choice([True, False], n, p=[0.3, 0.7])
})

# Save as Parquet (columnar) and CSV (row-oriented)
df.to_parquet("transactions_columnar.parquet", compression="snappy")
df.to_csv("transactions_row.csv", index=False)

conn = duckdb.connect()

print("Dataset: 5 million rows, 12 columns")
print(f"Columnar (Parquet): {__import__('os').path.getsize('transactions_columnar.parquet') / 1e6:.1f} MB")
print(f"Row-oriented (CSV): {__import__('os').path.getsize('transactions_row.csv') / 1e6:.1f} MB")

I/O Reduction: Reading Less Data

The most direct performance advantage of columnar storage is reading less data from disk. Modern storage systems, even NVMe SSDs, are orders of magnitude slower than CPU processing. Reducing I/O is the highest-leverage optimization available.

python

# Query 1: Aggregate using only 2 of 12 columns
query_2col = """
    SELECT country, SUM(amount) as total_revenue
    FROM '{table}'
    GROUP BY country
    ORDER BY total_revenue DESC
"""

# Query 2: Aggregate using only 3 of 12 columns
query_3col = """
    SELECT category, country, AVG(amount) as avg_amount, COUNT(*) as txn_count
    FROM '{table}'
    WHERE status = 'completed'
    GROUP BY category, country
    ORDER BY avg_amount DESC
"""

# Benchmark against columnar Parquet
start = time.perf_counter()
for _ in range(3):
    conn.execute(query_2col.format(table="transactions_columnar.parquet")).fetchdf()
columnar_2col = (time.perf_counter() - start) / 3

# Benchmark against row CSV
start = time.perf_counter()
for _ in range(3):
    conn.execute(query_2col.format(table="transactions_row.csv")).fetchdf()
row_2col = (time.perf_counter() - start) / 3

print(f"2-column aggregation:")
print(f"  Columnar (Parquet): {columnar_2col:.3f}s")
print(f"  Row-oriented (CSV): {row_2col:.3f}s")
print(f"  Speedup: {row_2col / columnar_2col:.1f}x")

start = time.perf_counter()
for _ in range(3):
    conn.execute(query_3col.format(table="transactions_columnar.parquet")).fetchdf()
columnar_3col = (time.perf_counter() - start) / 3

start = time.perf_counter()
for _ in range(3):
    conn.execute(query_3col.format(table="transactions_row.csv")).fetchdf()
row_3col = (time.perf_counter() - start) / 3

print(f"\n3-column filtered aggregation:")
print(f"  Columnar (Parquet): {columnar_3col:.3f}s")
print(f"  Row-oriented (CSV): {row_3col:.3f}s")
print(f"  Speedup: {row_3col / columnar_3col:.1f}x")

The speedup scales roughly with the fraction of columns not read. A query that needs two of twelve columns reads approximately seventeen percent of the data a row-oriented scan reads, all else being equal. In practice the speedup is even larger because columnar data compresses much better.

Compression: Why Columnar Data Compresses So Effectively

Compression in columnar databases is not just a storage optimization. It is a query performance optimization. Compressed data takes less time to read from disk and less memory bandwidth to transfer to the CPU. In many analytical workloads, the bottleneck is I/O rather than CPU, so reading compressed data and decompressing it is faster than reading uncompressed data.

Columnar storage makes data dramatically more compressible because values in the same column are of the same type and often have limited cardinality or predictable patterns.

python

import pyarrow as pa
import pyarrow.parquet as pq
import io

# Demonstrate compression ratios for different column types
def measure_compression(array, codec):
    table = pa.table({"col": array})
    buf = io.BytesIO()
    pq.write_table(table, buf, compression=codec, use_dictionary=True)
    return buf.tell()

# Low cardinality string column (5 values, 5M rows)
low_cardinality = pa.array(
    np.random.choice(["US", "UK", "DE", "FR", "JP"], n, p=[0.4, 0.2, 0.15, 0.15, 0.1])
)

# High cardinality string column (100k unique values)
high_cardinality = pa.array([f"Customer_{i}" for i in np.random.randint(1, 100_001, n)])

# Float column
amounts = pa.array(np.random.lognormal(4.0, 1.2, n).round(2))

# Timestamp column with regular intervals
timestamps = pa.array(pd.date_range("2022-01-01", periods=n, freq="s"))

print("Compression analysis (5M rows):")
print(f"{'Column type':<35} {'Raw (MB)':<12} {'Snappy (MB)':<14} {'ZStd (MB)':<12} {'Ratio':<8}")
print("-" * 82)

columns = [
    ("Low cardinality strings (5 values)", low_cardinality),
    ("High cardinality strings (100k)", high_cardinality),
    ("Float amounts", amounts),
    ("Timestamps (regular intervals)", timestamps)
]

raw_size_approx = {
    "Low cardinality strings (5 values)": n * 2.5,
    "High cardinality strings (100k)": n * 12,
    "Float amounts": n * 8,
    "Timestamps (regular intervals)": n * 8
}

for name, array in columns:
    snappy_size = measure_compression(array, "snappy")
    zstd_size = measure_compression(array, "zstd")
    raw_approx = raw_size_approx[name]
    ratio = raw_approx / snappy_size

    print(f"{name:<35} {raw_approx/1e6:<12.1f} {snappy_size/1e6:<14.1f} "
          f"{zstd_size/1e6:<12.1f} {ratio:<8.1f}x")

The compression ratios are large because columnar storage enables column-specific encodings before general-purpose compression is applied. Dictionary encoding replaces repeated string values with small integer codes. Run-length encoding compresses consecutive identical values into count-value pairs. Bit packing reduces integer columns to the minimum number of bits required. These encodings happen at the column level and exploit patterns that would be invisible in row-oriented storage where values from different columns are interleaved.

Vectorized Execution: Processing Data in Batches

Modern CPUs have SIMD (Single Instruction Multiple Data) capabilities that apply the same operation to multiple values simultaneously. An AVX-512 instruction can process sixteen 32-bit floating point values in a single CPU cycle. Row-oriented databases struggle to exploit SIMD because each row’s values are mixed with other columns and the CPU cannot form contiguous batches of same-type values.

Columnar databases are designed around vectorized execution. Instead of processing one row at a time, they process batches of thousands of values from the same column in tight loops that the compiler can vectorize and the CPU can execute with SIMD instructions.

python

# Demonstrate the throughput difference between scalar and vectorized operations
import numpy as np
import time

n_values = 10_000_000
amounts = np.random.lognormal(4.0, 1.2, n_values).astype(np.float32)
threshold = 100.0

# Scalar Python loop (simulates row-at-a-time processing)
def scalar_filter_sum(values, threshold):
    total = 0.0
    count = 0
    for v in values:
        if v > threshold:
            total += v
            count += 1
    return total, count

# Vectorized NumPy (simulates columnar vectorized execution)
def vectorized_filter_sum(values, threshold):
    mask = values > threshold
    return values[mask].sum(), mask.sum()

# Time scalar approach on small sample (full would take too long)
sample = amounts[:100_000]
start = time.perf_counter()
for _ in range(10):
    scalar_result = scalar_filter_sum(sample, threshold)
scalar_time = (time.perf_counter() - start) / 10

# Time vectorized approach on full dataset
start = time.perf_counter()
for _ in range(10):
    vector_result = vectorized_filter_sum(amounts, threshold)
vector_time = (time.perf_counter() - start) / 10

print(f"Scalar processing (100k values): {scalar_time*1000:.2f}ms")
print(f"Vectorized processing (10M values): {vector_time*1000:.2f}ms")
print(f"Vectorized throughput: {10_000_000 / vector_time / 1e6:.0f}M values/second")

# DuckDB's actual vectorized execution
conn = duckdb.connect()
conn.execute(f"CREATE TABLE amounts AS SELECT unnest({amounts[:1_000_000].tolist()}) as amount")

start = time.perf_counter()
for _ in range(5):
    conn.execute("""
        SELECT SUM(amount), COUNT(*)
        FROM amounts
        WHERE amount > 100
    """).fetchone()
duckdb_time = (time.perf_counter() - start) / 5

print(f"\nDuckDB vectorized query (1M values): {duckdb_time*1000:.2f}ms")

DuckDB, Snowflake, BigQuery, ClickHouse, and other columnar analytical databases all implement vectorized execution engines. The batch size is typically chosen to fit in the CPU’s L1 or L2 cache, ensuring that once a batch is loaded into cache it is processed entirely before the next batch is fetched. Cache efficiency is as important as raw instruction throughput for analytical query performance.

Late Materialization: Deferring Row Assembly

A query that filters and then projects does not need to assemble full rows until the very end. Late materialization is the optimization that exploits this.

Consider a query that filters five million rows down to fifty thousand based on a status column and then computes the average of the amount column. Early materialization would reconstruct full rows after filtering and then extract amount values. Late materialization applies the filter using only the status column, records which row positions passed the filter, and then reads the amount column for only those fifty thousand positions. The full row is never assembled.

python

# Demonstrate late materialization principle
def early_materialization_query(df, status_filter, amount_threshold):
    """Simulates early materialization: join columns first, then filter"""
    # Read all columns first (like row-oriented)
    all_columns = df[["transaction_id", "customer_id", "amount",
                       "category", "country", "status",
                       "merchant_id", "device_type"]].copy()

    # Then apply filter
    filtered = all_columns[
        (all_columns["status"] == status_filter) &
        (all_columns["amount"] > amount_threshold)
    ]
    return filtered["amount"].mean()

def late_materialization_query(df, status_filter, amount_threshold):
    """Simulates late materialization: filter first using minimal columns"""
    # Step 1: Filter using only the filter columns (columnar advantage)
    mask = (df["status"] == status_filter) & (df["amount"] > amount_threshold)

    # Step 2: Read only the needed column for only the passing rows
    return df.loc[mask, "amount"].mean()

# Benchmark on full dataset
start = time.perf_counter()
result_early = early_materialization_query(df, "completed", 100)
early_time = time.perf_counter() - start

start = time.perf_counter()
result_late = late_materialization_query(df, "completed", 100)
late_time = time.perf_counter() - start

print(f"Early materialization: {early_time:.3f}s (result: {result_early:.2f})")
print(f"Late materialization:  {late_time:.3f}s (result: {result_late:.2f})")
print(f"Speedup: {early_time / late_time:.1f}x")

# DuckDB applies late materialization automatically
start = time.perf_counter()
duckdb_result = conn.execute("""
    SELECT AVG(amount)
    FROM read_parquet('transactions_columnar.parquet')
    WHERE status = 'completed'
      AND amount > 100
""").fetchone()[0]
duckdb_time = time.perf_counter() - start
print(f"\nDuckDB (automatic late materialization): {duckdb_time:.3f}s")

Late materialization is most effective when filters are highly selective. A query that keeps ten percent of rows after filtering reads the non-filter columns for ten percent of rows rather than all rows, producing a tenfold reduction in I/O for those columns.

Zone Maps and Min-Max Pruning

Columnar databases store statistical metadata about each column segment, including minimum and maximum values. These zone maps allow the query engine to skip entire segments without reading them when a filter condition is provably impossible to satisfy.

python

# Demonstrate zone map (min-max pruning) effectiveness
df_sorted = df.sort_values("transaction_date").reset_index(drop=True)
df_sorted.to_parquet(
    "transactions_sorted.parquet",
    compression="snappy",
    row_group_size=100_000
)

import pyarrow.parquet as pq

parquet_file = pq.ParquetFile("transactions_sorted.parquet")
metadata = parquet_file.metadata

print(f"File has {metadata.num_row_groups} row groups of ~100k rows each")
print("\nZone maps for transaction_date column:")
print(f"{'Row Group':<12} {'Min Date':<25} {'Max Date':<25} {'Skippable for July 2022?'}")
print("-" * 85)

july_start = pd.Timestamp("2022-07-01")
july_end = pd.Timestamp("2022-08-01")

skippable = 0
for i in range(min(8, metadata.num_row_groups)):
    rg = metadata.row_group(i)
    for j in range(rg.num_columns):
        col = rg.column(j)
        if col.path_in_schema == "transaction_date":
            stats = col.statistics
            if stats and stats.min is not None:
                can_skip = "YES - SKIPPED" if (
                    stats.max < july_start.value or
                    stats.min >= july_end.value
                ) else "No - must read"
                if "YES" in can_skip:
                    skippable += 1
                print(f"{i:<12} {str(stats.min)[:24]:<25} "
                      f"{str(stats.max)[:24]:<25} {can_skip}")

# Benchmark query with and without sorted data
start = time.perf_counter()
for _ in range(5):
    conn.execute("""
        SELECT COUNT(*), SUM(amount)
        FROM read_parquet('transactions_sorted.parquet')
        WHERE transaction_date >= '2022-07-01'
          AND transaction_date < '2022-08-01'
    """).fetchone()
sorted_time = (time.perf_counter() - start) / 5

start = time.perf_counter()
for _ in range(5):
    conn.execute("""
        SELECT COUNT(*), SUM(amount)
        FROM read_parquet('transactions_columnar.parquet')
        WHERE transaction_date >= '2022-07-01'
          AND transaction_date < '2022-08-01'
    """).fetchone()
unsorted_time = (time.perf_counter() - start) / 5

print(f"\nFiltered query on July 2022:")
print(f"  Sorted data (effective zone maps): {sorted_time:.3f}s")
print(f"  Unsorted data (ineffective zone maps): {unsorted_time:.3f}s")
print(f"  Sorting benefit: {unsorted_time / sorted_time:.1f}x")

The effectiveness of zone maps depends entirely on data organization. When data is sorted or clustered by a frequently filtered column, zone maps exclude most row groups for selective queries. Snowflake’s automatic clustering, BigQuery’s clustering columns, and ClickHouse’s ORDER BY clause all serve this purpose: organizing data so that zone maps can skip as much as possible.

Columnar Databases in Production

Understanding which systems implement columnar storage helps connect these principles to practical tool choices.

python

# DuckDB: embedded columnar analytical database
conn = duckdb.connect()

complex_query = """
    WITH monthly_stats AS (
        SELECT
            DATE_TRUNC('month', transaction_date) as month,
            country,
            category,
            COUNT(*) as transaction_count,
            SUM(amount) as total_amount,
            AVG(amount) as avg_amount,
            PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY amount) as p95_amount
        FROM read_parquet('transactions_columnar.parquet')
        WHERE status = 'completed'
        GROUP BY 1, 2, 3
    ),
    ranked AS (
        SELECT *,
            ROW_NUMBER() OVER (
                PARTITION BY month, country
                ORDER BY total_amount DESC
            ) as category_rank
        FROM monthly_stats
    )
    SELECT month, country, category, total_amount, avg_amount, p95_amount
    FROM ranked
    WHERE category_rank = 1
    ORDER BY month, country
"""

start = time.perf_counter()
result = conn.execute(complex_query).fetchdf()
query_time = time.perf_counter() - start

print(f"Complex analytical query across 5M rows:")
print(f"  DuckDB execution time: {query_time:.3f}s")
print(f"  Result rows: {len(result)}")
print(f"\nSample results:")
print(result.head(10).to_string(index=False))

ClickHouse takes columnar storage further with its MergeTree engine, which stores data sorted by a primary key and applies aggressive compression using the column-specific codecs most appropriate for each data type. Queries on ClickHouse that touch the primary key columns can skip enormous fractions of data through its sparse index structure. Snowflake and BigQuery implement columnar storage on cloud object storage with automatic optimization layers that manage clustering and micro-partition statistics without requiring manual configuration.

Columnar Database Cheat Sheet

OptimizationWhat It DoesPerformance Impact
Column pruningReads only queried columnsProportional to columns skipped
Dictionary encodingReplaces repeated values with integers10-100x compression on low cardinality
Run-length encodingCompresses consecutive repeated valuesHigh on sorted or clustered data
Vectorized executionProcesses batches with SIMD instructions4-16x over scalar row-at-a-time
Late materializationAssembles rows only after filteringHigh for selective filters
Zone mapsSkips row groups based on min-max statsHigh for sorted, clustered data
Predicate pushdownEvaluates filters at storage layerReduces rows before any processing
Cache efficiencyFits column batches in CPU cacheMultiplicative with vectorization

Common Misconceptions About Columnar Databases

Columnar databases are not always faster than row databases. For OLTP workloads involving single-row lookups, updates, and inserts, row-oriented databases are faster. Fetching a single customer record by ID requires reading the entire row in a row database, which is efficient. In a columnar database, fetching a single row requires reading from many separate column files and assembling the result, which is slower. Columnar databases are optimized for analytical workloads that aggregate across many rows and few columns.

Columnar storage alone does not explain the full performance advantage. Vectorized execution, late materialization, zone maps, and parallel query execution each contribute significant additional speedup beyond the I/O reduction from column pruning. The performance advantage comes from the combination of all these optimizations working together, each enabled by the columnar storage model.

More compression is not always better for query performance. Heavily compressed data requires more CPU time to decompress. In CPU-bound workloads, aggressive compression codecs can make queries slower even though they read less data. The optimal compression codec depends on whether your workload is I/O-bound (where more compression helps) or CPU-bound (where lighter compression is better). Snappy and LZ4 are designed for analytical workloads where decompression speed matters as much as compression ratio.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top