PHP: php-amqplib แบบต่อเนื่อง

Byphunsanit

PHP: php-amqplib แบบต่อเนื่อง

ในบาง page เราอาจจะต้องการที่จะใช้ queuing หลายตัว แทนที่จะ connect / disconnect หลายๆครั้ง เราสามารถใช้ batch / bulk ในการส่งช้อมูลให้ rabbitmq ในครั้งเดียวได้

RabbitMQSendBatch.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<?php
 
include 'RabbitMQConnection.php';
 
use PhpAmqpLib\Message\AMQPMessage;
 
$exchange_name = 'customers';
$queue_name = 'invoices';
 
/**
 * Declares exchange
 *
 * @param string $exchange_name
 * @param string $type
 * @param bool $passive
 * @param bool $durable
 * @param bool $auto_delete
 * @param bool $internal
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
$channel->exchange_declare($exchange_name, 'fanout', false, true, false);
 
/**
 * Declares queue, creates if needed
 *
 * @param string $queue
 * @param bool $passive
 * @param bool $durable
 * @param bool $exclusive
 * @param bool $auto_delete
 * @param bool $nowait
 * @param array $arguments
 * @param int $ticket
 * @return mixed|null
 */
 list($queueName, $message_count, $consumer_count) = $channel->queue_declare($queue_name, false, true, false, false);
 
$properties = [
    'content_type' => 'application/json',
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
 
for ($a = 1; $a <= 1000; $a++) {
    $datas = [
        'id' => str_pad($a, 4, '0', STR_PAD_LEFT),
        'rand' => rand(0, 100),
        'time' => date('Y-m-d H:m:s'),
    ];
 
    $msg_body = json_encode($datas);
 
    $msg = new AMQPMessage($msg_body, $properties);
 
    /**
     * Publishes a message
     *
     * @param AMQPMessage $msg
     * @param string $exchange
     * @param string $routing_key
     * @param bool $mandatory
     * @param bool $immediate
     * @param int $ticket
     */
    $channel->basic_publish($msg, $exchange_name, $queue_name);
 
    /**
     * Publishes a message
     *
     * @param AMQPMessage $msg
     * @param string $exchange
     * @param string $routing_key
     * @param bool $mandatory
     * @param bool $immediate
     * @param int $ticket
     */
    $channel->basic_publish($msg, $exchange_name, $queue_name);
 
    echo '<br>' . $msg_body;
}
 
/**
 * Publish batch
 *
 * @return void
 */
$channel->publish_batch();
 
$channel->close();
$connection->close();

อ่านเพิ่มเติม

About the author

phunsanit administrator

Leave a Reply