PHP队列系统与消息处理实战队列是处理异步任务和削峰填谷的重要工具。PHP本身没有内置队列但可以用Redis实现一个功能完整的队列系统。处理邮件发送、图片处理、报表生成这些耗时任务用队列可以避免阻塞主请求。Redis的List数据结构天然适合做队列。LPUSH入队RPOP出队BRPOP阻塞出队。phpclass RedisQueue{private Redis $redis;private string $prefix queue:;public function __construct(Redis $redis){$this-redis $redis;}public function push(string $queue, array $job): string{$jobId uniqid(job_, true);$payload json_encode([id $jobId,data $job,created_at time(),attempts 0,]);$this-redis-lPush($this-prefix . $queue, $payload);return $jobId;}public function pop(string $queue, int $timeout 0): ?array{$result $this-redis-brPop([$this-prefix . $queue], $timeout);if ($result null) return null;$payload json_decode($result[1], true);if ($payload null) return null;$payload[attempts];return $payload;}public function later(string $queue, array $job, int $delay): void{$payload json_encode([id uniqid(job_, true),data $job,created_at time(),available_at time() $delay,attempts 0,]);$this-redis-zAdd($this-prefix . delayed: . $queue, time() $delay, $payload);}public function size(string $queue): int{return $this-redis-lLen($this-prefix . $queue);}public function clear(string $queue): void{$this-redis-del($this-prefix . $queue);}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new RedisQueue($redis);// 推送任务$jobId $queue-push(emails, [type welcome,to userexample.com,user_id 123,]);echo 任务已推送: $jobId\n;$queue-push(images, [type resize,path /uploads/photo.jpg,sizes [300, 600, 1200],]);echo 队列大小: . $queue-size(emails) . \n;?worker进程负责从队列中拉取任务并执行。phpclass QueueWorker{private RedisQueue $queue;private array $handlers [];private bool $running false;public function __construct(RedisQueue $queue){$this-queue $queue;}public function addHandler(string $type, callable $handler): void{$this-handlers[$type] $handler;}public function work(string $queueName, int $memoryLimit 128): void{$this-running true;echo Worker启动监听队列: $queueName\n;while ($this-running) {// 检查内存if (memory_get_usage(true) $memoryLimit * 1024 * 1024) {echo 内存超限退出\n;break;}try {$job $this-queue-pop($queueName, 5);if ($job null) continue;$this-processJob($job);} catch (Exception $e) {echo 处理失败: {$e-getMessage()}\n;sleep(1);}}}private function processJob(array $job): void{$type $job[data][type] ?? default;echo 处理任务: {$job[id]} (类型: $type)\n;$handler $this-handlers[$type] ?? null;if ($handler null) {echo 未找到处理器: $type\n;return;}try {$result $handler($job[data]);echo 任务完成: {$job[id]}\n;if ($result) {echo 结果: . json_encode($result) . \n;}} catch (Exception $e) {echo 任务失败: {$job[id]}: {$e-getMessage()}\n;throw $e;}}public function stop(): void{$this-running false;}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new RedisQueue($redis);$worker new QueueWorker($queue);// 注册处理器$worker-addHandler(welcome, function ($data) {echo 发送欢迎邮件至 {$data[to]}\n;sleep(1);return [sent true];});$worker-addHandler(resize, function ($data) {echo 调整图片尺寸: {$data[path]}\n;foreach ($data[sizes] as $size) {echo 生成 {$size}x{$size} 缩略图\n;usleep(500000);}return [completed true];});// 运行worker实际中会单独启动一个进程// $worker-work(default);?延迟队列用于定时任务比如订单超时取消、定时发送邮件等。phpclass DelayedQueueProcessor{private Redis $redis;private RedisQueue $queue;private string $prefix queue:;public function __construct(Redis $redis, RedisQueue $queue){$this-redis $redis;$this-queue $queue;}public function processDelayed(): int{$processed 0;$now time();// 找到所有需要处理的延迟队列$keys $this-redis-keys($this-prefix . delayed:*);foreach ($keys as $key) {$queueName str_replace($this-prefix . delayed:, , $key);// 获取到期的任务$jobs $this-redis-zRangeByScore($key, 0, $now);foreach ($jobs as $job) {$data json_decode($job, true);if ($data isset($data[data])) {// 移到主队列$this-queue-push($queueName, $data[data]);$this-redis-zRem($key, $job);$processed;}}}return $processed;}}失败任务的重试机制也很重要。phpclass FailedJobHandler{private Redis $redis;private string $prefix queue:failed:;public function __construct(Redis $redis){$this-redis $redis;}public function handleFailure(string $queue, array $job, Exception $error): void{$job[error] $error-getMessage();$job[failed_at] time();$this-redis-lPush($this-prefix . $queue,json_encode($job));echo 任务失败: {$job[id]} - {$error-getMessage()}\n;}public function retry(string $queue, RedisQueue $targetQueue): int{$count 0;$key $this-prefix . $queue;while ($job $this-redis-rPop($key)) {$data json_decode($job, true);if ($data isset($data[data])) {$targetQueue-push($queue, $data[data]);$count;}}return $count;}public function count(string $queue): int{return $this-redis-lLen($this-prefix . $queue);}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new RedisQueue($redis);$failedHandler new FailedJobHandler($redis);echo 失败任务数: . $failedHandler-count(emails) . \n;?任务优先级可以用多个队列来实现高优先级的队列先消费。phpclass PriorityQueue{private Redis $redis;private array $queues [high, default, low];private string $prefix queue:;public function __construct(Redis $redis){$this-redis $redis;}public function push(string $priority, string $type, array $data): string{if (!in_array($priority, $this-queues)) {$priority default;}$jobId uniqid(prio_, true);$payload json_encode([id $jobId,type $type,data $data,priority $priority,created_at time(),]);$this-redis-lPush($this-prefix . $priority, $payload);return $jobId;}public function pop(int $timeout 5): ?array{$keys array_map(fn($q) $this-prefix . $q, $this-queues);$result $this-redis-brPop($keys, $timeout);if ($result null) return null;return json_decode($result[1], true);}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new PriorityQueue($redis);$queue-push(high, payment_refund, [order_id 123]);$queue-push(default, send_email, [to usertest.com]);$queue-push(low, generate_report, [type daily]);echo 高优先级任务会先被消费\n;?队列系统在实际项目中可以解决很多问题。异步任务处理、削峰填谷、任务重试、定时执行。用Redis做队列比用消息队列系统如RabbitMQ简单得多功能也够用。
PHP队列系统与消息处理实战
PHP队列系统与消息处理实战队列是处理异步任务和削峰填谷的重要工具。PHP本身没有内置队列但可以用Redis实现一个功能完整的队列系统。处理邮件发送、图片处理、报表生成这些耗时任务用队列可以避免阻塞主请求。Redis的List数据结构天然适合做队列。LPUSH入队RPOP出队BRPOP阻塞出队。phpclass RedisQueue{private Redis $redis;private string $prefix queue:;public function __construct(Redis $redis){$this-redis $redis;}public function push(string $queue, array $job): string{$jobId uniqid(job_, true);$payload json_encode([id $jobId,data $job,created_at time(),attempts 0,]);$this-redis-lPush($this-prefix . $queue, $payload);return $jobId;}public function pop(string $queue, int $timeout 0): ?array{$result $this-redis-brPop([$this-prefix . $queue], $timeout);if ($result null) return null;$payload json_decode($result[1], true);if ($payload null) return null;$payload[attempts];return $payload;}public function later(string $queue, array $job, int $delay): void{$payload json_encode([id uniqid(job_, true),data $job,created_at time(),available_at time() $delay,attempts 0,]);$this-redis-zAdd($this-prefix . delayed: . $queue, time() $delay, $payload);}public function size(string $queue): int{return $this-redis-lLen($this-prefix . $queue);}public function clear(string $queue): void{$this-redis-del($this-prefix . $queue);}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new RedisQueue($redis);// 推送任务$jobId $queue-push(emails, [type welcome,to userexample.com,user_id 123,]);echo 任务已推送: $jobId\n;$queue-push(images, [type resize,path /uploads/photo.jpg,sizes [300, 600, 1200],]);echo 队列大小: . $queue-size(emails) . \n;?worker进程负责从队列中拉取任务并执行。phpclass QueueWorker{private RedisQueue $queue;private array $handlers [];private bool $running false;public function __construct(RedisQueue $queue){$this-queue $queue;}public function addHandler(string $type, callable $handler): void{$this-handlers[$type] $handler;}public function work(string $queueName, int $memoryLimit 128): void{$this-running true;echo Worker启动监听队列: $queueName\n;while ($this-running) {// 检查内存if (memory_get_usage(true) $memoryLimit * 1024 * 1024) {echo 内存超限退出\n;break;}try {$job $this-queue-pop($queueName, 5);if ($job null) continue;$this-processJob($job);} catch (Exception $e) {echo 处理失败: {$e-getMessage()}\n;sleep(1);}}}private function processJob(array $job): void{$type $job[data][type] ?? default;echo 处理任务: {$job[id]} (类型: $type)\n;$handler $this-handlers[$type] ?? null;if ($handler null) {echo 未找到处理器: $type\n;return;}try {$result $handler($job[data]);echo 任务完成: {$job[id]}\n;if ($result) {echo 结果: . json_encode($result) . \n;}} catch (Exception $e) {echo 任务失败: {$job[id]}: {$e-getMessage()}\n;throw $e;}}public function stop(): void{$this-running false;}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new RedisQueue($redis);$worker new QueueWorker($queue);// 注册处理器$worker-addHandler(welcome, function ($data) {echo 发送欢迎邮件至 {$data[to]}\n;sleep(1);return [sent true];});$worker-addHandler(resize, function ($data) {echo 调整图片尺寸: {$data[path]}\n;foreach ($data[sizes] as $size) {echo 生成 {$size}x{$size} 缩略图\n;usleep(500000);}return [completed true];});// 运行worker实际中会单独启动一个进程// $worker-work(default);?延迟队列用于定时任务比如订单超时取消、定时发送邮件等。phpclass DelayedQueueProcessor{private Redis $redis;private RedisQueue $queue;private string $prefix queue:;public function __construct(Redis $redis, RedisQueue $queue){$this-redis $redis;$this-queue $queue;}public function processDelayed(): int{$processed 0;$now time();// 找到所有需要处理的延迟队列$keys $this-redis-keys($this-prefix . delayed:*);foreach ($keys as $key) {$queueName str_replace($this-prefix . delayed:, , $key);// 获取到期的任务$jobs $this-redis-zRangeByScore($key, 0, $now);foreach ($jobs as $job) {$data json_decode($job, true);if ($data isset($data[data])) {// 移到主队列$this-queue-push($queueName, $data[data]);$this-redis-zRem($key, $job);$processed;}}}return $processed;}}失败任务的重试机制也很重要。phpclass FailedJobHandler{private Redis $redis;private string $prefix queue:failed:;public function __construct(Redis $redis){$this-redis $redis;}public function handleFailure(string $queue, array $job, Exception $error): void{$job[error] $error-getMessage();$job[failed_at] time();$this-redis-lPush($this-prefix . $queue,json_encode($job));echo 任务失败: {$job[id]} - {$error-getMessage()}\n;}public function retry(string $queue, RedisQueue $targetQueue): int{$count 0;$key $this-prefix . $queue;while ($job $this-redis-rPop($key)) {$data json_decode($job, true);if ($data isset($data[data])) {$targetQueue-push($queue, $data[data]);$count;}}return $count;}public function count(string $queue): int{return $this-redis-lLen($this-prefix . $queue);}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new RedisQueue($redis);$failedHandler new FailedJobHandler($redis);echo 失败任务数: . $failedHandler-count(emails) . \n;?任务优先级可以用多个队列来实现高优先级的队列先消费。phpclass PriorityQueue{private Redis $redis;private array $queues [high, default, low];private string $prefix queue:;public function __construct(Redis $redis){$this-redis $redis;}public function push(string $priority, string $type, array $data): string{if (!in_array($priority, $this-queues)) {$priority default;}$jobId uniqid(prio_, true);$payload json_encode([id $jobId,type $type,data $data,priority $priority,created_at time(),]);$this-redis-lPush($this-prefix . $priority, $payload);return $jobId;}public function pop(int $timeout 5): ?array{$keys array_map(fn($q) $this-prefix . $q, $this-queues);$result $this-redis-brPop($keys, $timeout);if ($result null) return null;return json_decode($result[1], true);}}$redis new Redis();$redis-connect(127.0.0.1, 6379);$queue new PriorityQueue($redis);$queue-push(high, payment_refund, [order_id 123]);$queue-push(default, send_email, [to usertest.com]);$queue-push(low, generate_report, [type daily]);echo 高优先级任务会先被消费\n;?队列系统在实际项目中可以解决很多问题。异步任务处理、削峰填谷、任务重试、定时执行。用Redis做队列比用消息队列系统如RabbitMQ简单得多功能也够用。