หมวดหมู่: CodeIgniter

CodeIgniter: php-amqplibCodeIgniter: php-amqplib

ลองเขียน custom driver สำหรับติดต่อกับ RabbitMQ / AMQP ยังไม่สมบูรณ์แต่ก็ดีกว่าการที่ต้องมา connect ผ่าน php-amqplib ทุกครั้งที่จะใช้ตามปกติ

  1. config file ก่อนตามระเบียบ
    <?php
    defined ('BASEPATH') or exit ('No direct script access allowed') ;
    
    /*
    | -------------------------------------------------------------------
    | RABBITMQ CONNECTIVITY SETTINGS
    | -------------------------------------------------------------------
    | This file will contain the settings needed to access your RabbitMQ.
    |
    | For complete instructions please consult the 'RabbitMQ Connection'
    | page of the User Guide.
    |
    | -------------------------------------------------------------------
    | EXPLANATION OF VARIABLES
    | -------------------------------------------------------------------
    |
    | ['connection_timeout'] => 3.0,
    | ['context'] => null,
    | ['heartbeat'] => 0
    | ['host'],
    | ['insist'] => false,
    | ['keepalive'] => false,
    | ['locale'] => 'en_US',
    | ['login_method'] => 'AMQPLAIN',
    | ['login_response'] => null,
    | ['password'],
    | ['port'],
    | ['read_write_timeout'] => 3.0,
    | ['user'],
    | ['vhost'] => '/',
     */
    $config['RabbitMQ']['connects'] = [
     'default' => [
     'connection_timeout' => 3.0,
     'context' => null,
     'heartbeat' => 0,
     'host' => '127.0.0.1',
     'insist' => false,
     'keepalive' => false,
     'locale' => 'en_US',
     'login_method' => 'AMQPLAIN',
     'login_response' => null,
     'password' => 'guest',
     'port' => 5672,
     'read_write_timeout' => 3.0,
     'user' => 'guest',
     'vhost' => '/',
     ],
    ];
    
    /*
    | -------------------------------------------------------------------
    | RABBITMQ DEBUG SETTINGS
    | -------------------------------------------------------------------
    |
     */
    $config['RabbitMQ']['debug'] = false;
    
  2. ถึงคิวไฟล์ driver ที่จะเป็นตัวกลางระหว่าง codeigniter และ php-amqplib
    <?php
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    class RabbitMQ extends AMQPStreamConnection
    {
    
     public function __construct ($config = array ()) {
     $this->CI = &get_instance () ;
    
     $this->config = $config['RabbitMQ'];
    
     if ($this->config['debug'] == true) {
     define ('AMQP_DEBUG', true) ;
     }
     }
    
     public function connect ($dsnId = 'default') {
     $dsn = $this->config['connects'][$dsnId];
    
     $this->connection = new AMQPStreamConnection ($dsn['host'], $dsn['port'], $dsn['user'], $dsn['password'], $dsn['vhost'], $dsn['insist'], $dsn['login_method'], $dsn['login_response'], $dsn['locale'], $dsn['connection_timeout'], $dsn['read_write_timeout'], $dsn['context'], $dsn['keepalive'], $dsn['heartbeat']) ;
    
     $this->channel = $this->connection->channel () ;
     }
    
     public function disconnect () {
     $this->channel->close () ;
     $this->connection->close () ;
     }
    
     public function setMessageJson ($datas) {
     $msg_body = json_encode ($datas) ;
    
     $properties = [
     'content_type' => 'application/json',
     'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
     ];
    
     return new AMQPMessage ($msg_body, $properties) ;
     }
    
    }
  3. ที่นี้ก็ controller ที่จะทำงานให้
    <?php
    defined ('BASEPATH') or exit ('No direct script access allowed') ;
    
    class Messaging extends CI_Controller
    {
     public function __construct () {
     parent::__construct () ;
    
     $this->load->driver ('RabbitMQ') ;
     }
    
     public function receive () {
     set_time_limit (0) ;
    
     $this->rabbitmq->connect () ;
    
     $exchange_name = 'customers';
     $queue_name = 'invoices';
    
    /**
     * Declares exchange
     *
     * @param string $exchange_name
     * @param string $type
     * @param bool $passive
     * @param bool $durable
     * @param bool $auto_delete
     * @param bool $internal
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
     $this->rabbitmq->channel->exchange_declare ($exchange_name, 'fanout', false, true, false) ;
    
    /**
     * Declares queue, creates if needed
     *
     * @param string $queue
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param bool $auto_delete
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
     list ($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare ('', false, false, true, false) ;
     $this->rabbitmq->channel->queue_bind ($queue_name, $exchange_name) ;
    
     $callback = function ($msg) {
    
     $datas = json_decode ($msg->body, true) ;
     fwrite (fopen ('RabbitMQReceive.txt', 'a+') , print_r ($datas, true)) ;
    
     sleep (substr_count ($msg->body, '.')) ;
    
     /* delete message */
     $msg->delivery_info['channel']->basic_ack ($msg->delivery_info['delivery_tag']) ;
     };
    
    /**
     * Starts a queue consumer
     *
     * @param string $queue_name
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack
     * @param bool $exclusive
     * @param bool $nowait
     * @param callback|null $callback
     * @param int|null $ticket
     * @param array $arguments
     * @return mixed|string
     */
     $this->rabbitmq->channel->basic_consume ($queue_name, '', false, false, false, false, $callback) ;
    
     while (count ($this->rabbitmq->channel->callbacks)) {
    /**
     * Wait for some expected AMQP methods and dispatch to them.
     * Unexpected methods are queued up for later calls to this PHP
     * method.
     *
     * @param array $allowed_methods
     * @param bool $non_blocking
     * @param int $timeout
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
     * @return mixed
     */
     $this->rabbitmq->channel->wait () ;
     }
    
     $this->rabbitmq->channel->close () ;
     $connection->close () ;
     }
    
     public function send () {
     $this->rabbitmq->connect () ;
    
     $exchange_name = 'customers';
     $queue_name = 'invoices';
    
    /**
     * Declares exchange
     *
     * @param string $exchange_name
     * @param string $type
     * @param bool $passive
     * @param bool $durable
     * @param bool $auto_delete
     * @param bool $internal
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
     $this->rabbitmq->channel->exchange_declare ($exchange_name, 'fanout', false, true, false) ;
    
    /**
     * Declares queue, creates if needed
     *
     * @param string $queue
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param bool $auto_delete
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
     list ($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare ($queue_name, false, true, false, false) ;
    
     $datas = [
     'rand' => rand (0, 100) ,
     'time' => date ('Y-m-d H:m:s') ,
     ];
    
    /**
     * Declares message
     *
     * @param array $datas
     * @return string
     */
     $msg = $this->rabbitmq->setMessageJson ($datas) ;
    
    /**
     * Publishes a message
     *
     * @param AMQPMessage $msg
     * @param string $exchange
     * @param string $routing_key
     * @param bool $mandatory
     * @param bool $immediate
     * @param int $ticket
     */
     $this->rabbitmq->channel->basic_publish ($msg, $exchange_name, $queue_name) ;
    
     echo '<br><pre>' . print_r ($datas, true) , '</pre>';
    
     $this->rabbitmq->disconnect () ;
    
     }
    
    }
    
  4. ทดสอบโดยเรียก http://localhost/CodeIgniter-3.1.3/messaging/send และ http://localhost/CodeIgniter-3.1.3/messaging/receive