Redis Stream  是从5.0才有的新数据结构,基础知识参考网址

Redis Stream | 菜鸟教程Redis Stream Redis Stream 是 Redis 5.0 版本新增加的数据结构。 Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。 简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。 而 ..https://www.runoob/redis/redis-stream.html但是php要想使用确有很多障碍。网上相关资料少的出奇。
很多命令php都不支持,要使用 $redis->rawCommand。
有一位网友整理了相关资料,可以参考。
PHP 操作 Redis Stream 消息队列_王宣成的博客-CSDN博客_php redis stream需要redis5.0 ,借鉴kafka,消息可持久化<?php //连接reids $redis = new Redis(); $redis->connect('127.0.0.1', 6379); //xadd:追加消息 //xdel:删除消息,删除标志位,不影响消息总长度 //xrange:消息列表,过滤已删除的消息 //xlen:消息长度 //del: 删除所有消息 $redis->rawCommandhttps://blog.csdn/qq_18743819/article/details/107276380我自己也做了一遍消息队列,贴上代码。抛砖引玉吧。
生产者代码就一个。

$redis->xadd('queue','*',['type'=>0,'dat'=>'']);

这个特别说明 第三个参数是数组,按原生代码这个就得拼接key/val 如果有1000个,是不是很傻呀。php用数组封装一下完美。

消费者代码有点复杂

set_time_limit(0);
$this->queue="queue"; //消息队里非key
$this->group='pin-group'  //消费者组
$consumer='consumerA';  //消费者

if(!$this->redis->exists($this->queue)){  /*队列不存在需要初始化*/
    $this->redis->xadd($this->queue,'*',['type'=>0,'dat'=>'']);
}
$res=$this->redis->xinfo('groups',$this->queue);  /*获取strarm消费组信息*/
if(!$res){
     $res=$this->redis->xgroup('create',$this->queue,$this->group,'0');  /*创建消费组*/
     var_dump($res);
}
while (1) {
     echo 'start============'.microtime(true)." \n";
//从最后读取一条,阻塞5秒
     $read = $this->redis->rawCommand('xreadgroup','group',$this->group,$consumer,'block','5000', 'count', '1' ,'streams',$this->queue,'>');

     //$res = $this->redis->rawCommand('xpending',$this->queue,$this->group,'-','+','10',$consumer);  //消费者的待处理消息
            $info = $read[0][1] ?? [];
            if (empty($info)) {
                continue;
            }
            $msgCount = count($info);
            for ($a = 0; $a < $msgCount; $a ++) {
                $msgId = $info[$a][0] ?? 0;  //每条消息的id
                $msg=$info[$a][1];
                $type=$msg[1];
                $dat=json_decode($msg[3],true);
                print_r([$type,$dat]);
                $xack = $this->redis->rawCommand('xack',$this->queue,$this->group,$msgId);  //确认消息已经处理
                echo $msgId.':'.$xack.PHP_EOL;
            }            
            //sleep(1);
            echo "end============".microtime(true)." \n";
}

消费者代码,需要持久化。

更多推荐

Redis Stream +php7 实现消息队列