yii2 amqp 接收和发送数据(和外部系统对接)

  1. 配置queue(amqp)
'bootstrap' => [
        'queue', // The component registers own console commands
    'components' => [
        'queue' => [
            'class' => 'zhuravljov\yii\queue\amqp\Queue',
            'host'  => '',
            'port'  => 5672,
            'user'  => 'admin',
            'password' => 'admin',
            'queueName' => 'productDropshipQN',
            'exchangeName' => 'productDropshipEX',


2.console  controller


 * FecShop file.
 * @link http://www.fecshop.com/
 * @copyright Copyright (c) 2016 FecShop Software LLC
 * @license http://www.fecshop.com/license/

namespace fecshop\app\console\modules\Amqp\controllers;

use Yii;
use yii\console\Controller;
use fecshop\app\console\modules\Amqp\block\PushTest;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

 * @author Terry Zhao <2358269014@qq.com>
 * @since 1.0
 * 这是一个测试RabbitMq 的一个例子。这里作为消息生产方。
 * 你可以通过执行 ./yii amqp/test/test 来生产数据。
class TestController extends Controller
    const EXCHANGE_DIRECT = 'direct';
    const EXCHANGE_TOPIC = 'topic';
    const EXCHANGE_FANOUT = 'fanout';
    public $host = '';
    public $port = 5672;
    public $user = 'admin';
    public $password        = 'admin';
    public $queueName       = 'productDropshipQN';
    public $exchangeName    = 'productDropshipEX';
    public $routingKey      = 'productDropshipRT';
    public $exchangeType    = self::EXCHANGE_DIRECT;
     * @var AMQPStreamConnection
    private $connection;
     * @var AMQPChannel
    private $channel;
     * 生产数据
    public function actionTest()
            'name'  => 'water',
            'age'   => 331,
     * 接收数据
    public function actionListen()
        $callback = function(AMQPMessage $message) {
            if ($this->handleMessage($message->body)) {
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
        while(count($this->channel->callbacks)) {
     * Opens connection and channel
    protected function open()
        if ($this->channel) return;
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($this->queueName,true, true);
        $this->channel->exchange_declare($this->exchangeName, $this->exchangeType, false, true, false);
        $this->channel->queue_bind($this->queueName, $this->exchangeName,$this->routingKey);
     * 这里处理接收到的数据
    protected function handleMessage($message)
        // $message = unserialize($message);
        //  do some thing ...
        // \Yii::info($message,'fecshop_debug');
        return true;
    public function actionListen3()






电子邮件地址不会被公开。 必填项已用*标注