- 配置queue(amqp)
'bootstrap' => [ 'queue', // The component registers own console commands ], 'components' => [ 'queue' => [ 'class' => 'zhuravljov\yii\queue\amqp\Queue', 'host' => '192.168.221.56', 'port' => 5672, 'user' => 'admin', 'password' => 'admin', 'queueName' => 'productDropshipQN', 'exchangeName' => 'productDropshipEX', ], ],
2.console controller
<?php /** * FecShop file. * * @link http://www.fecshop.com/ * @copyright Copyright (c) 2016 FecShop Software LLC * @license http://www.fecshop.com/license/ */ namespace fecshop\app\console\modules\Amqp\controllers; use Yii; use yii\console\Controller; use fecshop\app\console\modules\Amqp\block\PushTest; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * @author Terry Zhao <2358269014@qq.com> * @since 1.0 * 这是一个测试RabbitMq 的一个例子。这里作为消息生产方。 * 你可以通过执行 ./yii amqp/test/test 来生产数据。 */ class TestController extends Controller { const EXCHANGE_DIRECT = 'direct'; const EXCHANGE_TOPIC = 'topic'; const EXCHANGE_FANOUT = 'fanout'; public $host = '192.168.221.56'; public $port = 5672; public $user = 'admin'; public $password = 'admin'; public $queueName = 'productDropshipQN'; public $exchangeName = 'productDropshipEX'; public $routingKey = 'productDropshipRT'; public $exchangeType = self::EXCHANGE_DIRECT; /** * @var AMQPStreamConnection */ private $connection; /** * @var AMQPChannel */ private $channel; /** * 生产数据 */ public function actionTest() { Yii::$app->queue->push([ 'name' => 'water', 'age' => 331, ]); } /** * 接收数据 */ public function actionListen() { $this->open(); $callback = function(AMQPMessage $message) { if ($this->handleMessage($message->body)) { $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback); while(count($this->channel->callbacks)) { $this->channel->wait(); } } /** * Opens connection and channel */ protected function open() { if ($this->channel) return; $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queueName,true, true); $this->channel->exchange_declare($this->exchangeName, $this->exchangeType, false, true, false); $this->channel->queue_bind($this->queueName, $this->exchangeName,$this->routingKey); } /** * 这里处理接收到的数据 */ protected function handleMessage($message) { // $message = unserialize($message); var_dump($message); // do some thing ... // \Yii::info($message,'fecshop_debug'); return true; } /* public function actionListen3() { Yii::$app->queue->listen(); } */ }