1Mojo::RabbitMQ::Client:U:sCehranCnoenlt(r3ipbmu)ted PerlMoDjooc:u:mReanbtbaittiMoQn::Client::Channel(3pm)
2
3
4
6 Mojo::RabbitMQ::Client::Channel - handles all channel related methods
7
9 use Mojo::RabbitMQ::Client::Channel;
10
11 my $channel = Mojo::RabbitMQ::Client::Channel->new();
12
13 $channel->catch(sub { warn "Some channel error occurred: " . $_[1] });
14
15 $channel->on(
16 open => sub {
17 my ($channel) = @_;
18 ...
19 }
20 );
21 $channel->on(close => sub { warn "Channel closed" });
22
23 $client->open_channel($channel);
24
26 Mojo::RabbitMQ::Client::Channel allows one to call all channel related
27 methods.
28
30 Mojo::RabbitMQ::Client::Channel inherits all events from
31 Mojo::EventEmitter and can emit the following new ones.
32
33 open
34 $channel->on(open => sub {
35 my ($channel) = @_;
36 ...
37 });
38
39 Emitted when channel receives Open-Ok.
40
41 close
42 $channel->on(close=> sub {
43 my ($channel, $frame) = @_;
44 ...
45 });
46
47 Emitted when channel gets closed, $frame contains close reason.
48
50 Mojo::RabbitMQ::Client::Channel has following attributes.
51
52 id
53 my $id = $channel->id;
54 $channel->id(20810);
55
56 If not set, Mojo::RabbitMQ::Client sets it to next free number when
57 channel is opened.
58
59 is_open
60 $channel->is_open ? "Channel is open" : "Channel is closed";
61
62 is_active
63 $channel->is_active ? "Channel is active" : "Channel is not active";
64
65 This can be modified on reception of Channel-Flow.
66
67 client
68 my $client = $channel->client;
69 $channel->client($client);
70
72 Mojo::RabbitMQ::Client::Channel inherits all methods from
73 Mojo::EventEmitter and implements the following new ones.
74
75 close
76 $channel->close;
77
78 Cancels all consumers and closes channel afterwards.
79
80 declare_exchange
81 my $exchange = $channel->declare_exchange(
82 exchange => 'mojo',
83 type => 'fanout',
84 durable => 1,
85 ...
86 )->deliver;
87
88 Verify exchange exists, create if needed.
89
90 This method creates an exchange if it does not already exist, and if
91 the exchange exists, verifies that it is of the correct and expected
92 class.
93
94 Following arguments are accepted:
95
96 exchange
97 Unique exchange name
98
99 type
100 Each exchange belongs to one of a set of exchange types implemented
101 by the server. The exchange types define the functionality of the
102 exchange - i.e. how messages are routed through it. It is not valid
103 or meaningful to attempt to change the type of an existing exchange.
104
105 passive
106 If set, the server will reply with Declare-Ok if the exchange already
107 exists with the same name, and raise an error if not. The client can
108 use this to check whether an exchange exists without modifying the
109 server state. When set, all other method fields except name and no-
110 wait are ignored. A declare with both passive and no-wait has no
111 effect. Arguments are compared for semantic equivalence.
112
113 durable
114 If set when creating a new exchange, the exchange will be marked as
115 durable. Durable exchanges remain active when a server restarts. Non-
116 durable exchanges (transient exchanges) are purged if/when a server
117 restarts.
118
119 auto_delete
120 If set, the exchange is deleted when all queues have finished using
121 it.
122
123 internal
124 If set, the exchange may not be used directly by publishers, but only
125 when bound to other exchanges. Internal exchanges are used to
126 construct wiring that is not visible to applications.
127
128 declare_exchange_p
129 Same as declare_exchange but auto-delivers method and returns a
130 Mojo::Promise object.
131
132 $channel->declare_exchange_p(
133 exchange => 'mojo',
134 type => 'fanout',
135 durable => 1,
136 ...
137 )->then(sub {
138 say "Exchange declared...";
139 })->catch(sub {
140 my $err = shift;
141 warn "Exchange declaration error: $err";
142 })->wait;
143
144 delete_exchange
145 $channel->delete_exchange(exchange => 'mojo')->deliver;
146
147 Delete an exchange.
148
149 This method deletes an exchange. When an exchange is deleted all queue
150 bindings on the exchange are cancelled.
151
152 Following arguments are accepted:
153
154 exchange
155 Exchange name.
156
157 if_unused
158 If set, the server will only delete the exchange if it has no queue
159 bindings. If the exchange has queue bindings the server does not
160 delete it but raises a channel exception instead.
161
162 delete_exchange_p
163 Same as delete_exchange but auto-delivers method and returns a
164 Mojo::Promise object.
165
166 $channel->delete_exchange_p(
167 exchange => 'mojo'
168 )->then(sub {
169 say "Exchange deleted...";
170 })->catch(sub {
171 my $err = shift;
172 warn "Exchange removal error: $err";
173 })->wait;
174
175 declare_queue
176 my $queue = $channel->declare_queue(queue => 'mq', durable => 1)->deliver
177
178 Declare queue, create if needed.
179
180 This method creates or checks a queue. When creating a new queue the
181 client can specify various properties that control the durability of
182 the queue and its contents, and the level of sharing for the queue.
183
184 Following arguments are accepted:
185
186 queue
187 The queue name MAY be empty, in which case the server MUST create a
188 new queue with a unique generated name and return this to the client
189 in the Declare-Ok method.
190
191 passive
192 If set, the server will reply with Declare-Ok if the queue already
193 exists with the same name, and raise an error if not. The client can
194 use this to check whether a queue exists without modifying the server
195 state. When set, all other method fields except name and no-wait are
196 ignored. A declare with both passive and no-wait has no effect.
197 Arguments are compared for semantic equivalence.
198
199 durable
200 If set when creating a new queue, the queue will be marked as
201 durable. Durable queues remain active when a server restarts. Non-
202 durable queues (transient queues) are purged if/when a server
203 restarts. Note that durable queues do not necessarily hold persistent
204 messages, although it does not make sense to send persistent messages
205 to a transient queue.
206
207 exclusive
208 Exclusive queues may only be accessed by the current connection, and
209 are deleted when that connection closes. Passive declaration of an
210 exclusive queue by other connections are not allowed.
211
212 auto_delete
213 If set, the queue is deleted when all consumers have finished using
214 it. The last consumer can be cancelled either explicitly or because
215 its channel is closed. If there was no consumer ever on the queue, it
216 won't be deleted. Applications can explicitly delete auto-delete
217 queues using the Delete method as normal.
218
219 declare_queue_p
220 Same as declare_queue but auto-delivers method and returns a
221 Mojo::Promise object.
222
223 $channel->declare_queue_p(
224 queue => 'mq',
225 durable => 1
226 )->then(sub {
227 say "Queue declared...";
228 })->catch(sub {
229 my $err = shift;
230 warn "Queue declaration error: $err";
231 })->wait;
232
233 bind_queue
234 $channel->bind_queue(
235 exchange => 'mojo',
236 queue => 'mq',
237 routing_key => ''
238 )->deliver;
239
240 Bind queue to an exchange.
241
242 This method binds a queue to an exchange. Until a queue is bound it
243 will not receive any messages. In a classic messaging model, store-and-
244 forward queues are bound to a direct exchange and subscription queues
245 are bound to a topic exchange.
246
247 Following arguments are accepted:
248
249 queue
250 Specifies the name of the queue to bind.
251
252 exchange
253 Name of the exchange to bind to.
254
255 routing_key
256 Specifies the routing key for the binding. The routing key is used
257 for routing messages depending on the exchange configuration. Not all
258 exchanges use a routing key - refer to the specific exchange
259 documentation. If the queue name is empty, the server uses the last
260 queue declared on the channel. If the routing key is also empty, the
261 server uses this queue name for the routing key as well. If the queue
262 name is provided but the routing key is empty, the server does the
263 binding with that empty routing key. The meaning of empty routing
264 keys depends on the exchange implementation.
265
266 bind_queue_p
267 Same as bind_queue but auto-delivers method and returns a Mojo::Promise
268 object.
269
270 $channel->bind_queue_p(
271 exchange => 'mojo',
272 queue => 'mq',
273 routing_key => ''
274 )->then(sub {
275 say "Queue bound...";
276 })->catch(sub {
277 my $err = shift;
278 warn "Queue binding error: $err";
279 })->wait;
280
281 unbind_queue
282 $channel->unbind_queue(
283 exchange => 'mojo',
284 queue => 'mq',
285 routing_key => ''
286 )->deliver;
287
288 Unbind a queue from an exchange.
289
290 This method unbinds a queue from an exchange.
291
292 Following arguments are accepted:
293
294 queue
295 Specifies the name of the queue to unbind.
296
297 exchange
298 The name of the exchange to unbind from.
299
300 routing_key
301 Specifies the routing key of the binding to unbind.
302
303 unbind_queue_p
304 Same as unbind_queue but auto-delivers method and returns a
305 Mojo::Promise object.
306
307 $channel->unbind_queue_p(
308 exchange => 'mojo',
309 queue => 'mq',
310 routing_key => ''
311 )->then(sub {
312 say "Queue unbound...";
313 })->catch(sub {
314 my $err = shift;
315 warn "Queue unbinding error: $err";
316 })->wait;
317
318 purge_queue
319 $channel->purge_queue(queue => 'mq')->deliver;
320
321 Purge a queue.
322
323 This method removes all messages from a queue which are not awaiting
324 acknowledgment.
325
326 Following arguments are accepted:
327
328 queue
329 Specifies the name of the queue to purge.
330
331 purge_queue_p
332 Same as purge_queue but auto-delivers method and returns a
333 Mojo::Promise object.
334
335 $channel->purge_queue_p(
336 queue => 'mq',
337 )->then(sub {
338 say "Queue purged...";
339 })->catch(sub {
340 my $err = shift;
341 warn "Queue purging error: $err";
342 })->wait;
343
344 delete_queue
345 $channel->delete_queue(queue => 'mq', if_empty => 1)->deliver;
346
347 Delete a queue.
348
349 This method deletes a queue. When a queue is deleted any pending
350 messages are sent to a dead-letter queue if this is defined in the
351 server configuration, and all consumers on the queue are cancelled.
352
353 Following arguments are accepted:
354
355 queue
356 Specifies the name of the queue to delete.
357
358 if_unused
359 If set, the server will only delete the queue if it has no consumers.
360 If the queue has consumers the server does does not delete it but
361 raises a channel exception instead.
362
363 if_empty
364 If set, the server will only delete the queue if it has no messages.
365
366 delete_queue_p
367 Same as delete_queue but auto-delivers method and returns a
368 Mojo::Promise object.
369
370 $channel->delete_queue_p(
371 queue => 'mq',
372 if_empty => 1
373 )->then(sub {
374 say "Queue removed...";
375 })->catch(sub {
376 my $err = shift;
377 warn "Queue removal error: $err";
378 })->wait;
379
380 publish
381 my $message = $channel->publish(
382 exchange => 'mojo',
383 routing_key => 'mq',
384 body => 'simple text body',
385 );
386 $message->deliver();
387
388 Publish a message.
389
390 This method publishes a message to a specific exchange. The message
391 will be routed to queues as defined by the exchange configuration and
392 distributed to any active consumers when the transaction, if any, is
393 committed.
394
395 Following arguments are accepted:
396
397 exchange
398 Specifies the name of the exchange to publish to. The exchange name
399 can be empty, meaning the default exchange. If the exchange name is
400 specified, and that exchange does not exist, the server will raise a
401 channel exception.
402
403 routing_key
404 Specifies the routing key for the message. The routing key is used
405 for routing messages depending on the exchange configuration.
406
407 mandatory
408 This flag tells the server how to react if the message cannot be
409 routed to a queue. If this flag is set, the server will return an
410 unroutable message with a Return method. If this flag is zero, the
411 server silently drops the message.
412
413 All rejections are emitted as "reject" event.
414
415 $message->on(reject => sub {
416 my $message = shift;
417 my $frame = shift;
418 my $method_frame = $frame->method_frame;
419
420 my $reply_code = $method_frame->reply_code;
421 my $reply_text = $method_frame->reply_text;
422 });
423
424 immediate
425 This flag tells the server how to react if the message cannot be
426 routed to a queue consumer immediately. If this flag is set, the
427 server will return an undeliverable message with a Return method. If
428 this flag is zero, the server will queue the message, but with no
429 guarantee that it will ever be consumed.
430
431 As said above, all rejections are emitted as "reject" event.
432
433 $message->on(reject => sub { ... });
434
435 consume
436 my $consumer = $channel->consume(queue => 'mq');
437 $consumer->on(message => sub { ... });
438 $consumer->deliver;
439
440 This method asks the server to start a "consumer", which is a transient
441 request for messages from a specific queue. Consumers last as long as
442 the channel they were declared on, or until the client cancels them.
443
444 Following arguments are accepted:
445
446 queue
447 Specifies the name of the queue to consume from.
448
449 consumer_tag
450 Specifies the identifier for the consumer. The consumer tag is local
451 to a channel, so two clients can use the same consumer tags. If this
452 field is empty the server will generate a unique tag.
453
454 $consumer->on(success => sub {
455 my $consumer = shift;
456 my $frame = shift;
457
458 my $consumer_tag = $frame->method_frame->consumer_tag;
459 });
460
461 no_local (not implemented in RabbitMQ!)
462 If the no-local field is set the server will not send messages to the
463 connection that published them.
464
465 See RabbitMQ Compatibility and Conformance
466 <https://www.rabbitmq.com/specification.html>
467
468 no_ack
469 If this field is set the server does not expect acknowledgements for
470 messages. That is, when a message is delivered to the client the
471 server assumes the delivery will succeed and immediately dequeues it.
472 This functionality may increase performance but at the cost of
473 reliability. Messages can get lost if a client dies before they are
474 delivered to the application.
475
476 exclusive
477 Request exclusive consumer access, meaning only this consumer can
478 access the queue.
479
480 cancel
481 $channel->cancel(consumer_tag => 'amq.ctag....')->deliver;
482
483 End a queue consumer.
484
485 This method cancels a consumer. This does not affect already delivered
486 messages, but it does mean the server will not send any more messages
487 for that consumer. The client may receive an arbitrary number of
488 messages in between sending the cancel method and receiving the cancel-
489 ok reply.
490
491 Following arguments are accepted:
492
493 consumer_tag
494 Holds the consumer tag specified by the client or provided by the
495 server.
496
497 get
498 my $get = $channel->get(queue => 'mq')
499 $get->deliver;
500
501 Direct access to a queue.
502
503 This method provides a direct access to the messages in a queue using a
504 synchronous dialogue that is designed for specific types of application
505 where synchronous functionality is more important than performance.
506
507 This is simple event emitter to which you have to subscribe. It can
508 emit:
509
510 message
511 Provide client with a message.
512
513 This method delivers a message to the client following a get method.
514 A message delivered by 'get-ok' must be acknowledged unless the no-
515 ack option was set in the get method.
516
517 You can access all get-ok reply parameters as below:
518
519 $get->on(message => sub {
520 my $get = shift;
521 my $get_ok = shift;
522 my $message = shift;
523
524 say "Still got: " . $get_ok->method_frame->message_count;
525 });
526
527 empty
528 Indicate no messages available.
529
530 This method tells the client that the queue has no messages available
531 for the client.
532
533 Following arguments are accepted:
534
535 queue
536 Specifies the name of the queue to get a message from.
537
538 no_ack
539 If this field is set the server does not expect acknowledgements for
540 messages. That is, when a message is delivered to the client the
541 server assumes the delivery will succeed and immediately dequeues it.
542 This functionality may increase performance but at the cost of
543 reliability. Messages can get lost if a client dies before they are
544 delivered to the application.
545
546 ack
547 $channel->ack(delivery_tag => 1);
548
549 Acknowledge one or more messages.
550
551 When sent by the client, this method acknowledges one or more messages
552 delivered via the Deliver or Get-Ok methods. When sent by server, this
553 method acknowledges one or more messages published with the Publish
554 method on a channel in confirm mode. The acknowledgement can be for a
555 single message or a set of messages up to and including a specific
556 message.
557
558 Following arguments are accepted:
559
560 delivery_tag
561 Server assigned delivery tag that was received with a message.
562
563 multiple
564 If set to 1, the delivery tag is treated as "up to and including", so
565 that multiple messages can be acknowledged with a single method. If
566 set to zero, the delivery tag refers to a single message. If the
567 multiple field is 1, and the delivery tag is zero, this indicates
568 acknowledgement of all outstanding messages.
569
570 qos
571 $channel->qos(prefetch_count => 1)->deliver;
572
573 Sets specified Quality of Service to channel, or entire connection.
574 Accepts following arguments:
575
576 prefetch_size
577 Prefetch window size in octets.
578
579 prefetch_count
580 Prefetch window in complete messages.
581
582 global
583 If set all settings will be applied connection wide.
584
585 recover
586 $channel->recover(requeue => 0)->deliver;
587
588 Redeliver unacknowledged messages.
589
590 This method asks the server to redeliver all unacknowledged messages on
591 a specified channel. Zero or more messages may be redelivered.
592
593 requeue
594 If this field is zero, the message will be redelivered to the
595 original recipient. If this bit is 1, the server will attempt to
596 requeue the message, potentially then delivering it to an alternative
597 subscriber.
598
599 reject
600 $channel->reject(delivery_tag => 1, requeue => 0)->deliver;
601
602 Reject an incoming message.
603
604 This method allows a client to reject a message. It can be used to
605 interrupt and cancel large incoming messages, or return untreatable
606 messages to their original queue.
607
608 Following arguments are accepted:
609
610 delivery_tag
611 Server assigned delivery tag that was received with a message.
612
613 requeue
614 If requeue is true, the server will attempt to requeue the message.
615 If requeue is false or the requeue attempt fails the messages are
616 discarded or dead-lettered.
617
618 select_tx
619 Work with transactions.
620
621 The Tx class allows publish and ack operations to be batched into
622 atomic units of work. The intention is that all publish and ack
623 requests issued within a transaction will complete successfully or none
624 of them will. Servers SHOULD implement atomic transactions at least
625 where all publish or ack requests affect a single queue. Transactions
626 that cover multiple queues may be non-atomic, given that queues can be
627 created and destroyed asynchronously, and such events do not form part
628 of any transaction. Further, the behaviour of transactions with
629 respect to the immediate and mandatory flags on Basic.Publish methods
630 is not defined.
631
632 $channel->select_tx()->deliver;
633
634 Select standard transaction mode.
635
636 This method sets the channel to use standard transactions. The client
637 must use this method at least once on a channel before using the Commit
638 or Rollback methods.
639
640 commit_tx
641 $channel->commit_tx()->deliver;
642
643 Commit the current transaction.
644
645 This method commits all message publications and acknowledgments
646 performed in the current transaction. A new transaction starts
647 immediately after a commit.
648
649 rollback_tx
650 $channel->rollback_tx()->deliver;
651
652 Abandon the current transaction.
653
654 This method abandons all message publications and acknowledgments
655 performed in the current transaction. A new transaction starts
656 immediately after a rollback. Note that unacked messages will not be
657 automatically redelivered by rollback; if that is required an explicit
658 recover call should be issued.
659
661 Mojo::RabbitMQ::Client, Mojo::RabbitMQ::Client::Method,
662 Net::AMQP::Protocol::v0_8
663
665 Copyright (C) 2015-2017, Sebastian Podjasek and others
666
667 Based on AnyEvent::RabbitMQ - Copyright (C) 2010 Masahito Ikuta,
668 maintained by "bobtfish@bobtfish.net"
669
670 This program is free software, you can redistribute it and/or modify it
671 under the terms of the Artistic License version 2.0.
672
673
674
675perl v5.36.0 2023-01-2M0ojo::RabbitMQ::Client::Channel(3pm)