Skip to content

Queues (Hàng đợi)

Giới thiệu (Introduction)

Laravel Queues cung cấp API thống nhất cho nhiều queue backends (Amazon SQS, Redis, Beanstalkd, database, hoặc sync). Queues cho phép hoãn thực thi tác vụ tốn thời gian (gửi email, xử lý video) sang background, cải thiện response time.

Cấu hình

File cấu hình tại config/queue.php.

ConnectionMô tả
syncThực thi ngay lập tức (cho development)
databaseLưu jobs trong database
redisLưu jobs trong Redis
sqsAmazon SQS
beanstalkdBeanstalkd server

Tạo Jobs (Creating Jobs)

bash
php artisan make:job ProcessPodcast

Cấu trúc Job

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    public function __construct(
        public Podcast $podcast,
    ) {}

    public function handle(AudioProcessor $processor): void
    {
        $processor->process($this->podcast);
    }
}

Middleware cho Jobs

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

public function middleware(): array
{
    return [new WithoutOverlapping($this->podcast->id)];
}

Rate Limiting

php
use Illuminate\Queue\Middleware\RateLimited;

public function middleware(): array
{
    return [new RateLimited('backups')];
}

Dispatching Jobs

php
use App\Jobs\ProcessPodcast;

// Dispatch vào queue
ProcessPodcast::dispatch($podcast);

// Dispatch có điều kiện
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

// Dispatch delay
ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));

// Dispatch sau DB commit
ProcessPodcast::dispatch($podcast)->afterCommit();

// Sync dispatch (chạy ngay)
ProcessPodcast::dispatchSync($podcast);

Job Chaining

Chạy các jobs theo tuần tự:

php
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

// Với catch
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
])->catch(function (Throwable $e) {
    // Job trong chain fail...
})->dispatch();

Queues và Connections

php
ProcessPodcast::dispatch($podcast)
    ->onQueue('processing')
    ->onConnection('sqs');

Chạy Queue Worker

bash
php artisan queue:work

# Chạy queue cụ thể
php artisan queue:work redis --queue=emails

# Số jobs tối đa
php artisan queue:work --max-jobs=1000

# Thời gian tối đa
php artisan queue:work --max-time=3600

# Sleep khi queue trống
php artisan queue:work --sleep=3

# Xử lý 1 job
php artisan queue:work --once

Supervisor Configuration

Dùng Supervisor để giữ workers chạy:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

Retries (Thử lại)

php
// Số lần thử tối đa
class ProcessPodcast implements ShouldQueue
{
    public $tries = 25;

    // Hoặc thời gian tối đa thử
    public function retryUntil(): DateTime
    {
        return now()->addMinutes(10);
    }
}

Backoff

php
public $backoff = 3; // 3 giây

// Exponential backoff
public $backoff = [1, 5, 10]; // 1s, 5s, 10s

Job Batching

Dispatch nhiều jobs và thực hiện action khi batch hoàn thành:

php
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

$batch = Bus::batch([
    new ImportCsv(1),
    new ImportCsv(2),
    new ImportCsv(3),
])->then(function (Batch $batch) {
    // Tất cả jobs hoàn thành...
})->catch(function (Batch $batch, Throwable $e) {
    // Phát hiện job fail đầu tiên...
})->finally(function (Batch $batch) {
    // Batch kết thúc (thành công hoặc fail)...
})->dispatch();

$batch->id;
$batch->progress();     // 0-100
$batch->finished();
$batch->cancelled();

Failed Jobs

bash
# Xem failed jobs
php artisan queue:failed

# Retry failed job
php artisan queue:retry <id>
php artisan queue:retry all

# Xóa failed job
php artisan queue:forget <id>
php artisan queue:flush

Xử lý Failure trong Job

php
public function failed(?Throwable $exception): void
{
    // Gửi notification cho user khi job fail...
}

Job Events

php
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Support\Facades\Queue;

Queue::before(function (JobProcessing $event) {
    // $event->connectionName, $event->job, $event->job->payload()
});

Queue::after(function (JobProcessed $event) {
    // Job hoàn thành...
});