1Net::STOMP::Client::TutUosreiralC(o3n)tributed Perl DocuNmeetn:t:aStTiOoMnP::Client::Tutorial(3)
2
3
4
6 Net::STOMP::Client::Tutorial - Getting started with STOMP and
7 Net::STOMP::Client
8
10 Here is a five-parts tutorial to get started with Net::STOMP::Client.
11
12 A basic knowledge of STOMP is required. For this, you can read:
13
14 • <http://stomp.github.com/stomp-specification-1.2.html>
15
16 the STOMP 1.2 protocol specification
17
18 • <http://fusesource.com/docs/broker/5.5/connectivity_guide/FMBConnectivityStompTelnet.html>
19
20 the Fuse Message Broker STOMP Tutorial
21
23 Net::STOMP::Client, like similar modules under the Net::* hierarchy,
24 provides an object oriented interface to a network protocol.
25
26 CREATING AN OBJECT
27 In order to connect to a broker, you first have to create an object.
28 This object will later be used to interact with the broker. When the
29 module creates the object, it tries to connect to the broker using
30 either plain TCP or SSL. Nothing is done at the STOMP level.
31
32 To create the object, you of course need to specify where to connect
33 to. This can be done either with a single "uri" parameter:
34
35 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
36
37 or with the pair of "host" and "port" parameters:
38
39 $stomp = Net::STOMP::Client->new(
40 host => "mybroker",
41 port => 6163,
42 );
43
44 USING SSL
45 Using SSL is more complex since more parameters have to be given. Note:
46 IO::Socket::SSL is used behind the scene so you can refer to its
47 documentation for more information. Here is a complete example with
48 certificate based authentication:
49
50 $stomp = Net::STOMP::Client->new(
51 uri => "stomp+ssl://mybroker:6162",
52 sockopts => {
53 # path of the directory containing trusted certificates
54 SSL_ca_path => "/etc/ssl/ca",
55 # client certificate to present
56 SSL_cert_file => "/etc/ssl/client-cert.pem",
57 # client private key
58 SSL_key_file => "/etc/ssl/client-key.pem",
59 # passphrase of the client private key
60 SSL_passwd_cb => sub { return("secret") },
61 # verify mode (SSL_VERIFY_PEER)
62 SSL_verify_mode => 1,
63 },
64 );
65
66 Note: recent versions of IO::Socket::SSL issue a warning if the verify
67 mode is not explicitly set.
68
69 GETTING PEER INFORMATION
70 Once connected, at TCP or SSL level, you can get information about the
71 broker using the "peer" method. For instance:
72
73 $peer = $stomp->peer();
74 printf("connected to broker %s (IP %s), port %d\n",
75 $peer->host(), $peer->addr(), $peer->port());
76
77 CONNECTING
78 After creating the broker object, you must start a STOMP session by
79 sending a "CONNECT" frame. This is as simple as:
80
81 $stomp->connect();
82
83 If authentication is required, you must pass extra information at this
84 stage. For instance with:
85
86 $stomp->connect(
87 login => "guest",
88 passcode => "welcome",
89 );
90
91 In case the broker uses virtual hosts, you can add a "host" parameter
92 to the "connect" method:
93
94 $stomp->connect(
95 login => "guest",
96 passcode => "welcome",
97 host => "/",
98 );
99
100 If it is not given explicitly, the "host" parameter defaults to
101 whatever has been given in the "new" method, via the "host" or "uri"
102 parameter.
103
104 At this point, the session has been established and you can now send
105 and/or receive messages.
106
107 GETTING SERVER INFORMATION
108 Once connected at STOMP level, you can get information about the broker
109 using the "server" and "version" methods. For instance:
110
111 printf("speaking STOMP %s with server %s\n",
112 $stomp->version(), $stomp->server() || "UNKNOWN");
113
114 DISCONNECTING
115 When you are done with messaging, you should gracefully end the session
116 by sending a "DISCONNECT" frame with:
117
118 $stomp->disconnect();
119
120 Note: STOMP does not support reconnection. Once the session has been
121 ended, the broker object cannot be used anymore.
122
123 WRAP UP
124 Putting all this together, here is a complete program that simply
125 connects, starts and ends a session, printing information along the
126 way.
127
128 use Net::STOMP::Client;
129 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
130 $peer = $stomp->peer();
131 printf("connected to broker %s (IP %s), port %d\n",
132 $peer->host(), $peer->addr(), $peer->port());
133 $stomp->connect();
134 printf("speaking STOMP %s with server %s\n",
135 $stomp->version(), $stomp->server() || "UNKNOWN");
136 printf("session %s started\n", $stomp->session());
137 $stomp->disconnect();
138 printf("session ended\n");
139
141 SENDING MESSAGES
142 A message is made of a header (a list of key/value pairs) and a body.
143 Both are optional.
144
145 To send a message, you have to issue a "SEND" frame. For instance:
146
147 $stomp->send(
148 destination => "/queue/test",
149 subject => "this is a test",
150 time => time(),
151 body => "Hello world!\n",
152 );
153
154 USING RECEIPTS
155 By default, you do not get any confirmation that the message has indeed
156 been received by the broker. If you want such a confirmation, you have
157 to use receipts. The following code snippet sends two messages with a
158 "receipt" header containing a pseudo-unique id and waits for matching
159 "RECEIPT" frames coming from the broker. This is easy because the
160 Net::STOMP::Client module keeps track of which receipts are expected
161 and have not been received yet.
162
163 $stomp->send(
164 destination => "/queue/test",
165 body => "Test of receipts 1...\n",
166 receipt => $stomp->uuid(),
167 );
168 $stomp->send(
169 destination => "/queue/test",
170 body => "Test of receipts 2...\n",
171 receipt => $stomp->uuid(),
172 );
173 # wait at most 10 seconds for all pending receipts
174 $stomp->wait_for_receipts(timeout => 10);
175 # complain if some receipts are still pending
176 die("Receipt not received!\n") if $stomp->receipts();
177
178 Note: all STOMP frames can carry a "receipt" header so this is not
179 restricted to message sending.
180
181 USING TRANSACTIONS
182 In addition, you can use transactions to group the sending of several
183 messages so that either none or all of them get handled by the broker.
184
185 # create a unique transaction id
186 $tid = $stomp->uuid();
187 # begin the transaction
188 $stomp->begin(transaction => $tid);
189 # send two messages as part of this transaction
190 $stomp->send(
191 destination => "/queue/test1",
192 body => "message 1",
193 transaction => $tid,
194 );
195 $stomp->send(
196 destination => "/queue/test2",
197 body => "message 2",
198 transaction => $tid,
199 );
200 # either abort or commit
201 if (... something bad happened...) {
202 # abort/rollback the transaction
203 $stomp->abort(transaction => $tid);
204 # no messages have been delivered to the broker
205 } else {
206 # commit the transaction
207 $stomp->commit(transaction => $tid);
208 # both messages have been delivered to the broker
209 }
210
212 USING SUBSCRIPTIONS
213 In order to receive frames, you first have to subscribe to one or more
214 destinations. This is as easy as:
215
216 $stomp->subscribe(destination => "/queue/test");
217
218 When you are done, you simply unsubscribe with:
219
220 $stomp->unsubscribe(destination => "/queue/test");
221
222 It is good practice to add an "id" header to uniquely identify the
223 subscription. All messages part of this subscription will have a
224 matching "subscription" header. This "id" can also be used to
225 unsubscribe.
226
227 In fact, the code above only works with STOMP 1.0. In STOMP 1.1 and
228 above, the "id" header has been made mandatory so you must use
229 something like:
230
231 $stomp->subscribe(
232 destination => "/queue/test",
233 id => "testsub",
234 );
235 # received messages will contain: subscription:testsub
236 $stomp->unsubscribe(id => "testsub");
237
238 RECEIVING FRAMES
239 While you are subscribed to some destinations, the broker may decide at
240 any time to send you "MESSAGE" frames. You can process these frames
241 with a simple loop:
242
243 while ($frame = $stomp->wait_for_frames()) {
244 # ... do something with the received frame ...
245 }
246
247 The code above is blocking and will loop forever. You can add a
248 "timeout" option to have a non-blocking loop:
249
250 while (1) {
251 # wait at most one second for a new frame
252 $frame = $stomp->wait_for_frames(timeout => 1);
253 # do what is appropriate
254 if ($frame) {
255 # ... do something with the received frame ...
256 } else {
257 # nothing received
258 }
259 }
260
261 Because of the asynchronous nature of STOMP, receiving messages is a
262 bit tricky: you cannot know a priori which types of frames will be sent
263 when. For instance, you may want to send messages (with receipts)
264 while you are subscribed to some destinations and you may receive a
265 "MESSAGE" frame while you would like to wait for a <RECEIPT> frame, or
266 vice versa.
267
268 The "wait_for_frames" method described above will wait for any frame,
269 not only message frames. It is up to you to check that what you receive
270 is a "MESSAGE" frame or not. This can be done with something like:
271
272 if ($frame->command() eq "MESSAGE") {
273 # ... do something with the received message ...
274 } else {
275 # something else than a message frame
276 }
277
278 WRAP UP
279 Putting all this together, here is a complete program that receives ten
280 messages from to "/queue/test":
281
282 use Net::STOMP::Client;
283 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
284 # the next line will be explained in the next part of the tutorial ;-)
285 $stomp->message_callback(sub { return(1) });
286 $stomp->connect();
287 $sid = $stomp->uuid();
288 $stomp->subscribe(
289 destination => "/queue/test",
290 # we use the generated subscription id
291 id => $sid,
292 # we want a receipt on our SUBSCRIBE frame
293 receipt => $stomp->uuid(),
294 );
295 $count = 0;
296 while ($count < 10) {
297 $frame = $stomp->wait_for_frames(timeout => 1);
298 if ($frame) {
299 if ($frame->command() eq "MESSAGE") {
300 $count++;
301 printf("received message %d with id %s\n",
302 $count, $frame->header("message-id"));
303 } else {
304 # this will catch the RECEIPT frame
305 printf("%s frame received\n", $frame->command());
306 }
307 } else {
308 print("waiting for messages...\n");
309 }
310 }
311 $stomp->unsubscribe(id => $sid);
312 $stomp->disconnect();
313
315 As seen in part 3, because of the asynchronous nature of STOMP, it is a
316 bit tricky to properly handle all the different types of frames that
317 can be received.
318
319 In order to simplify this, Net::STOMP::Client supports the use of
320 callbacks. They are pieces of code called in well defined situations.
321 In fact, there are two levels of callbacks: global and local.
322
323 GLOBAL CALLBACKS
324 Global (per command) callbacks are called each time a frame is
325 received. Net::STOMP::Client has default callbacks that should be
326 sufficient for all types of frames, except for "MESSAGE" frames. For
327 these, it is really up to the coder to define what he wants to do with
328 the received messages.
329
330 Here is an example with a message callback counting the messages
331 received:
332
333 $stomp->message_callback(sub {
334 my($self, $frame) = @_;
335 $count++;
336 return($self);
337 });
338
339 These callbacks are somehow global and it is good practice not to
340 change them during a session. If you do not need a global message
341 callback, you can supply the dummy:
342
343 $stomp->message_callback(sub { return(1) });
344
345 Here is how to re-write a simplified version of the inner part of the
346 receiving program of part 3 with a global callback:
347
348 $count = 0;
349 sub msg_cb ($$) {
350 my($self, $frame) = @_;
351 $count++;
352 printf("received message %d with id %s\n",
353 $count, $frame->header("message-id"));
354 return($self);
355 }
356 $stomp->message_callback(\&msg_cb);
357 $stomp->wait_for_frames() while $count < 10;
358
359 LOCAL CALLBACKS
360 Local (per invocation) callbacks are called by the "wait_for_frame"
361 method. Their return value control what "wait_for_frame" does:
362
363 • false: "wait_for_frame" should wait for more frames
364
365 • true: "wait_for_frame" can stop and return this value
366
367 Here is how to use "wait_for_frames" with a local callback to wait
368 until we receive a "MESSAGE" frame that contains "quit" in the body:
369
370 sub msg_cb ($$) {
371 my($self, $frame) = @_;
372 return(0) unless $frame->command() eq "MESSAGE";
373 return(0) unless $frame->body() =~ /quit/;
374 return($frame);
375 }
376 $frame = $stomp->wait_for_frames(callback => \&msg_cb);
377
378 As you see, you can put the logic either in the global callbacks or in
379 the local callbacks. The best practice is to have a single global
380 message callback that does not change throughout the execution of the
381 program and to optionally put in local callbacks what may change from
382 one place of the program to another.
383
384 WRAP UP
385 Here is how to re-write the receiving program of part 3 with a global
386 callback only counting the number of messages and a local callback
387 printing information:
388
389 use Net::STOMP::Client;
390 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
391 $stomp->connect();
392 sub msg_cb ($$) {
393 my($self, $frame) = @_;
394 my $cmd = $frame->command();
395 if ($cmd eq "MESSAGE") {
396 printf("received message %d with id %s\n",
397 $count, $frame->header("message-id"));
398 } else {
399 printf("%s frame received\n", $cmd);
400 }
401 return($frame);
402 }
403 $stomp->message_callback(sub { $count++ });
404 $sid = $stomp->uuid();
405 $stomp->subscribe(
406 destination => "/queue/test",
407 id => $sid,
408 receipt => $stomp->uuid(),
409 );
410 $count = 0;
411 while ($count < 10) {
412 $stomp->wait_for_frames(
413 callback => \&msg_cb,
414 timeout => 1,
415 ) or print("waiting for messages...\n");
416 }
417 $stomp->unsubscribe(id => $sid);
418 $stomp->disconnect();
419
421 ACKNOWLEDGMENT MODES
422 Unless specified otherwise, subscriptions are made in "auto" mode,
423 meaning that a message is considered to be delivered by the broker as
424 soon as it sends the corresponding "MESSAGE" frame. The client may not
425 receive the frame or it could exit before processing it. This could
426 result in message loss.
427
428 In order to avoid message loss, one can change the subscription
429 acknowledgment mode to be "client" instead of "auto". This is an option
430 of the "SUBSCRIBE" frame:
431
432 $stomp->subscribe(
433 destination => "/queue/test",
434 id => $sid,
435 ack => "client",
436 );
437
438 In "client" mode, the client must explicitly acknowledge the messages
439 it has successfully processed. This is achieved by sending an "ACK"
440 frame with a "message-id" header matching the one of the received
441 message:
442
443 $stomp->ack("message-id" => $frame->header("message-id"));
444
445 In practice, this is more complex since the exact way to acknowledge
446 messages changed between STOMP 1.0 (only "message-id" required), STOMP
447 1.1 (both "message-id" and "subscription" required) and STOMP 1.2 (only
448 "id" required).
449
450 Net::STOMP::Client has a unique feature to ease acknowledgments: you
451 can directly pass the frame that holds the received message and the
452 module will properly acknowledge, it regardless of the STOMP protocol
453 version used:
454
455 $stomp->ack(frame => $frame);
456
457 Messages that have not been acknowledged by the end of the session will
458 be resent by the broker.
459
460 Note that starting with STOMP 1.1 also has a "client-individual" mode.
461 Please consult the protocol specification for more details.
462
463 EFFICIENT I/O
464 The high-level methods handle one frame at a time. This can be
465 inefficient for small frames. For instance, the "send" method will
466 build a frame object, encode it and send it on the wire with at least
467 one call to "syswrite", maybe for very few bytes.
468
469 The low-level methods allow you to better control this and queue
470 messages in memory before sending them. This way, you group data and
471 use I/O more efficiently.
472
473 Here is how to queue ten messages and send them in one go.
474
475 foreach my $n (1 .. 10) {
476 $frame = Net::STOMP::Client::Frame->new(
477 command => "SEND",
478 headers => { destination => "/topic/test" },
479 body => "message $n",
480 );
481 # simply add the frame to the outgoing queue
482 $stomp->queue_frame($frame);
483 }
484 # no timeout given: block until all data has been sent
485 $stomp->send_data();
486
488 IO::Socket::SSL, Net::STOMP::Client, Net::STOMP::Client::Connection,
489 Net::STOMP::Client::Frame, Net::STOMP::Client::HeartBeat,
490 Net::STOMP::Client::Peer, Net::STOMP::Client::Receipt,
491 Net::STOMP::Client::Version.
492
494 Lionel Cons <http://cern.ch/lionel.cons>
495
496 Copyright (C) CERN 2010-2021
497
498
499
500perl v5.34.0 2022-01-21 Net::STOMP::Client::Tutorial(3)