ลองเขียน custom driver สำหรับติดต่อกับ RabbitMQ / AMQP ยังไม่สมบูรณ์แต่ก็ดีกว่าการที่ต้องมา connect ผ่าน php-amqplib ทุกครั้งที่จะใช้ตามปกติ
- config file ก่อนตามระเบียบ
<?php defined('BASEPATH') or exit('No direct script access allowed'); /* | ------------------------------------------------------------------- | RABBITMQ CONNECTIVITY SETTINGS | ------------------------------------------------------------------- | This file will contain the settings needed to access your RabbitMQ. | | For complete instructions please consult the 'RabbitMQ Connection' | page of the User Guide. | | ------------------------------------------------------------------- | EXPLANATION OF VARIABLES | ------------------------------------------------------------------- | | ['connection_timeout'] => 3.0, | ['context'] => null, | ['heartbeat'] => 0 | ['host'], | ['insist'] => false, | ['keepalive'] => false, | ['locale'] => 'en_US', | ['login_method'] => 'AMQPLAIN', | ['login_response'] => null, | ['password'], | ['port'], | ['read_write_timeout'] => 3.0, | ['user'], | ['vhost'] => '/', */ $config['RabbitMQ']['connects'] = [ 'default' => [ 'connection_timeout' => 3.0, 'context' => null, 'heartbeat' => 0, 'host' => '127.0.0.1', 'insist' => false, 'keepalive' => false, 'locale' => 'en_US', 'login_method' => 'AMQPLAIN', 'login_response' => null, 'password' => 'guest', 'port' => 5672, 'read_write_timeout' => 3.0, 'user' => 'guest', 'vhost' => '/', ], ]; /* | ------------------------------------------------------------------- | RABBITMQ DEBUG SETTINGS | ------------------------------------------------------------------- | */ $config['RabbitMQ']['debug'] = false;
- ถึงคิวไฟล์ driver ที่จะเป็นตัวกลางระหว่าง codeigniter และ php-amqplib
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitMQ extends AMQPStreamConnection { public function __construct($config = array()) { $this->CI = &get_instance(); $this->config = $config['RabbitMQ']; if ($this->config['debug'] == true) { define('AMQP_DEBUG', true); } } public function connect($dsnId = 'default') { $dsn = $this->config['connects'][$dsnId]; $this->connection = new AMQPStreamConnection($dsn['host'], $dsn['port'], $dsn['user'], $dsn['password'], $dsn['vhost'], $dsn['insist'], $dsn['login_method'], $dsn['login_response'], $dsn['locale'], $dsn['connection_timeout'], $dsn['read_write_timeout'], $dsn['context'], $dsn['keepalive'], $dsn['heartbeat']); $this->channel = $this->connection->channel(); } public function disconnect() { $this->channel->close(); $this->connection->close(); } public function setMessageJson($datas) { $msg_body = json_encode($datas); $properties = [ 'content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; return new AMQPMessage($msg_body, $properties); } }
- ที่นี้ก็ controller ที่จะทำงานให้
<?php defined('BASEPATH') or exit('No direct script access allowed'); class Messaging extends CI_Controller { public function __construct() { parent::__construct(); $this->load->driver('RabbitMQ'); } public function receive() { set_time_limit(0); $this->rabbitmq->connect(); $exchange_name = 'customers'; $queue_name = 'invoices'; /** * Declares exchange * * @param string $exchange_name * @param string $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ $this->rabbitmq->channel->exchange_declare($exchange_name, 'fanout', false, true, false); /** * Declares queue, creates if needed * * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare('', false, false, true, false); $this->rabbitmq->channel->queue_bind($queue_name, $exchange_name); $callback = function ($msg) { $datas = json_decode($msg->body, true); fwrite(fopen('RabbitMQReceive.txt', 'a+'), print_r($datas, true)); sleep(substr_count($msg->body, '.')); /* delete message */ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; /** * Starts a queue consumer * * @param string $queue_name * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack * @param bool $exclusive * @param bool $nowait * @param callback|null $callback * @param int|null $ticket * @param array $arguments * @return mixed|string */ $this->rabbitmq->channel->basic_consume($queue_name, '', false, false, false, false, $callback); while (count($this->rabbitmq->channel->callbacks)) { /** * Wait for some expected AMQP methods and dispatch to them. * Unexpected methods are queued up for later calls to this PHP * method. * * @param array $allowed_methods * @param bool $non_blocking * @param int $timeout * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException * @throws \PhpAmqpLib\Exception\AMQPRuntimeException * @return mixed */ $this->rabbitmq->channel->wait(); } $this->rabbitmq->channel->close(); $connection->close(); } public function send() { $this->rabbitmq->connect(); $exchange_name = 'customers'; $queue_name = 'invoices'; /** * Declares exchange * * @param string $exchange_name * @param string $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ $this->rabbitmq->channel->exchange_declare($exchange_name, 'fanout', false, true, false); /** * Declares queue, creates if needed * * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare($queue_name, false, true, false, false); $datas = [ 'rand' => rand(0, 100), 'time' => date('Y-m-d H:m:s'), ]; /** * Declares message * * @param array $datas * @return string */ $msg = $this->rabbitmq->setMessageJson($datas); /** * Publishes a message * * @param AMQPMessage $msg * @param string $exchange * @param string $routing_key * @param bool $mandatory * @param bool $immediate * @param int $ticket */ $this->rabbitmq->channel->basic_publish($msg, $exchange_name, $queue_name); echo '<br><pre>' . print_r($datas, true), '</pre>'; $this->rabbitmq->disconnect(); } }
- ทดสอบโดยเรียก http://localhost/CodeIgniter-3.1.3/messaging/send และ http://localhost/CodeIgniter-3.1.3/messaging/receive