1Net::Stomp(3) User Contributed Perl Documentation Net::Stomp(3)
2
3
4
6 Net::Stomp - A Streaming Text Orientated Messaging Protocol Client
7
9 # send a message to the queue 'foo'
10 use Net::Stomp;
11 my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
12 $stomp->connect( { login => 'hello', passcode => 'there' } );
13 $stomp->send(
14 { destination => '/queue/foo', body => 'test message' } );
15 $stomp->disconnect;
16
17 # subscribe to messages from the queue 'foo'
18 use Net::Stomp;
19 my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
20 $stomp->connect( { login => 'hello', passcode => 'there' } );
21 $stomp->subscribe(
22 { destination => '/queue/foo',
23 'ack' => 'client',
24 'activemq.prefetchSize' => 1
25 }
26 );
27 while (1) {
28 my $frame = $stomp->receive_frame;
29 if (!defined $frame) {
30 # maybe log connection problems
31 next; # will reconnect automatically
32 }
33 warn $frame->body; # do something here
34 $stomp->ack( { frame => $frame } );
35 }
36 $stomp->disconnect;
37
38 # write your own frame
39 my $frame = Net::Stomp::Frame->new(
40 { command => $command, headers => $conf, body => $body } );
41 $self->send_frame($frame);
42
43 # connect with failover supporting similar URI to ActiveMQ
44 $stomp = Net::Stomp->new({ failover => "failover://tcp://primary:61616" })
45 # "?randomize=..." and other parameters are ignored currently
46 $stomp = Net::Stomp->new({ failover => "failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false" })
47
48 # Or in a more natural perl way
49 $stomp = Net::Stomp->new({ hosts => [
50 { hostname => 'primary', port => 61616 },
51 { hostname => 'secondary', port => 61616 },
52 ] });
53
55 This module allows you to write a Stomp client. Stomp is the Streaming
56 Text Orientated Messaging Protocol (or the Protocol Briefly Known as
57 TTMP and Represented by the symbol :ttmp). It's a simple and easy to
58 implement protocol for working with Message Orientated Middleware from
59 any language. Net::Stomp is useful for talking to Apache ActiveMQ, an
60 open source (Apache 2.0 licensed) Java Message Service 1.1 (JMS)
61 message broker packed with many enterprise features.
62
63 A Stomp frame consists of a command, a series of headers and a body -
64 see Net::Stomp::Frame for more details.
65
66 For details on the protocol see <https://stomp.github.io/>.
67
68 In long-lived processes, you can use a new "Net::Stomp" object to send
69 each message, but it's more polite to the broker to keep a single
70 object around and re-use it for multiple messages; this reduce the
71 number of TCP connections that have to be established. "Net::Stomp"
72 tries very hard to re-connect whenever something goes wrong.
73
74 ActiveMQ-specific suggestions
75 To enable the ActiveMQ Broker for Stomp add the following to the
76 activemq.xml configuration inside the <transportConnectors> section:
77
78 <transportConnector name="stomp" uri="stomp://localhost:61613"/>
79
80 To enable the ActiveMQ Broker for Stomp and SSL add the following
81 inside the <transportConnectors> section:
82
83 <transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>
84
85 For details on Stomp in ActiveMQ See
86 <http://activemq.apache.org/stomp.html>.
87
89 "new"
90 The constructor creates a new object. You must pass in a hostname and a
91 port or set a failover configuration:
92
93 my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
94
95 If you want to use SSL, make sure you have IO::Socket::SSL and pass in
96 the SSL flag:
97
98 my $stomp = Net::Stomp->new( {
99 hostname => 'localhost',
100 port => '61612',
101 ssl => 1,
102 } );
103
104 If you want to pass in IO::Socket::SSL options:
105
106 my $stomp = Net::Stomp->new( {
107 hostname => 'localhost',
108 port => '61612',
109 ssl => 1,
110 ssl_options => { SSL_cipher_list => 'ALL:!EXPORT' },
111 } );
112
113 Failover
114
115 There is some failover support in "Net::Stomp". You can specify
116 ""failover"" in a similar manner to ActiveMQ
117 (<http://activemq.apache.org/failover-transport-reference.html>) for
118 similarity with Java configs or using a more natural method to Perl of
119 passing in an array-of-hashrefs in the "hosts" parameter.
120
121 When "Net::Stomp" connects the first time, upon construction, it will
122 simply try each host in the list, stopping at the first one that
123 accepts the connection, dying if no connection attempt is successful.
124 You can set ""initial_reconnect_attempts"" to 0 to mean "keep looping
125 forever", or to an integer value to mean "only go through the list of
126 hosts this many times" (the default value is therefore 1).
127
128 When "Net::Stomp" notices that the connection has been lost (inside
129 ""send_frame"" or ""receive_frame""), it will try to re-connect. In
130 this case, the number of connection attempts will be limited by
131 ""reconnect_attempts"", which defaults to 0, meaning "keep trying
132 forever".
133
134 Reconnect on "fork"
135
136 By default Net::Stomp will reconnect, using a different socket, if the
137 process "fork"s. This avoids problems when parent & child write to the
138 socket at the same time. If, for whatever reason, you don't want this
139 to happen, set ""reconnect_on_fork"" to 0 (either as a constructor
140 parameter, or by calling the method).
141
143 These can be passed as constructor parameters, or used as read/write
144 accessors.
145
146 "hostname"
147 If you want to connect to a single broker, you can specify its hostname
148 here. If you modify this value during the lifetime of the object, the
149 new value will be used for the subsequent reconnect attempts.
150
151 "port"
152 If you want to connect to a single broker, you can specify its port
153 here. If you modify this value during the lifetime of the object, the
154 new value will be used for the subsequent reconnect attempts.
155
156 "socket_options"
157 Optional hashref, it will be passed to the IO::Socket::IP,
158 IO::Socket::SSL, or IO::Socket::INET constructor every time we need to
159 get a socket.
160
161 In addition to the various options supported by those classes, you can
162 set "keep_alive" to a true value, which will enable TCP-level keep-
163 alive on the socket (see the TCP Keepalive HOWTO
164 <http://www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/> for some
165 information on that feature).
166
167 "ssl"
168 Boolean, defaults to false, whether we should use SSL to talk to the
169 single broker. If you modify this value during the lifetime of the
170 object, the new value will be used for the subsequent reconnect
171 attempts.
172
173 "ssl_options"
174 Options to pass to IO::Socket::SSL when connecting via SSL to the
175 single broker. If you modify this value during the lifetime of the
176 object, the new value will be used for the subsequent reconnect
177 attempts.
178
179 "failover"
180 Modifying this attribute after the object has been constructed has no
181 effect. Pass this as a constructor parameter only. Its value must be a
182 URL (as a string) in the form:
183
184 failover://(tcp://$hostname1:$port1,tcp://$hostname2:$port,...)
185
186 This is equivalent to setting ""hosts"" to:
187
188 [ { hostname => $hostname1, port => $port1 },
189 { hostname => $hostname2, port => $port2 } ]
190
191 "hosts"
192 Arrayref of hashrefs, each having a "hostname" key and a "port" key,
193 and optionall "ssl" and "ssl_options". Connections will be attempted in
194 order, looping around if necessary, depending on the values of
195 ""initial_reconnect_attempts"" and ""reconnect_attempts"".
196
197 "current_host"
198 If using multiple hosts, this is the index (inside the ""hosts"" array)
199 of the one we're currently connected to.
200
201 "logger"
202 Optional logger object, the default one is a Log::Any logger. You can
203 pass in any object with the same API, or configure Log::Any::Adapter to
204 route the messages to whatever logging system you need.
205
206 "reconnect_on_fork"
207 Boolean, defaults to true. Reconnect if a method is being invoked from
208 a different process than the one that created the object. Don't change
209 this unless you really know what you're doing.
210
211 "initial_reconnect_attempts"
212 Integer, how many times to loop through the ""hosts"" trying to
213 connect, before giving up and throwing an exception, during the
214 construction of the object. Defaults to 1. 0 means "keep trying
215 forever". Between each connection attempt there will be a sleep of
216 ""connect_delay"" seconds.
217
218 "reconnect_attempts"
219 Integer, how many times to loop through the ""hosts"" trying to
220 connect, before giving up and throwing an exception, during
221 ""send_frame"" or ""receive_frame"". Defaults to 0, meaning "keep
222 trying forever". Between each connection attempt there will be a sleep
223 of ""connect_delay"" seconds.
224
225 "connect_delay"
226 Integer, defaults to 5. How many seconds to sleep between connection
227 attempts to brokers.
228
229 "timeout"
230 Integer, in seconds, defaults to "undef". The default timeout for read
231 operations. "undef" means "wait forever".
232
233 "receipt_timeout"
234 Integer, in seconds, defaults to "undef". The default timeout while
235 waiting for a receipt (in ""send_with_receipt"" and
236 ""send_transactional""). If "undef", the global ""timeout"" is used.
237
239 "connect"
240 This starts the Stomp session with the Stomp server. You may pass in a
241 "login" and "passcode" options, plus whatever other headers you may
242 need (e.g. "client-id", "host").
243
244 $stomp->connect( { login => 'hello', passcode => 'there' } );
245
246 Returns the frame that the server responded with (or "undef" if the
247 connection was lost). If that frame's command is not "CONNECTED",
248 something went wrong.
249
250 "send"
251 This sends a message to a queue or topic. You must pass in a
252 destination and a body (which must be a string of bytes). You can also
253 pass whatever other headers you may need (e.g. "transaction").
254
255 $stomp->send( { destination => '/queue/foo', body => 'test message' } );
256
257 It's probably a good idea to pass a "content-length" corresponding to
258 the byte length of the "body"; this is necessary if the "body" contains
259 a byte 0.
260
261 Always returns a true value. It automatically reconnects if writing to
262 the socket fails.
263
264 "send_with_receipt"
265 This sends a message asking for a receipt, and returns false if the
266 receipt of the message is not acknowledged by the server:
267
268 $stomp->send_with_receipt(
269 { destination => '/queue/foo', body => 'test message' }
270 ) or die "Couldn't send the message!";
271
272 If using ActiveMQ, you might also want to make the message persistent:
273
274 $stomp->send_transactional(
275 { destination => '/queue/foo', body => 'test message', persistent => 'true' }
276 ) or die "Couldn't send the message!";
277
278 The actual frame sequence for a successful sending is:
279
280 -> SEND
281 <- RECEIPT
282
283 The actual frame sequence for a failed sending is:
284
285 -> SEND
286 <- anything but RECEIPT
287
288 If you are using this connection only to send (i.e. you've never called
289 ""subscribe""), the only thing that could be received instead of a
290 "RECEIPT" is an "ERROR" frame, but if you subscribed, the broker may
291 well send a "MESSAGE" before sending the "RECEIPT". DO NOT use this
292 method on a connection used for receiving.
293
294 If you want to see the "RECEIPT" or "ERROR" frame, pass a scalar as a
295 second parameter to the method, and it will be set to the received
296 frame:
297
298 my $success = $stomp->send_transactional(
299 { destination => '/queue/foo', body => 'test message' },
300 $received_frame,
301 );
302 if (not $success) { warn $received_frame->as_string }
303
304 You can specify a "timeout" in the parametrs, just like for
305 ""received_frame"". This function will wait for that timeout, or for
306 ""receipt_timeout"", or for ""timeout"", whichever is defined, or
307 forever, if none is defined.
308
309 "send_transactional"
310 This sends a message in transactional mode and returns false if the
311 receipt of the message is not acknowledged by the server:
312
313 $stomp->send_transactional(
314 { destination => '/queue/foo', body => 'test message' }
315 ) or die "Couldn't send the message!";
316
317 If using ActiveMQ, you might also want to make the message persistent:
318
319 $stomp->send_transactional(
320 { destination => '/queue/foo', body => 'test message', persistent => 'true' }
321 ) or die "Couldn't send the message!";
322
323 "send_transactional" just wraps "send_with_receipt" in a STOMP
324 transaction.
325
326 The actual frame sequence for a successful sending is:
327
328 -> BEGIN
329 -> SEND
330 <- RECEIPT
331 -> COMMIT
332
333 The actual frame sequence for a failed sending is:
334
335 -> BEGIN
336 -> SEND
337 <- anything but RECEIPT
338 -> ABORT
339
340 If you are using this connection only to send (i.e. you've never called
341 ""subscribe""), the only thing that could be received instead of a
342 "RECEIPT" is an "ERROR" frame, but if you subscribed, the broker may
343 well send a "MESSAGE" before sending the "RECEIPT". DO NOT use this
344 method on a connection used for receiving.
345
346 If you want to see the "RECEIPT" or "ERROR" frame, pass a scalar as a
347 second parameter to the method, and it will be set to the received
348 frame:
349
350 my $success = $stomp->send_transactional(
351 { destination => '/queue/foo', body => 'test message' },
352 $received_frame,
353 );
354 if (not $success) { warn $received_frame->as_string }
355
356 You can specify a "timeout" in the parametrs, just like for
357 ""received_frame"". This function will wait for that timeout, or for
358 ""receipt_timeout"", or for ""timeout"", whichever is defined, or
359 forever, if none is defined.
360
361 "disconnect"
362 This disconnects from the Stomp server:
363
364 $stomp->disconnect;
365
366 If you call any other method after this, a new connection will be
367 established automatically (to the next failover host, if there's more
368 than one).
369
370 Always returns a true value.
371
372 "subscribe"
373 This subscribes you to a queue or topic. You must pass in a
374 "destination".
375
376 Always returns a true value.
377
378 The acknowledge mode (header "ack") defaults to "auto", which means
379 that frames will be considered delivered after they have been sent to a
380 client. The other option is "client", which means that messages will
381 only be considered delivered after the client specifically acknowledges
382 them with an ACK frame (see ""ack"").
383
384 When "Net::Stomp" reconnects after a failure, all subscriptions will be
385 re-instated, each with its own options.
386
387 Other options:
388
389 "selector"
390 Specifies a JMS Selector using SQL 92 syntax as specified in the
391 JMS 1.1 specification. This allows a filter to be applied to each
392 message as part of the subscription.
393
394 "id"
395 A unique identifier for this subscription. Very useful if you
396 subscribe to the same destination more than once (e.g. with
397 different selectors), so that messages arriving will have a
398 "subscription" header with this value if they arrived because of
399 this subscription.
400
401 "activemq.dispatchAsync"
402 Should messages be dispatched synchronously or asynchronously from
403 the producer thread for non-durable topics in the broker. For fast
404 consumers set this to false. For slow consumers set it to true so
405 that dispatching will not block fast consumers.
406
407 "activemq.exclusive"
408 Would I like to be an Exclusive Consumer on a queue.
409
410 "activemq.maximumPendingMessageLimit"
411 For Slow Consumer Handling on non-durable topics by dropping old
412 messages - we can set a maximum pending limit which once a slow
413 consumer backs up to this high water mark we begin to discard old
414 messages.
415
416 "activemq.noLocal"
417 Specifies whether or not locally sent messages should be ignored
418 for subscriptions. Set to true to filter out locally sent messages.
419
420 "activemq.prefetchSize"
421 Specifies the maximum number of pending messages that will be
422 dispatched to the client. Once this maximum is reached no more
423 messages are dispatched until the client acknowledges a message.
424 Set to 1 for very fair distribution of messages across consumers
425 where processing messages can be slow.
426
427 "activemq.priority"
428 Sets the priority of the consumer so that dispatching can be
429 weighted in priority order.
430
431 "activemq.retroactive"
432 For non-durable topics do you wish this subscription to the
433 retroactive.
434
435 "activemq.subscriptionName"
436 For durable topic subscriptions you must specify the same
437 ""client-id"" on the connection and ""subscriptionName"" on the
438 subscribe.
439
440 $stomp->subscribe(
441 { destination => '/queue/foo',
442 'ack' => 'client',
443 'activemq.prefetchSize' => 1
444 }
445 );
446
447 "unsubscribe"
448 This unsubscribes you to a queue or topic. You must pass in a
449 "destination" or an "id":
450
451 $stomp->unsubcribe({ destination => '/queue/foo' });
452
453 Always returns a true value.
454
455 "receive_frame"
456 This blocks and returns you the next Stomp frame, or "undef" if there
457 was a connection problem.
458
459 my $frame = $stomp->receive_frame;
460 warn $frame->body; # do something here
461
462 By default this method will block until a frame can be returned, or for
463 however long the "timeout" attribue says. If you wish to wait for a
464 specified time pass a "timeout" argument:
465
466 # Wait half a second for a frame, else return undef
467 $stomp->receive_frame({ timeout => 0.5 })
468
469 "can_read"
470 This returns whether there is new data waiting to be read from the
471 STOMP server. Optionally takes a timeout in seconds:
472
473 my $can_read = $stomp->can_read;
474 my $can_read = $stomp->can_read({ timeout => '0.1' });
475
476 "undef" says block until something can be read, 0 says to poll and
477 return immediately. This method ignores the value of the "timeout"
478 attribute.
479
480 "ack"
481 This acknowledges that you have received and processed a frame and all
482 frames before it (if you are using client acknowledgements):
483
484 $stomp->ack( { frame => $frame } );
485
486 Always returns a true value.
487
488 "nack"
489 This informs the remote end that you have been unable to process a
490 received frame (if you are using client acknowledgements) (See
491 individual stomp server documentation for information about additional
492 fields that can be passed to alter NACK behavior):
493
494 $stomp->nack( { frame => $frame } );
495
496 Always returns a true value.
497
498 "send_frame"
499 If this module does not provide enough help for sending frames, you may
500 construct your own frame and send it:
501
502 # write your own frame
503 my $frame = Net::Stomp::Frame->new(
504 { command => $command, headers => $conf, body => $body } );
505 $self->send_frame($frame);
506
507 This is the method used by all the other methods that send frames. It
508 will keep trying to send the frame as hard as it can, reconnecting if
509 the connection breaks (limited by ""reconnect_attempts""). If no
510 connection can be established, and ""reconnect_attempts"" is not 0,
511 this method will "die".
512
513 Always returns an empty list.
514
516 Net::Stomp::Frame.
517
519 https://github.com/dakkar/Net-Stomp
520
522 Leon Brocard <acme@astray.com>, Thom May <thom.may@betfair.com>,
523 Michael S. Fischer <michael@dynamine.net>, Ash Berlin
524 <ash_github@firemirror.com>
525
527 Paul Driver <frodwith@cpan.org>, Andreas Faafeng <aff@cpan.org>, Vigith
528 Maurice <vigith@yahoo-inc.com>, Stephen Fralich <sjf4@uw.edu>, Squeeks
529 <squeek@cpan.org>, Chisel Wright <chisel@chizography.net>, Gianni
530 Ceccarelli <dakkar@thenautilus.net>
531
533 Copyright (C) 2006-9, Leon Brocard Copyright (C) 2009, Thom May,
534 Betfair.com Copyright (C) 2010, Ash Berlin, Net-a-Porter.com Copyright
535 (C) 2010, Michael S. Fischer
536
537 This module is free software; you can redistribute it or modify it
538 under the same terms as Perl itself.
539
540
541
542perl v5.36.0 2022-07-22 Net::Stomp(3)