Pourquoi votre réplication Postgres vers DuckDB va casser (et comment gérer le schema drift en SQL)
postgres_scan est extrêmement performante pour un Data Warehouse local. Cependant, l’apparition de nouvelles colonnes ou la sélection partielle de champs rendent la synchronisation fragile. Voici comment implémenter un pipeline incrémental résilient au schema drift.
DuckDB, PostgreSQL, ETL, schema drift, dbt, incremental, data engineering, python, SQL
Dans un entrepôt de données moderne basé sur DuckDB et dbt, l’ingestion de bases de production PostgreSQL pose un défi classique : la sensibilité aux modifications de schéma (schema drift) et la surconsommation de ressources lors de l’extraction de tables massives (comme les journaux d’activités). Sans connecteurs commerciaux lourds, une synchronisation naïve par script Python sature rapidement la mémoire ou casse à la première colonne ajoutée en amont.
Le projet data-warehouse implémente un synchronisateur robuste PostgreSQL vers DuckDB en Python et SQL natif. Il s’appuie sur l’extension officielle postgres de DuckDB pour lire les flux à haute vitesse sans sérialisation intermédiaire, tout en appliquant un élagage strict des colonnes (column pruning). Le pipeline calcule dynamiquement un filigrane temporel (watermark) pour l’incrémental, détecte les colonnes manquantes via des métadonnées de table (PRAGMA table_info), injecte à la volée les nouvelles colonnes par des instructions ALTER TABLE, et fusionne (upsert) les lignes modifiées à l’aide d’une table temporaire de delta en utilisant des insertions nominatives (INSERT INTO ... BY NAME).
1 Introduction
Dans les architectures de données modernes dites “légères” (modern data stack locale), DuckDB s’est imposé comme le moteur analytique de choix pour le traitement in-process, et dbt comme la couche de modélisation. Pourtant, la phase d’ingestion (le “E” et le “L” de l’ETL) reste souvent le maillon faible. Les équipes doivent soit déployer des outils de réplication tiers (Fivetran, Airbyte, Meltano), soit écrire des scripts de dump maison complexes à maintenir.
La tentation est grande de simplement charger des tables entières de production PostgreSQL sous forme de DataFrames pandas avant de les écrire dans DuckDB. En production, cette approche souffre de trois maux :
- Saturation de la mémoire vive lors de la lecture de tables volumineuses.
- Coût de transfert et stockage inutile si seulement une fraction des colonnes est requise en aval.
- Fragilité opérationnelle : tout ajout ou modification de colonne sur la base transactionnelle fait planter le processus d’insertion ou corrompt l’ordre des colonnes.
Cet article détaille comment le dépôt data-warehouse résout ces contraintes grâce à une synchronisation sur mesure, ultra-rapide et tolérante aux dérives de schéma, écrite en moins de 150 lignes de code Python.
2 Étapes techniques / Pipeline
L’architecture de synchronisation est structurée autour de deux fichiers : extract_db.py qui pilote la configuration des tables à extraire et duckdb_sync.py qui implémente le moteur de synchronisation générique.
2.1 Configuration et élagage des colonnes (Column Pruning)
Dans extract_db.py, une structure déclarative TABLE_CONFIGS définit les tables cibles et la liste exacte des colonnes requises. C’est l’étape d’élagage des colonnes : les tables massives comme completed ou track (qui stockent des millions de lignes d’historique utilisateur) ne voient que leurs identifiants, statuts et dates de mise à jour transférés.
# Fichier : src/extract/bdd_metier/extract_db.py
TABLE_CONFIGS = {
"user": "*", # Table complète (toutes les colonnes)
"completed": [
"id",
"user_id",
"is_archived",
"updated_at",
"created_at",
"status",
"external_source",
],
"track": ["id", "user_id", "is_archived", "updated_at", "created_at"],
}2.2 Mécanisme de Watermark et chargement incrémental
Pour chaque table configurée, la fonction sync_tables_optimized commence par interroger le fichier local DuckDB pour déterminer la date de dernière mise à jour stockée. Ce filigrane (watermark) sert de borne pour l’extraction incrémentale.
# Fichier : src/extract/bdd_metier/duckdb_sync.py
def _get_max_updated_at(con: duckdb.DuckDBPyConnection, table: str) -> int:
"""Trouve le timestamp updated_at maximum dans la table DuckDB locale."""
try:
# Vérifier si la table existe déjà dans DuckDB
rel = con.execute(
f"SELECT 1 FROM information_schema.tables "
f"WHERE table_schema = '{RAW_SCHEMA}' AND table_name = '{table}'"
).fetchone()
if not rel:
return 0
# Récupérer la valeur maximale de la colonne updated_at
res = con.execute(
f"SELECT MAX(updated_at) FROM {RAW_SCHEMA}.{table}"
).fetchone()
return res[0] if res and res[0] is not None else 0
except Exception as e:
logger.warning(f"Impossible de déterminer le watermark pour '{table}' : {e}. Full load.")
return 0Si le filigrane est valide (> 0), le pipeline effectue une synchronisation incrémentale. Sinon, il bascule automatiquement sur un chargement complet.
2.3 Détection de la dérive de schéma (Schema Drift)
L’une des décisions implicites majeures du code est de forcer un rechargement complet de la table si de nouvelles colonnes configurées dans TABLE_CONFIGS manquent dans la structure locale de la table analytique.
Cependant, si le schéma évolue côté base de production PostgreSQL avec de nouveaux champs et que le pipeline tourne en mode incrémental, DuckDB doit adapter sa structure à la volée. Pour ce faire, le code compare les schémas de la table cible et de la table temporaire contenant les modifications (delta) :
# Fichier : src/extract/bdd_metier/duckdb_sync.py
# 1. Inspecter la structure de la table cible locale
target_cols = {
row[1]: row[2]
for row in con.execute(
f"PRAGMA table_info('{RAW_SCHEMA}.{quoted_table}')"
).fetchall()
}
# 2. Inspecter la structure de la table delta (importée de Postgres)
delta_cols = {
row[1]: row[2]
for row in con.execute(
f"PRAGMA table_info('{temp_delta}')"
).fetchall()
}
# 3. Identifier les colonnes manquantes localement
missing_cols = [
(col_name, col_type)
for col_name, col_type in delta_cols.items()
if col_name not in target_cols
]
# 4. Appliquer les DDL dynamiquement
for col_name, col_type in missing_cols:
logger.info("Adding missing column '%s' (%s) to %s.%s", col_name, col_type, RAW_SCHEMA, table)
con.execute(
f"ALTER TABLE {RAW_SCHEMA}.{quoted_table} ADD COLUMN {col_name} {col_type}"
)2.4 Fusion unifiée par ID et insertion nominative
Une fois la base de destination mise à jour avec les colonnes manquantes, le pipeline réalise l’opération de fusion (upsert) en deux étapes transactionnelles :
- Suppression dans la table locale des enregistrements existants ayant le même identifiant unique (
id) que le delta reçu. - Insertion par nom des nouveaux enregistrements afin d’éviter tout décalage positionnel dans les arguments SQL (qui provoquerait des erreurs d’insertion ou des mélanges de valeurs si l’ordre des colonnes varie).
# Fichier : src/extract/bdd_metier/duckdb_sync.py
# Suppression des lignes obsolètes pour garantir l'idempotence
con.execute(
f"DELETE FROM {RAW_SCHEMA}.{quoted_table} "
f"WHERE id IN (SELECT id FROM {temp_delta})"
)
# Insertion explicite par nom de colonne
con.execute(
f"INSERT INTO {RAW_SCHEMA}.{quoted_table} BY NAME "
f"SELECT * FROM {temp_delta}"
)Grâce à INSERT INTO ... BY NAME, DuckDB associe automatiquement chaque colonne de la requête SELECT * à la colonne cible portant le même nom, protégeant ainsi l’opération contre les variations physiques d’index de colonnes.
3 Stratégie d’adoption
Pour intégrer ce pattern au sein d’une infrastructure de données existante sans bousculer votre équipe :
Étape 1 : Activer le Column Pruning
Commencez par remplacer les extractions globales (SELECT *) par des listes explicites de colonnes dans vos configurations d’ingestion. Rien que cela réduira la consommation de bande passante et la taille de votre base DuckDB locale de 40% à 70% sur les tables volumineuses.Étape 2 : Brancher le scanner natif DuckDB
Utilisez l’extension DuckDBpostgreset sa fonctionpostgres_scanau lieu d’utiliser SQLAlchemy ou pandas comme passe-plat. Les transferts se feront directement de mémoire à mémoire via des types natifs C, éliminant les lenteurs liées à la sérialisation Python.Étape 3 : Implémenter la détection de dérive légère
Ajoutez la routine de comparaison viaPRAGMA table_info. C’est une solution robuste qui élimine le besoin d’outils complexes de migration de schéma côté entrepôt pour les couches de staging.
3.1 Frictions et limites organisationnelles
- Renommer ou supprimer des colonnes : Le pipeline actuel gère uniquement les ajouts de colonnes. Si une colonne est supprimée en production, elle restera vide (
NULL) dans DuckDB. Les suppressions physiques ou les changements de types incompatibles (ex. modifier unintenvarchar) nécessiteront toujours une intervention manuelle ou un rechargement complet (full reload). - Tooling : Ce script s’intègre parfaitement dans des orchestrateurs légers (Dagster, Prefect, scripts cron dans des containers). Si votre équipe utilise déjà un orchestrateur d’ingestion d’entreprise, il est préférable de conserver ce dernier pour les sources externes SaaS complexes, mais d’adopter ce script pour les bases de production internes pour des raisons évidentes de coût et de performance.
4 Conclusion
4.1 Gains concrets
- Performance : La combinaison de
postgres_scanet de filigranes temporels permet de synchroniser plusieurs millions de lignes en quelques secondes sans saturer le processeur. - Résilience : Le script ne plante pas lors du déploiement d’une nouvelle version de l’application de production modifiant la structure des tables.
- Volume disque : L’élagage des colonnes évite d’ingérer des données inutilisées (ex: données géométriques lourdes, gros champs JSON analytiques inutiles).
4.2 Trade-offs
- Absence de CDC (Change Data Capture) : Le pipeline s’appuie sur une colonne
updated_at. Si un enregistrement est physiquement supprimé de la base de production (hard delete) sans mise à jour d’un drapeau d’archivage (ex.is_archived), la suppression ne sera pas propagée à l’entrepôt DuckDB. - Maintenance de code : Bien que léger (150 lignes), ce code doit être maintenu, testé et mis à jour par l’équipe d’ingénierie de données locale, là où un outil SaaS délègue cette responsabilité à un tiers.
5 Références code
src\extract\bdd_metier\duckdb_sync.py
Implémentation du moteur de synchronisation Postgres vers DuckDB avec détection de dérive et requêtesBY NAME.src\extract\bdd_metier\extract_db.py
Définition des configurations de tables et élagage des colonnes (TABLE_CONFIGS) pour la synchronisation.models\staging\stg_bdd_metier.sql
Modèle de staging dbt utilisant une logique d’incrément multi-tables pour capturer les mises à jour corrélées.
Une présentation plus large du projet est disponible dans la section projets.