1Net::Stomp(3) User Contributed Perl Documentation Net::Stomp(3)
2
3
4
6 Net::Stomp - A Streaming Text Orientated Messaging Protocol Client
7
9 # send a message to the queue 'foo'
10 use Net::Stomp;
11 my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
12 $stomp->connect( { login => 'hello', passcode => 'there' } );
13 $stomp->send(
14 { destination => '/queue/foo', body => 'test message' } );
15 $stomp->disconnect;
16
17 # subscribe to messages from the queue 'foo'
18 use Net::Stomp;
19 my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
20 $stomp->connect( { login => 'hello', passcode => 'there' } );
21 $stomp->subscribe(
22 { destination => '/queue/foo',
23 'ack' => 'client',
24 'activemq.prefetchSize' => 1
25 }
26 );
27 while (1) {
28 my $frame = $stomp->receive_frame;
29 if (!defined $frame) {
30 # maybe log connection problems
31 next; # will reconnect automatically
32 }
33 warn $frame->body; # do something here
34 $stomp->ack( { frame => $frame } );
35 }
36 $stomp->disconnect;
37
38 # write your own frame
39 my $frame = Net::Stomp::Frame->new(
40 { command => $command, headers => $conf, body => $body } );
41 $self->send_frame($frame);
42
43 # connect with failover supporting similar URI to ActiveMQ
44 $stomp = Net::Stomp->new({ failover => "failover://tcp://primary:61616" })
45 # "?randomize=..." and other parameters are ignored currently
46 $stomp = Net::Stomp->new({ failover => "failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false" })
47
48 # Or in a more natural perl way
49 $stomp = Net::Stomp->new({ hosts => [
50 { hostname => 'primary', port => 61616 },
51 { hostname => 'secondary', port => 61616 },
52 ] });
53
55 This module allows you to write a Stomp client. Stomp is the Streaming
56 Text Orientated Messaging Protocol (or the Protocol Briefly Known as
57 TTMP and Represented by the symbol :ttmp). It's a simple and easy to
58 implement protocol for working with Message Orientated Middleware from
59 any language. Net::Stomp is useful for talking to Apache ActiveMQ, an
60 open source (Apache 2.0 licensed) Java Message Service 1.1 (JMS)
61 message broker packed with many enterprise features.
62
63 A Stomp frame consists of a command, a series of headers and a body -
64 see Net::Stomp::Frame for more details.
65
66 For details on the protocol see <https://stomp.github.io/>.
67
68 In long-lived processes, you can use a new "Net::Stomp" object to send
69 each message, but it's more polite to the broker to keep a single
70 object around and re-use it for multiple messages; this reduce the
71 number of TCP connections that have to be established. "Net::Stomp"
72 tries very hard to re-connect whenever something goes wrong.
73
74 ActiveMQ-specific suggestions
75 To enable the ActiveMQ Broker for Stomp add the following to the
76 activemq.xml configuration inside the <transportConnectors> section:
77
78 <transportConnector name="stomp" uri="stomp://localhost:61613"/>
79
80 To enable the ActiveMQ Broker for Stomp and SSL add the following
81 inside the <transportConnectors> section:
82
83 <transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>
84
85 For details on Stomp in ActiveMQ See
86 <http://activemq.apache.org/stomp.html>.
87
89 "new"
90 The constructor creates a new object. You must pass in a hostname and a
91 port or set a failover configuration:
92
93 my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
94
95 If you want to use SSL, make sure you have IO::Socket::SSL and pass in
96 the SSL flag:
97
98 my $stomp = Net::Stomp->new( {
99 hostname => 'localhost',
100 port => '61612',
101 ssl => 1,
102 } );
103
104 If you want to pass in IO::Socket::SSL options:
105
106 my $stomp = Net::Stomp->new( {
107 hostname => 'localhost',
108 port => '61612',
109 ssl => 1,
110 ssl_options => { SSL_cipher_list => 'ALL:!EXPORT' },
111 } );
112
113 You can pass a logger object, for example a Log::Log4perl logger:
114
115 my $stomp = Net::Stomp->new({
116 hostname => 'localhost',
117 port => '61613',
118 logger => Log::Log4perl->get_logger('stomp'),
119 });
120
121 Warnings and errors will be logged instead of written to "STDERR".
122
123 Failover
124
125 There is some failover support in "Net::Stomp". You can specify
126 ""failover"" in a similar manner to ActiveMQ
127 (<http://activemq.apache.org/failover-transport-reference.html>) for
128 similarity with Java configs or using a more natural method to Perl of
129 passing in an array-of-hashrefs in the "hosts" parameter.
130
131 When "Net::Stomp" connects the first time, upon construction, it will
132 simply try each host in the list, stopping at the first one that
133 accepts the connection, dying if no connection attempt is successful.
134 You can set ""initial_reconnect_attempts"" to 0 to mean "keep looping
135 forever", or to an integer value to mean "only go through the list of
136 hosts this many times" (the default value is therefore 1).
137
138 When "Net::Stomp" notices that the connection has been lost (inside
139 ""send_frame"" or ""receive_frame""), it will try to re-connect. In
140 this case, the number of connection attempts will be limited by
141 ""reconnect_attempts"", which defaults to 0, meaning "keep trying
142 forever".
143
144 Reconnect on "fork"
145
146 By default Net::Stomp will reconnect, using a different socket, if the
147 process "fork"s. This avoids problems when parent & child write to the
148 socket at the same time. If, for whatever reason, you don't want this
149 to happen, set ""reconnect_on_fork"" to 0 (either as a constructor
150 parameter, or by calling the method).
151
153 These can be passed as constructor parameters, or used as read/write
154 accessors.
155
156 "hostname"
157 If you want to connect to a single broker, you can specify its hostname
158 here. If you modify this value during the lifetime of the object, the
159 new value will be used for the subsequent reconnect attempts.
160
161 "port"
162 If you want to connect to a single broker, you can specify its port
163 here. If you modify this value during the lifetime of the object, the
164 new value will be used for the subsequent reconnect attempts.
165
166 "socket_options"
167 Optional hashref, it will be passed to the IO::Socket::IP,
168 IO::Socket::SSL, or IO::Socket::INET constructor every time we need to
169 get a socket.
170
171 In addition to the various options supported by those classes, you can
172 set "keep_alive" to a true value, which will enable TCP-level keep-
173 alive on the socket (see the TCP Keepalive HOWTO
174 <http://www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/> for some
175 information on that feature).
176
177 "ssl"
178 Boolean, defaults to false, whether we should use SSL to talk to the
179 single broker. If you modify this value during the lifetime of the
180 object, the new value will be used for the subsequent reconnect
181 attempts.
182
183 "ssl_options"
184 Options to pass to IO::Socket::SSL when connecting via SSL to the
185 single broker. If you modify this value during the lifetime of the
186 object, the new value will be used for the subsequent reconnect
187 attempts.
188
189 "failover"
190 Modifying this attribute after the object has been constructed has no
191 effect. Pass this as a constructor parameter only. Its value must be a
192 URL (as a string) in the form:
193
194 failover://(tcp://$hostname1:$port1,tcp://$hostname2:$port,...)
195
196 This is equivalent to setting ""hosts"" to:
197
198 [ { hostname => $hostname1, port => $port1 },
199 { hostname => $hostname2, port => $port2 } ]
200
201 "hosts"
202 Arrayref of hashrefs, each having a "hostname" key and a "port" key,
203 and optionall "ssl" and "ssl_options". Connections will be attempted in
204 order, looping around if necessary, depending on the values of
205 ""initial_reconnect_attempts"" and ""reconnect_attempts"".
206
207 "current_host"
208 If using multiple hosts, this is the index (inside the ""hosts"" array)
209 of the one we're currently connected to.
210
211 "logger"
212 Optional logger object, the default one just logs to "STDERR" (see
213 Net::Stomp::StupidLogger). You can pass in any object that implements
214 (at least) the "warn" and "fatal" methods. They will be passed a string
215 to log.
216
217 "reconnect_on_fork"
218 Boolean, defaults to true. Reconnect if a method is being invoked from
219 a different process than the one that created the object. Don't change
220 this unless you really know what you're doing.
221
222 "initial_reconnect_attempts"
223 Integer, how many times to loop through the ""hosts"" trying to
224 connect, before giving up and throwing an exception, during the
225 construction of the object. Defaults to 1. 0 means "keep trying
226 forever". Between each connection attempt there will be a sleep of
227 ""connect_delay"" seconds.
228
229 "reconnect_attempts"
230 Integer, how many times to loop through the ""hosts"" trying to
231 connect, before giving up and throwing an exception, during
232 ""send_frame"" or ""receive_frame"". Defaults to 0, meaning "keep
233 trying forever". Between each connection attempt there will be a sleep
234 of ""connect_delay"" seconds.
235
236 "connect_delay"
237 Integer, defaults to 5. How many seconds to sleep between connection
238 attempts to brokers.
239
240 "timeout"
241 Integer, in seconds, defaults to "undef". The default timeout for read
242 operations. "undef" means "wait forever".
243
244 "receipt_timeout"
245 Integer, in seconds, defaults to "undef". The default timeout while
246 waiting for a receipt (in ""send_with_receipt"" and
247 ""send_transactional""). If "undef", the global ""timeout"" is used.
248
250 "connect"
251 This starts the Stomp session with the Stomp server. You may pass in a
252 "login" and "passcode" options, plus whatever other headers you may
253 need (e.g. "client-id", "host").
254
255 $stomp->connect( { login => 'hello', passcode => 'there' } );
256
257 Returns the frame that the server responded with (or "undef" if the
258 connection was lost). If that frame's command is not "CONNECTED",
259 something went wrong.
260
261 "send"
262 This sends a message to a queue or topic. You must pass in a
263 destination and a body (which must be a string of bytes). You can also
264 pass whatever other headers you may need (e.g. "transaction").
265
266 $stomp->send( { destination => '/queue/foo', body => 'test message' } );
267
268 It's probably a good idea to pass a "content-length" corresponding to
269 the byte length of the "body"; this is necessary if the "body" contains
270 a byte 0.
271
272 Always returns a true value. It automatically reconnects if writing to
273 the socket fails.
274
275 "send_with_receipt"
276 This sends a message asking for a receipt, and returns false if the
277 receipt of the message is not acknowledged by the server:
278
279 $stomp->send_with_receipt(
280 { destination => '/queue/foo', body => 'test message' }
281 ) or die "Couldn't send the message!";
282
283 If using ActiveMQ, you might also want to make the message persistent:
284
285 $stomp->send_transactional(
286 { destination => '/queue/foo', body => 'test message', persistent => 'true' }
287 ) or die "Couldn't send the message!";
288
289 The actual frame sequence for a successful sending is:
290
291 -> SEND
292 <- RECEIPT
293
294 The actual frame sequence for a failed sending is:
295
296 -> SEND
297 <- anything but RECEIPT
298
299 If you are using this connection only to send (i.e. you've never called
300 ""subscribe""), the only thing that could be received instead of a
301 "RECEIPT" is an "ERROR" frame, but if you subscribed, the broker may
302 well send a "MESSAGE" before sending the "RECEIPT". DO NOT use this
303 method on a connection used for receiving.
304
305 If you want to see the "RECEIPT" or "ERROR" frame, pass a scalar as a
306 second parameter to the method, and it will be set to the received
307 frame:
308
309 my $success = $stomp->send_transactional(
310 { destination => '/queue/foo', body => 'test message' },
311 $received_frame,
312 );
313 if (not $success) { warn $received_frame->as_string }
314
315 You can specify a "timeout" in the parametrs, just like for
316 ""received_frame"". This function will wait for that timeout, or for
317 ""receipt_timeout"", or for ""timeout"", whichever is defined, or
318 forever, if none is defined.
319
320 "send_transactional"
321 This sends a message in transactional mode and returns false if the
322 receipt of the message is not acknowledged by the server:
323
324 $stomp->send_transactional(
325 { destination => '/queue/foo', body => 'test message' }
326 ) or die "Couldn't send the message!";
327
328 If using ActiveMQ, you might also want to make the message persistent:
329
330 $stomp->send_transactional(
331 { destination => '/queue/foo', body => 'test message', persistent => 'true' }
332 ) or die "Couldn't send the message!";
333
334 "send_transactional" just wraps "send_with_receipt" in a STOMP
335 transaction.
336
337 The actual frame sequence for a successful sending is:
338
339 -> BEGIN
340 -> SEND
341 <- RECEIPT
342 -> COMMIT
343
344 The actual frame sequence for a failed sending is:
345
346 -> BEGIN
347 -> SEND
348 <- anything but RECEIPT
349 -> ABORT
350
351 If you are using this connection only to send (i.e. you've never called
352 ""subscribe""), the only thing that could be received instead of a
353 "RECEIPT" is an "ERROR" frame, but if you subscribed, the broker may
354 well send a "MESSAGE" before sending the "RECEIPT". DO NOT use this
355 method on a connection used for receiving.
356
357 If you want to see the "RECEIPT" or "ERROR" frame, pass a scalar as a
358 second parameter to the method, and it will be set to the received
359 frame:
360
361 my $success = $stomp->send_transactional(
362 { destination => '/queue/foo', body => 'test message' },
363 $received_frame,
364 );
365 if (not $success) { warn $received_frame->as_string }
366
367 You can specify a "timeout" in the parametrs, just like for
368 ""received_frame"". This function will wait for that timeout, or for
369 ""receipt_timeout"", or for ""timeout"", whichever is defined, or
370 forever, if none is defined.
371
372 "disconnect"
373 This disconnects from the Stomp server:
374
375 $stomp->disconnect;
376
377 If you call any other method after this, a new connection will be
378 established automatically (to the next failover host, if there's more
379 than one).
380
381 Always returns a true value.
382
383 "subscribe"
384 This subscribes you to a queue or topic. You must pass in a
385 "destination".
386
387 Always returns a true value.
388
389 The acknowledge mode (header "ack") defaults to "auto", which means
390 that frames will be considered delivered after they have been sent to a
391 client. The other option is "client", which means that messages will
392 only be considered delivered after the client specifically acknowledges
393 them with an ACK frame (see ""ack"").
394
395 When "Net::Stomp" reconnects after a failure, all subscriptions will be
396 re-instated, each with its own options.
397
398 Other options:
399
400 "selector"
401 Specifies a JMS Selector using SQL 92 syntax as specified in the
402 JMS 1.1 specification. This allows a filter to be applied to each
403 message as part of the subscription.
404
405 "id"
406 A unique identifier for this subscription. Very useful if you
407 subscribe to the same destination more than once (e.g. with
408 different selectors), so that messages arriving will have a
409 "subscription" header with this value if they arrived because of
410 this subscription.
411
412 "activemq.dispatchAsync"
413 Should messages be dispatched synchronously or asynchronously from
414 the producer thread for non-durable topics in the broker. For fast
415 consumers set this to false. For slow consumers set it to true so
416 that dispatching will not block fast consumers.
417
418 "activemq.exclusive"
419 Would I like to be an Exclusive Consumer on a queue.
420
421 "activemq.maximumPendingMessageLimit"
422 For Slow Consumer Handling on non-durable topics by dropping old
423 messages - we can set a maximum pending limit which once a slow
424 consumer backs up to this high water mark we begin to discard old
425 messages.
426
427 "activemq.noLocal"
428 Specifies whether or not locally sent messages should be ignored
429 for subscriptions. Set to true to filter out locally sent messages.
430
431 "activemq.prefetchSize"
432 Specifies the maximum number of pending messages that will be
433 dispatched to the client. Once this maximum is reached no more
434 messages are dispatched until the client acknowledges a message.
435 Set to 1 for very fair distribution of messages across consumers
436 where processing messages can be slow.
437
438 "activemq.priority"
439 Sets the priority of the consumer so that dispatching can be
440 weighted in priority order.
441
442 "activemq.retroactive"
443 For non-durable topics do you wish this subscription to the
444 retroactive.
445
446 "activemq.subscriptionName"
447 For durable topic subscriptions you must specify the same
448 ""client-id"" on the connection and ""subscriptionName"" on the
449 subscribe.
450
451 $stomp->subscribe(
452 { destination => '/queue/foo',
453 'ack' => 'client',
454 'activemq.prefetchSize' => 1
455 }
456 );
457
458 "unsubscribe"
459 This unsubscribes you to a queue or topic. You must pass in a
460 "destination" or an "id":
461
462 $stomp->unsubcribe({ destination => '/queue/foo' });
463
464 Always returns a true value.
465
466 "receive_frame"
467 This blocks and returns you the next Stomp frame, or "undef" if there
468 was a connection problem.
469
470 my $frame = $stomp->receive_frame;
471 warn $frame->body; # do something here
472
473 By default this method will block until a frame can be returned, or for
474 however long the "timeout" attribue says. If you wish to wait for a
475 specified time pass a "timeout" argument:
476
477 # Wait half a second for a frame, else return undef
478 $stomp->receive_frame({ timeout => 0.5 })
479
480 "can_read"
481 This returns whether there is new data waiting to be read from the
482 STOMP server. Optionally takes a timeout in seconds:
483
484 my $can_read = $stomp->can_read;
485 my $can_read = $stomp->can_read({ timeout => '0.1' });
486
487 "undef" says block until something can be read, 0 says to poll and
488 return immediately. This method ignores the value of the "timeout"
489 attribute.
490
491 "ack"
492 This acknowledges that you have received and processed a frame and all
493 frames before it (if you are using client acknowledgements):
494
495 $stomp->ack( { frame => $frame } );
496
497 Always returns a true value.
498
499 "nack"
500 This informs the remote end that you have been unable to process a
501 received frame (if you are using client acknowledgements) (See
502 individual stomp server documentation for information about additional
503 fields that can be passed to alter NACK behavior):
504
505 $stomp->nack( { frame => $frame } );
506
507 Always returns a true value.
508
509 "send_frame"
510 If this module does not provide enough help for sending frames, you may
511 construct your own frame and send it:
512
513 # write your own frame
514 my $frame = Net::Stomp::Frame->new(
515 { command => $command, headers => $conf, body => $body } );
516 $self->send_frame($frame);
517
518 This is the method used by all the other methods that send frames. It
519 will keep trying to send the frame as hard as it can, reconnecting if
520 the connection breaks (limited by ""reconnect_attempts""). If no
521 connection can be established, and ""reconnect_attempts"" is not 0,
522 this method will "die".
523
524 Always returns an empty list.
525
527 Net::Stomp::Frame.
528
530 https://github.com/dakkar/Net-Stomp
531
533 Leon Brocard <acme@astray.com>, Thom May <thom.may@betfair.com>,
534 Michael S. Fischer <michael@dynamine.net>, Ash Berlin
535 <ash_github@firemirror.com>
536
538 Paul Driver <frodwith@cpan.org>, Andreas Faafeng <aff@cpan.org>, Vigith
539 Maurice <vigith@yahoo-inc.com>, Stephen Fralich <sjf4@uw.edu>, Squeeks
540 <squeek@cpan.org>, Chisel Wright <chisel@chizography.net>, Gianni
541 Ceccarelli <dakkar@thenautilus.net>
542
544 Copyright (C) 2006-9, Leon Brocard Copyright (C) 2009, Thom May,
545 Betfair.com Copyright (C) 2010, Ash Berlin, Net-a-Porter.com Copyright
546 (C) 2010, Michael S. Fischer
547
548 This module is free software; you can redistribute it or modify it
549 under the same terms as Perl itself.
550
551
552
553perl v5.28.0 2018-07-15 Net::Stomp(3)