ลองเขียน custom driver สำหรับติดต่อกับ RabbitMQ / AMQP ยังไม่สมบูรณ์แต่ก็ดีกว่าการที่ต้องมา connect ผ่าน php-amqplib ทุกครั้งที่จะใช้ตามปกติ

  1. config file ก่อนตามระเบียบ[code language=”php” title=”\application\config\RabbitMQ.php”]<?php
    defined(‘BASEPATH’) or exit(‘No direct script access allowed’);

    /*
    | ——————————————————————-
    | RABBITMQ CONNECTIVITY SETTINGS
    | ——————————————————————-
    | This file will contain the settings needed to access your RabbitMQ.
    |
    | For complete instructions please consult the ‘RabbitMQ Connection’
    | page of the User Guide.
    |
    | ——————————————————————-
    | EXPLANATION OF VARIABLES
    | ——————————————————————-
    |
    | [‘connection_timeout’] => 3.0,
    | [‘context’] => null,
    | [‘heartbeat’] => 0
    | [‘host’],
    | [‘insist’] => false,
    | [‘keepalive’] => false,
    | [‘locale’] => ‘en_US’,
    | [‘login_method’] => ‘AMQPLAIN’,
    | [‘login_response’] => null,
    | [‘password’],
    | [‘port’],
    | [‘read_write_timeout’] => 3.0,
    | [‘user’],
    | [‘vhost’] => ‘/’,
    */
    $config[‘RabbitMQ’][‘connects’] = [
    ‘default’ => [
    ‘connection_timeout’ => 3.0,
    ‘context’ => null,
    ‘heartbeat’ => 0,
    ‘host’ => ‘127.0.0.1’,
    ‘insist’ => false,
    ‘keepalive’ => false,
    ‘locale’ => ‘en_US’,
    ‘login_method’ => ‘AMQPLAIN’,
    ‘login_response’ => null,
    ‘password’ => ‘guest’,
    ‘port’ => 5672,
    ‘read_write_timeout’ => 3.0,
    ‘user’ => ‘guest’,
    ‘vhost’ => ‘/’,
    ],
    ];

    /*
    | ——————————————————————-
    | RABBITMQ DEBUG SETTINGS
    | ——————————————————————-
    |
    */
    $config[‘RabbitMQ’][‘debug’] = false;
    [/code]

  2. ถึงคิวไฟล์ driver ที่จะเป็นตัวกลางระหว่าง codeigniter และ php-amqplib[code language=”php” title=”\application\libraries\RabbitMQ\RabbitMQ.php”]<?php
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    class RabbitMQ extends AMQPStreamConnection
    {

    public function __construct($config = array())
    {
    $this->CI = &get_instance();

    $this->config = $config[‘RabbitMQ’];

    if ($this->config[‘debug’] == true) {
    define(‘AMQP_DEBUG’, true);
    }
    }

    public function connect($dsnId = ‘default’) {
    $dsn = $this->config[‘connects’][$dsnId];

    $this->connection = new AMQPStreamConnection($dsn[‘host’], $dsn[‘port’], $dsn[‘user’], $dsn[‘password’], $dsn[‘vhost’], $dsn[‘insist’], $dsn[‘login_method’], $dsn[‘login_response’], $dsn[‘locale’], $dsn[‘connection_timeout’], $dsn[‘read_write_timeout’], $dsn[‘context’], $dsn[‘keepalive’], $dsn[‘heartbeat’]);

    $this->channel = $this->connection->channel();
    }

    public function disconnect()
    {
    $this->channel->close();
    $this->connection->close();
    }

    public function setMessageJson($datas)
    {
    $msg_body = json_encode($datas);

    $properties = [
    ‘content_type’ => ‘application/json’,
    ‘delivery_mode’ => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    ];

    return new AMQPMessage($msg_body, $properties);
    }

    }[/code]

  3. ที่นี้ก็ controller ที่จะทำงานให้[code language=”php” title=”\application\controllers\Messaging.php”]<?php
    defined(‘BASEPATH’) or exit(‘No direct script access allowed’);

    class Messaging extends CI_Controller
    {
    public function __construct()
    {
    parent::__construct();

    $this->load->driver(‘RabbitMQ’);
    }

    public function receive()
    {
    set_time_limit(0);

    $this->rabbitmq->connect();

    $exchange_name = ‘customers’;
    $queue_name = ‘invoices’;

    /**
    * Declares exchange
    *
    * @param string $exchange_name
    * @param string $type
    * @param bool $passive
    * @param bool $durable
    * @param bool $auto_delete
    * @param bool $internal
    * @param bool $nowait
    * @param array $arguments
    * @param int $ticket
    * @return mixed|null
    */
    $this->rabbitmq->channel->exchange_declare($exchange_name, ‘fanout’, false, true, false);

    /**
    * Declares queue, creates if needed
    *
    * @param string $queue
    * @param bool $passive
    * @param bool $durable
    * @param bool $exclusive
    * @param bool $auto_delete
    * @param bool $nowait
    * @param array $arguments
    * @param int $ticket
    * @return mixed|null
    */
    list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare(”, false, false, true, false);
    $this->rabbitmq->channel->queue_bind($queue_name, $exchange_name);

    $callback = function ($msg) {

    $datas = json_decode($msg->body, true);
    fwrite(fopen(‘RabbitMQReceive.txt’, ‘a+’), print_r($datas, true));

    sleep(substr_count($msg->body, ‘.’));

    /* delete message */
    $msg->delivery_info[‘channel’]->basic_ack($msg->delivery_info[‘delivery_tag’]);
    };

    /**
    * Starts a queue consumer
    *
    * @param string $queue_name
    * @param string $consumer_tag
    * @param bool $no_local
    * @param bool $no_ack
    * @param bool $exclusive
    * @param bool $nowait
    * @param callback|null $callback
    * @param int|null $ticket
    * @param array $arguments
    * @return mixed|string
    */
    $this->rabbitmq->channel->basic_consume($queue_name, ”, false, false, false, false, $callback);

    while (count($this->rabbitmq->channel->callbacks)) {
    /**
    * Wait for some expected AMQP methods and dispatch to them.
    * Unexpected methods are queued up for later calls to this PHP
    * method.
    *
    * @param array $allowed_methods
    * @param bool $non_blocking
    * @param int $timeout
    * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
    * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
    * @return mixed
    */
    $this->rabbitmq->channel->wait();
    }

    $this->rabbitmq->channel->close();
    $connection->close();
    }

    public function send()
    {
    $this->rabbitmq->connect();

    $exchange_name = ‘customers’;
    $queue_name = ‘invoices’;

    /**
    * Declares exchange
    *
    * @param string $exchange_name
    * @param string $type
    * @param bool $passive
    * @param bool $durable
    * @param bool $auto_delete
    * @param bool $internal
    * @param bool $nowait
    * @param array $arguments
    * @param int $ticket
    * @return mixed|null
    */
    $this->rabbitmq->channel->exchange_declare($exchange_name, ‘fanout’, false, true, false);

    /**
    * Declares queue, creates if needed
    *
    * @param string $queue
    * @param bool $passive
    * @param bool $durable
    * @param bool $exclusive
    * @param bool $auto_delete
    * @param bool $nowait
    * @param array $arguments
    * @param int $ticket
    * @return mixed|null
    */
    list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare($queue_name, false, true, false, false);

    $datas = [
    ‘rand’ => rand(0, 100),
    ‘time’ => date(‘Y-m-d H:m:s’),
    ];

    /**
    * Declares message
    *
    * @param array $datas
    * @return string
    */
    $msg = $this->rabbitmq->setMessageJson($datas);

    /**
    * Publishes a message
    *
    * @param AMQPMessage $msg
    * @param string $exchange
    * @param string $routing_key
    * @param bool $mandatory
    * @param bool $immediate
    * @param int $ticket
    */
    $this->rabbitmq->channel->basic_publish($msg, $exchange_name, $queue_name);

    echo ‘<br><pre>’ . print_r($datas, true), ‘</pre>’;

    $this->rabbitmq->disconnect();

    }

    }
    [/code]

  4. ทดสอบโดยเรียก http://localhost/CodeIgniter-3.1.3/messaging/send และ http://localhost/CodeIgniter-3.1.3/messaging/receive