1Queue::DBI(3) User Contributed Perl Documentation Queue::DBI(3)
2
3
4
6 Queue::DBI - A queueing module with an emphasis on safety, using DBI as
7 a storage system for queued data.
8
10 Version 2.7.0
11
13 This module allows you to safely use a queueing system by preventing
14 backtracking, infinite loops and data loss.
15
16 An emphasis of this distribution is to provide an extremely reliable
17 dequeueing mechanism without having to use transactions.
18
19 use Queue::DBI;
20 my $queue = Queue::DBI->new(
21 'queue_name' => $queue_name,
22 'database_handle' => $dbh,
23 'cleanup_timeout' => 3600,
24 'verbose' => 1,
25 );
26
27 # Store a complex data structure.
28 $queue->enqueue(
29 {
30 values => [ 1, 2, 3 ],
31 data => { key1 => 1, key2 => 2 },
32 }
33 );
34
35 # Store a scalar, which must be passed by reference.
36 $queue->enqueue( \"Lorem ipsum dolor sit amet" );
37
38 # Process the queued elements one by one.
39 while ( my $queue_element = $queue->next() )
40 {
41 # Skip elements that cannot be locked.
42 next
43 unless $queue_element->lock();
44
45 eval {
46 # Do some work
47 process( $queue_element->{'email'} );
48 };
49 if ( $@ )
50 {
51 # Something failed, we clear the lock but don't delete the record in the
52 # queue so that we can try again next time
53 $queue_element->requeue();
54 }
55 else
56 {
57 # All good, remove definitively the element
58 $queue_element->success();
59 }
60 }
61
62 # Requeue items that have been locked for more than 6 hours
63 $queue->cleanup( 6 * 3600 );
64
66 This distribution currently supports:
67
68 • SQLite
69
70 • MySQL
71
72 • PostgreSQL
73
74 Please contact me if you need support for another database type, I'm
75 always glad to add extensions if you can help me with testing.
76
78 new()
79 Create a new Queue::DBI object.
80
81 my $queue = Queue::DBI->new(
82 'queue_name' => $queue_name,
83 'database_handle' => $dbh,
84 'cleanup_timeout' => 3600,
85 'verbose' => 1,
86 'max_requeue_count' => 5,
87 );
88
89 # Custom table names (optional).
90 my $queue = Queue::DBI->new(
91 'queue_name' => $queue_name,
92 'database_handle' => $dbh,
93 'cleanup_timeout' => 3600,
94 'verbose' => 1,
95 'max_requeue_count' => 5,
96 'queues_table_name' => $custom_queues_table_name,
97 'queue_elements_table_name' => $custom_queue_elements_table_name,
98 );
99
100 Parameters:
101
102 • 'queue_name'
103
104 Mandatory, the name of the queue elements will be added to /
105 removed from.
106
107 • 'database handle'
108
109 Mandatory, a DBI object.
110
111 • 'cleanup_timeout'
112
113 Optional, if set to an integer representing a time in seconds, the
114 module will automatically make available again elements that have
115 been locked longuer than that time.
116
117 • 'verbose'
118
119 Optional, control the verbosity of the warnings in the code. 0 will
120 not display any warning; 1 will only give one line warnings about
121 the current operation; 2 will also usually output the SQL queries
122 performed.
123
124 • 'max_requeue_count'
125
126 By default, Queue:::DBI will retrieve again the queue elements that
127 were requeued without limit to the number of times they have been
128 requeued. Use this option to specify how many times an element can
129 be requeued before it is ignored when retrieving elements.
130
131 • 'queues_table_name'
132
133 By default, Queue::DBI uses a table named 'queues' to store the
134 queue definitions. This allows using your own name, if you want to
135 support separate queuing systems or legacy systems.
136
137 • 'queue_elements_table_name'
138
139 By default, Queue::DBI uses a table named 'queue_elements' to store
140 the queued data. This allows using your own name, if you want to
141 support separate queuing systems or legacy systems.
142
143 • 'lifetime'
144
145 By default, Queue:::DBI will fetch elements regardless of how old
146 they are. Use this option to specify how old (in seconds) an
147 element can be and still be retrieved for processing.
148
149 get_queue_id()
150 Returns the queue ID corresponding to the current queue object.
151
152 my $queue_id = $queue->get_queue_id();
153
154 count()
155 Returns the number of elements in the queue.
156
157 my $elements_count = $queue->count();
158
159 Optional parameter:
160
161 • exclude_locked_elements
162
163 Exclude locked elements from the count. Default 0.
164
165 my $unlocked_elements_count = $queue->count(
166 exclude_locked_elements => 1
167 );
168
169 enqueue()
170 Adds a new element at the end of the current queue.
171
172 # Store a scalar by passing its reference.
173 my $queue_element_id = $queue->enqueue( \$string );
174 my $queue_element_id = $queue->enqueue( \"string" );
175
176 # Store an array reference.
177 my $queue_element_id = $queue->enqueue( [ 1, 2, 3 ] );
178
179 # Store a hash reference.
180 my $queue_element_id = $queue->enqueue( { key => 123 } );
181
182 # Store a complex datastructure.
183 my $queue_element_id = $queue->enqueue(
184 {
185 values => [ 1, 2, 3 ],
186 data => { key1 => 1, key2 => 2 },
187 }
188 );
189
190 The data passed should be a reference to a scalar or a reference to a
191 complex data structure, but you cannot pass a scalar directly. There is
192 otherwise no limitation on the type of data that can be stored as it is
193 serialized for storage in the database.
194
195 next()
196 Retrieves the next element from the queue and returns it in the form of
197 a Queue::DBI::Element object.
198
199 my $queue_element = $queue->next();
200
201 while ( my $queue_element = $queue->next() )
202 {
203 # [...]
204 }
205
206 Additionally, for testing purposes, a list of IDs to use when trying to
207 retrieve elements can be specified using 'search_in_ids':
208
209 my $queue_item = $queue->next( 'search_in_ids' => [ 123, 124, 125 ] );
210
211 retrieve_batch()
212 Retrieves a batch of elements from the queue and returns them in an
213 arrayref.
214
215 This method requires an integer to be passed as parameter to indicate
216 the maximum size of the batch to be retrieved.
217
218 my $queue_elements = $queue->retrieve_batch( 500 );
219
220 foreach ( @$queue_elements )
221 {
222 # [...]
223 }
224
225 Additionally, for testing purposes, a list of IDs to use when trying to
226 retrieve elements can be specified using 'search_in_ids':
227
228 my $queue_items = $queue->retrieve_batch(
229 10,
230 'search_in_ids' => [ 123, 124, 125 ],
231 );
232
233 get_element_by_id()
234 Retrieves a queue element using a queue element ID, ignoring any lock
235 placed on that element.
236
237 This method is mostly useful when doing a lock on an element and then
238 calling success/requeue asynchroneously.
239
240 This method requires a queue element ID to be passed as parameter.
241
242 my $queue_element = $queue->get_element_by_id( 123456 );
243
244 cleanup()
245 Requeue items that have been locked for more than the time in seconds
246 specified as parameter.
247
248 Returns the items requeued so that a specific action can be taken on
249 them.
250
251 my $elements = $queue->cleanup( $time_in_seconds );
252 foreach my $element ( @$elements )
253 {
254 # $element is a Queue::DBI::Element object
255 }
256
257 purge()
258 Remove (permanently, caveat emptor!) queue elements based on how many
259 times they've been requeued or how old they are, and return the number
260 of elements deleted.
261
262 # Remove permanently elements that have been requeued more than 10 times.
263 my $deleted_elements_count = $queue->purge( max_requeue_count => 10 );
264
265 # Remove permanently elements that were created over an hour ago.
266 my $deleted_elements_count = $queue->purge( lifetime => 3600 );
267
268 Important: locked elements are not purged even if they match the
269 criteria, as they are presumed to be currently in process and purging
270 them would create unexpected failures in the application processing
271 them.
272
273 Also note that max_requeue_count and lifetime cannot be combined.
274
276 get_max_requeue_count()
277 Return how many times an element can be requeued before it is ignored
278 when retrieving elements.
279
280 my $max_requeue_count = $queue->get_max_requeue_count();
281
282 set_max_requeue_count()
283 Set the number of time an element can be requeued before it is ignored
284 when retrieving elements. Set it to "undef" to disable the limit.
285
286 # Don't keep pulling the element if it has been requeued more than 5 times.
287 $queue->set_max_requeue_count( 5 );+
288
289 # Retry without limit.
290 $queue->set_max_requeue_count( undef );
291
292 get_lifetime()
293 Return how old an element can be before it is ignored when retrieving
294 elements.
295
296 # Find how old an element can be before the queue will stop retrieving it.
297 my $lifetime = $queue->get_lifetime();
298
299 set_lifetime()
300 Set how old an element can be before it is ignored when retrieving
301 elements.
302
303 Set it to "undef" to reset Queue::DBI back to its default behavior of
304 retrieving elements without time limit.
305
306 # Don't pull queue elements that are more than an hour old.
307 $queue->set_lifetime( 3600 );
308
309 # Pull elements without time limit.
310 $queue->set_lifetime( undef );
311
312 get_verbose()
313 Return the verbosity level, which is used in the module to determine
314 when and what type of debugging statements / information should be
315 warned out.
316
317 See "set_verbose()" for the possible values this function can return.
318
319 warn 'Verbose' if $queue->get_verbose();
320
321 warn 'Very verbose' if $queue->get_verbose() > 1;
322
323 set_verbose()
324 Control the verbosity of the warnings in the code:
325
326 • 0 will not display any warning;
327
328 • 1 will only give one line warnings about the current operation;
329
330 • 2 will also usually output the SQL queries performed.
331
332 $queue->set_verbose(1); # turn on verbose information
333
334 $queue->set_verbose(2); # be extra verbose
335
336 $queue->set_verbose(0); # quiet now!
337
339 freeze()
340 Serialize an element to store it in a SQL "text" column.
341
342 my $frozen_data = $queue->freeze( $data );
343
344 thaw()
345 Deserialize an element which was stored a SQL "text" column.
346
347 my $thawed_data = $queue->thaw( $frozen_data );
348
350 create_tables()
351 Please use "create_tables()" in Queue::DBI::Admin instead.
352
353 Here is an example that shows how to refactor your call to this
354 deprecated function:
355
356 # Load the admin module.
357 use Queue::DBI::Admin;
358
359 # Create the object which will allow managing the queues.
360 my $queues_admin = Queue::DBI::Admin->new(
361 database_handle => $dbh,
362 );
363
364 # Create the tables required by Queue::DBI to store the queues and data.
365 $queues_admin->create_tables(
366 drop_if_exist => $boolean,
367 );
368
369 lifetime()
370 Please use "get_lifetime()" and "set_lifetime()" instead.
371
372 verbose()
373 Please use "get_verbose()" and "set_verbose()" instead.
374
375 max_requeue_count()
376 Please use "get_max_requeue_count()" and "set_max_requeue_count()"
377 instead.
378
380 get_dbh()
381 Returns the database handle used for this queue.
382
383 my $dbh = $queue->get_dbh();
384
385 get_queues_table_name()
386 Returns the name of the table used to store queue definitions.
387
388 my $queues_table_name = $queue->get_queues_table_name();
389
390 get_queue_elements_table_name()
391 Returns the name of the table used to store queue definitions.
392
393 my $queue_elements_table_name = $queue->get_queue_elements_table_name();
394
396 Please report any bugs or feature requests through the web interface at
397 <https://github.com/guillaumeaubert/Queue-DBI/issues/new>. I will be
398 notified, and then you'll automatically be notified of progress on your
399 bug as I make changes.
400
402 You can find documentation for this module with the perldoc command.
403
404 perldoc Queue::DBI
405
406 You can also look for information at:
407
408 • GitHub's request tracker
409
410 <https://github.com/guillaumeaubert/Queue-DBI/issues>
411
412 • AnnoCPAN: Annotated CPAN documentation
413
414 <http://annocpan.org/dist/Queue-DBI>
415
416 • CPAN Ratings
417
418 <http://cpanratings.perl.org/d/Queue-DBI>
419
420 • MetaCPAN
421
422 <https://metacpan.org/release/Queue-DBI>
423
425 Guillaume Aubert <https://metacpan.org/author/AUBERTG>, "<aubertg at
426 cpan.org>".
427
429 I originally developed this project for ThinkGeek
430 (<http://www.thinkgeek.com/>). Thanks for allowing me to open-source
431 it!
432
434 Copyright 2009-2017 Guillaume Aubert.
435
436 This code is free software; you can redistribute it and/or modify it
437 under the same terms as Perl 5 itself.
438
439 This program is distributed in the hope that it will be useful, but
440 WITHOUT ANY WARRANTY; without even the implied warranty of
441 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the LICENSE
442 file for more details.
443
444
445
446perl v5.34.0 2022-01-21 Queue::DBI(3)