miércoles, 25 de junio de 2025

Sincronización de Datos Transaccionales con Colas Asíncronas en PHP

Sincronización de Datos Transaccionales con Colas Asíncronas en PHP

En el desarrollo de aplicaciones web complejas, a menudo nos encontramos con la necesidad de sincronizar datos entre diferentes sistemas o servicios. Una aproximación común es realizar actualizaciones en tiempo real dentro de una misma transacción. Sin embargo, esto puede afectar significativamente el rendimiento y la escalabilidad, especialmente si involucra servicios externos con tiempos de respuesta variables. Una alternativa robusta es utilizar colas asíncronas para desacoplar estas operaciones, garantizando la consistencia de los datos sin comprometer la experiencia del usuario.

Este post explora la implementación de un patrón donde una acción de base de datos dentro de una transacción dispara un evento que se coloca en una cola. Un worker consume ese evento de la cola y realiza las acciones necesarias en otros sistemas. Si la acción en la cola falla, se implementa un mecanismo de reintento y, si el reintento falla, se registra el error para análisis posterior.


queueName = $queueName;
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($queueName, false, true, false, false); // Durable queue
    }

    public function publishMessage(array $data): void
    {
        $messageBody = json_encode($data);
        $message = new AMQPMessage(
            $messageBody,
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] // Persistent message
        );
        $this->channel->basic_publish($message, '', $this->queueName);
    }

    public function consumeMessage(callable $callback): void
    {
        $this->channel->basic_qos(null, 1, null); // Process one message at a time
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($callback) {
                try {
                    $data = json_decode($message->body, true);
                    $callback($data);
                    $message->ack(); // Acknowledge successful processing
                } catch (\Exception $e) {
                    // Handle error, log, and potentially retry
                    error_log("Error processing message: " . $e->getMessage());
                    $message->nack(false, true); // Requeue the message
                }
            }
        );

        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }

    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// Ejemplo de uso
$queue = new TransactionalQueue('user_updates');

// Publicar un mensaje (dentro de una transacción de base de datos)
$queue->publishMessage(['user_id' => 123, 'email' => 'newemail@example.com']);

// Consumir un mensaje (en un worker)
$queue->consumeMessage(function (array $data) {
    // Simula la actualización de un servicio externo
    sleep(2); // Simula una operación lenta
    echo "Processing user update: " . json_encode($data) . PHP_EOL;
    //throw new \Exception("Simulated error"); // uncomment to test error handling
});
?>
    

En este ejemplo, la clase `TransactionalQueue` encapsula la lógica para interactuar con RabbitMQ (u otro broker de mensajes). La función `publishMessage` coloca un mensaje en la cola cuando se realiza una operación en la base de datos (por ejemplo, actualizar el correo electrónico de un usuario). La función `consumeMessage` recibe un callback que se ejecuta por cada mensaje consumido de la cola. Es importante notar el uso de `basic_qos` para procesar un mensaje a la vez y `delivery_mode` para asegurar la persistencia de los mensajes en el broker.

Este patrón permite mantener la consistencia de los datos sin penalizar el rendimiento de la aplicación principal. La clave reside en la robustez del consumidor, manejando errores y reintentos adecuadamente. Considera usar un sistema de logging centralizado y una política de reintentos exponencial para mejorar la resiliencia del sistema.

No hay comentarios:

Publicar un comentario