miércoles, 25 de junio de 2025

Manipulación Avanzada de Streams Asíncronos con SplQueue en PHP

Manipulación Avanzada de Streams Asíncronos con SplQueue en PHP

PHP ofrece potentes herramientas para trabajar con streams, pero a menudo la manipulación asíncrona de múltiples streams puede ser un desafío. Este artículo explora una técnica avanzada que combina la funcionalidad de streams con la estructura de datos SplQueue para gestionar y procesar datos de múltiples fuentes de forma no bloqueante.

La idea principal es crear una cola que almacene recursos de stream junto con información sobre su estado. Luego, un bucle principal monitorea los streams en la cola y procesa los datos disponibles sin bloquear la ejecución.


<?php

// Clase para encapsular un stream y su estado
class StreamItem {
    public $stream;
    public $metadata;

    public function __construct($stream, $metadata = []) {
        $this->stream = $stream;
        $this->metadata = $metadata;
    }
}

// Crear una cola para gestionar streams
$streamQueue = new SplQueue();

// Función para agregar un stream a la cola
function enqueueStream($streamQueue, $stream, $metadata = []) {
    stream_set_blocking($stream, false); // Establecer como no bloqueante
    $streamItem = new StreamItem($stream, $metadata);
    $streamQueue->enqueue($streamItem);
}

// Ejemplo: Abrir dos streams (podrían ser sockets, archivos, etc.)
$stream1 = fopen("php://input", "r"); // Standard input
$stream2 = fopen("data.txt", "r"); // Un archivo local

// Agregar los streams a la cola
enqueueStream($streamQueue, $stream1, ['name' => 'stdin']);
enqueueStream($streamQueue, $stream2, ['name' => 'file']);

// Bucle principal para procesar los streams de forma asíncrona
while (!$streamQueue->isEmpty()) {
    $readStreams = [];
    $writeStreams = null;
    $exceptStreams = null;

    // Preparar los arrays para stream_select
    foreach ($streamQueue as $item) {
        $readStreams[] = $item->stream;
    }

    // Esperar hasta que al menos un stream esté listo para leer
    $numReadyStreams = stream_select($readStreams, $writeStreams, $exceptStreams, 0, 200000); // Timeout de 0.2 segundos

    if ($numReadyStreams > 0) {
        // Iterar sobre la cola y procesar los streams listos
        foreach ($streamQueue as $item) {
            if (in_array($item->stream, $readStreams, true)) {
                $data = fgets($item->stream); // Leer datos del stream

                if ($data === false) {
                    // El stream ha llegado al final o ha ocurrido un error
                    fclose($item->stream);
                    $streamQueue->dequeue(); // Eliminar el stream de la cola
                } else {
                    // Procesar los datos
                    echo "Data from " . $item->metadata['name'] . ": " . $data;
                }
            }
        }
    } else {
        // No hay streams listos, hacer algo más (p.ej., dormir)
        usleep(100000); // Dormir 0.1 segundos
    }
}

echo "All streams processed.\n";

?>
    

En este ejemplo, StreamItem encapsula un recurso de stream y metadatos asociados. La función enqueueStream configura un stream como no bloqueante y lo agrega a la cola. El bucle principal utiliza stream_select para monitorear los streams y procesa los datos disponibles de forma no bloqueante. Si un stream está listo para leer, se leen los datos y se procesan. Si un stream llega al final, se cierra y se elimina de la cola.

Este enfoque permite gestionar eficientemente múltiples streams de forma asíncrona, evitando bloqueos y mejorando la capacidad de respuesta de las aplicaciones PHP.

No hay comentarios:

Publicar un comentario