1IPC::DirQueue(3)      User Contributed Perl Documentation     IPC::DirQueue(3)
2
3
4

NAME

6       IPC::DirQueue - disk-based many-to-many task queue
7

SYNOPSIS

9           my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
10           $dq->enqueue_file("filename");
11
12           my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
13           my $job = $dq->pickup_queued_job();
14           if (!$job) { print "no jobs left\n"; exit; }
15           # ...do something interesting with $job->get_data_path() ...
16           $job->finish();
17

DESCRIPTION

19       This module implements a FIFO queueing infrastructure, using a
20       directory as the communications and storage media.  No daemon process
21       is required to manage the queue; all communication takes place via the
22       filesystem.
23
24       A common UNIX system design pattern is to use a tool like "lpr" as a
25       task queueing system; for example,
26       "http://patrick.wagstrom.net/old/weblog/archives/000128.html" describes
27       the use of "lpr" as an MP3 jukebox.
28
29       However, "lpr" isn't as efficient as it could be.  When used in this
30       way, you have to restart each task processor for every new task.  If
31       you have a lot of startup overhead, this can be very inefficient.
32       With "IPC::DirQueue", a processing server can run persistently and
33       cache data needed across multiple tasks efficiently; it will not be
34       restarted unless you restart it.
35
36       Multiple enqueueing and dequeueing processes on multiple hosts (NFS-
37       safe locking is used) can run simultaneously, and safely, on the same
38       queue.
39
40       Since multiple dequeuers can run simultaneously, this provides a good
41       way to process a variable level of incoming tasks using a pre-defined
42       number of worker processes.
43
44       If you need more CPU power working on a queue, you can simply start
45       another dequeuer to help out.  If you need less, kill off a few
46       dequeuers.
47
48       If you need to take down the server to perform some maintainance or
49       upgrades, just kill the dequeuer processes, perform the work, and start
50       up new ones. Since there's no 'socket' or similar point of failure
51       aside from the directory itself, the queue will just quietly fill with
52       waiting jobs until the new dequeuer is ready.
53
54       Arbitrary 'name = value' string-pair metadata can be transferred
55       alongside data files.   In fact, in some cases, you may find it easier
56       to send unused and empty data files, and just use the 'metadata' fields
57       to transfer the details of what will be worked on.
58

METHODS

60       $dq->new ($opts);
61           Create a new queue object, suitable for either enqueueing jobs or
62           picking up already-queued jobs for processing.
63
64           $opts is a reference to a hash, which may contain the following
65           options:
66
67           dir => $path_to_directory (no default)
68               Name the directory where the queue files are stored.  This is
69               required.
70
71           data_file_mode => $mode (default: 0666)
72               The "chmod"-style file mode for data files.  This should be
73               specified as a string with a leading 0.  It will be affected by
74               the current process "umask".
75
76           queue_file_mode => $mode (default: 0666)
77               The "chmod"-style file mode for queue control files.  This
78               should be specified as a string with a leading 0.  It will be
79               affected by the current process "umask".
80
81           ordered => { 0 | 1 } (default: 1)
82               Whether the jobs should be processed in order of submission, or
83               in no particular order.
84
85           queue_fanout => { 0 | 1 } (default: 0)
86               Whether the queue directory should be 'fanned out'.  This
87               allows better scalability with NFS-shared queues with large
88               numbers of pending files, but hurts performance otherwise.   It
89               also implies ordered = 0. (This is strictly experimental, has
90               overall poor performance, and is not recommended.)
91
92           indexd_uri => $uri (default: undef)
93               A URI of a "dq-indexd" daemon, used to maintain the list of
94               waiting jobs.  The URI must be of the form
95               "dq://hostname[:port]" . (This is strictly experimental, and is
96               not recommended.)
97
98           buf_size => $number (default: 65536)
99               The buffer size to use when copying files, in bytes.
100
101           active_file_lifetime => $number (default: 600)
102               The lifetime of an untouched active lockfile, in seconds.  See
103               'STALE LOCKS AND SIGNAL HANDLING', below, for more details.
104
105       $dq->enqueue_file ($filename [, $metadata [, $pri] ] );
106           Enqueue a new job for processing. Returns 1 if the job was
107           enqueued, or "undef" on failure.
108
109           $filename is the path to the file to be enqueued.  Its contents
110           will be read, and will be used as the contents of the data file
111           available to dequeuers using "IPC::DirQueue::Job::get_data_path()".
112
113           $metadata is an optional hash reference; every item of metadata
114           will be available to worker processes on the "IPC::DirQueue::Job"
115           object, in the "$job->{metadata}" hashref.  Note that using this
116           channel for metadata brings with it several restrictions:
117
118           1. it requires that the metadata be stored as 'name' => 'value'
119           string pairs
120           2. neither 'name' nor 'value' may contain newline (\n) or NUL (\0)
121           characters
122           3. 'name' cannot contain colon (:) characters
123           4. 'name' cannot start with a capital letter 'Q' and be 4
124           characters in length
125
126           If those restrictions are broken, die() will be called with the
127           following error:
128
129                 die "IPC::DirQueue: invalid metadatum: '$k'";
130
131           This is a change added in release 0.06; prior to that, that
132           metadatum would be silently dropped.
133
134           An optional priority can be specified; lower priorities are run
135           first.  Priorities range from 0 to 99, and 50 is default.
136
137       $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] );
138           Enqueue a new job for processing. Returns 1 if the job was
139           enqueued, or "undef" on failure. $pri and $metadata are as
140           described in "$dq->enqueue_file()".
141
142           $filehandle is a perl file handle that must be open for reading.
143           It will be closed on completion, regardless of success or failure.
144           Its contents will be read, and will be used as the contents of the
145           data file available to dequeuers using
146           "IPC::DirQueue::Job::get_data_path()".
147
148       $dq->enqueue_string ($string [, $metadata [, $pri] ] );
149           Enqueue a new job for processing.  The job data is entirely read
150           from $string. Returns 1 if the job was enqueued, or "undef" on
151           failure.  $pri and $metadata are as described in
152           "$dq->enqueue_file()".
153
154       $dq->enqueue_sub ($subref [, $metadata [, $pri] ] );
155           Enqueue a new job for processing. Returns 1 if the job was
156           enqueued, or "undef" on failure. $pri and $metadata are as
157           described in "$dq->enqueue_file()".
158
159           $subref is a perl subroutine, which is expected to return one of
160           the following each time it is called:
161
162               - a string of data bytes to be appended to any existing data.  (the
163                 string may be empty, C<''>, in which case it's a no-op.)
164
165               - C<undef> when the enqueued data has ended, ie. EOF.
166
167               - C<die()> if an error occurs.  The C<die()> message will be converted into
168                 a warning, and the C<enqueue_sub()> call will return C<undef>.
169
170           (Tip: note that this is a closure, so variables outside the
171           subroutine can be accessed safely.)
172
173       $job = $dq->pickup_queued_job( [ path => $path ] );
174           Pick up the next job in the queue, so that it can be processed.
175
176           If no job is available for processing, either because the queue is
177           empty or because other worker processes are already working on
178           them, "undef" is returned; otherwise, a new instance of
179           "IPC::DirQueue::Job" is returned.
180
181           Note that the job is marked as active until "$job->finish()" is
182           called.
183
184           If the (optional) parameter "path" is used, its value indicates the
185           path of the desired job's data file. By using this, it is possible
186           to cancel not-yet-active items from anywhere in the queue, or pick
187           up jobs out of sequence.  The data path must match the value of the
188           pathqueue member of the "IPC::DirQueue::Job" object passed to the
189           "visit_all_jobs()" callback.
190
191       $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);
192           Wait for a job to be queued within the next $timeout seconds.
193
194           If there is already a job ready for processing, this will return
195           immediately.  If one is not available, it will sleep, wake up
196           periodically, check for job availabilty, and either carry on
197           sleeping or return the new job if one is now available.
198
199           If a job becomes available, a new instance of "IPC::DirQueue::Job"
200           is returned. If the timeout is reached, "undef" is returned.
201
202           If $timeout is not specified, or is less than 1, this function will
203           wait indefinitely.
204
205           The optional parameter $pollinterval indicates how frequently to
206           wake up and check for new jobs.  It is specified in seconds, and
207           floating-point precision is supported.  The default is 1.
208
209           Note that if $timeout is not a round multiple of $pollinterval, the
210           nearest round multiple of $pollinterval greater than $timeout will
211           be used instead.  Also note that $timeout is used as an integer.
212
213       $dq->visit_all_jobs($visitor, $visitcontext);
214           Visit all the jobs in the queue, in a read-only mode.  Used to list
215           the entire queue.
216
217           The callback function $visitor will be called for each job in the
218           queue, like so:
219
220             &$visitor ($visitcontext, $job);
221
222           $visitcontext is whatever you pass in that variable above.  $job is
223           a new, read-only instance of "IPC::DirQueue::Job" representing that
224           job.
225
226           If a job is active (being processed), the $job object also contains
227           the following additional data:
228
229             'active_host': the hostname on which the job is active
230             'active_pid': the process ID of the process which picked up the job
231

STALE LOCKS AND SIGNAL HANDLING

233       If interrupted or terminated, dequeueing processes should be careful to
234       either call "$job->finish()" or "$job->return_to_queue()" on any active
235       tasks before exiting -- otherwise those jobs will remain marked active.
236
237       Dequeueing processes can also call "$job->touch_active_lock()"
238       periodically, while processing large tasks, to ensure that the task is
239       still marked as active.
240
241       Stale locks are normally dealt with automatically.  If a lock is still
242       active after about 10 minutes of inactivity, the other dequeuers on
243       that machine will probe the process ID listed in that lock file using
244       kill(0).  If that process ID is no longer running, the lock is presumed
245       likely to be stale. If a given timeout (10 minutes plus a random value
246       between 0 and 256 seconds) has elapsed since the lock file was last
247       modified, the lock file is deleted.
248
249       This 10-minute default can be modified using the "active_file_lifetime"
250       parameter to the "IPC::DirQueue" constructor.
251
252       Note: this means that if the dequeueing processes are spread among
253       multiple machines, and there is no longer a dequeuer running on the
254       machine that initially 'locked' the task, it will never be unlocked,
255       unless you delete the active file for that task.
256

QUEUE DIRECTORY STRUCTURE

258       "IPC::DirQueue" maintains the following structure for a queue
259       directory:
260
261       queue directory
262           The queue directory is used to store the queue control files.
263           Queue control files determine what jobs are in the queue; if a job
264           has a queue control file in this directory, it is listed in the
265           queue.
266
267           The filename format is as follows:
268
269               50.20040909232529941258.HASH[.PID.RAND]
270
271           The first two digits (50) are the priority of the job.  Lower
272           priority numbers are run first.  20040909232529 is the current date
273           and time when the enqueueing process was run, in "YYYYMMDDHHMMSS"
274           format.   941258 is the time in microseconds, as returned by
275           "gettimeofday()".  And finally, "HASH" is a variable-length hash of
276           some semi-random data, used to increase the chance of uniqueness.
277
278           If there is a collision, the timestamps are regenerated after a 250
279           msec sleep, and further randomness will be added at the end of the
280           string (namely, the current process ID and a random integer value).
281           Up to 10 retries will be attempted.  Once the file is atomically
282           moved into the queue directory without collision, the retries
283           cease.
284
285           If queue_fanout was used in the "IPC::DirQueue" constructor, then
286           the queue directory does not contain the queue control files
287           directly; instead, there is an interposing set of 16 "fan-out"
288           directories, named according to the hex digits from 0 to "f".
289
290       active directory
291           The active directory is used to store active queue control files.
292
293           When a job becomes 'active' -- ie. is picked up by
294           "pickup_queued_job()" -- its control file is moved from the queue
295           directory into the active directory while it is processed.
296
297       data directory
298           The data directory is used to store enqueued data files.
299
300           It contains a two-level "fan-out" hashed directory structure; each
301           data file is stored under a single-letter directory, which in turn
302           is under a single-letter directory.   This increases the efficiency
303           of directory lookups under many filesystems.
304
305           The format of filenames here is similar to that used in the queue
306           directory, except that the last two characters are removed and used
307           instead for the "fan-out" directory names.
308
309       tmp directory
310           The tmp directory contains temporary work files that are in the
311           process of enqueueing, and not ready ready for processing.
312
313           The filename format here is similar to the above, with suffixes
314           indicating the type of file (".ctrl", ".data").
315
316       Atomic, NFS-safe renaming is used to avoid collisions, overwriting or
317       other unsafe operations.
318

SEE ALSO

320       "IPC::DirQueue::Job"
321

AUTHOR

323       Justin Mason <dq /at/ jmason.org>
324

MAILING LIST

326       The IPC::DirQueue mailing list is at <ipc-dirqueue-subscribe@perl.org>.
327
329       "IPC::DirQueue" is distributed under the same license as perl itself.
330

AVAILABILITY

332       The latest version of this library is likely to be available from CPAN.
333
334
335
336perl v5.28.1                      2008-04-18                  IPC::DirQueue(3)
Impressum