1Mojo::RabbitMQ::Client(U3spemr)Contributed Perl DocumentMaotjioo:n:RabbitMQ::Client(3pm)
2
3
4

NAME

6       Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client
7

SYNOPSIS

9         use Mojo::RabbitMQ::Client;
10
11         # Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
12         my $client = Mojo::RabbitMQ::Client->new(
13           url => 'amqp://guest:guest@127.0.0.1:5672/');
14
15         # Catch all client related errors
16         $client->catch(sub { warn "Some error caught in client"; });
17
18         # When connection is in Open state, open new channel
19         $client->on(
20           open => sub {
21             my ($client) = @_;
22
23             # Create a new channel with auto-assigned id
24             my $channel = Mojo::RabbitMQ::Client::Channel->new();
25
26             $channel->catch(sub { warn "Error on channel received"; });
27
28             $channel->on(
29               open => sub {
30                 my ($channel) = @_;
31                 $channel->qos(prefetch_count => 1)->deliver;
32
33                 # Publish some example message to test_queue
34                 my $publish = $channel->publish(
35                   exchange    => 'test',
36                   routing_key => 'test_queue',
37                   body        => 'Test message',
38                   mandatory   => 0,
39                   immediate   => 0,
40                   header      => {}
41                 );
42                 # Deliver this message to server
43                 $publish->deliver;
44
45                 # Start consuming messages from test_queue
46                 my $consumer = $channel->consume(queue => 'test_queue');
47                 $consumer->on(message => sub { say "Got a message" });
48                 $consumer->deliver;
49               }
50             );
51             $channel->on(close => sub { $log->error('Channel closed') });
52
53             $client->open_channel($channel);
54           }
55         );
56
57         # Start connection
58         $client->connect();
59
60         # Start Mojo::IOLoop if not running already
61         Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
62
63   CONSUMER
64         use Mojo::RabbitMQ::Client;
65         my $consumer = Mojo::RabbitMQ::Client->consumer(
66           url      => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
67           defaults => {
68             qos      => {prefetch_count => 1},
69             queue    => {durable        => 1},
70             consumer => {no_ack         => 0},
71           }
72         );
73
74         $consumer->catch(sub { die "Some error caught in Consumer" } );
75         $consumer->on('success' => sub { say "Consumer ready" });
76         $consumer->on(
77           'message' => sub {
78             my ($consumer, $message) = @_;
79
80             $consumer->channel->ack($message)->deliver;
81           }
82         );
83         $consumer->start();
84
85         Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
86
87   PUBLISHER
88         use Mojo::RabbitMQ::Client;
89         my $publisher = Mojo::RabbitMQ::Client->publisher(
90           url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo'
91         );
92
93         $publisher->publish('plain text');
94
95         $publisher->publish(
96           {encode => { to => 'json'}},
97           routing_key => 'mojo_mq'
98         )->then(sub {
99           say "Message published";
100         })->catch(sub {
101           die "Publishing failed"
102         })->wait;
103

DESCRIPTION

105       Mojo::RabbitMQ::Client is a rewrite of AnyEvent::RabbitMQ to work on
106       top of Mojo::IOLoop.
107

EVENTS

109       Mojo::RabbitMQ::Client inherits all events from Mojo::EventEmitter and
110       can emit the following new ones.
111
112   connect
113         $client->on(connect => sub {
114           my ($client, $stream) = @_;
115           ...
116         });
117
118       Emitted when TCP/IP connection with RabbitMQ server is established.
119
120   open
121         $client->on(open => sub {
122           my ($client) = @_;
123           ...
124         });
125
126       Emitted AMQP protocol Connection.Open-Ok method is received.
127
128   close
129         $client->on(close => sub {
130           my ($client) = @_;
131           ...
132         });
133
134       Emitted on reception of Connection.Close-Ok method.
135
136   disconnect
137         $client->on(close => sub {
138           my ($client) = @_;
139           ...
140         });
141
142       Emitted when TCP/IP connection gets disconnected.
143

ATTRIBUTES

145       Mojo::RabbitMQ::Client has following attributes.
146
147   tls
148         my $tls = $client->tls;
149         $client = $client->tls(1)
150
151       Force secure connection. Default is disabled (0).
152
153   user
154         my $user = $client->user;
155         $client  = $client->user('guest')
156
157       Sets username for authorization, by default it's not defined.
158
159   pass
160         my $pass = $client->pass;
161         $client  = $client->pass('secret')
162
163       Sets user password for authorization, by default it's not defined.
164
165   host
166         my $host = $client->host;
167         $client  = $client->host('localhost')
168
169       Hostname or IP address of RabbitMQ server. Defaults to "localhost".
170
171   port
172         my $port = $client->port;
173         $client  = $client->port(1234)
174
175       Port on which RabbitMQ server listens for new connections.  Defaults to
176       5672, which is standard RabbitMQ server listen port.
177
178   vhost
179         my $vhost = $client->vhost;
180         $client  = $client->vhost('/')
181
182       RabbitMQ virtual server to user. Default is "/".
183
184   params
185         my $params = $client->params;
186         $client  = $client->params(Mojo::Parameters->new('verify=1'))
187
188       Sets additional parameters for connection. Default is not defined.
189
190       For list of supported parameters see "SUPPORTED QUERY PARAMETERS".
191
192   url
193         my $url = $client->url;
194         $client = $client->url('amqp://...');
195
196       Sets all connection parameters in one string, according to
197       specification from <https://www.rabbitmq.com/uri-spec.html>.
198
199         amqp_URI       = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ]
200
201         amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
202
203         amqp_userinfo  = username [ ":" password ]
204
205         username       = *( unreserved / pct-encoded / sub-delims )
206
207         password       = *( unreserved / pct-encoded / sub-delims )
208
209         vhost          = segment
210
211   heartbeat_timeout
212         my $timeout = $client->heartbeat_timeout;
213         $client     = $client->heartbeat_timeout(180);
214
215       Heartbeats are use to monitor peer reachability in AMQP.  Default value
216       is 60 seconds, if set to 0 no heartbeats will be sent.
217
218   connect_timeout
219         my $timeout = $client->connect_timeout;
220         $client     = $client->connect_timeout(5);
221
222       Connection timeout used by Mojo::IOLoop::Client.  Defaults to
223       environment variable "MOJO_CONNECT_TIMEOUT" or 10 seconds if nothing
224       else is set.
225
226   max_channels
227         my $max_channels = $client->max_channels;
228         $client          = $client->max_channels(10);
229
230       Maximum number of channels allowed to be active. Defaults to 0 which
231       means no implicit limit.
232
233       When you try to call "add_channel" over limit an "error" will be
234       emitted on channel saying that: Maximum number of channels reached.
235

STATIC METHODS

237   consumer
238         my $client = Mojo::RabbitMQ::Client->consumer(...)
239
240       Shortcut for creating Mojo::RabbitMQ::Client::Consumer.
241
242   publisher
243         my $client = Mojo::RabbitMQ::Client->publisher(...)
244
245       Shortcut for creating Mojo::RabbitMQ::Client::Publisher.
246

METHODS

248       Mojo::RabbitMQ::Client inherits all methods from Mojo::EventEmitter and
249       implements the following new ones.
250
251   connect
252         $client->connect();
253
254       Tries to connect to RabbitMQ server and negotiate AMQP protocol.
255
256   close
257         $client->close();
258
259   param
260         my $param = $client->param('name');
261         $client   = $client->param(name => 'value');
262
263   add_channel
264         my $channel = Mojo::RabbitMQ::Client::Channel->new();
265         ...
266         $channel    = $client->add_channel($channel);
267         $channel->open;
268
269   open_channel
270         my $channel = Mojo::RabbitMQ::Client::Channel->new();
271         ...
272         $client->open_channel($channel);
273
274   delete_channel
275         my $removed = $client->delete_channel($channel->id);
276

SUPPORTED QUERY PARAMETERS

278       There's no formal specification, nevertheless a list of common
279       parameters recognized by officially supported RabbitMQ clients is
280       maintained here: <https://www.rabbitmq.com/uri-query-parameters.html>.
281
282       Some shortcuts are also supported, you'll find them in parenthesis.
283
284       Aliases are less significant, so when both are specified only primary
285       value will be used.
286
287   cacertfile (ca)
288       Path to Certificate Authority file for TLS.
289
290   certfile (cert)
291       Path to the client certificate file for TLS.
292
293   keyfile (key)
294       Path to the client certificate private key file for TLS.
295
296   fail_if_no_peer_cert (verify)
297       TLS verification mode, defaults to 0x01 on the client-side if a
298       certificate authority file has been provided, or 0x00 otherwise.
299
300   auth_mechanism
301       Sets the AMQP authentication mechanism. Defaults to AMQPLAIN. AMQPLAIN
302       and EXTERNAL are supported; EXTERNAL will only work if
303       Mojo::RabbitMQ::Client does not need to do anything beyond passing
304       along a username and password if specified.
305
306   heartbeat
307       Sets requested heartbeat timeout, just like "heartbeat_timeout"
308       attribute.
309
310   connection_timeout (timeout)
311       Sets connection timeout - see connection_timeout attribute.
312
313   channel_max
314       Sets maximum number of channels - see max_channels attribute.
315

SEE ALSO

317       Mojo::RabbitMQ::Client::Channel, Mojo::RabbitMQ::Client::Consumer,
318       Mojo::RabbitMQ::Client::Publisher
319
321       Copyright (C) 2015-2019, Sebastian Podjasek and others
322
323       Based on AnyEvent::RabbitMQ - Copyright (C) 2010 Masahito Ikuta,
324       maintained by "bobtfish@bobtfish.net"
325
326       This program is free software, you can redistribute it and/or modify it
327       under the terms of the Artistic License version 2.0.
328
329       Contains AMQP specification (shared/amqp0-9-1.stripped.extended.xml)
330       licensed under BSD-style license.
331
332
333
334perl v5.30.0                      2019-08-21       Mojo::RabbitMQ::Client(3pm)
Impressum