Category Archive Database

Byphunsanit

PHP: ใช้ RabbitMQ / AMQP โดย php-amqplib

ถึงจะติดตั้ง extension amqp ไว้แล้วก็จริงแต่เพราะว่าการติดตั้งอะไรเพิ่มให้ระบบเซิร์ฟเวอร์บริษัทจำเป็นต้องมีการขออนุมัติจากคณะกรรมการซะก่อน เพื่อความสะดวกจึงใช้ php-amqplib/php-amqplib แทนโดยสามารถติดตั้งได้โดยใช้ composer ได้เลย

สร้างส่วน connect กับ RabbitMQ ก่อน

<?php

/* define('AMQP_DEBUG', true); */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

ค่าคงที่ define(‘AMQP_DEBUG’, true); ใช้เปิด debug mode ของตัว class php-amqplib

การส่งข้อมูล

<?php

include 'RabbitMQConnection.php';

use PhpAmqpLib\Message\AMQPMessage;

$exchange_name = 'customers';
$queue_name = 'invoices';

/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
list($queueName, $message_count, $consumer_count) = $channel->queue_declare($queue_name, false, true, false, false);

$datas = [
    'rand' => rand(0, 100),
    'time' => date('Y-m-d H:m:s'),
];

$msg_body = json_encode($datas);

$properties = [
    'content_type' => 'application/json',
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];

$msg = new AMQPMessage($msg_body, $properties);

/**
 * Publishes a message
 *
 * @param AMQPMessage $msg
 * @param string $exchange
 * @param string $routing_key
 * @param bool $mandatory
 * @param bool $immediate
 * @param int $ticket
 */
$channel->basic_publish($msg, $exchange_name, $queue_name);

echo '<br>' . $msg_body;

$channel->close();
$connection->close();

จากนั้นรับ message

<?php

include 'RabbitMQConnection.php';

set_time_limit(0);

$exchange_name = 'customers';
$queue_name = 'invoices';

/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
list($queueName, $message_count, $consumer_count) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queue_name, $exchange_name);

$callback = function ($msg) {

    $datas = json_decode($msg->body, true);
    fwrite(fopen('RabbitMQReceive.txt', 'a+'), print_r($datas, true));

    sleep(substr_count($msg->body, '.'));

    /* delete message */
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

/**
 * Starts a queue consumer
 *
 * @param string $queue_name
 * @param string $consumer_tag
 * @param bool $no_local
 * @param bool $no_ack
 * @param bool $exclusive
 * @param bool $nowait
 * @param callback|null $callback
 * @param int|null $ticket
 * @param array $arguments
 * @return mixed|string
 */
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
    /**
     * Wait for some expected AMQP methods and dispatch to them.
     * Unexpected methods are queued up for later calls to this PHP
     * method.
     *
     * @param array $allowed_methods
     * @param bool $non_blocking
     * @param int $timeout
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
     * @return mixed
     */

    $channel->wait();
}

$channel->close();
$connection->close();

ข้อมูลจะถูกดำเนินการโดย function callback .ในตัวอย่างจะถูกเขียนไว้ใน RabbitMQReceive.txt และถูกลบออกไปโดย การกำหนดค่าให้ delivery_info

หลักการการทำงานคือ

  1. ย้าย process ที่ไม่ต้องทำทันทีไปทำงานทีหลังโดยใช้ การส่ง ตัวแปรที่จำเป็นไปในรูปแบบ message ให้ rabbitmq ทำให้ user ไม่จำเป็นต้องรอและรู้สึกว่าเว็บทำงานให้อย่างรวดเร็ว
  2. message จะถูกเก็บใน queue ที่ชื่อ “invoices”
  3. เมื่อ server ว่าง ก็เรียก message ใน queue ที่เก็บไว้ ดึงข้อมูลออกมา เพื่อนำไปทำงานต่อไป

อ่านเพิ่มเติม