ลองเขียน custom driver สำหรับติดต่อกับ RabbitMQ / AMQP ยังไม่สมบูรณ์แต่ก็ดีกว่าการที่ต้องมา connect ผ่าน php-amqplib ทุกครั้งที่จะใช้ตามปกติ
- config file ก่อนตามระเบียบ
\application\config\RabbitMQ.php 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657<?php
defined(
'BASEPATH'
)
or
exit
(
'No direct script access allowed'
);
/*
| -------------------------------------------------------------------
| RABBITMQ CONNECTIVITY SETTINGS
| -------------------------------------------------------------------
| This file will contain the settings needed to access your RabbitMQ.
|
| For complete instructions please consult the 'RabbitMQ Connection'
| page of the User Guide.
|
| -------------------------------------------------------------------
| EXPLANATION OF VARIABLES
| -------------------------------------------------------------------
|
| ['connection_timeout'] => 3.0,
| ['context'] => null,
| ['heartbeat'] => 0
| ['host'],
| ['insist'] => false,
| ['keepalive'] => false,
| ['locale'] => 'en_US',
| ['login_method'] => 'AMQPLAIN',
| ['login_response'] => null,
| ['password'],
| ['port'],
| ['read_write_timeout'] => 3.0,
| ['user'],
| ['vhost'] => '/',
*/
$config
[
'RabbitMQ'
][
'connects'
] = [
'default'
=> [
'connection_timeout'
=> 3.0,
'context'
=> null,
'heartbeat'
=> 0,
'host'
=>
'127.0.0.1'
,
'insist'
=> false,
'keepalive'
=> false,
'locale'
=>
'en_US'
,
'login_method'
=>
'AMQPLAIN'
,
'login_response'
=> null,
'password'
=>
'guest'
,
'port'
=> 5672,
'read_write_timeout'
=> 3.0,
'user'
=>
'guest'
,
'vhost'
=>
'/'
,
],
];
/*
| -------------------------------------------------------------------
| RABBITMQ DEBUG SETTINGS
| -------------------------------------------------------------------
|
*/
$config
[
'RabbitMQ'
][
'debug'
] = false;
- ถึงคิวไฟล์ driver ที่จะเป็นตัวกลางระหว่าง codeigniter และ php-amqplib
\application\libraries\RabbitMQ\RabbitMQ.php 123456789101112131415161718192021222324252627282930313233343536373839404142434445<?php
use
PhpAmqpLib\Connection\AMQPStreamConnection;
use
PhpAmqpLib\Message\AMQPMessage;
class
RabbitMQ
extends
AMQPStreamConnection
{
public
function
__construct(
$config
=
array
())
{
$this
->CI = &get_instance();
$this
->config =
$config
[
'RabbitMQ'
];
if
(
$this
->config[
'debug'
] == true) {
define(
'AMQP_DEBUG'
, true);
}
}
public
function
connect(
$dsnId
=
'default'
) {
$dsn
=
$this
->config[
'connects'
][
$dsnId
];
$this
->connection =
new
AMQPStreamConnection(
$dsn
[
'host'
],
$dsn
[
'port'
],
$dsn
[
'user'
],
$dsn
[
'password'
],
$dsn
[
'vhost'
],
$dsn
[
'insist'
],
$dsn
[
'login_method'
],
$dsn
[
'login_response'
],
$dsn
[
'locale'
],
$dsn
[
'connection_timeout'
],
$dsn
[
'read_write_timeout'
],
$dsn
[
'context'
],
$dsn
[
'keepalive'
],
$dsn
[
'heartbeat'
]);
$this
->channel =
$this
->connection->channel();
}
public
function
disconnect()
{
$this
->channel->close();
$this
->connection->close();
}
public
function
setMessageJson(
$datas
)
{
$msg_body
= json_encode(
$datas
);
$properties
= [
'content_type'
=>
'application/json'
,
'delivery_mode'
=> AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
return
new
AMQPMessage(
$msg_body
,
$properties
);
}
}
- ที่นี้ก็ controller ที่จะทำงานให้
\application\controllers\Messaging.php 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170<?php
defined(
'BASEPATH'
)
or
exit
(
'No direct script access allowed'
);
class
Messaging
extends
CI_Controller
{
public
function
__construct()
{
parent::__construct();
$this
->load->driver(
'RabbitMQ'
);
}
public
function
receive()
{
set_time_limit(0);
$this
->rabbitmq->connect();
$exchange_name
=
'customers'
;
$queue_name
=
'invoices'
;
/**
* Declares exchange
*
* @param string $exchange_name
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
$this
->rabbitmq->channel->exchange_declare(
$exchange_name
,
'fanout'
, false, true, false);
/**
* Declares queue, creates if needed
*
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
list(
$queueName
,
$message_count
,
$consumer_count
) =
$this
->rabbitmq->channel->queue_declare(
''
, false, false, true, false);
$this
->rabbitmq->channel->queue_bind(
$queue_name
,
$exchange_name
);
$callback
=
function
(
$msg
) {
$datas
= json_decode(
$msg
->body, true);
fwrite(
fopen
(
'RabbitMQReceive.txt'
,
'a+'
), print_r(
$datas
, true));
sleep(substr_count(
$msg
->body,
'.'
));
/* delete message */
$msg
->delivery_info[
'channel'
]->basic_ack(
$msg
->delivery_info[
'delivery_tag'
]);
};
/**
* Starts a queue consumer
*
* @param string $queue_name
* @param string $consumer_tag
* @param bool $no_local
* @param bool $no_ack
* @param bool $exclusive
* @param bool $nowait
* @param callback|null $callback
* @param int|null $ticket
* @param array $arguments
* @return mixed|string
*/
$this
->rabbitmq->channel->basic_consume(
$queue_name
,
''
, false, false, false, false,
$callback
);
while
(
count
(
$this
->rabbitmq->channel->callbacks)) {
/**
* Wait for some expected AMQP methods and dispatch to them.
* Unexpected methods are queued up for later calls to this PHP
* method.
*
* @param array $allowed_methods
* @param bool $non_blocking
* @param int $timeout
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @return mixed
*/
$this
->rabbitmq->channel->wait();
}
$this
->rabbitmq->channel->close();
$connection
->close();
}
public
function
send()
{
$this
->rabbitmq->connect();
$exchange_name
=
'customers'
;
$queue_name
=
'invoices'
;
/**
* Declares exchange
*
* @param string $exchange_name
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
$this
->rabbitmq->channel->exchange_declare(
$exchange_name
,
'fanout'
, false, true, false);
/**
* Declares queue, creates if needed
*
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
list(
$queueName
,
$message_count
,
$consumer_count
) =
$this
->rabbitmq->channel->queue_declare(
$queue_name
, false, true, false, false);
$datas
= [
'rand'
=> rand(0, 100),
'time'
=>
date
(
'Y-m-d H:m:s'
),
];
/**
* Declares message
*
* @param array $datas
* @return string
*/
$msg
=
$this
->rabbitmq->setMessageJson(
$datas
);
/**
* Publishes a message
*
* @param AMQPMessage $msg
* @param string $exchange
* @param string $routing_key
* @param bool $mandatory
* @param bool $immediate
* @param int $ticket
*/
$this
->rabbitmq->channel->basic_publish(
$msg
,
$exchange_name
,
$queue_name
);
echo
'<br><pre>'
. print_r(
$datas
, true),
'</pre>'
;
$this
->rabbitmq->disconnect();
}
}
- ทดสอบโดยเรียก http://localhost/CodeIgniter-3.1.3/messaging/send และ http://localhost/CodeIgniter-3.1.3/messaging/receive