Implementando Streams Asíncronos en PHP con Fibers
PHP 8.1 introdujo Fibers, permitiendo concurrencia sin la sobrecarga de threads. Si bien se usan comúnmente para operaciones I/O, podemos usarlos para construir streams asíncronos de datos. Esto resulta útil cuando se necesita procesar datos de forma incremental sin bloquear el hilo principal, por ejemplo, al recibir información de una API en tiempo real o al leer archivos grandes.
<?php
use Fiber;
class AsyncStream {
private Fiber $producer;
private mixed $value = null;
private bool $done = false;
public function __construct(callable $producer) {
$this->producer = new Fiber(function () use ($producer) {
try {
$producer($this); // Inyectamos el stream en el productor
} finally {
$this->done = true; // Marcar como finalizado incluso si hay excepciones
}
});
}
public function current(): mixed {
if (!$this->producer->isStarted()) {
$this->producer->start();
} elseif ($this->producer->isSuspended()) {
$this->producer->resume();
}
if ($this->done) {
return null; // O lanzar una excepción, depende de la lógica
}
return $this->value;
}
public function next(mixed $value): void {
if (!$this->producer->isRunning()) {
throw new LogicException("next() can only be called from within the producer.");
}
$this->value = $value;
$this->producer->suspend();
}
public function isDone(): bool {
return $this->done;
}
public function getIterator(): iterable {
while (!$this->isDone()) {
yield $this->current();
}
}
}
// Ejemplo de uso:
$stream = new AsyncStream(function (AsyncStream $stream) {
for ($i = 0; $i < 5; $i++) {
sleep(1); // Simula un proceso largo
$stream->next("Data chunk: " . $i);
}
});
foreach ($stream->getIterator() as $data) {
echo $data . PHP_EOL;
}
echo "Stream finished." . PHP_EOL;
En este ejemplo, la clase AsyncStream
encapsula la lógica de la Fiber. El constructor recibe una función $producer
, que se ejecuta en la Fiber y genera los datos. La función next()
se usa para enviar datos al stream desde dentro de la Fiber. La función current()
se llama desde el bucle foreach
para obtener el siguiente valor del stream. Es importante que la funcion producer reciba la instancia AsyncStream como argumento para poder utilizar el metodo next().
<?php
// Ejemplo de stream asíncrono que simula la lectura de un archivo grande
use Fiber;
function readFileAsync(string $filePath, int $chunkSize = 4096): AsyncStream
{
return new AsyncStream(function (AsyncStream $stream) use ($filePath, $chunkSize) {
$file = fopen($filePath, 'r');
if (!$file) {
throw new RuntimeException("Could not open file: " . $filePath);
}
try {
while (!feof($file)) {
$chunk = fread($file, $chunkSize);
$stream->next($chunk);
}
} finally {
fclose($file);
}
});
}
// Uso:
$filePath = 'large_file.txt'; // Reemplaza con tu archivo
$stream = readFileAsync($filePath);
foreach ($stream->getIterator() as $chunk) {
echo "Processing chunk: " . strlen($chunk) . " bytes" . PHP_EOL;
// Realiza el procesamiento necesario con el chunk de datos
usleep(50000); //Simula un proceso
}
echo "File processing complete." . PHP_EOL;
Este segundo ejemplo muestra cómo leer un archivo grande de forma asíncrona usando un stream. La función readFileAsync
crea un AsyncStream
que lee el archivo en chunks y los emite al stream. El bucle foreach
itera sobre los chunks y los procesa. Esta técnica permite evitar cargar todo el archivo en memoria, lo que es útil para archivos muy grandes. Las excepciones no controladas dentro de la Fiber provocarán que se propagen al llamador de current()
o resume()
, dependiendo del contexto.
No hay comentarios:
Publicar un comentario