🎯 Outils iGaming en ligne gratuits        

Création d'un système de diffusion de webhooks qui mesure sa propre latence (avec du code)

J'avais besoin de construire un système qui envoie des rappels HTTP sortants (webhooks/postbacks) à des centaines de points de terminaison différents, suit le succès et l'échec de la livraison, mesure les percentiles de latence par point de terminaison, effectue des nouvelles tentatives intelligentes et met en quarantaine les points de terminaison qui sont constamment lents ou défectueux.

C'est le genre d'infrastructure qui ressemble à un projet de week-end et qui se transforme en un véritable casse-tête de trois mois. Tous les systèmes de diffusion de webhooks que j'ai vus en production présentent le même problème : ils fonctionnent parfaitement à faible volume, puis se dégradent silencieusement à grande échelle car personne n'a instrumenté le pipeline de diffusion.

Cet article décrit en détail l'implémentation complète : architecture de la file d'attente, logique de nouvelle tentative avec temporisation exponentielle, suivi de la latence par point de terminaison avec les percentiles P50/P95/P99, mécanisme de coupure pour les points de terminaison défaillants et file d'attente de messages non distribuables pour les pannes permanentes. Le code est entièrement écrit en Python avec PostgreSQL. Aucun courtier de messages externe n'est requis (mais je préciserai où l'utilisation de Redis ou RabbitMQ serait nécessaire pour un débit plus élevé).

Le modèle de données

Tout commence par deux tables : une pour la file d’attente de livraison et une pour les mesures de latence.

sql

-- Pending and in-flight webhook deliveries
CREATE TABLE webhook_queue (
    id              SERIAL PRIMARY KEY,
    endpoint_id     INTEGER NOT NULL,
    endpoint_url    TEXT NOT NULL,
    payload         JSONB NOT NULL,
    
    -- Delivery state
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
        -- pending, in_flight, delivered, failed, dead_letter
    attempt_count   INTEGER NOT NULL DEFAULT 0,
    max_attempts    INTEGER NOT NULL DEFAULT 5,
    
    -- Timing
    created_at      TIMESTAMP NOT NULL DEFAULT NOW(),
    next_attempt_at TIMESTAMP NOT NULL DEFAULT NOW(),
    delivered_at    TIMESTAMP,
    last_error      TEXT,
    
    -- Response tracking
    last_status_code INTEGER,
    last_response_ms INTEGER,  -- response time in milliseconds
    
    INDEX idx_queue_next (status, next_attempt_at)
        WHERE status IN ('pending', 'failed')
);

-- Per-endpoint latency measurements (ring buffer)
CREATE TABLE endpoint_latency (
    id              SERIAL PRIMARY KEY,
    endpoint_id     INTEGER NOT NULL,
    response_ms     INTEGER NOT NULL,
    status_code     INTEGER,
    measured_at     TIMESTAMP NOT NULL DEFAULT NOW(),
    
    INDEX idx_latency_endpoint (endpoint_id, measured_at)
);

-- Endpoint health state (circuit breaker)
CREATE TABLE endpoint_health (
    endpoint_id         INTEGER PRIMARY KEY,
    endpoint_url        TEXT NOT NULL,
    state               VARCHAR(20) NOT NULL DEFAULT 'closed',
        -- closed (healthy), open (broken), half_open (testing)
    consecutive_failures INTEGER NOT NULL DEFAULT 0,
    failure_threshold   INTEGER NOT NULL DEFAULT 10,
    last_failure_at     TIMESTAMP,
    last_success_at     TIMESTAMP,
    opened_at           TIMESTAMP,  -- when circuit opened
    cooldown_seconds    INTEGER NOT NULL DEFAULT 300,  -- 5 min before half_open
    
    -- Latency stats (updated periodically)
    p50_ms              INTEGER,
    p95_ms              INTEGER,
    p99_ms              INTEGER,
    sample_count        INTEGER NOT NULL DEFAULT 0
);

Trois points à noter concernant ce schéma.

La première, la webhook_queue tableau utilise un next_attempt_at une colonne au lieu d'un mécanisme de planification distinct. Le processus recherche les lignes où status IN ('pending', 'failed') AND next_attempt_at <= NOW()Il s'agit d'une solution rudimentaire de file d'attente différée, qui fonctionne correctement jusqu'à environ 10 000 envois par minute. Au-delà, il est préférable d'utiliser un véritable courtier de messages.

Deuxièmement, le endpoint_latency La table sert de tampon circulaire. Je purge périodiquement les lignes datant de plus de 24 heures. Les percentiles de latence dans endpoint_health sont calculées à partir de cette fenêtre glissante — elles représentent un comportement récent, et non des moyennes historiques.

Troisièmement, le endpoint_health Le tableau implémente la machine à états du disjoncteur. Plus de détails ci-dessous.

Le livreur

La boucle de traitement principale est volontairement simple. La complexité réside dans la logique de nouvelle tentative et le disjoncteur, et non dans le chemin de livraison lui-même.

python

import requests
import time
import psycopg2
from psycopg2.extras import RealDictCursor
from datetime import datetime, timedelta

DB_DSN = "postgresql://user:pass@localhost/webhooks"

def get_connection():
    return psycopg2.connect(DB_DSN)

def deliver_webhooks(batch_size=50):
    """
    Fetch pending webhooks and attempt delivery.
    Uses SELECT FOR UPDATE SKIP LOCKED for safe concurrent workers.
    """
    conn = get_connection()
    cur = conn.cursor(cursor_factory=RealDictCursor)
    
    try:
        cur.execute("""
            SELECT id, endpoint_id, endpoint_url, payload, 
                   attempt_count, max_attempts
            FROM webhook_queue
            WHERE status IN ('pending', 'failed')
              AND next_attempt_at <= NOW()
            ORDER BY next_attempt_at ASC
            LIMIT %s
            FOR UPDATE SKIP LOCKED
        """, (batch_size,))
        
        rows = cur.fetchall()
        
        for row in rows:
            # Check circuit breaker before attempting
            if is_circuit_open(cur, row['endpoint_id']):
                # Don't attempt delivery — reschedule for after cooldown
                reschedule_for_cooldown(cur, row['id'], row['endpoint_id'])
                continue
            
            # Attempt delivery and measure latency
            result = attempt_delivery(
                row['endpoint_url'], 
                row['payload']
            )
            
            # Record latency measurement regardless of success/failure
            record_latency(
                cur, 
                row['endpoint_id'], 
                result['response_ms'], 
                result['status_code']
            )
            
            if result['success']:
                mark_delivered(cur, row['id'], result)
                record_success(cur, row['endpoint_id'])
            else:
                handle_failure(
                    cur, row['id'], row['endpoint_id'],
                    row['attempt_count'], row['max_attempts'],
                    result
                )
        
        conn.commit()
    
    except Exception as e:
        conn.rollback()
        raise
    finally:
        cur.close()
        conn.close()


def attempt_delivery(url, payload):
    """
    Fire the webhook and measure response time.
    Returns dict with success, status_code, response_ms, error.
    """
    start = time.monotonic()
    
    try:
        response = requests.post(
            url,
            json=payload,
            timeout=15,          # 15 second hard timeout
            headers={
                'Content-Type': 'application/json',
                'User-Agent': 'WebhookDelivery/1.0',
                'X-Delivery-Timestamp': str(int(time.time()))
            }
        )
        
        elapsed_ms = int((time.monotonic() - start) * 1000)
        
        return {
            'success': 200 <= response.status_code < 300,
            'status_code': response.status_code,
            'response_ms': elapsed_ms,
            'error': None if response.ok else f"HTTP {response.status_code}"
        }
    
    except requests.Timeout:
        elapsed_ms = int((time.monotonic() - start) * 1000)
        return {
            'success': False,
            'status_code': None,
            'response_ms': elapsed_ms,
            'error': 'timeout_15s'
        }
    
    except requests.ConnectionError as e:
        elapsed_ms = int((time.monotonic() - start) * 1000)
        return {
            'success': False,
            'status_code': None,
            'response_ms': elapsed_ms,
            'error': f'connection_error: {str(e)[:200]}'
        }

Le SELECT FOR UPDATE SKIP LOCKED Cette clause est essentielle pour exécuter plusieurs instances de travailleurs. Sans elle, SKIP LOCKEDDeux processus pourraient se bloquer sur la même ligne. Désormais, chaque processus traite un lot différent de webhooks en attente. Cela permet une mise à l'échelle horizontale en lançant simplement davantage de processus.

Le time.monotonic() appeler au lieu de time.time() est délibéré. time.time() peut faire un saut en arrière lors des ajustements NTP. time.monotonic() ne recule jamais, ce qui est important lorsqu'on mesure une latence inférieure à la seconde.

Logique de nouvelle tentative avec temporisation exponentielle et gigue

En cas d'échec d'une livraison, le délai de nouvelle tentative détermine si votre système se rétablit correctement ou s'il crée un véritable raz-de-marée qui submerge un point de terminaison déjà en difficulté.

python

import random

def calculate_next_attempt(attempt_count, base_delay=30, max_delay=3600):
    """
    Exponential backoff with full jitter.
    
    Attempt 1: 0-30s
    Attempt 2: 0-60s  
    Attempt 3: 0-120s
    Attempt 4: 0-240s
    Attempt 5: 0-480s (capped at max_delay)
    
    Full jitter prevents thundering herd when an endpoint
    recovers and hundreds of retries fire simultaneously.
    """
    exponential_delay = base_delay * (2 ** attempt_count)
    capped_delay = min(exponential_delay, max_delay)
    jittered_delay = random.uniform(0, capped_delay)
    
    return datetime.utcnow() + timedelta(seconds=jittered_delay)


def handle_failure(cur, webhook_id, endpoint_id, 
                   attempt_count, max_attempts, result):
    """
    Handle a failed delivery attempt.
    Either retry with backoff or move to dead letter queue.
    """
    new_attempt_count = attempt_count + 1
    
    if new_attempt_count >= max_attempts:
        # Exhausted retries — dead letter
        cur.execute("""
            UPDATE webhook_queue 
            SET status = 'dead_letter',
                attempt_count = %s,
                last_error = %s,
                last_status_code = %s,
                last_response_ms = %s
            WHERE id = %s
        """, (
            new_attempt_count, result['error'],
            result['status_code'], result['response_ms'],
            webhook_id
        ))
    else:
        # Schedule retry with backoff
        next_attempt = calculate_next_attempt(new_attempt_count)
        cur.execute("""
            UPDATE webhook_queue
            SET status = 'failed',
                attempt_count = %s,
                next_attempt_at = %s,
                last_error = %s,
                last_status_code = %s,
                last_response_ms = %s
            WHERE id = %s
        """, (
            new_attempt_count, next_attempt,
            result['error'], result['status_code'],
            result['response_ms'], webhook_id
        ))
    
    # Update circuit breaker
    record_failure(cur, endpoint_id)

Pourquoi utiliser la gigue complète plutôt que la gigue décorrélée ou la gigue égale ? AWS a publié une analyse de référence à ce sujet. La gigue complète (aléatoire entre 0 et la limite exponentielle) offre le temps d'exécution total le plus court pour tous les clients. La gigue égale (aléatoire entre la moitié et la limite) est plus prudente, mais met plus de temps à vider le backlog de tentatives. Pour la diffusion par webhook, avec de nombreux points de terminaison indépendants, la gigue complète est le choix idéal, car les tentatives de chaque point de terminaison sont indépendantes : aucune coordination n'est nécessaire entre eux.

Disjoncteur : Arrêtez de marteler les points de rupture

Le modèle de disjoncteur empêche votre système de gaspiller des ressources sur des points de terminaison défaillants de manière répétée. Sans lui, un point de terminaison défaillant accumule des centaines de tentatives de reconnexion en attente, chacune expirant après 15 secondes, ce qui mobilise inutilement la capacité de vos nœuds de calcul pour des livraisons vouées à l'échec.

python

def is_circuit_open(cur, endpoint_id):
    """
    Check if the circuit breaker is open (endpoint is broken).
    If open and cooldown has passed, transition to half_open.
    """
    cur.execute("""
        SELECT state, opened_at, cooldown_seconds
        FROM endpoint_health
        WHERE endpoint_id = %s
    """, (endpoint_id,))
    
    row = cur.fetchone()
    if not row:
        return False  # no health record = assume healthy
    
    if row['state'] == 'closed':
        return False
    
    if row['state'] == 'open':
        # Check if cooldown period has elapsed
        if row['opened_at'] and row['cooldown_seconds']:
            elapsed = (datetime.utcnow() - row['opened_at']).total_seconds()
            if elapsed >= row['cooldown_seconds']:
                # Transition to half_open — allow one probe
                cur.execute("""
                    UPDATE endpoint_health
                    SET state = 'half_open'
                    WHERE endpoint_id = %s
                """, (endpoint_id,))
                return False  # allow the probe delivery
        return True  # still in cooldown
    
    if row['state'] == 'half_open':
        return False  # allow probe delivery
    
    return False


def record_failure(cur, endpoint_id):
    """
    Record a delivery failure. Open circuit if threshold reached.
    """
    cur.execute("""
        UPDATE endpoint_health
        SET consecutive_failures = consecutive_failures + 1,
            last_failure_at = NOW()
        WHERE endpoint_id = %s
        RETURNING consecutive_failures, failure_threshold, state
    """, (endpoint_id,))
    
    row = cur.fetchone()
    if not row:
        # Create health record on first failure
        cur.execute("""
            INSERT INTO endpoint_health (endpoint_id, endpoint_url, 
                consecutive_failures, last_failure_at)
            VALUES (%s, '', 1, NOW())
            ON CONFLICT (endpoint_id) DO UPDATE
            SET consecutive_failures = endpoint_health.consecutive_failures + 1,
                last_failure_at = NOW()
        """, (endpoint_id,))
        return
    
    if row['state'] == 'half_open':
        # Probe failed — re-open circuit with longer cooldown
        cur.execute("""
            UPDATE endpoint_health
            SET state = 'open',
                opened_at = NOW(),
                cooldown_seconds = LEAST(cooldown_seconds * 2, 3600)
            WHERE endpoint_id = %s
        """, (endpoint_id,))
    
    elif (row['state'] == 'closed' and 
          row['consecutive_failures'] >= row['failure_threshold']):
        # Threshold reached — open circuit
        cur.execute("""
            UPDATE endpoint_health
            SET state = 'open',
                opened_at = NOW(),
                cooldown_seconds = 300  -- reset to 5 minutes
            WHERE endpoint_id = %s
        """, (endpoint_id,))


def record_success(cur, endpoint_id):
    """
    Record a delivery success. Close circuit if half_open.
    """
    cur.execute("""
        UPDATE endpoint_health
        SET consecutive_failures = 0,
            last_success_at = NOW(),
            state = 'closed'
        WHERE endpoint_id = %s
    """, (endpoint_id,))

L'escalade de l'état semi-ouvert vers l'état ouvert, avec un délai de récupération doublé, est un détail souvent négligé. Si un point de terminaison échoue pendant la phase de test (état semi-ouvert), il est inutile de réessayer 5 minutes plus tard, car le point de terminaison reste indisponible. Doublez le délai de récupération à 10 minutes, puis à 20, pour atteindre une durée maximale d'une heure. Cela évite que le disjoncteur ne devienne un mécanisme de déclenchement intempestif et abusif.

Suivi du percentile de latence

Les moyennes sont trompeuses. Un terminal avec un temps de réponse moyen de 200 ms peut répondre en 50 ms dans 95 % des cas et en 3 000 ms dans les 5 % restants. La moyenne semble correcte. Le P95 révèle un problème qui affecte 1 transmission sur 20.

python

def record_latency(cur, endpoint_id, response_ms, status_code):
    """
    Record a latency measurement and update percentile stats.
    """
    cur.execute("""
        INSERT INTO endpoint_latency 
            (endpoint_id, response_ms, status_code)
        VALUES (%s, %s, %s)
    """, (endpoint_id, response_ms, status_code))


def update_latency_percentiles(cur, endpoint_id, window_hours=24):
    """
    Calculate P50, P95, P99 from the rolling window.
    Uses PostgreSQL's percentile_cont for exact percentiles.
    """
    cur.execute("""
        SELECT 
            COUNT(*) as sample_count,
            percentile_cont(0.50) WITHIN GROUP 
                (ORDER BY response_ms) AS p50,
            percentile_cont(0.95) WITHIN GROUP 
                (ORDER BY response_ms) AS p95,
            percentile_cont(0.99) WITHIN GROUP 
                (ORDER BY response_ms) AS p99
        FROM endpoint_latency
        WHERE endpoint_id = %s
          AND measured_at >= NOW() - INTERVAL '%s hours'
    """, (endpoint_id, window_hours))
    
    row = cur.fetchone()
    
    if row and row['sample_count'] > 0:
        cur.execute("""
            UPDATE endpoint_health
            SET p50_ms = %s,
                p95_ms = %s,
                p99_ms = %s,
                sample_count = %s
            WHERE endpoint_id = %s
        """, (
            int(row['p50']), int(row['p95']), 
            int(row['p99']), row['sample_count'],
            endpoint_id
        ))
    
    return row


def get_slow_endpoints(cur, p95_threshold_ms=2000):
    """
    Find endpoints whose P95 latency exceeds the threshold.
    These are candidates for investigation or circuit opening.
    """
    cur.execute("""
        SELECT endpoint_id, endpoint_url, 
               p50_ms, p95_ms, p99_ms, sample_count,
               state, consecutive_failures
        FROM endpoint_health
        WHERE p95_ms > %s
          AND sample_count >= 20  -- need sufficient samples
        ORDER BY p95_ms DESC
    """, (p95_threshold_ms,))
    
    return cur.fetchall()

PostgreSQL percentile_cont est une fonction d'agrégation d'ensembles ordonnés qui calcule les percentiles exacts. Pour les grands ensembles de données, il faudrait passer à percentile_disc (qui renvoie une valeur observée réelle plutôt qu'une interpolation) ou utiliser une approximation par la méthode t-digest. Pour la diffusion par webhook avec une fenêtre de 24 heures, le calcul précis des percentiles sur les données brutes est suffisamment rapide jusqu'à environ 100 000 mesures par point de terminaison.

Le get_slow_endpoints Cette fonction est exécutée automatiquement toutes les 15 minutes. Les points de terminaison dont le délai d'attente (P95) dépasse 2 secondes sont signalés pour investigation. Pour ceux dont le P95 dépasse 5 secondes, le seuil de déclenchement du disjoncteur est abaissé : le nombre d'échecs consécutifs autorisés avant le déclenchement est réduit, car chaque échec de livraison immobilise un thread de travail pendant toute la durée du délai d'attente.

Surveillance de l'état de la distribution des webhooks

Voici la requête de surveillance que j'exécute toutes les cinq minutes. Elle génère un résumé de l'état de santé de l'ensemble du pipeline de livraison, présenté sur une seule ligne :

sql

SELECT
    -- Queue depth
    COUNT(*) FILTER (WHERE status = 'pending') AS pending,
    COUNT(*) FILTER (WHERE status = 'failed') AS awaiting_retry,
    COUNT(*) FILTER (WHERE status = 'in_flight') AS in_flight,
    COUNT(*) FILTER (WHERE status = 'dead_letter') AS dead_letter,
    
    -- Delivery rate (last hour)
    COUNT(*) FILTER (
        WHERE status = 'delivered' 
        AND delivered_at >= NOW() - INTERVAL '1 hour'
    ) AS delivered_last_hour,
    
    -- Failure rate (last hour)
    COUNT(*) FILTER (
        WHERE status IN ('failed', 'dead_letter')
        AND created_at >= NOW() - INTERVAL '1 hour'
    ) AS failed_last_hour,
    
    -- Oldest undelivered
    MIN(created_at) FILTER (
        WHERE status IN ('pending', 'failed')
    ) AS oldest_pending,
    
    -- Average delivery latency (last hour, successful only)
    AVG(last_response_ms) FILTER (
        WHERE status = 'delivered'
        AND delivered_at >= NOW() - INTERVAL '1 hour'
    ) AS avg_delivery_ms_last_hour

FROM webhook_queue;

Le oldest_pending La valeur est l'indicateur le plus important dans cette requête. Si elle est antérieure à la durée maximale de votre fenêtre de nouvelle tentative (somme de tous les délais d'attente), il y a un problème structurel : soit le processus est bloqué, soit le point de terminaison est inaccessible, soit la file d'attente croît plus vite que vous ne pouvez la vider.

Je déclenche une alerte dans trois cas : augmentation du nombre de lettres non distribuées (les points de terminaison tombent constamment en panne et personne n’enquête), durée de la file d’attente dépassant 30 minutes (la livraison prend du retard), et La latence P95 par point de terminaison dépassant les seuils indiquant une fiabilité de livraison dégradée Le troisième signal est l'alerte précoce : la latence augmente avant les défaillances. Un terminal qui répondait en 200 ms et qui met désormais 3 secondes à répondre est sur le point d'expirer.

La file d'attente des lettres non distribuées n'est pas qu'un simple espace de stockage.

La plupart des équipes utilisent une file d'attente de messages non distribués, sous forme de table, pour stocker les webhooks ayant échoué. Elles la consultent occasionnellement lors des interventions en cas d'incident. C'est un gaspillage de ressources.

La file d'attente des messages non distribués constitue votre ensemble de données de débogage le plus précieux. Chaque ligne représente une tentative de livraison infructueuse, à plusieurs reprises, de votre système. L'analyse des messages non distribués vous révèle des informations que les indicateurs de performance ne vous fourniront jamais.

python

def analyze_dead_letters(cur, hours=24):
    """
    Analyze recent dead letter entries for patterns.
    Returns per-endpoint failure analysis.
    """
    cur.execute("""
        SELECT 
            endpoint_id,
            endpoint_url,
            COUNT(*) AS dead_count,
            
            -- Most common error
            MODE() WITHIN GROUP (ORDER BY last_error) AS primary_error,
            
            -- Most common status code
            MODE() WITHIN GROUP (ORDER BY last_status_code) 
                AS primary_status_code,
            
            -- Timing
            MIN(created_at) AS first_dead,
            MAX(created_at) AS last_dead,
            
            -- Average attempts before giving up
            AVG(attempt_count)::INTEGER AS avg_attempts
            
        FROM webhook_queue
        WHERE status = 'dead_letter'
          AND created_at >= NOW() - INTERVAL '%s hours'
        GROUP BY endpoint_id, endpoint_url
        ORDER BY dead_count DESC
        LIMIT 20
    """, (hours,))
    
    return cur.fetchall()

Lorsque j'examine les lettres non distribuées, je recherche trois schémas.

Défaillances du cluster : 50 messages non distribués pour le même point de terminaison dans la même heure indiquent que ce dernier est tombé en panne et n'a pas pu être rétabli dans le délai imparti. Action : prolonger le délai de nouvelle tentative ou mettre en place une remise en file d'attente manuelle.

Modèles de codes d'état : Une augmentation soudaine des erreurs 401/403 (erreurs non distribuées) indique que les identifiants du point de terminaison ont été renouvelés et que la configuration du webhook n'a pas été mise à jour. Une augmentation soudaine des erreurs 429 (Trop de requêtes) signifie que vous dépassez la limite de requêtes et qu'il est nécessaire de limiter le débit.

Accumulation progressive : 2 à 3 lettres non distribuées par jour pour un seul point de terminaison, réparties uniformément. C'est le schéma le plus sournois : le point de terminaison fonctionne la plupart du temps, mais subit des pannes intermittentes qui épuisent les tentatives de reconnexion au fil du temps. La solution consiste généralement à augmenter progressivement le nombre de lettres non distribuées. max_attempts pour ce point de terminaison spécifique ou en réduisant le délai d'expiration.

Exécution du travailleur

La boucle principale qui relie tout :

python

import signal
import sys

running = True

def shutdown_handler(signum, frame):
    global running
    running = False
    print(f"Received signal {signum}, shutting down gracefully...")

signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)

def main():
    print("Webhook delivery worker starting...")
    
    while running:
        try:
            deliver_webhooks(batch_size=50)
        except Exception as e:
            print(f"Worker error: {e}")
            time.sleep(5)  # back off on errors
            continue
        
        # Update latency stats every 100 iterations
        # (cheap operation, doesn't need to run every loop)
        if int(time.time()) % 100 == 0:
            conn = get_connection()
            cur = conn.cursor(cursor_factory=RealDictCursor)
            try:
                cur.execute(
                    "SELECT DISTINCT endpoint_id FROM endpoint_health"
                )
                for row in cur.fetchall():
                    update_latency_percentiles(cur, row['endpoint_id'])
                conn.commit()
            finally:
                cur.close()
                conn.close()
        
        # Poll interval — 500ms keeps latency low without
        # hammering the database
        time.sleep(0.5)

    print("Worker shut down cleanly.")

if __name__ == '__main__':
    main()

Le SIGTERM Le gestionnaire est essentiel pour des arrêts propres dans les environnements conteneurisés. Lorsque Kubernetes envoie un signal SIGTERM, le worker termine son traitement par lots en cours, valide la transaction et s'arrête. Sans cela, des lignes restent bloquées. in_flight statut sans travailleur en train de les traiter.

Ce que ce système ne fait pas (et quand vous avez besoin de plus)

Cette implémentation gère jusqu'à environ 10 000 livraisons par minute sur une seule instance PostgreSQL avec 2 à 3 processus de travail. Au-delà, trois modifications sont nécessaires.

Commencez par remplacer la file d'attente PostgreSQL par Redis Streams ou RabbitMQ. SELECT FOR UPDATE SKIP LOCKED Ce type de comportement engendre des conflits d'écriture sur la table de file d'attente à haut débit. Un courtier de messages dédié permet d'éliminer ce problème.

Deuxièmement, ajoutez une limitation de débit par point de terminaison. Certains points de terminaison de réception ont des limites de débit (100 requêtes par minute, 1 000 par heure). Sans limitation de débit côté client, vous dépasserez leur quota et recevrez une erreur 429. Implémentez un système de jetons par point de terminaison.

Troisièmement, ajoutez la signature des requêtes. Les signatures HMAC-SHA256 sur la charge utile permettent au point de terminaison destinataire de vérifier que le webhook provient bien de votre système et n'a pas été altéré lors de sa transmission. Il s'agit d'une étape indispensable pour tout système de webhook envoyant des données financières.

Le système présenté dans cet article constitue la base. Il gère les problèmes complexes — logique de nouvelle tentative, protection contre les surcharges, mesure de la latence, analyse des messages non distribués — indispensables à tout système de distribution de webhooks, quelle que soit son échelle. Les composants spécifiques que vous ajoutez (courtier de messages, limiteur de débit, signature des requêtes) dépendent de vos exigences en matière de débit et de sécurité.

L'élément crucial, souvent négligé par les équipes, est la mesure du système de distribution lui-même. Si vous ne pouvez pas répondre à la question « Quelle est la latence de distribution P95 vers le point de terminaison X au cours des dernières 24 heures ? », vous travaillez à l'aveugle. Commencez par mettre en place les outils de mesure. Le reste suivra.

article précédent

Meilleur logiciel de suivi des affiliés iGaming en 2026

César Fikson
Auteur :

César Fikson

Je suis analyste de données iGaming, spécialisé dans l'analyse et l'interprétation des données relatives aux plateformes de jeux en ligne, aux jeux d'argent et aux tendances du marché. J'analyse le comportement des joueurs, les performances des jeux et les tendances des revenus afin d'optimiser les expériences de jeu et les stratégies commerciales.

Sommaire