1Net::STOMP::Client::TutUosreiralC(o3n)tributed Perl DocuNmeetn:t:aStTiOoMnP::Client::Tutorial(3)
2
3
4
6 Net::STOMP::Client::Tutorial - Getting started with STOMP and
7 Net::STOMP::Client
8
10 Here is a five-parts tutorial to get started with Net::STOMP::Client.
11
12 A basic knowledge of STOMP is required. For this, you can read:
13
14 · http://stomp.github.com/stomp-specification-1.1.html
15 <http://stomp.github.com/stomp-specification-1.1.html>
16
17 the STOMP 1.1 protocol specification
18
19 · <http://fusesource.com/docs/broker/5.4/connectivity_guide/BHIJBDJH.html>
20
21 the Fuse Message Broker STOMP Tutorial
22
24 Net::STOMP::Client, like similar modules under the Net::* hierarchy,
25 provides an object oriented interface to a network protocol.
26
27 CREATING AN OBJECT
28 In order to connect to a broker, you first have to create an object.
29 This object will later be used to interact with the broker. When the
30 module creates the object, it tries to connect to the broker using
31 either plain TCP or SSL. Nothing is done at the STOMP level.
32
33 To create the object, you of course need to specify where to connect
34 to. This can be done either with a single "uri" parameter:
35
36 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
37
38 or with the pair of "host" and "port" parameters:
39
40 $stomp = Net::STOMP::Client->new(
41 host => "mybroker",
42 port => 6163,
43 );
44
45 USING SSL
46 Using SSL is more complex since more parameters have to be given. Note:
47 IO::Socket::SSL is used behind the scene so you can refer to its
48 documentation for more information. Here is how this could be done:
49
50 $stomp = Net::STOMP::Client->new(
51 uri => "stomp+ssl://mybroker:6162",
52 sockopts => {
53 # path of the directory containing trusted certificates
54 SSL_ca_path => "/etc/ssl/ca",
55 # client certificate to present
56 SSL_cert_file => "/etc/ssl/client-cert.pem",
57 # client private key
58 SSL_key_file => "/etc/ssl/client-key.pem",
59 # passphrase of the client private key
60 SSL_passwd_cb => sub { return("secret") },
61 },
62 );
63
64 GETTING PEER INFORMATION
65 Once connected, at TCP or SSL level, you can get information about the
66 broker using the "peer" method. For instance:
67
68 $peer = $stomp->peer();
69 printf("connected to broker %s (IP %s), port %d\n",
70 $peer->host(), $peer->addr(), $peer->port());
71
72 CONNECTING
73 After creating the broker object, you must start a STOMP session by
74 sending a "CONNECT" frame. This is as simple as:
75
76 $stomp->connect();
77
78 If authentication is required, you must pass extra information at this
79 stage. For instance with:
80
81 $stomp->connect(
82 login => "guest",
83 passcode => "welcome",
84 );
85
86 At this point, the session has been established and you can now send
87 and/or receive messages.
88
89 DISCONNECTING
90 When you are done with messaging, you should gracefully end the session
91 by sending a "DISCONNECT" frame with:
92
93 $stomp->disconnect();
94
95 Note: STOMP does not support reconnection. Once the session has been
96 ended, the broker object cannot be used anymore.
97
98 WRAP UP
99 Putting all this together, here is a complete program that simply
100 connects, starts and ends a session, printing information along the
101 way.
102
103 use Net::STOMP::Client;
104 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
105 $peer = $stomp->peer();
106 printf("connected to broker %s (IP %s), port %d\n",
107 $peer->host(), $peer->addr(), $peer->port());
108 $stomp->connect();
109 printf("session %s started\n", $stomp->session());
110 $stomp->disconnect();
111 printf("session ended\n");
112
114 SENDING MESSAGES
115 A message is made of a header (a list of key/value pairs) and a body.
116 Both are optional.
117
118 To send a message, you have to issue a "SEND" frame. For instance:
119
120 $stomp->send(
121 destination => "/queue/test",
122 subject => "this is a test",
123 time => time(),
124 body => "Hello world!\n",
125 );
126
127 USING RECEIPTS
128 By default, you do not get any confirmation that the message has indeed
129 been received by the broker. If you want such a confirmation, you have
130 to use receipts. The following code snippet sends two messages with a
131 "receipt" header containing a pseudo-unique id and waits for matching
132 "RECEIPT" frames coming from the broker. This is easy because the
133 Net::STOMP::Client module keeps track of which receipts are expected
134 and have not been received yet.
135
136 $stomp->send(
137 destination => "/queue/test",
138 body => "Test of receipts 1...\n",
139 receipt => $stomp->uuid(),
140 );
141 $stomp->send(
142 destination => "/queue/test",
143 body => "Test of receipts 2...\n",
144 receipt => $stomp->uuid(),
145 );
146 # wait at most 10 seconds for all pending receipts
147 $stomp->wait_for_receipts(timeout => 10);
148 # complain if some receipts are still pending
149 die("Receipt not received!\n") if $stomp->receipts();
150
151 Note: all STOMP frames can carry a "receipt" header so this is not
152 restricted to message sending.
153
154 USING TRANSACTIONS
155 In addition, you can use transactions to group the sending of several
156 messages so that either none or all of them get handled by the broker.
157
158 # create a unique transaction id
159 $tid = $stomp->uuid();
160 # begin the transaction
161 $stomp->begin(transaction => $tid);
162 # send two messages as part of this transaction
163 $stomp->send(
164 destination => "/queue/test1",
165 body => "message 1",
166 transaction => $tid,
167 );
168 $stomp->send(
169 destination => "/queue/test2",
170 body => "message 2",
171 transaction => $tid,
172 );
173 # either abort or commit
174 if (... something bad happened...) {
175 # abort/rollback the transaction
176 $stomp->abort(transaction => $tid);
177 # no messages have been delivered to the broker
178 } else {
179 # commit the transaction
180 $stomp->commit(transaction => $tid);
181 # both messages have been delivered to the broker
182 }
183
185 USING SUBSCRIPTIONS
186 In order to receive frames, you first have to subscribe to one or more
187 destinations. This is as easy as:
188
189 $stomp->subscribe(destination => "/queue/test");
190
191 When you are done, you simply unsubscribe with:
192
193 $stomp->unsubscribe(destination => "/queue/test");
194
195 It is good practice to add an "id" header to uniquely identify the
196 subscription. All messages part of this subscription will have a
197 matching "subscription" header. This "id" can also be used to later
198 unsubscribe. For instance:
199
200 $stomp->subscribe(
201 destination => "/queue/test",
202 id => "my-test-sub",
203 );
204 # received messages will contain: subscription:my-test-sub
205 $stomp->unsubscribe(id => "my-test-sub");
206
207 RECEIVING FRAMES
208 While you are subscribed to some destinations, the broker may decide at
209 any time to send you "MESSAGE" frames. You can process these frames
210 with a simple loop:
211
212 while ($frame = $stomp->wait_for_frames()) {
213 # ... do something with the received frame ...
214 }
215
216 The code above is blocking and will loop forever. You can add a
217 "timeout" option to have a non-blocking loop:
218
219 while (1) {
220 # wait at most one second for a new frame
221 $frame = $stomp->wait_for_frames(timeout => 1);
222 # do what is appropriate
223 if ($frame) {
224 # ... do something with the received frame ...
225 } else {
226 # nothing received
227 }
228 }
229
230 Because of the asynchronous nature of STOMP, receiving messages is a
231 bit tricky: you cannot know a priori which types of frames will be sent
232 when. For instance, you may want to send messages (with receipts)
233 while you are subscribed to some destinations and you may receive a
234 "MESSAGE" frame while you would like to wait for a <RECEIPT> frame, or
235 vice versa.
236
237 The "wait_for_frames" method described above will wait for any frame,
238 not only message frames. It is up to you to check that what you receive
239 is a "MESSAGE" frame or not. This can be done with something like:
240
241 if ($frame->command() eq "MESSAGE") {
242 # ... do something with the received message ...
243 } else {
244 # something else than a message frame
245 }
246
247 WRAP UP
248 Putting all this together, here is a complete program that receives ten
249 messages from to "/queue/test":
250
251 use Net::STOMP::Client;
252 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
253 # the next line will be explained in the next part of the tutorial ;-)
254 $stomp->message_callback(sub { return(1) });
255 $stomp->connect();
256 $sid = $stomp->uuid();
257 $stomp->subscribe(
258 destination => "/queue/test",
259 # we use a subscription id
260 id => $sid,
261 # we want a receipt on our SUBSCRIBE frame
262 receipt => $stomp->uuid(),
263 );
264 $count = 0;
265 while ($count < 10) {
266 $frame = $stomp->wait_for_frames(timeout => 1);
267 if ($frame) {
268 if ($frame->command() eq "MESSAGE") {
269 $count++;
270 printf("received message %d with id %s\n",
271 $count, $frame->header("message-id"));
272 } else {
273 # this will catch the RECEIPT frame
274 printf("%s frame received\n", $frame->command());
275 }
276 } else {
277 print("waiting for messages...\n");
278 }
279 }
280 $stomp->unsubscribe(id => $sid);
281 $stomp->disconnect();
282
284 As seen in part 3, because of the asynchronous nature of STOMP, it is a
285 bit tricky to properly handle all the different types of frames that
286 can be received.
287
288 In order to simplify this, Net::STOMP::Client supports the use of
289 callbacks. They are pieces of code called in well defined situations.
290 In fact, there are two levels of callbacks: global and local.
291
292 GLOBAL CALLBACKS
293 Global (per command) callbacks are called each time a frame is
294 received. Net::STOMP::Client has default callbacks that should be
295 sufficient for all types of frames, except for "MESSAGE" frames. For
296 these, it is really up to the coder to define what he wants to do with
297 the received messages.
298
299 Here is an example with a message callback counting the messages
300 received:
301
302 $stomp->message_callback(sub {
303 my($self, $frame) = @_;
304 $count++;
305 return($self);
306 });
307
308 These callbacks are somehow global and it is good practice not to
309 change them during a session. If you do not need a global message
310 callback, you can supply the dummy:
311
312 $stomp->message_callback(sub { return(1) });
313
314 Here is how to re-write a simplified version of the inner part of the
315 receiving program of part 3 with a global callback:
316
317 $count = 0;
318 sub msg_cb ($$) {
319 my($self, $frame) = @_;
320 $count++;
321 printf("received message %d with id %s\n",
322 $count, $frame->header("message-id"));
323 return($self);
324 }
325 $stomp->message_callback(\&msg_cb);
326 $stomp->wait_for_frames() while $count < 10;
327
328 LOCAL CALLBACKS
329 Local (per invocation) callbacks are called by the "wait_for_frame"
330 method. Their return value control what "wait_for_frame" does:
331
332 · "undef": an error occured
333
334 · false: "wait_for_frame" should wait for more frames
335
336 · true: "wait_for_frame" can stop and return this value
337
338 Here is how to use "wait_for_frames" with a local callback to wait
339 until we receive a "MESSAGE" frame that contains "quit" in the body:
340
341 sub msg_cb ($$) {
342 my($self, $frame) = @_;
343 return(0) unless $frame->command() eq "MESSAGE";
344 return(0) unless $frame->body() =~ /quit/;
345 return($frame);
346 }
347 $frame = $stomp->wait_for_frames(callback => \&msg_cb);
348
349 As you see, you can put the logic either in the global callbacks or in
350 the local callbacks. The best practice is to have a single global
351 message callback that does not change throughout the execution of the
352 program and to optionally put in local callbacks what may change from
353 one place of the program to another.
354
355 WRAP UP
356 Here is how to re-write the receiving program of part 3 with a global
357 callback only counting the number of messages and a local callback
358 printing information:
359
360 use Net::STOMP::Client;
361 $stomp = Net::STOMP::Client->new(uri => "stomp://mybroker:6163");
362 $stomp->connect();
363 sub msg_cb ($$) {
364 my($self, $frame) = @_;
365 my $cmd = $frame->command();
366 if ($cmd eq "MESSAGE") {
367 printf("received message %d with id %s\n",
368 $count, $frame->header("message-id"));
369 } else {
370 printf("%s frame received\n", $cmd);
371 }
372 return($frame);
373 }
374 $stomp->message_callback(sub { $count++ });
375 $sid = $stomp->uuid();
376 $stomp->subscribe(
377 destination => "/queue/test",
378 id => $sid,
379 receipt => $stomp->uuid(),
380 );
381 $count = 0;
382 while ($count < 10) {
383 $stomp->wait_for_frames(
384 callback => \&msg_cb,
385 timeout => 1,
386 ) or print("waiting for messages...\n");
387 }
388 $stomp->unsubscribe(id => $sid);
389 $stomp->disconnect();
390
392 ACKNOWLEDGMENT MODES
393 Unless specified otherwise, subscriptions are made in "auto" mode,
394 meaning that a message is considered to be delivered by the broker as
395 soon as it sends the corresponding "MESSAGE" frame. The client may not
396 receive the frame or it could exit before processing it. This could
397 result in message loss.
398
399 In order to avoid message loss, one can change the subscription
400 acknowledgment mode to be "client" instead of "auto". This is an option
401 of the "SUBSCRIBE" frame:
402
403 $stomp->subscribe(
404 destination => "/queue/test",
405 ack => "client",
406 );
407
408 In "client" mode, the client must explicitly acknowledge the messages
409 it has successfully processed. This is achieved by sending an "ACK"
410 frame with a "message-id" header matching the one of the received
411 message:
412
413 $stomp->ack("message-id" => $frame->header("message-id"));
414
415 In fact, you can directly pass the frame that holds the received
416 message and Net::STOMP::Client will extract the "message-id" for you:
417
418 $stomp->ack(frame => $frame);
419
420 Messages that have not be acknowledged by the end of the session will
421 be resent by the broker.
422
424 IO::Socket::SSL, Net::STOMP::Client, Net::STOMP::Client::Debug,
425 Net::STOMP::Client::Error, Net::STOMP::Client::Frame,
426 Net::STOMP::Client::OO, Net::STOMP::Client::Peer.
427
429 Lionel Cons <http://cern.ch/lionel.cons>
430
431 Copyright CERN 2010-2011
432
433
434
435perl v5.12.3 2011-04-01 Net::STOMP::Client::Tutorial(3)