8 min read
AI assisted

PostgreSQL Lakehouse (2/2) — Distributed Processing and Citus Integration

Distributed processing (Ray/Daft/Smallpond) and Citus + pg_lake FDW integration

Series — PostgreSQL Lakehouse Experiments
  1. 1. PostgreSQL Lakehouse (1/2) — When DuckLake Hit a Wall, pg_lake Was There
  2. 2. PostgreSQL Lakehouse (2/2) — Distributed Processing and Citus Integration

Part 1 covered DuckLake's libpq symbol collision and the discovery of pg_lake. This post picks up from there: what happens when a single DuckDB node isn't enough, why Citus and pg_duckdb cannot coexist in the same process, and how postgres_fdw solved the DW-Lakehouse integration that was supposed to happen on a single instance.


Phase 5: Distributed Processing — Ray vs Daft vs Smallpond

DuckDB handles tens to hundreds of GB of analytics efficiently on a single node. Beyond that, or when load must distribute across nodes, a separate layer is needed. Three options were tested against both MinIO and DuckLake.

Ray + DuckDB

Ray is a Python distributed computing framework. Originally built for reinforcement learning research, it now covers data processing, ML training, and serving. Combined with DuckDB, each Ray worker node runs an independent DuckDB instance.

import ray
import duckdb

@ray.remote
def process_partition(s3_path: str) -> pd.DataFrame:
    conn = duckdb.connect()
    conn.execute("INSTALL httpfs; LOAD httpfs;")
    conn.execute(f"SET s3_endpoint='minio:9000'; SET s3_use_ssl=false;")
    return conn.execute(f"""
        SELECT region, SUM(amount) AS total
        FROM read_parquet('{s3_path}')
        GROUP BY region
    """).df()

futures = [process_partition.remote(path) for path in partition_paths]
results = ray.get(futures)

Ray manages worker lifecycle, failure recovery, and result collection. DuckDB handles the per-partition computation. Teams familiar with PySpark will recognize the pattern — one level of abstraction higher.

Both MinIO and DuckLake worked. Score: 9/10. The downside is Ray cluster operational overhead.

Daft

Daft is a distributed DataFrame framework built by Eventual and Cohere, written in Rust. It provides a pandas/Spark-compatible DataFrame API with native Iceberg support.

import daft

df = daft.read_iceberg(table)

result = (df
    .where(df["year"] == 2025)
    .groupby("region")
    .agg(daft.col("amount").sum())
)

result.collect()  # Lazy execution — plan builds until collect()

Daft uses lazy evaluation: the execution plan builds up through DataFrame transformations and materializes only at collect(). The Rust core eliminates Python GIL constraints that affect pure-Python parallelism.

Both MinIO and DuckLake worked. Score: 9/10. Configuration was simpler than Ray+DuckDB for data-heavy workloads.

Smallpond

MotherDuck's (the DuckDB cloud company) distributed DuckDB layer. The promise: transparent distributed DuckDB without changing application code.

In practice, MinIO connectivity failed immediately:

import smallpond
sp = smallpond.init()

df = sp.read_parquet("s3://bucket/data/*.parquet",
                     endpoint_url="http://minio:9000")
# Error: unsupported S3 endpoint configuration

DuckLake also failed. Error messages were unclear, making debugging difficult. Both MinIO and DuckLake: FAIL. Score: 3/10.

Smallpond's failure reveals that S3-compatible storage support is more limited than documented. In MotherDuck's managed cloud environment the behavior may differ, but for self-hosted infrastructure it is not yet production-ready.

Framework MinIO DuckLake Score Characteristics
Ray + DuckDB PASS PASS 9/10 Mature ecosystem, PySpark-like operations
Daft PASS PASS 9/10 Rust core, native Iceberg, simpler config
Smallpond FAIL FAIL 3/10 S3-compat limits, unclear errors

Phase 6: Vortex File Format — Not Yet for Multi-Engine

Vortex is a next-generation columnar file format developed by Spiraldb, written in Rust. It positions itself as Parquet's successor with:

  • Random access: 100× faster than Parquet
  • Full scan: 10–20× faster than Parquet
  • SIMD-friendly encoding, better compression

DuckDB 1.3+ includes native Vortex support:

import duckdb
conn = duckdb.connect()

# Convert Parquet to Vortex
conn.execute("""
    COPY (SELECT * FROM read_parquet('data.parquet'))
    TO 'data.vortex'
""")

# Direct Vortex read
result = conn.execute("SELECT * FROM 'data.vortex' WHERE id = 12345").df()
# Random access 100× faster than equivalent Parquet read

DuckDB and Polars both worked with Vortex. The blocker: Trino does not support Vortex. Neither does Spark.

For a multi-engine Lakehouse where data must be readable by Trino and Spark, storing data as Vortex makes it inaccessible to those engines. That defeats the purpose.

Vortex is valuable in DuckDB-only environments today. For multi-engine Lakehouse, it is premature. We flagged it as "interesting technology, revisit when ecosystem support broadens" and moved on.


Phase 7: Citus + pg_duckdb — Same Process, Incompatible

With Citus as the existing DW and pg_lake as the Lakehouse, the natural first attempt was loading both extensions in the same PostgreSQL instance.

CREATE EXTENSION citus;
CREATE EXTENSION pg_duckdb;
-- → PostgreSQL server crash

Crash. The logs trace the failure to internal hook conflicts. Understanding why requires looking at what each extension does to the PostgreSQL process.

Citus deeply modifies PostgreSQL's query execution path. It registers planner_hook, executor_hook, and ProcessUtility_hook to intercept query planning and execution for distributed routing. It installs custom memory management to coordinate across shards.

pg_duckdb embeds DuckDB as a shared library inside the PostgreSQL process. DuckDB brings its own memory allocator (jemalloc), thread pool, and file I/O subsystem. pg_duckdb also registers PostgreSQL hooks to route eligible queries to DuckDB.

When both load in the same process, the hook chains conflict. DuckDB's jemalloc and PostgreSQL's palloc do not interoperate — jemalloc-allocated objects passed to palloc-expecting code (or vice versa) corrupt the heap. The hooks also step on each other during query planning.

This is GitHub Issue #444, explicitly open in both repositories, with both teams indicating no fix is planned. Citus and pg_duckdb (and therefore pg_lake) cannot coexist in a single PostgreSQL instance today.


Phase 8: Citus + pg_lake — Separate Instances via postgres_fdw

After Phase 7, the approach changed: Citus DW and pg_lake Lakehouse as completely separate PostgreSQL instances, connected via postgres_fdw.

Citus 12.1 (PG16, DW)   ←——postgres_fdw——→   pg_lake 3.3 (PG18, Lakehouse)
  [sharded OLAP, OLTP]                            [Iceberg tables, analytics]
         ↑                                                  ↓
  existing applications                         MinIO (Parquet files)
                                                            ↑
                                                  DuckDB standalone

postgres_fdw is a standard PostgreSQL extension that connects to remote PostgreSQL servers and exposes their tables as local foreign tables. The PostgreSQL wire protocol is backward-compatible, so PG16 connecting to PG18 works without issues.

-- On Citus (PG16): connect to pg_lake (PG18)
CREATE SERVER pglake_server
    FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (host 'pglake-host', port '5439', dbname 'warehouse');

CREATE USER MAPPING FOR current_user
    SERVER pglake_server
    OPTIONS (user 'pg_lake_user', password '...');

IMPORT FOREIGN SCHEMA public
    FROM SERVER pglake_server
    INTO lake_schema;

-- Query pg_lake Iceberg tables from Citus
SELECT region, SUM(amount)
FROM lake_schema.orders
WHERE order_date >= '2025-01-01'
GROUP BY region;

When Citus executes this query, postgres_fdw forwards it to the pg_lake server. pg_lake reads the Iceberg Parquet files via its DuckDB engine and returns results. Critically, aggregation pushdown works: pg_lake receives the full GROUP BY SUM() query and executes it locally rather than shipping all rows to Citus for aggregation.

Measured latencies:

Operation Latency Notes
SELECT (cold, with aggregation) 102ms Aggregation pushdown confirmed
INSERT via FDW → Iceberg 205ms Iceberg snapshot auto-created
UPDATE via FDW → Iceberg 99ms Row-level
DELETE via FDW → Iceberg 70ms Row-level

Cross-version compatibility (PG16 → PG18) worked without issues.

Three-Way Consistency Verification

After inserting a row through Citus FDW, the same data was verified across three paths:

-- 1. Direct pg_lake query
SELECT * FROM orders WHERE order_id = 999;
-- → returns the row

-- 2. Via Citus FDW (lake_schema.orders)
SELECT * FROM lake_schema.orders WHERE order_id = 999;
-- → same row, same values

-- 3. DuckDB standalone reading the Iceberg snapshot directly
SELECT * FROM iceberg_scan('s3://pg-lake-warehouse/iceberg/.../metadata.json')
WHERE order_id = 999;
-- → same row, same values

All three paths return identical data. This works because Iceberg's snapshot model is the coordination mechanism. An INSERT via FDW causes pg_lake to commit a new Iceberg snapshot. DuckDB reads the latest snapshot's metadata.json and sees the new data immediately.


Final Architecture

[ad-hoc queries / OLTP-like]   [batch distributed]    [external engines]
  psql, JDBC applications        Ray + DuckDB           Spark
        ↓                        Daft                    Trino
  PostgreSQL 18 (pg_lake)  ←——FDW——→  Citus DW (PG16)
        ↓
  MinIO / GCS (Iceberg v2 Parquet)
        ↑
  DuckDB standalone (iceberg_scan)
Scenario Recommendation
PostgreSQL-centered Lakehouse (small-medium) pg_lake + Iceberg
Large-scale multi-engine (heavy Spark+Trino) Iceberg + Nessie + Spark/Trino
Simple analytics (< 1TB, single engine) DuckDB + PostgreSQL catalog
DuckDB distributed processing Ray + DuckDB or Daft
Existing Citus DW + Lakehouse integration Citus + pg_lake + postgres_fdw
DuckDB-only environment DuckLake (standalone)

What the Experiments Established

DuckLake is a DuckDB-only tool. PostgreSQL catalog support is documented but broken inside pg_duckdb due to the libpq symbol collision. For DuckDB standalone environments it is straightforward and effective.

GCS cold reads are 650× slower than NVMe. Cloud Lakehouse architectures require explicit caching strategy before making any performance commitments. First-query latency is not comparable to local storage.

Citus + pg_duckdb cannot coexist in the same instance today. Separate instances connected via postgres_fdw is the only viable path. Bidirectional reads and writes work, three-way consistency is maintained through Iceberg snapshots.

pg_lake is the most complete PostgreSQL Lakehouse solution available as of late 2025. Pre-GA status requires production stability validation, but functionally it satisfies the original goal: PostgreSQL as Iceberg catalog, full ACID, and verified interoperability with Spark, Trino, and DuckDB.

Ray+DuckDB and Daft are both production-viable for distributed DuckDB workloads. Smallpond is not yet ready for self-hosted infrastructure. Vortex is worth watching for DuckDB-only environments but cannot be adopted in multi-engine stacks until Trino and Spark add support.