Category Archive RabbitMQ

Byphunsanit

CodeIgniter: php-amqplib

ลองเขียน custom driver สำหรับติดต่อกับ RabbitMQ / AMQP ยังไม่สมบูรณ์แต่ก็ดีกว่าการที่ต้องมา connect ผ่าน php-amqplib ทุกครั้งที่จะใช้ตามปกติ

  1. config file ก่อนตามระเบียบ
    <?php
    defined('BASEPATH') or exit('No direct script access allowed');
    
    /*
    | -------------------------------------------------------------------
    | RABBITMQ CONNECTIVITY SETTINGS
    | -------------------------------------------------------------------
    | This file will contain the settings needed to access your RabbitMQ.
    |
    | For complete instructions please consult the 'RabbitMQ Connection'
    | page of the User Guide.
    |
    | -------------------------------------------------------------------
    | EXPLANATION OF VARIABLES
    | -------------------------------------------------------------------
    |
    |    ['connection_timeout'] => 3.0,
    |    ['context'] => null,
    |    ['heartbeat'] => 0
    |    ['host'],
    |    ['insist'] => false,
    |    ['keepalive'] => false,
    |    ['locale'] => 'en_US',
    |    ['login_method'] => 'AMQPLAIN',
    |    ['login_response'] => null,
    |    ['password'],
    |    ['port'],
    |    ['read_write_timeout'] => 3.0,
    |    ['user'],
    |    ['vhost'] => '/',
     */
    $config['RabbitMQ']['connects'] = [
        'default' => [
            'connection_timeout' => 3.0,
            'context' => null,
            'heartbeat' => 0,
            'host' => '127.0.0.1',
            'insist' => false,
            'keepalive' => false,
            'locale' => 'en_US',
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'password' => 'guest',
            'port' => 5672,
            'read_write_timeout' => 3.0,
            'user' => 'guest',
            'vhost' => '/',
        ],
    ];
    
    /*
    | -------------------------------------------------------------------
    | RABBITMQ DEBUG SETTINGS
    | -------------------------------------------------------------------
    |
     */
    $config['RabbitMQ']['debug'] = false;
    
  2. ถึงคิวไฟล์ driver ที่จะเป็นตัวกลางระหว่าง codeigniter และ php-amqplib
    <?php
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    class RabbitMQ extends AMQPStreamConnection
    {
    
        public function __construct($config = array())
        {
            $this->CI = &get_instance();
    
            $this->config = $config['RabbitMQ'];
    
            if ($this->config['debug'] == true) {
                define('AMQP_DEBUG', true);
            }
        }
    
        public function connect($dsnId = 'default') {
            $dsn = $this->config['connects'][$dsnId];
    
            $this->connection = new AMQPStreamConnection($dsn['host'], $dsn['port'], $dsn['user'], $dsn['password'], $dsn['vhost'], $dsn['insist'], $dsn['login_method'], $dsn['login_response'], $dsn['locale'], $dsn['connection_timeout'], $dsn['read_write_timeout'], $dsn['context'], $dsn['keepalive'], $dsn['heartbeat']);
    
            $this->channel = $this->connection->channel();
        }
    
        public function disconnect()
        {
            $this->channel->close();
            $this->connection->close();
        }
    
        public function setMessageJson($datas)
        {
            $msg_body = json_encode($datas);
    
            $properties = [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ];
    
            return new AMQPMessage($msg_body, $properties);
        }
    
    }
  3. ที่นี้ก็ controller ที่จะทำงานให้
    <?php
    defined('BASEPATH') or exit('No direct script access allowed');
    
    class Messaging extends CI_Controller
    {
        public function __construct()
        {
            parent::__construct();
    
            $this->load->driver('RabbitMQ');
        }
    
        public function receive()
        {
            set_time_limit(0);
    
            $this->rabbitmq->connect();
    
            $exchange_name = 'customers';
            $queue_name = 'invoices';
    
    /**
     * Declares exchange
     *
     * @param string $exchange_name
     * @param string $type
     * @param bool $passive
     * @param bool $durable
     * @param bool $auto_delete
     * @param bool $internal
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            $this->rabbitmq->channel->exchange_declare($exchange_name, 'fanout', false, true, false);
    
    /**
     * Declares queue, creates if needed
     *
     * @param string $queue
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param bool $auto_delete
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare('', false, false, true, false);
            $this->rabbitmq->channel->queue_bind($queue_name, $exchange_name);
    
            $callback = function ($msg) {
    
                $datas = json_decode($msg->body, true);
                fwrite(fopen('RabbitMQReceive.txt', 'a+'), print_r($datas, true));
    
                sleep(substr_count($msg->body, '.'));
    
                /* delete message */
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            };
    
    /**
     * Starts a queue consumer
     *
     * @param string $queue_name
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack
     * @param bool $exclusive
     * @param bool $nowait
     * @param callback|null $callback
     * @param int|null $ticket
     * @param array $arguments
     * @return mixed|string
     */
            $this->rabbitmq->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
    
            while (count($this->rabbitmq->channel->callbacks)) {
    /**
     * Wait for some expected AMQP methods and dispatch to them.
     * Unexpected methods are queued up for later calls to this PHP
     * method.
     *
     * @param array $allowed_methods
     * @param bool $non_blocking
     * @param int $timeout
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
     * @return mixed
     */
                $this->rabbitmq->channel->wait();
            }
    
            $this->rabbitmq->channel->close();
            $connection->close();
        }
    
        public function send()
        {
            $this->rabbitmq->connect();
    
            $exchange_name = 'customers';
            $queue_name = 'invoices';
    
    /**
     * Declares exchange
     *
     * @param string $exchange_name
     * @param string $type
     * @param bool $passive
     * @param bool $durable
     * @param bool $auto_delete
     * @param bool $internal
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            $this->rabbitmq->channel->exchange_declare($exchange_name, 'fanout', false, true, false);
    
    /**
     * Declares queue, creates if needed
     *
     * @param string $queue
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param bool $auto_delete
     * @param bool $nowait
     * @param array $arguments
     * @param int $ticket
     * @return mixed|null
     */
            list($queueName, $message_count, $consumer_count) = $this->rabbitmq->channel->queue_declare($queue_name, false, true, false, false);
    
            $datas = [
                'rand' => rand(0, 100),
                'time' => date('Y-m-d H:m:s'),
            ];
    
    /**
     * Declares message
     *
     * @param array $datas
     * @return string
     */
            $msg = $this->rabbitmq->setMessageJson($datas);
    
    /**
     * Publishes a message
     *
     * @param AMQPMessage $msg
     * @param string $exchange
     * @param string $routing_key
     * @param bool $mandatory
     * @param bool $immediate
     * @param int $ticket
     */
            $this->rabbitmq->channel->basic_publish($msg, $exchange_name, $queue_name);
    
            echo '<br><pre>' . print_r($datas, true), '</pre>';
    
            $this->rabbitmq->disconnect();
    
        }
    
    }
    
  4. ทดสอบโดยเรียก http://localhost/CodeIgniter-3.1.3/messaging/send และ http://localhost/CodeIgniter-3.1.3/messaging/receive
Byphunsanit

PHP: php-amqplib แบบต่อเนื่อง

ในบาง page เราอาจจะต้องการที่จะใช้ queuing หลายตัว แทนที่จะ connect / disconnect หลายๆครั้ง เราสามารถใช้ batch / bulk ในการส่งช้อมูลให้ rabbitmq ในครั้งเดียวได้

<?php

include 'RabbitMQConnection.php';

use PhpAmqpLib\Message\AMQPMessage;

$exchange_name = 'customers';
$queue_name = 'invoices';

/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
 list($queueName, $message_count, $consumer_count) = $channel->queue_declare($queue_name, false, true, false, false);

$properties = [
    'content_type' => 'application/json',
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];

for ($a = 1; $a <= 1000; $a++) {
    $datas = [
        'id' => str_pad($a, 4, '0', STR_PAD_LEFT),
        'rand' => rand(0, 100),
        'time' => date('Y-m-d H:m:s'),
    ];

	$msg_body = json_encode($datas);

	$msg = new AMQPMessage($msg_body, $properties);

	/**
	 * Publishes a message
	 *
	 * @param AMQPMessage $msg
	 * @param string $exchange
	 * @param string $routing_key
	 * @param bool $mandatory
	 * @param bool $immediate
	 * @param int $ticket
	 */
	$channel->basic_publish($msg, $exchange_name, $queue_name);

    /**
     * Publishes a message
     *
     * @param AMQPMessage $msg
     * @param string $exchange
     * @param string $routing_key
     * @param bool $mandatory
     * @param bool $immediate
     * @param int $ticket
     */
    $channel->basic_publish($msg, $exchange_name, $queue_name);

    echo '<br>' . $msg_body;
}

/**
 * Publish batch
 *
 * @return void
 */
$channel->publish_batch();

$channel->close();
$connection->close();

อ่านเพิ่มเติม

Byphunsanit

PHP: ใช้ RabbitMQ / AMQP โดย php-amqplib

ถึงจะติดตั้ง extension amqp ไว้แล้วก็จริงแต่เพราะว่าการติดตั้งอะไรเพิ่มให้ระบบเซิร์ฟเวอร์บริษัทจำเป็นต้องมีการขออนุมัติจากคณะกรรมการซะก่อน เพื่อความสดวกจึงใช้ php-amqplib/php-amqplib แทนโดยสามารถติดตั้งได้โดยใช้ composer ได้เลย

สร้างส่วน connect กับ RabbitMQ ก่อน

<?php

/* define('AMQP_DEBUG', true); */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

ค่าคงที่ define(‘AMQP_DEBUG’, true); ใช้เปิด debug mode ของตัว class php-amqplib

การส่งข้อมูล

<?php

include 'RabbitMQConnection.php';

use PhpAmqpLib\Message\AMQPMessage;

$exchange_name = 'customers';
$queue_name = 'invoices';

/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
list($queueName, $message_count, $consumer_count) = $channel->queue_declare($queue_name, false, true, false, false);

$datas = [
    'rand' => rand(0, 100),
    'time' => date('Y-m-d H:m:s'),
];

$msg_body = json_encode($datas);

$properties = [
    'content_type' => 'application/json',
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];

$msg = new AMQPMessage($msg_body, $properties);

/**
 * Publishes a message
 *
 * @param AMQPMessage $msg
 * @param string $exchange
 * @param string $routing_key
 * @param bool $mandatory
 * @param bool $immediate
 * @param int $ticket
 */
$channel->basic_publish($msg, $exchange_name, $queue_name);

echo '<br>' . $msg_body;

$channel->close();
$connection->close();

จากนั้นรับ message

<?php

include 'RabbitMQConnection.php';

set_time_limit(0);

$exchange_name = 'customers';
$queue_name = 'invoices';

/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
list($queueName, $message_count, $consumer_count) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queue_name, $exchange_name);

$callback = function ($msg) {

    $datas = json_decode($msg->body, true);
    fwrite(fopen('RabbitMQReceive.txt', 'a+'), print_r($datas, true));

    sleep(substr_count($msg->body, '.'));

    /* delete message */
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

/**
 * Starts a queue consumer
 *
 * @param string $queue_name
 * @param string $consumer_tag
 * @param bool $no_local
 * @param bool $no_ack
 * @param bool $exclusive
 * @param bool $nowait
 * @param callback|null $callback
 * @param int|null $ticket
 * @param array $arguments
 * @return mixed|string
 */
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
    /**
     * Wait for some expected AMQP methods and dispatch to them.
     * Unexpected methods are queued up for later calls to this PHP
     * method.
     *
     * @param array $allowed_methods
     * @param bool $non_blocking
     * @param int $timeout
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
     * @return mixed
     */

    $channel->wait();
}

$channel->close();
$connection->close();

ข้อมูลจะถูกดำเนินการโดย function callback .ในตัวอย่างจะถูกเขียนไว้ใน RabbitMQReceive.txt และถูกลบออกไปโดย การกำหนดค่าให้ delivery_info

หลักการการทำงานคือ

  1. ย้าย process ที่ไม่ต้องทำทันทีไปทำงานทีหลังโดยใช้ การส่ง ตัวแปรที่จำเป็นไปในรูปแบบ message ให้ rabbitmq ทำให้ user ไม่จำเป็นต้องรอและรู้สึกว่าเว็บทำงานให้อย่างรวดเร็ว
  2. message จะถูกเก็บใน queue ที่ชื่อ “invoices”
  3. เมื่อ server ว่าง ก็เรียก message ใน queue ที่เก็บไว้ ดึงข้อมูลออกมา เพื่อนำไปทำงานต่อไป

อ่านเพิ่มเติม

Byphunsanit

PHP: ติดตั้ง RabbitMQ / AMQP Driver

ที่ทำงานนำ RabbitMQ มาทำระบบ messaging middleware เพื่อที่จะไม่ต้องให้ลูกค้ารอการทำงานเบื้องหลังบางอย่าง โดยเก็บงานที่ไม่เร่งด่วนไว้ทำในภายหลัง

ตัว RabbitMQ เป็นหนึ่งในโปรแกรม Messaging ที่ใช้มาตราฐาน AMQP: Advanced Message Queuing Protocol หรือ ISO/IEC 19464:2014 โปรแกรมตัวอื่นๆ เช่น Apache Qpid และ OpenAMQP

การติดตั้ง

  1. download ตัว driver มาจากเว็บ PECL: amqp โดยเลือกให้ตรงกับ version ของ php
  2. แตกไฟล์ php_amqp.dll ไปไว้ที่ extension_dir เช่น C:\wamp64\bin\php\php5.6.25\ext
  3. แตกไฟล์ rabbitmq.1.dllไปไว้ที่ C:\Windows\System ระวัง! ไม่ใช่ System32 นะครับ
  4. แก้ไฟล์ C:\wamp64\bin\apache\apache2.4.23\bin\php.ini เพิ่ม extension=php_amqp.dll
  5. restart apache ใหม่
  6. เปิด phpinfo ขึ้นมาหา amqp ถ้ามีก็แสดงว่าติดตั้ง extension amqp สำเร็จ

อ่านเพิ่มเติม