ถึงจะติดตั้ง extension amqp ไว้แล้วก็จริงแต่เพราะว่าการติดตั้งอะไรเพิ่มให้ระบบเซิร์ฟเวอร์บริษัทจำเป็นต้องมีการขออนุมัติจากคณะกรรมการซะก่อน เพื่อความสะดวกจึงใช้ php-amqplib/php-amqplib แทนโดยสามารถติดตั้งได้โดยใช้ composer ได้เลย
สร้างส่วน connect กับ RabbitMQ ก่อน
<?php /* define('AMQP_DEBUG', true); */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();
ค่าคงที่ define(‘AMQP_DEBUG’, true); ใช้เปิด debug mode ของตัว class php-amqplib
การส่งข้อมูล
<?php include 'RabbitMQConnection.php'; use PhpAmqpLib\Message\AMQPMessage; $exchange_name = 'customers'; $queue_name = 'invoices'; /** * Declares exchange * * @param string $exchange_name * @param string $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ $channel->exchange_declare($exchange_name, 'fanout', false, true, false); /** * Declares queue, creates if needed * * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ list($queueName, $message_count, $consumer_count) = $channel->queue_declare($queue_name, false, true, false, false); $datas = [ 'rand' => rand(0, 100), 'time' => date('Y-m-d H:m:s'), ]; $msg_body = json_encode($datas); $properties = [ 'content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; $msg = new AMQPMessage($msg_body, $properties); /** * Publishes a message * * @param AMQPMessage $msg * @param string $exchange * @param string $routing_key * @param bool $mandatory * @param bool $immediate * @param int $ticket */ $channel->basic_publish($msg, $exchange_name, $queue_name); echo '<br>' . $msg_body; $channel->close(); $connection->close();
จากนั้นรับ message
<?php include 'RabbitMQConnection.php'; set_time_limit(0); $exchange_name = 'customers'; $queue_name = 'invoices'; /** * Declares exchange * * @param string $exchange_name * @param string $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ $channel->exchange_declare($exchange_name, 'fanout', false, true, false); /** * Declares queue, creates if needed * * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ list($queueName, $message_count, $consumer_count) = $channel->queue_declare('', false, false, true, false); $channel->queue_bind($queue_name, $exchange_name); $callback = function ($msg) { $datas = json_decode($msg->body, true); fwrite(fopen('RabbitMQReceive.txt', 'a+'), print_r($datas, true)); sleep(substr_count($msg->body, '.')); /* delete message */ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; /** * Starts a queue consumer * * @param string $queue_name * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack * @param bool $exclusive * @param bool $nowait * @param callback|null $callback * @param int|null $ticket * @param array $arguments * @return mixed|string */ $channel->basic_consume($queue_name, '', false, false, false, false, $callback); while (count($channel->callbacks)) { /** * Wait for some expected AMQP methods and dispatch to them. * Unexpected methods are queued up for later calls to this PHP * method. * * @param array $allowed_methods * @param bool $non_blocking * @param int $timeout * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException * @throws \PhpAmqpLib\Exception\AMQPRuntimeException * @return mixed */ $channel->wait(); } $channel->close(); $connection->close();
ข้อมูลจะถูกดำเนินการโดย function callback .ในตัวอย่างจะถูกเขียนไว้ใน RabbitMQReceive.txt และถูกลบออกไปโดย การกำหนดค่าให้ delivery_info
หลักการการทำงานคือ
- ย้าย process ที่ไม่ต้องทำทันทีไปทำงานทีหลังโดยใช้ การส่ง ตัวแปรที่จำเป็นไปในรูปแบบ message ให้ rabbitmq ทำให้ user ไม่จำเป็นต้องรอและรู้สึกว่าเว็บทำงานให้อย่างรวดเร็ว
- message จะถูกเก็บใน queue ที่ชื่อ “invoices”
- เมื่อ server ว่าง ก็เรียก message ใน queue ที่เก็บไว้ ดึงข้อมูลออกมา เพื่อนำไปทำงานต่อไป
อ่านเพิ่มเติม