Queue
Overview
Pogo Queue is a FrankenPHP queue module with Laravel and Symfony Messenger drivers.
It runs a Go worker loop inside the FrankenPHP process, reserves jobs from Redis Streams, delivers each job to a PHP worker script, and acknowledges, releases, or fails the delivery through native PHP functions.
Status and fit
Pogo Queue is designed for production use with Redis Streams and at-least-once delivery. Job handlers must be idempotent because a reserved job can be delivered more than once after a timeout, process restart, or worker failure.
Use it when:
- You already operate Redis.
- You want queue workers inside the FrankenPHP deployment model.
- Laravel or Symfony Messenger already owns your job definitions and retry policy.
Avoid it when:
- You need a dashboard or Horizon replacement.
- You cannot make job handlers idempotent.
- You need exactly-once job execution.
Build
Compile the module into FrankenPHP:
xcaddy build \
--with github.com/dunglas/frankenphp@v1.12.3 \
--with github.com/dunglas/frankenphp/caddy@v1.12.3 \
--with github.com/y-l-g/queue/module@main
Caddy configuration
Production Redis backend:
{
frankenphp
pogo_queue {
backend redis {
url {$POGO_REDIS_URL}
key_prefix pogo
group default
consumer {$HOSTNAME}
tls false
}
worker public/queue-worker.php
queues default,mail,notifications
concurrency 8
worker_threads 8
max_payload_bytes 1048576
visibility_timeout 90s
reserve_timeout 1s
shutdown_timeout 30s
max_attempts 3
}
}
Local-only memory backend:
{
frankenphp
pogo_queue {
backend memory {
max_messages 1000
max_total_bytes 67108864
}
worker public/queue-worker.php
queues default
concurrency 2
}
}
The memory backend is not durable and must not be used for critical production work.
Application integration
Laravel
Install the driver:
composer require pogo/laravel-queue
Configure config/queue.php:
'pogo' => [
'driver' => 'pogo',
'queue' => env('POGO_QUEUE', 'default'),
'retry_after' => 90,
],
Set the environment:
QUEUE_CONNECTION=pogo
POGO_QUEUE=default
POGO_REDIS_URL=redis://redis:6379/0
Create a FrankenPHP queue worker entrypoint:
<?php
use Illuminate\Queue\WorkerOptions;
use Laravel\Octane\ApplicationFactory;
use Laravel\Octane\FrankenPhp\FrankenPhpClient;
use Laravel\Octane\Worker;
use Pogo\Queue\Laravel\PogoJob;
require_once dirname(__DIR__) . '/vendor/autoload.php';
$basePath = $_SERVER['APP_BASE_PATH'] ?? dirname(__DIR__);
$worker = tap(new Worker(new ApplicationFactory($basePath), new FrankenPhpClient()))->boot();
$options = new WorkerOptions();
$connection = $_ENV['POGO_CONNECTION'] ?? 'pogo';
try {
while (frankenphp_handle_request(static function (string $message) use ($worker, $options, $connection): void {
$delivery = json_decode($message, true, flags: JSON_THROW_ON_ERROR);
$app = $worker->application();
$queue = $app['queue']->connection($connection);
$job = PogoJob::fromDelivery($app, $queue, $delivery);
$app['queue.worker']->process($connection, $job, $options);
})) {
}
} finally {
$worker->terminate();
}
Symfony
Install the bundle:
composer require pogo/symfony-queue
Configure Messenger:
framework:
messenger:
transports:
pogo: 'pogo-queue://default'
routing:
'App\Message\YourMessage': pogo
Create a worker entrypoint:
<?php
use App\Kernel;
use Symfony\Bundle\FrameworkBundle\Console\Application;
use Symfony\Component\Console\Input\ArrayInput;
require_once __DIR__ . '/../vendor/autoload_runtime.php';
return static function (array $context) {
$kernel = new Kernel($context['APP_ENV'], (bool) $context['APP_DEBUG']);
$app = new Application($kernel);
$app->setDefaultCommand('messenger:consume', true);
$app->run(new ArrayInput([
'receivers' => ['pogo'],
'--time-limit' => 3600,
]));
return $app;
};
PHP API
The framework drivers use these native functions:
pogo_queue_push(string $queue, string $payload, int $delaySeconds = 0): string;
pogo_queue_ack(string $queue, string $deliveryId): int;
pogo_queue_release(string $queue, string $deliveryId, int $delaySeconds = 0): int;
pogo_queue_fail(string $queue, string $deliveryId, string $reason = ''): int;
pogo_queue_status(?string $queue = null): string;
pogo_queue_push() returns a JSON string containing ok, id, code, and message fields. pogo_queue() is still available as a deprecated v1 helper for immediate dispatch to the default queue.
Operations
- Enable Redis persistence for production deployments.
- Set
visibility_timeoutabove the normal maximum job runtime. - Set
max_attemptsaccording to your retry policy. - Monitor
pending,reserved,delayed,failed,backend_errors, and payload rejection counts throughpogo_queue_status(). - During deploys, FrankenPHP stops reserving new work and lets in-flight workers finish until
shutdown_timeout; unacknowledged Redis messages remain pending and can be reclaimed by another consumer.
Troubleshooting
- Worker unavailable: confirm the binary includes
pogo_queue, the Caddyfile has apogo_queueblock, and the worker script path is correct. - Queue unknown: add the queue name to the
queuesdirective. - Payload too large: increase
max_payload_bytesor reduce job payload size. - Growing reserved count: check PHP worker errors and make sure
visibility_timeoutis not shorter than normal job runtime.