1Mojo::RabbitMQ::Client(U3spemr)Contributed Perl DocumentMaotjioo:n:RabbitMQ::Client(3pm)
2
3
4
6 Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client
7
9 use Mojo::RabbitMQ::Client;
10
11 # Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
12 my $client = Mojo::RabbitMQ::Client->new(
13 url => 'amqp://guest:guest@127.0.0.1:5672/');
14
15 # Catch all client related errors
16 $client->catch(sub { warn "Some error caught in client"; });
17
18 # When connection is in Open state, open new channel
19 $client->on(
20 open => sub {
21 my ($client) = @_;
22
23 # Create a new channel with auto-assigned id
24 my $channel = Mojo::RabbitMQ::Client::Channel->new();
25
26 $channel->catch(sub { warn "Error on channel received"; });
27
28 $channel->on(
29 open => sub {
30 my ($channel) = @_;
31 $channel->qos(prefetch_count => 1)->deliver;
32
33 # Publish some example message to test_queue
34 my $publish = $channel->publish(
35 exchange => 'test',
36 routing_key => 'test_queue',
37 body => 'Test message',
38 mandatory => 0,
39 immediate => 0,
40 header => {}
41 );
42 # Deliver this message to server
43 $publish->deliver;
44
45 # Start consuming messages from test_queue
46 my $consumer = $channel->consume(queue => 'test_queue');
47 $consumer->on(message => sub { say "Got a message" });
48 $consumer->deliver;
49 }
50 );
51 $channel->on(close => sub { $log->error('Channel closed') });
52
53 $client->open_channel($channel);
54 }
55 );
56
57 # Start connection
58 $client->connect();
59
60 # Start Mojo::IOLoop if not running already
61 Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
62
63 CONSUMER
64 use Mojo::RabbitMQ::Client;
65 my $consumer = Mojo::RabbitMQ::Client->consumer(
66 url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
67 defaults => {
68 qos => {prefetch_count => 1},
69 queue => {durable => 1},
70 consumer => {no_ack => 0},
71 }
72 );
73
74 $consumer->catch(sub { die "Some error caught in Consumer" } );
75 $consumer->on('success' => sub { say "Consumer ready" });
76 $consumer->on(
77 'message' => sub {
78 my ($consumer, $message) = @_;
79
80 $consumer->channel->ack($message)->deliver;
81 }
82 );
83 $consumer->start();
84
85 Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
86
87 PUBLISHER
88 use Mojo::RabbitMQ::Client;
89 my $publisher = Mojo::RabbitMQ::Client->publisher(
90 url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo'
91 );
92
93 $publisher->publish('plain text');
94
95 $publisher->publish(
96 {encode => { to => 'json'}},
97 routing_key => 'mojo_mq'
98 )->then(sub {
99 say "Message published";
100 })->catch(sub {
101 die "Publishing failed"
102 })->wait;
103
105 Mojo::RabbitMQ::Client is a rewrite of AnyEvent::RabbitMQ to work on
106 top of Mojo::IOLoop.
107
109 Mojo::RabbitMQ::Client inherits all events from Mojo::EventEmitter and
110 can emit the following new ones.
111
112 connect
113 $client->on(connect => sub {
114 my ($client, $stream) = @_;
115 ...
116 });
117
118 Emitted when TCP/IP connection with RabbitMQ server is established.
119
120 open
121 $client->on(open => sub {
122 my ($client) = @_;
123 ...
124 });
125
126 Emitted AMQP protocol Connection.Open-Ok method is received.
127
128 close
129 $client->on(close => sub {
130 my ($client) = @_;
131 ...
132 });
133
134 Emitted on reception of Connection.Close-Ok method.
135
136 disconnect
137 $client->on(close => sub {
138 my ($client) = @_;
139 ...
140 });
141
142 Emitted when TCP/IP connection gets disconnected.
143
145 Mojo::RabbitMQ::Client has following attributes.
146
147 tls
148 my $tls = $client->tls;
149 $client = $client->tls(1)
150
151 Force secure connection. Default is disabled (0).
152
153 user
154 my $user = $client->user;
155 $client = $client->user('guest')
156
157 Sets username for authorization, by default it's not defined.
158
159 pass
160 my $pass = $client->pass;
161 $client = $client->pass('secret')
162
163 Sets user password for authorization, by default it's not defined.
164
165 host
166 my $host = $client->host;
167 $client = $client->host('localhost')
168
169 Hostname or IP address of RabbitMQ server. Defaults to "localhost".
170
171 port
172 my $port = $client->port;
173 $client = $client->port(1234)
174
175 Port on which RabbitMQ server listens for new connections. Defaults to
176 5672, which is standard RabbitMQ server listen port.
177
178 vhost
179 my $vhost = $client->vhost;
180 $client = $client->vhost('/')
181
182 RabbitMQ virtual server to user. Default is "/".
183
184 params
185 my $params = $client->params;
186 $client = $client->params(Mojo::Parameters->new('verify=1'))
187
188 Sets additional parameters for connection. Default is not defined.
189
190 For list of supported parameters see "SUPPORTED QUERY PARAMETERS".
191
192 url
193 my $url = $client->url;
194 $client = $client->url('amqp://...');
195
196 Sets all connection parameters in one string, according to
197 specification from <https://www.rabbitmq.com/uri-spec.html>.
198
199 amqp_URI = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ]
200
201 amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
202
203 amqp_userinfo = username [ ":" password ]
204
205 username = *( unreserved / pct-encoded / sub-delims )
206
207 password = *( unreserved / pct-encoded / sub-delims )
208
209 vhost = segment
210
211 heartbeat_timeout
212 my $timeout = $client->heartbeat_timeout;
213 $client = $client->heartbeat_timeout(180);
214
215 Heartbeats are use to monitor peer reachability in AMQP. Default value
216 is 60 seconds, if set to 0 no heartbeats will be sent.
217
218 connect_timeout
219 my $timeout = $client->connect_timeout;
220 $client = $client->connect_timeout(5);
221
222 Connection timeout used by Mojo::IOLoop::Client. Defaults to
223 environment variable "MOJO_CONNECT_TIMEOUT" or 10 seconds if nothing
224 else is set.
225
226 max_channels
227 my $max_channels = $client->max_channels;
228 $client = $client->max_channels(10);
229
230 Maximum number of channels allowed to be active. Defaults to 0 which
231 means no implicit limit.
232
233 When you try to call "add_channel" over limit an "error" will be
234 emitted on channel saying that: Maximum number of channels reached.
235
237 consumer
238 my $client = Mojo::RabbitMQ::Client->consumer(...)
239
240 Shortcut for creating Mojo::RabbitMQ::Client::Consumer.
241
242 publisher
243 my $client = Mojo::RabbitMQ::Client->publisher(...)
244
245 Shortcut for creating Mojo::RabbitMQ::Client::Publisher.
246
248 Mojo::RabbitMQ::Client inherits all methods from Mojo::EventEmitter and
249 implements the following new ones.
250
251 connect
252 $client->connect();
253
254 Tries to connect to RabbitMQ server and negotiate AMQP protocol.
255
256 close
257 $client->close();
258
259 param
260 my $param = $client->param('name');
261 $client = $client->param(name => 'value');
262
263 add_channel
264 my $channel = Mojo::RabbitMQ::Client::Channel->new();
265 ...
266 $channel = $client->add_channel($channel);
267 $channel->open;
268
269 open_channel
270 my $channel = Mojo::RabbitMQ::Client::Channel->new();
271 ...
272 $client->open_channel($channel);
273
274 delete_channel
275 my $removed = $client->delete_channel($channel->id);
276
278 There's no formal specification, nevertheless a list of common
279 parameters recognized by officially supported RabbitMQ clients is
280 maintained here: <https://www.rabbitmq.com/uri-query-parameters.html>.
281
282 Some shortcuts are also supported, you'll find them in parenthesis.
283
284 Aliases are less significant, so when both are specified only primary
285 value will be used.
286
287 cacertfile (ca)
288 Path to Certificate Authority file for TLS.
289
290 certfile (cert)
291 Path to the client certificate file for TLS.
292
293 keyfile (key)
294 Path to the client certificate private key file for TLS.
295
296 fail_if_no_peer_cert (verify)
297 TLS verification mode, defaults to 0x01 on the client-side if a
298 certificate authority file has been provided, or 0x00 otherwise.
299
300 auth_mechanism
301 Sets the AMQP authentication mechanism. Defaults to AMQPLAIN. AMQPLAIN
302 and EXTERNAL are supported; EXTERNAL will only work if
303 Mojo::RabbitMQ::Client does not need to do anything beyond passing
304 along a username and password if specified.
305
306 heartbeat
307 Sets requested heartbeat timeout, just like "heartbeat_timeout"
308 attribute.
309
310 connection_timeout (timeout)
311 Sets connection timeout - see connection_timeout attribute.
312
313 channel_max
314 Sets maximum number of channels - see max_channels attribute.
315
317 Mojo::RabbitMQ::Client::Channel, Mojo::RabbitMQ::Client::Consumer,
318 Mojo::RabbitMQ::Client::Publisher
319
321 Copyright (C) 2015-2019, Sebastian Podjasek and others
322
323 Based on AnyEvent::RabbitMQ - Copyright (C) 2010 Masahito Ikuta,
324 maintained by "bobtfish@bobtfish.net"
325
326 This program is free software, you can redistribute it and/or modify it
327 under the terms of the Artistic License version 2.0.
328
329 Contains AMQP specification (shared/amqp0-9-1.stripped.extended.xml)
330 licensed under BSD-style license.
331
332
333
334perl v5.38.0 2023-07-20 Mojo::RabbitMQ::Client(3pm)