miércoles, 25 de junio de 2025

Implementando Streams Asíncronos con Corrutinas en PHP

Implementando Streams Asíncronos con Corrutinas en PHP

PHP, aunque tradicionalmente conocido por su ejecución sincrónica, ha evolucionado para permitir la concurrencia y asincronía. Una técnica avanzada es el uso de corrutinas para manejar streams de datos de forma asíncrona. Esto resulta útil al procesar grandes volúmenes de datos, como archivos de registro o flujos de datos de sensores, sin bloquear el hilo principal de ejecución. Este enfoque mejora la capacidad de respuesta de la aplicación y la eficiencia del uso de recursos.


<?php

use Revolt\EventLoop;
use Revolt\Stream\ReadableStream;

/**
 * Corrutina para leer un stream de forma asíncrona y procesar los datos.
 *
 * @param ReadableStream $stream El stream a leer.
 * @param callable $processor Una función para procesar cada chunk de datos.
 * @return void
 */
function processStreamAsync(ReadableStream $stream, callable $processor): void
{
    EventLoop::queue(function() use ($stream, $processor) {
        while (! $stream->isClosed()) {
            $chunk = $stream->read();
            if ($chunk === null) { // Se alcanzó el final del stream
                break;
            }
            $processor($chunk);
        }
    });
}

// Ejemplo de uso: leer un archivo de registro de forma asíncrona.
$stream = fopen("log.txt", "r");
$readableStream = new \Revolt\Stream\Stream($stream);


processStreamAsync($readableStream, function (string $chunk) {
    //Procesa el chunk de datos.  Por ejemplo, registrarlo o analizarlo.
    echo "Procesando: " . strlen($chunk) . " bytes\n";
    usleep(10000); //Simula un procesamiento que tarda un poco.
});


EventLoop::run();

//Cerrar el stream una vez que termine el EventLoop.
fclose($stream);
    

Este código utiliza la librería Revolt.run para la gestión del event loop y los streams asíncronos. La función `processStreamAsync` encapsula la lógica de lectura del stream en una corrutina, utilizando `EventLoop::queue` para ejecutar el bucle de lectura de forma no bloqueante. Cada chunk de datos leído del stream se pasa a una función de procesamiento ( `$processor` ), que puede realizar cualquier operación necesaria sobre los datos. En este ejemplo se simula un retraso en el procesamiento para demostrar la asincronía.


<?php

use Revolt\EventLoop;
use Revolt\Stream\ReadableStream;

//Ejemplo utilizando un generador para producir un stream "virtual"
function generateDataStream(int $numChunks, int $chunkSize): Generator {
    for ($i = 0; $i < $numChunks; $i++) {
        yield random_bytes($chunkSize);
        usleep(5000); // Simula la producción de datos con cierto retraso.
    }
}


$generator = generateDataStream(10, 1024); // 10 chunks de 1KB cada uno.
$stream = new \Revolt\Stream\IteratorStream($generator); // Adapta el generador a un stream.

processStreamAsync($stream, function (string $chunk) {
    echo "Procesado (Generador): " . strlen($chunk) . " bytes\n";
});

EventLoop::run();

    

Este segundo ejemplo muestra cómo usar un generador para crear un stream de datos "virtual". La función `generateDataStream` genera chunks de datos aleatorios con un retraso simulado entre cada chunk. La clase `IteratorStream` de Revolt se usa para adaptar el generador a un stream compatible con `processStreamAsync`. Esto permite procesar datos que no necesariamente provienen de un archivo, sino de una fuente de datos dinámica o calculada. La asincronía se mantiene, permitiendo que la aplicación continúe respondiendo mientras se generan y procesan los datos.

El uso de corrutinas con streams asíncronos en PHP abre nuevas posibilidades para la construcción de aplicaciones de alto rendimiento y baja latencia, especialmente en escenarios donde se requiere el procesamiento de grandes volúmenes de datos o la interacción con sistemas externos lentos.

No hay comentarios:

Publicar un comentario