miércoles, 25 de junio de 2025

Implementando WebSockets Asíncronos con ReactPHP y ZeroMQ

Implementando WebSockets Asíncronos con ReactPHP y ZeroMQ

En aplicaciones de alto rendimiento, la comunicación en tiempo real es crucial. ReactPHP ofrece una plataforma no bloqueante para construir aplicaciones asíncronas en PHP. Integrar WebSockets con ReactPHP permite una comunicación bidireccional eficiente. Sin embargo, para escalabilidad, es útil desacoplar el servidor WebSocket de la lógica de la aplicación. Aquí es donde ZeroMQ entra en juego.

ZeroMQ es una librería de mensajería de alto rendimiento que permite la comunicación entre procesos. Podemos utilizar ZeroMQ para crear una capa de abstracción entre el servidor WebSocket (construido con ReactPHP) y los workers de la aplicación que procesan los mensajes. Esto permite escalar la aplicación horizontalmente sin afectar el rendimiento del servidor WebSocket.

El siguiente ejemplo demuestra la creación de un servidor WebSocket con ReactPHP que publica mensajes a través de ZeroMQ. Un worker separado consume estos mensajes y los procesa.


context = new \ZMQContext();
        $this->socket = $this->context->getSocket(\ZMQ::SOCKET_PUB);
        $this->socket->bind("tcp://127.0.0.1:5555");
    }

    public function onOpen(\Ratchet\ConnectionInterface $conn) {
        $this->clients[$conn->resourceId] = $conn;
        echo "New connection! ({$conn->resourceId})\n";
    }

    public function onMessage(\Ratchet\ConnectionInterface $from, $msg) {
        echo sprintf('Connection %d sending message "%s" to %d other connection(s)' . "\n"
            , $from->resourceId, $msg, count($this->clients) - 1);

        $this->socket->send($msg);
    }

    public function onClose(\Ratchet\ConnectionInterface $conn) {
        unset($this->clients[$conn->resourceId]);
        echo "Connection {$conn->resourceId} has disconnected\n";
    }

    public function onError(\Ratchet\ConnectionInterface $conn, \Exception $e) {
        echo "An error has occurred: {$e->getMessage()}\n";
        unset($this->clients[$conn->resourceId]);
        $conn->close();
    }
};

$webSock = new Server('0.0.0.0:8080', $loop);
$webServer = new IoServer(
    new HttpServer(
        new WsServer(
            $pusher
        )
    ),
    $webSock
);

$loop->run();

?>
    

El código anterior crea un servidor WebSocket que escucha en el puerto 8080. Cada vez que un cliente envía un mensaje, el servidor lo publica a través de ZeroMQ al puerto 5555. Un worker (código no incluido por brevedad) podría conectarse a este puerto usando un socket de tipo `ZMQ::SOCKET_SUB` para recibir y procesar estos mensajes.

Esta arquitectura permite escalar la aplicación, ya que podemos tener múltiples workers consumiendo los mensajes de ZeroMQ, sin sobrecargar el servidor WebSocket. Además, simplifica la lógica del servidor WebSocket, ya que solo se encarga de la comunicación y no del procesamiento de los datos.

No hay comentarios:

Publicar un comentario