ถึงจะติดตั้ง extension amqp ไว้แล้วก็จริงแต่เพราะว่าการติดตั้งอะไรเพิ่มให้ระบบเซิร์ฟเวอร์บริษัทจำเป็นต้องมีการขออนุมัติจากคณะกรรมการซะก่อน เพื่อความสะดวกจึงใช้ php-amqplib/php-amqplib แทนโดยสามารถติดตั้งได้โดยใช้ composer ได้เลย
สร้างส่วน connect กับ RabbitMQ ก่อน
<?php
/* define ('AMQP_DEBUG', true) ; */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection ('localhost', 5672, 'guest', 'guest') ;
$channel = $connection->channel () ;
ค่าคงที่ define (‘AMQP_DEBUG’, true) ; ใช้เปิด debug mode ของตัว class php-amqplib
การส่งข้อมูล
<?php
include 'RabbitMQConnection.php';
use PhpAmqpLib\Message\AMQPMessage;
$exchange_name = 'customers';
$queue_name = 'invoices';
/**
* Declares exchange
*
* @param string $exchange_name
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
$channel->exchange_declare ($exchange_name, 'fanout', false, true, false) ;
/**
* Declares queue, creates if needed
*
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
list ($queueName, $message_count, $consumer_count) = $channel->queue_declare ($queue_name, false, true, false, false) ;
$datas = [
'rand' => rand (0, 100) ,
'time' => date ('Y-m-d H:m:s') ,
];
$msg_body = json_encode ($datas) ;
$properties = [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
$msg = new AMQPMessage ($msg_body, $properties) ;
/**
* Publishes a message
*
* @param AMQPMessage $msg
* @param string $exchange
* @param string $routing_key
* @param bool $mandatory
* @param bool $immediate
* @param int $ticket
*/
$channel->basic_publish ($msg, $exchange_name, $queue_name) ;
echo '<br>' . $msg_body;
$channel->close () ;
$connection->close () ;
จากนั้นรับ message
<?php
include 'RabbitMQConnection.php';
set_time_limit (0) ;
$exchange_name = 'customers';
$queue_name = 'invoices';
/**
* Declares exchange
*
* @param string $exchange_name
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
$channel->exchange_declare ($exchange_name, 'fanout', false, true, false) ;
/**
* Declares queue, creates if needed
*
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
list ($queueName, $message_count, $consumer_count) = $channel->queue_declare ('', false, false, true, false) ;
$channel->queue_bind ($queue_name, $exchange_name) ;
$callback = function ($msg) {
$datas = json_decode ($msg->body, true) ;
fwrite (fopen ('RabbitMQReceive.txt', 'a+') , print_r ($datas, true)) ;
sleep (substr_count ($msg->body, '.')) ;
/* delete message */
$msg->delivery_info['channel']->basic_ack ($msg->delivery_info['delivery_tag']) ;
};
/**
* Starts a queue consumer
*
* @param string $queue_name
* @param string $consumer_tag
* @param bool $no_local
* @param bool $no_ack
* @param bool $exclusive
* @param bool $nowait
* @param callback|null $callback
* @param int|null $ticket
* @param array $arguments
* @return mixed|string
*/
$channel->basic_consume ($queue_name, '', false, false, false, false, $callback) ;
while (count ($channel->callbacks)) {
/**
* Wait for some expected AMQP methods and dispatch to them.
* Unexpected methods are queued up for later calls to this PHP
* method.
*
* @param array $allowed_methods
* @param bool $non_blocking
* @param int $timeout
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @return mixed
*/
$channel->wait () ;
}
$channel->close () ;
$connection->close () ;
ข้อมูลจะถูกดำเนินการโดย function callback .ในตัวอย่างจะถูกเขียนไว้ใน RabbitMQReceive.txt และถูกลบออกไปโดย การกำหนดค่าให้ delivery_info
หลักการการทำงานคือ
- ย้าย process ที่ไม่ต้องทำทันทีไปทำงานทีหลังโดยใช้ การส่ง ตัวแปรที่จำเป็นไปในรูปแบบ message ให้ rabbitmq ทำให้ user ไม่จำเป็นต้องรอและรู้สึกว่าเว็บทำงานให้อย่างรวดเร็ว
- message จะถูกเก็บใน queue ที่ชื่อ “invoices”
- เมื่อ server ว่าง ก็เรียก message ใน queue ที่เก็บไว้ ดึงข้อมูลออกมา เพื่อนำไปทำงานต่อไป
อ่านเพิ่มเติม