ถึงจะติดตั้ง extension amqp ไว้แล้วก็จริงแต่เพราะว่าการติดตั้งอะไรเพิ่มให้ระบบเซิร์ฟเวอร์บริษัทจำเป็นต้องมีการขออนุมัติจากคณะกรรมการซะก่อน เพื่อความสะดวกจึงใช้ php-amqplib/php-amqplib แทนโดยสามารถติดตั้งได้โดยใช้ composer ได้เลย
สร้างส่วน connect กับ RabbitMQ ก่อน
1 2 3 4 5 6 7 8 9 10 | <?php /* define('AMQP_DEBUG', true); */ require_once __DIR__ . '/vendor/autoload.php' ; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection( 'localhost' , 5672, 'guest' , 'guest' ); $channel = $connection ->channel(); |
ค่าคงที่ define(‘AMQP_DEBUG’, true); ใช้เปิด debug mode ของตัว class php-amqplib
การส่งข้อมูล
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | <?php include 'RabbitMQConnection.php' ; use PhpAmqpLib\Message\AMQPMessage; $exchange_name = 'customers' ; $queue_name = 'invoices' ; /** * Declares exchange * * @param string $exchange_name * @param string $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ $channel ->exchange_declare( $exchange_name , 'fanout' , false, true, false); /** * Declares queue, creates if needed * * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ list( $queueName , $message_count , $consumer_count ) = $channel ->queue_declare( $queue_name , false, true, false, false); $datas = [ 'rand' => rand(0, 100), 'time' => date ( 'Y-m-d H:m:s' ), ]; $msg_body = json_encode( $datas ); $properties = [ 'content_type' => 'application/json' , 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]; $msg = new AMQPMessage( $msg_body , $properties ); /** * Publishes a message * * @param AMQPMessage $msg * @param string $exchange * @param string $routing_key * @param bool $mandatory * @param bool $immediate * @param int $ticket */ $channel ->basic_publish( $msg , $exchange_name , $queue_name ); echo '<br>' . $msg_body ; $channel ->close(); $connection ->close(); |
จากนั้นรับ message
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | <?php include 'RabbitMQConnection.php' ; set_time_limit(0); $exchange_name = 'customers' ; $queue_name = 'invoices' ; /** * Declares exchange * * @param string $exchange_name * @param string $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ $channel ->exchange_declare( $exchange_name , 'fanout' , false, true, false); /** * Declares queue, creates if needed * * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ list( $queueName , $message_count , $consumer_count ) = $channel ->queue_declare( '' , false, false, true, false); $channel ->queue_bind( $queue_name , $exchange_name ); $callback = function ( $msg ) { $datas = json_decode( $msg ->body, true); fwrite( fopen ( 'RabbitMQReceive.txt' , 'a+' ), print_r( $datas , true)); sleep(substr_count( $msg ->body, '.' )); /* delete message */ $msg ->delivery_info[ 'channel' ]->basic_ack( $msg ->delivery_info[ 'delivery_tag' ]); }; /** * Starts a queue consumer * * @param string $queue_name * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack * @param bool $exclusive * @param bool $nowait * @param callback|null $callback * @param int|null $ticket * @param array $arguments * @return mixed|string */ $channel ->basic_consume( $queue_name , '' , false, false, false, false, $callback ); while ( count ( $channel ->callbacks)) { /** * Wait for some expected AMQP methods and dispatch to them. * Unexpected methods are queued up for later calls to this PHP * method. * * @param array $allowed_methods * @param bool $non_blocking * @param int $timeout * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException * @throws \PhpAmqpLib\Exception\AMQPRuntimeException * @return mixed */ $channel ->wait(); } $channel ->close(); $connection ->close(); |
ข้อมูลจะถูกดำเนินการโดย function callback .ในตัวอย่างจะถูกเขียนไว้ใน RabbitMQReceive.txt และถูกลบออกไปโดย การกำหนดค่าให้ delivery_info
หลักการการทำงานคือ
- ย้าย process ที่ไม่ต้องทำทันทีไปทำงานทีหลังโดยใช้ การส่ง ตัวแปรที่จำเป็นไปในรูปแบบ message ให้ rabbitmq ทำให้ user ไม่จำเป็นต้องรอและรู้สึกว่าเว็บทำงานให้อย่างรวดเร็ว
- message จะถูกเก็บใน queue ที่ชื่อ “invoices”
- เมื่อ server ว่าง ก็เรียก message ใน queue ที่เก็บไว้ ดึงข้อมูลออกมา เพื่อนำไปทำงานต่อไป
อ่านเพิ่มเติม