PlusMagi's Blog By Pitt Phunsanit

PHP: ใช้ RabbitMQ / AMQP โดย php-amqplib

ถึงจะติดตั้ง extension amqp ไว้แล้วก็จริงแต่เพราะว่าการติดตั้งอะไรเพิ่มให้ระบบเซิร์ฟเวอร์บริษัทจำเป็นต้องมีการขออนุมัติจากคณะกรรมการซะก่อน เพื่อความสะดวกจึงใช้ php-amqplib/php-amqplib แทนโดยสามารถติดตั้งได้โดยใช้ composer ได้เลย

สร้างส่วน connect กับ RabbitMQ ก่อน

<?php

/* define ('AMQP_DEBUG', true) ; */

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection ('localhost', 5672, 'guest', 'guest') ;
$channel = $connection->channel () ;

ค่าคงที่ define (‘AMQP_DEBUG’, true) ; ใช้เปิด debug mode ของตัว class php-amqplib

การส่งข้อมูล

<?php

include 'RabbitMQConnection.php';

use PhpAmqpLib\Message\AMQPMessage;

$exchange_name = 'customers';
$queue_name = 'invoices';

/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare ($exchange_name, 'fanout', false, true, false) ;

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
list ($queueName, $message_count, $consumer_count) = $channel->queue_declare ($queue_name, false, true, false, false) ;

$datas = [
 'rand' => rand (0, 100) ,
 'time' => date ('Y-m-d H:m:s') ,
];

$msg_body = json_encode ($datas) ;

$properties = [
 'content_type' => 'application/json',
 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];

$msg = new AMQPMessage ($msg_body, $properties) ;

/**
 * Publishes a message
 *
 * @param AMQPMessage $msg
 * @param string $exchange
 * @param string $routing_key
 * @param bool $mandatory
 * @param bool $immediate
 * @param int $ticket
 */
$channel->basic_publish ($msg, $exchange_name, $queue_name) ;

echo '<br>' . $msg_body;

$channel->close () ;
$connection->close () ;

จากนั้นรับ message

<?php

include 'RabbitMQConnection.php';

set_time_limit (0) ;

$exchange_name = 'customers';
$queue_name = 'invoices';

/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare ($exchange_name, 'fanout', false, true, false) ;

/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
list ($queueName, $message_count, $consumer_count) = $channel->queue_declare ('', false, false, true, false) ;
$channel->queue_bind ($queue_name, $exchange_name) ;

$callback = function ($msg) {

 $datas = json_decode ($msg->body, true) ;
 fwrite (fopen ('RabbitMQReceive.txt', 'a+') , print_r ($datas, true)) ;

 sleep (substr_count ($msg->body, '.')) ;

 /* delete message */
 $msg->delivery_info['channel']->basic_ack ($msg->delivery_info['delivery_tag']) ;
};

/**
 * Starts a queue consumer
 *
 * @param string $queue_name
 * @param string $consumer_tag
 * @param bool $no_local
 * @param bool $no_ack
 * @param bool $exclusive
 * @param bool $nowait
 * @param callback|null $callback
 * @param int|null $ticket
 * @param array $arguments
 * @return mixed|string
 */
$channel->basic_consume ($queue_name, '', false, false, false, false, $callback) ;

while (count ($channel->callbacks)) {
 /**
 * Wait for some expected AMQP methods and dispatch to them.
 * Unexpected methods are queued up for later calls to this PHP
 * method.
 *
 * @param array $allowed_methods
 * @param bool $non_blocking
 * @param int $timeout
 * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
 * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
 * @return mixed
 */

 $channel->wait () ;
}

$channel->close () ;
$connection->close () ;

ข้อมูลจะถูกดำเนินการโดย function callback .ในตัวอย่างจะถูกเขียนไว้ใน RabbitMQReceive.txt และถูกลบออกไปโดย การกำหนดค่าให้ delivery_info

หลักการการทำงานคือ

  1. ย้าย process ที่ไม่ต้องทำทันทีไปทำงานทีหลังโดยใช้ การส่ง ตัวแปรที่จำเป็นไปในรูปแบบ message ให้ rabbitmq ทำให้ user ไม่จำเป็นต้องรอและรู้สึกว่าเว็บทำงานให้อย่างรวดเร็ว
  2. message จะถูกเก็บใน queue ที่ชื่อ “invoices”
  3. เมื่อ server ว่าง ก็เรียก message ใน queue ที่เก็บไว้ ดึงข้อมูลออกมา เพื่อนำไปทำงานต่อไป

อ่านเพิ่มเติม

Exit mobile version