1, 需要一个中文文档

https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ying-yong-jiao-cheng/php-ban/1-hello_world 

详细的说明了调用rabbitMq的各种方法。

 

2,开始连接的时候,第一次使用有许多的坑,最后多谢老铁给我讲解了一下关于bs cs的东西

rabbit 拓展有php写的 还有c写的拓展,分成两种。

php:


require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

类似于上面这个就是php的扩展,不用考虑扩展和php的版本问题,这也算是坑的一部分吧。

 

c:

    $conn = new \AMQPConnection($conn_args);
	
	$message = json_encode(array('Hello World3!','php3','c++3:'));
	
    //创建channel
	$channel = new \AMQPChannel($conn);
	
    //创建exchange
	$ex = new \AMQPExchange($channel);

下面这是c写的扩展,其实C写的运行速度快,但是目前使用的是php的扩展

 

生产者代码:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$localhost = ['localhost',5672,'guest','guest'];


$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
                      );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

消费者代码:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$localhost = ['localhost',5672,'guest','guest'];

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

//由于项目原因,不能直接使用原生的php扩展

切换为 c扩展时报错  暂时未解决

报错 :Exception 'AMQPConnectionException' with message 'Library error: a socket error occurred - Potential login failure.'

Library error: connection closed unexpectedly - Potential login failure.

百度翻译 连接失败,登录失败。设置了管理员账号依然如此,

php - rabbit引用

https://blog.csdn/jj546630576/article/details/88533718

 

在切换为php扩展时,一直报错

PHP Fatal error:  Class 'PhpAmqpLib\PhpAmqpLib\Connection\AMQPStreamConnection' not found in D:\php\couponapi\commands\RabbitMQController.php on line 214
PHP Fatal Error 'yii\base\ErrorException' with message 'Class 'PhpAmqpLib\PhpAmqpLib\Connection\AMQPStreamConnection' not found'

类名找不到,于是各种改类命名空间,最后发现是安转的composer的命名目录指向和我的命名目录指向有问题,大坑啊

name参数是目录指向:如果随便修改,就会找不到类

"name": "php-amqplib/php-amqplib",
    "replace": {
        "videlalvaro/php-amqplib": "self.version"
    },
    "type": "library",
    "description": "Formerly videlalvaro/php-amqplib.  This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
    "keywords": ["rabbitmq", "message", "queue"],
    "homepage": "https://github/php-amqplib/php-amqplib/",

此处惨痛的经历告诉我,没事别瞎改composer安装好的目录。不然还找不到原因

3,安装好了之后,开始连接通过生产者代码发送给Mq,并且在Mq接收到之后,会推送到消费者进行消费和输出

由于目前项目中用到的是YII2的 console命令行,

如果需要在command命令中引用类库,需要再console.php进行配置数据库和加载,否则会报错。

同事的意思:是想在是接收到Mq接收的消息之后再把这些消息经过一轮处理放到redis队列再处理

            我是直接在command全部处理完所有的逻辑,到时候在通过Mq的Ack应答机制返回完成

 

更多推荐

php 本地实现连接rabbitMq ,发送接收消息