Quand votre ETL incrémental par identifiant tourne en rond sans insérer une ligne

Pattern général : ingérer une API paginée par dernier ID, filtrer côté consommateur, et persister un curseur aligné sur le flux amont — pas sur le nombre d’insertions. Illustration avec le pipeline conditions de naivo (Python, SQLite, GitHub Actions).
Auteur·rice

Nicolas Decoopman

Date de publication

14 mai 2026

Mots clés

ETL, incrémental, curseur, watermark, SQLite, API REST, pagination, idempotence, GitHub Actions, naivo, fiabilité pipeline

Beaucoup d’API exposent des ressources « récentes depuis un identifiant » : vous passez un curseur, vous recevez une page d’objets, vous en gardez une partie. Tant que ce que vous gardez coïncide avec ce que l’API vous renvoie, tout va bien. Dès que vous filtrez fort (périmètre métier, qualité du contenu, schéma interne), une partie des objets du flux ne produit aucune ligne en base. Si votre curseur n’avance que lorsqu’au moins une insertion a réussi, le prochain run redemande la même page : vous avez transformé un incrémental en boucle de relecture.

Illustration. Le projet open source naivo agrège des comptes rendus de sorties en montagne. Une des sources est une API REST classique : liste des nouveautés depuis un dernier ID, puis fiche détaillée par ID. Le code ne persiste que les fiches dont un champ texte libre — après nettoyage HTML et découpage en rubriques — vaut d’être stocké. Rien de spécifique à « connaître » le fournisseur : le problème est structurel dès qu’il existe un déchet entre flux API et modèle relationnel.

La règle implémentée. Après chaque exécution, le curseur enregistré reflète le plus grand ID observé dans les réponses API (un « haut filigrane » sur le flux), même si zéro ligne a été écrite. Le curseur suit l’horloge logique du producteur, pas le succès de vos INSERT.

1 Introduction

Ce scénario apparaît dès qu’une équipe branche un cron sur une API tierce : budget temps, quotas, pagination, et logs qui expliquent mal pourquoi « rien n’a changé » alors que le job a tourné. Dans naivo, le workflow GitHub Actions conditions_daily.yml borne explicitement la durée (timeout-minutes: 30) : une boucle sur les mêmes identifiants n’est pas une anomalie cosmétique, c’est un risque opérationnel.

L’article décrit le contrat que le code impose entre l’API, le filtre métier et la persistance — puis ce que vous pouvez réutiliser ailleurs sans copier-coller la stack montagne.

2 Pipeline : ce que fait le code (vue générale)

2.1 Deux vitesses : historique court vs quotidien

Dans src/conditions/skitour.py, deux modes coexistent (noms internes days et incremental). L’un rejoue une fenêtre calendaire courte utile pour un premier remplissage ou une correction ; l’autre suppose que « ce qui est nouveau » est déjà exposé par le curseur d’ID, ce qui évite de balayer des dates passées inutilement quand l’API porte cette responsabilité. C’est un pattern courant : backfill explicite vs delta par clé monotone.

2.2 Où vit l’état

init_db crée une table métier avec une contrainte UNIQUE sur l’identifiant métier de la ressource, et une petite table state clé/valeur pour le curseur (last_sortie_id dans ce dépôt — le nom importe peu : c’est un entier persisté). get_state_int / set_state_int encapsulent des INSERT ... ON CONFLICT DO UPDATE : relancer le script deux fois de suite ne corrompt pas l’état.

2.3 Lecture du flux : filigrane global vs sélection locale

La fonction qui collecte les IDs (équivalent « page suivante tant qu’il y a des résultats ») met à jour un entier max_seen en parcourant toutes les lignes de la page renvoyée par l’API, avant d’appliquer le filtre métier (ici un ensemble d’identifiants de zones géographiques). Pourquoi c’est important : le flux amont peut contenir des objets que vous ignorez volontairement ; votre curseur doit quand même traverser ces IDs pour ne pas les redemander éternellement. Un garde-fou coupe la pagination si le producteur se comporte mal (plafond de pages).

2.4 Décision « on insert ou pas »

Pour chaque ID retenu après filtrage, une étape détail peut échouer ; les erreurs isolées ne bloquent pas tout le lot. La construction de ligne pour la base retourne None si le contenu utile est vide après normalisation — donc pas d’insert, comportement normal, pas exception.

2.5 Le cœur du pattern

Après upsert et nettoyage de rétention (fenêtre glissante en jours), le curseur passe à max_seen dès que celui-ci dépasse la valeur lue en début de run, sans condition sur le nombre de lignes insérées. C’est la ligne qui empêche la boucle infinie décrite en ouverture.

def build_row_from_sortie(sortie: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    raw_conditions = sortie.get("conditions") or ""
    raw_conditions = strip_html_tags(raw_conditions)
    if not str(raw_conditions).strip():
        return None
    # ... normalisation puis mapping vers les colonnes ...
inserted = upsert_conditions_rows(conn, rows)
deleted = clean_old_entries(conn, retention_days=15)

# Met à jour l'état même si rien n'a été inséré (ex: sorties sans conditions)
if max_seen > last_id:
    set_state_int(conn, STATE_LAST_SORTIE_ID_KEY, max_seen)

2.6 Normalisation amont du texte

Les champs libres HTML sont bruyants. strip_html_tags puis split_conditions (src/utils/parser.py) projettent un bloc unique vers des colonnes typées. Ce n’est pas magique : c’est la couche qui définit ce qu’« exploitable » veut dire pour votre schéma — et donc pourquoi tant d’objets API se terminent en « zéro ligne ».

2.7 Passerelle entre modes

Après un run « fenêtre calendaire », le code repositionne le curseur sur le maximum d’ID déjà présent en base. Ainsi, basculer du remplissage historique vers le quotidien ne laisse pas le curseur à zéro par inadvertance.

2.8 Intégration CI

Le workflow restaure la base SQLite depuis l’historique Git si le fichier est versionné, puis exécute le script. L’état n’est pas injecté par variables d’environnement : il vit dans l’artefact versionné — choix de simplicité pour un petit dépôt de données, avec le coût habituel des binaires sous contrôle de source.

3 Stratégie d’adoption (hors naivo)

  1. Écrire la règle d’avancement avant le code de boucle : « on avance sur le max observé dans la réponse » vs « on avance seulement si rowcount > 0 ». La seconde option est tentante et presque toujours fausse dès qu’il y a du filtrage.

  2. Séparer filigrane global et filtre local dans la spécification ; si vous ne pouvez pas observer les IDs exclus, il vous faudra un autre mécanisme (partitionnement, curseur par shard, file d’événements).

  3. Tester le cas « zéro insert » : une fixture avec une page API factice où tout est filtré ou vide, et vérifier que le curseur augmente quand même.

  4. SQLite versionné : documenter la politique de merge et de taille ; limiter la rétention ; sérialiser les jobs qui touchent le même fichier si plusieurs pipelines partagent l’infra.

4 Conclusion

Ce que vous gagnez : progression garantie dans le flux amont, moins d’appels redondants, jobs prévisibles sous timeout, journaux où last_id, max_seen et le nombre d’inserts se lisent séparément.

Ce que vous tradez : vous acceptez que des ressources « vues » côté API ne laissent aucune trace en base ; vous restez couplé à la sémantique des IDs (monotonie, absence de recyclage). Complexité : deux modes, état persistant, éventuellement duplication partielle de logique HTTP si JSON et binaires ne partagent pas le même helper. Onboarding : une phrase dans le README sur « pourquoi le curseur bouge sans insert » évite les tickets fantômes.

5 Lien vers le projet

Présentation du projet : naivo.
Code source : https://github.com/NCSdecoopman/naivo.

6 Références code

  • src\conditions\skitour.py — modes backfill / incrémental, table state, pagination, calcul de max_seen, filtrage métier, mise à jour du curseur indépendante du nombre d’inserts.

  • src\utils\parser.py — normalisation d’un champ libre vers colonnes structurées (split_conditions).

  • .github\workflows\conditions_daily.yml — cron, restauration de la base depuis Git, exécution du pipeline étatful en CI.