站长资源数据库

Redis 实现队列原理的实例详解

整理:jimmy2024/12/27浏览2
简介Redis 实现队列原理的实例详解场景说明:·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求·抢购场景,先入先出的模式命令:rpush + blpop 或 lpush +

Redis 实现队列原理的实例详解

场景说明:

·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时

·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去处理这些请求

·抢购场景,先入先出的模式

命令:

rpush + blpop 或 lpush + brpop

rpush : 往列表右侧推入数据

blpop : 客户端阻塞直到队列有值输出

简单队列:

simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();

获取20000万个商品,并把json化后的数据推入goods:task队列

queueBlpop.php

// 出队
while (true) {  
// 阻塞设置超时时间为3秒  
$task = $redis->blPop(array('goods:task'), 3); 
  if ($task) { 
    $redis->rPush('goods:success:task', $task[1]);
    $task = json_decode($task[1], true);
    echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
    echo PHP_EOL; 
  } else {
    echo 'nothing' . PHP_EOL;    sleep(5);  
 }
}

设置blpop阻塞时间为3秒,当有数据出队时保存到goods:success:task表示执行成功,当队列没有数据时,程序睡眠10秒重新检查goods:task是否有数据出队

cli 模式执行命令:

php simple.phpphp queueBlpop.php

优先级队列

思路:

blpop 有多个键时,blpop会从左至右遍历键,一旦一个键能弹出元素,客户端立即返回。例如:

blpop key1 key2 key3 key4

从key1到key4遍历,如果哪个key有值,则弹出这个值,若多个key同时有值时,优先弹出排在左边的key。

priority.php

// 设置优先级队列
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  
// cid 小于100放在低级队列  
if ($row['cid'] < 100) {
    $redis->rPush($low, json_encode($row));  

// cid 100到600之间放在中级队列  
}elseif ($row['cid'] > 100 && $row['cid'] < 600) {
    $redis->rPush($mid, json_encode($row));
}  
// cid 大于600放在高级队列   else {    $redis->rPush($high, json_encode($row));  }
}$redis->close();

priorityBlop.php

// 优先级队列
$high = 'goods:high:task';
$mid = 'goods:mid:task';$low = 'goods:low:task';
// 出队
while(true){  // 优先级高的队列放在左侧  
  $task = $redis->blPop(array($high, $mid, $low), 3);
  if ($task) {
    $task = json_decode($task[1], true);
    echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
    echo PHP_EOL;
  } else {
    echo 'nothing' . PHP_EOL;    sleep(5);
  }
}

优先级高的队列放在blpop命令左侧,依次排序,blpop命令会依次弹出high, mid, low队列的值

cli 模式执行命令:

php priority.phpphp priorityBlpop.php

延迟队列

思路:

可以用一个有序集合来保存延迟任务,member保存任务内容,score保存(当前时间 + 延时时间)。用时间作为score。程序只要用有序集合的第一条任务的score和当前时间做比较,如果当前时间比score小,说明有序集合的所有任务还没到执行时间。

delay.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));}

将20万条任务导入有序集合goods:delay:task,所有任务延迟到之后的1秒到300秒内执行

delayHandle.php

while (true) {// 因为是有序集合,只要判断第一条记录的延时时间,例如第一条未到执行时间    // 相对说明集合的其他任务未到执行时间  

$rs = $redis->zRange('goods:delay:task', 0, 0, true);

// 集合没有任务,睡眠时间设置为5秒  

  if (empty($rs)) {    
        echo 'no tasks , sleep 5 seconds' . PHP_EOL;sleep(5);continue;}
   $taskJson = key($rs);  
       $delay = $rs[$taskJson];  
       $task = json_decode($taskJson, true);  
   $now = time();// 到时间执行延时任务  
   if ($delay <= $now) {    

// 对当前任务加锁,避免移动移动延时任务到任务队列时被其他客户端修改      

  if (!($identifier = acquireLock($task['id']))) {     
    continue;}    

// 移动延时任务到任务队列      

$redis->zRem('goods:delay:task', $taskJson);    
$redis->rPush('goods:task', $taskJson);    
echo $task['id'] . ' run ' . PHP_EOL;    

// 释放锁      

releaseLock($task['id'], $identifier);  } 
  else {    

// 延时任务未到执行时间      

 $sleep = $delay - $now;    

// 最大值设置为2秒,保证如果有新的任务(延时时间1秒)进入集合时能够及时的被处理

  $sleep = $sleep > 2 "htmlcode">
queueBlpop.php
// 出队while (true) {  
// 阻塞设置超时时间为3秒  
 $task = $redis->blPop(array('goods:task'), 3);  
if ($task) {    
$redis->rPush('goods:success:task', $task[1]);    
 $task = json_decode($task[1], true);    
echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';    
echo PHP_EOL;  } else {    
echo 'nothing' . PHP_EOL;sleep(5);  
   }
}



处理任务队列中的任务

cli模式下执行命令:

php delay.phpphp delayHanlde.phpphp queueBlpop.php

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!