ถึงจะติดตั้ง extension amqp ไว้แล้วก็จริงแต่เพราะว่าการติดตั้งอะไรเพิ่มให้ระบบเซิร์ฟเวอร์บริษัทจำเป็นต้องมีการขออนุมัติจากคณะกรรมการซะก่อน เพื่อความสดวกจึงใช้ php-amqplib/php-amqplib แทนโดยสามารถติดตั้งได้โดยใช้ composer ได้เลย
สร้างส่วน connect กับ RabbitMQ ก่อน
<?php
/* define('AMQP_DEBUG', true); */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
ค่าคงที่ define(‘AMQP_DEBUG’, true); ใช้เปิด debug mode ของตัว class php-amqplib
การส่งข้อมูล
<?php
include 'RabbitMQConnection.php';
use PhpAmqpLib\Message\AMQPMessage;
$exchange_name = 'customers';
$queue_name = 'invoices';
/**
* Declares exchange
*
* @param string $exchange_name
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);
/**
* Declares queue, creates if needed
*
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
list($queueName, $message_count, $consumer_count) = $channel->queue_declare($queue_name, false, true, false, false);
$datas = [
'rand' => rand(0, 100),
'time' => date('Y-m-d H:m:s'),
];
$msg_body = json_encode($datas);
$properties = [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
$msg = new AMQPMessage($msg_body, $properties);
/**
* Publishes a message
*
* @param AMQPMessage $msg
* @param string $exchange
* @param string $routing_key
* @param bool $mandatory
* @param bool $immediate
* @param int $ticket
*/
$channel->basic_publish($msg, $exchange_name, $queue_name);
echo '<br>' . $msg_body;
$channel->close();
$connection->close();
จากนั้นรับ message
<?php
include 'RabbitMQConnection.php';
set_time_limit(0);
$exchange_name = 'customers';
$queue_name = 'invoices';
/**
* Declares exchange
*
* @param string $exchange_name
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);
/**
* Declares queue, creates if needed
*
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
list($queueName, $message_count, $consumer_count) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queue_name, $exchange_name);
$callback = function ($msg) {
$datas = json_decode($msg->body, true);
fwrite(fopen('RabbitMQReceive.txt', 'a+'), print_r($datas, true));
sleep(substr_count($msg->body, '.'));
/* delete message */
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
/**
* Starts a queue consumer
*
* @param string $queue_name
* @param string $consumer_tag
* @param bool $no_local
* @param bool $no_ack
* @param bool $exclusive
* @param bool $nowait
* @param callback|null $callback
* @param int|null $ticket
* @param array $arguments
* @return mixed|string
*/
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
/**
* Wait for some expected AMQP methods and dispatch to them.
* Unexpected methods are queued up for later calls to this PHP
* method.
*
* @param array $allowed_methods
* @param bool $non_blocking
* @param int $timeout
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @return mixed
*/
$channel->wait();
}
$channel->close();
$connection->close();
ข้อมูลจะถูกดำเนินการโดย function callback .ในตัวอย่างจะถูกเขียนไว้ใน RabbitMQReceive.txt และถูกลบออกไปโดย การกำหนดค่าให้ delivery_info
หลักการการทำงานคือ
- ย้าย process ที่ไม่ต้องทำทันทีไปทำงานทีหลังโดยใช้ การส่ง ตัวแปรที่จำเป็นไปในรูปแบบ message ให้ rabbitmq ทำให้ user ไม่จำเป็นต้องรอและรู้สึกว่าเว็บทำงานให้อย่างรวดเร็ว
- message จะถูกเก็บใน queue ที่ชื่อ “invoices”
- เมื่อ server ว่าง ก็เรียก message ใน queue ที่เก็บไว้ ดึงข้อมูลออกมา เพื่อนำไปทำงานต่อไป
อ่านเพิ่มเติม