yii2 amqp 接收和发送数据(和外部系统对接)

  1. 配置queue(amqp)
'bootstrap' => [
        'queue', // The component registers own console commands
    ],
    
    'components' => [
        'queue' => [
            'class' => 'zhuravljov\yii\queue\amqp\Queue',
            'host'  => '192.168.221.56',
            'port'  => 5672,
            'user'  => 'admin',
            'password' => 'admin',
            'queueName' => 'productDropshipQN',
            'exchangeName' => 'productDropshipEX',
        ],
    ],

 

2.console  controller

 

<?php
/**
 * FecShop file.
 *
 * @link http://www.fecshop.com/
 * @copyright Copyright (c) 2016 FecShop Software LLC
 * @license http://www.fecshop.com/license/
 */

namespace fecshop\app\console\modules\Amqp\controllers;

use Yii;
use yii\console\Controller;
use fecshop\app\console\modules\Amqp\block\PushTest;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * @author Terry Zhao <2358269014@qq.com>
 * @since 1.0
 * 这是一个测试RabbitMq 的一个例子。这里作为消息生产方。
 * 你可以通过执行 ./yii amqp/test/test 来生产数据。
 */
class TestController extends Controller
{
    const EXCHANGE_DIRECT = 'direct';
    const EXCHANGE_TOPIC = 'topic';
    const EXCHANGE_FANOUT = 'fanout';
    
    public $host = '192.168.221.56';
    public $port = 5672;
    public $user = 'admin';
    public $password        = 'admin';
    
    public $queueName       = 'productDropshipQN';
    public $exchangeName    = 'productDropshipEX';
    public $routingKey      = 'productDropshipRT';
    public $exchangeType    = self::EXCHANGE_DIRECT;
    
    /**
     * @var AMQPStreamConnection
     */
    private $connection;
    /**
     * @var AMQPChannel
     */
    private $channel;
    
     /**
     * 生产数据
     */
    public function actionTest()
    {
        Yii::$app->queue->push([
            'name'  => 'water',
            'age'   => 331,
        ]);
    }
    /**
     * 接收数据
     */
    public function actionListen()
    {
        $this->open();
        $callback = function(AMQPMessage $message) {
            if ($this->handleMessage($message->body)) {
                $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
            }
        };
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
        while(count($this->channel->callbacks)) {
            $this->channel->wait();
        }
        
    }
    
    
    /**
     * Opens connection and channel
     */
    protected function open()
    {
        if ($this->channel) return;
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($this->queueName,true, true);
        $this->channel->exchange_declare($this->exchangeName, $this->exchangeType, false, true, false);
        $this->channel->queue_bind($this->queueName, $this->exchangeName,$this->routingKey);
    }
    /**
     * 这里处理接收到的数据
     */
    protected function handleMessage($message)
    {
        // $message = unserialize($message);
        var_dump($message);
        //  do some thing ...
        // \Yii::info($message,'fecshop_debug');
        return true;
    }
    
    
    
    /*
    public function actionListen3()
    {
          Yii::$app->queue->listen();
        
    }
    */
    
}