1IPC::DirQueue(3) User Contributed Perl Documentation IPC::DirQueue(3)
2
3
4
6 IPC::DirQueue - disk-based many-to-many task queue
7
9 my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
10 $dq->enqueue_file("filename");
11
12 my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
13 my $job = $dq->pickup_queued_job();
14 if (!$job) { print "no jobs left\n"; exit; }
15 # ...do something interesting with $job->get_data_path() ...
16 $job->finish();
17
19 This module implements a FIFO queueing infrastructure, using a
20 directory as the communications and storage media. No daemon process
21 is required to manage the queue; all communication takes place via the
22 filesystem.
23
24 A common UNIX system design pattern is to use a tool like "lpr" as a
25 task queueing system; for example,
26 "http://patrick.wagstrom.net/old/weblog/archives/000128.html" describes
27 the use of "lpr" as an MP3 jukebox.
28
29 However, "lpr" isn't as efficient as it could be. When used in this
30 way, you have to restart each task processor for every new task. If
31 you have a lot of startup overhead, this can be very inefficient.
32 With "IPC::DirQueue", a processing server can run persistently and
33 cache data needed across multiple tasks efficiently; it will not be
34 restarted unless you restart it.
35
36 Multiple enqueueing and dequeueing processes on multiple hosts (NFS-
37 safe locking is used) can run simultaneously, and safely, on the same
38 queue.
39
40 Since multiple dequeuers can run simultaneously, this provides a good
41 way to process a variable level of incoming tasks using a pre-defined
42 number of worker processes.
43
44 If you need more CPU power working on a queue, you can simply start
45 another dequeuer to help out. If you need less, kill off a few
46 dequeuers.
47
48 If you need to take down the server to perform some maintainance or
49 upgrades, just kill the dequeuer processes, perform the work, and start
50 up new ones. Since there's no 'socket' or similar point of failure
51 aside from the directory itself, the queue will just quietly fill with
52 waiting jobs until the new dequeuer is ready.
53
54 Arbitrary 'name = value' string-pair metadata can be transferred
55 alongside data files. In fact, in some cases, you may find it easier
56 to send unused and empty data files, and just use the 'metadata' fields
57 to transfer the details of what will be worked on.
58
60 $dq->new ($opts);
61 Create a new queue object, suitable for either enqueueing jobs or
62 picking up already-queued jobs for processing.
63
64 $opts is a reference to a hash, which may contain the following
65 options:
66
67 dir => $path_to_directory (no default)
68 Name the directory where the queue files are stored. This is
69 required.
70
71 data_file_mode => $mode (default: 0666)
72 The "chmod"-style file mode for data files. This should be
73 specified as a string with a leading 0. It will be affected by
74 the current process "umask".
75
76 queue_file_mode => $mode (default: 0666)
77 The "chmod"-style file mode for queue control files. This
78 should be specified as a string with a leading 0. It will be
79 affected by the current process "umask".
80
81 ordered => { 0 | 1 } (default: 1)
82 Whether the jobs should be processed in order of submission, or
83 in no particular order.
84
85 queue_fanout => { 0 | 1 } (default: 0)
86 Whether the queue directory should be 'fanned out'. This
87 allows better scalability with NFS-shared queues with large
88 numbers of pending files, but hurts performance otherwise. It
89 also implies ordered = 0. (This is strictly experimental, has
90 overall poor performance, and is not recommended.)
91
92 indexd_uri => $uri (default: undef)
93 A URI of a "dq-indexd" daemon, used to maintain the list of
94 waiting jobs. The URI must be of the form
95 "dq://hostname[:port]" . (This is strictly experimental, and is
96 not recommended.)
97
98 buf_size => $number (default: 65536)
99 The buffer size to use when copying files, in bytes.
100
101 active_file_lifetime => $number (default: 600)
102 The lifetime of an untouched active lockfile, in seconds. See
103 'STALE LOCKS AND SIGNAL HANDLING', below, for more details.
104
105 $dq->enqueue_file ($filename [, $metadata [, $pri] ] );
106 Enqueue a new job for processing. Returns 1 if the job was
107 enqueued, or "undef" on failure.
108
109 $filename is the path to the file to be enqueued. Its contents
110 will be read, and will be used as the contents of the data file
111 available to dequeuers using IPC::DirQueue::Job::get_data_path().
112
113 $metadata is an optional hash reference; every item of metadata
114 will be available to worker processes on the "IPC::DirQueue::Job"
115 object, in the "$job->{metadata}" hashref. Note that using this
116 channel for metadata brings with it several restrictions:
117
118 1. it requires that the metadata be stored as 'name' => 'value'
119 string pairs
120 2. neither 'name' nor 'value' may contain newline (\n) or NUL (\0)
121 characters
122 3. 'name' cannot contain colon (:) characters
123 4. 'name' cannot start with a capital letter 'Q' and be 4
124 characters in length
125
126 If those restrictions are broken, die() will be called with the
127 following error:
128
129 die "IPC::DirQueue: invalid metadatum: '$k'";
130
131 This is a change added in release 0.06; prior to that, that
132 metadatum would be silently dropped.
133
134 An optional priority can be specified; lower priorities are run
135 first. Priorities range from 0 to 99, and 50 is default.
136
137 $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] );
138 Enqueue a new job for processing. Returns 1 if the job was
139 enqueued, or "undef" on failure. $pri and $metadata are as
140 described in "$dq->enqueue_file()".
141
142 $filehandle is a perl file handle that must be open for reading.
143 It will be closed on completion, regardless of success or failure.
144 Its contents will be read, and will be used as the contents of the
145 data file available to dequeuers using
146 IPC::DirQueue::Job::get_data_path().
147
148 $dq->enqueue_string ($string [, $metadata [, $pri] ] );
149 Enqueue a new job for processing. The job data is entirely read
150 from $string. Returns 1 if the job was enqueued, or "undef" on
151 failure. $pri and $metadata are as described in
152 "$dq->enqueue_file()".
153
154 $dq->enqueue_sub ($subref [, $metadata [, $pri] ] );
155 Enqueue a new job for processing. Returns 1 if the job was
156 enqueued, or "undef" on failure. $pri and $metadata are as
157 described in "$dq->enqueue_file()".
158
159 $subref is a perl subroutine, which is expected to return one of
160 the following each time it is called:
161
162 - a string of data bytes to be appended to any existing data. (the
163 string may be empty, C<''>, in which case it's a no-op.)
164
165 - C<undef> when the enqueued data has ended, ie. EOF.
166
167 - C<die()> if an error occurs. The C<die()> message will be converted into
168 a warning, and the C<enqueue_sub()> call will return C<undef>.
169
170 (Tip: note that this is a closure, so variables outside the
171 subroutine can be accessed safely.)
172
173 $job = $dq->pickup_queued_job( [ path => $path ] );
174 Pick up the next job in the queue, so that it can be processed.
175
176 If no job is available for processing, either because the queue is
177 empty or because other worker processes are already working on
178 them, "undef" is returned; otherwise, a new instance of
179 "IPC::DirQueue::Job" is returned.
180
181 Note that the job is marked as active until "$job->finish()" is
182 called.
183
184 If the (optional) parameter "path" is used, its value indicates the
185 path of the desired job's data file. By using this, it is possible
186 to cancel not-yet-active items from anywhere in the queue, or pick
187 up jobs out of sequence. The data path must match the value of the
188 pathqueue member of the "IPC::DirQueue::Job" object passed to the
189 visit_all_jobs() callback.
190
191 $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);
192 Wait for a job to be queued within the next $timeout seconds.
193
194 If there is already a job ready for processing, this will return
195 immediately. If one is not available, it will sleep, wake up
196 periodically, check for job availabilty, and either carry on
197 sleeping or return the new job if one is now available.
198
199 If a job becomes available, a new instance of "IPC::DirQueue::Job"
200 is returned. If the timeout is reached, "undef" is returned.
201
202 If $timeout is not specified, or is less than 1, this function will
203 wait indefinitely.
204
205 The optional parameter $pollinterval indicates how frequently to
206 wake up and check for new jobs. It is specified in seconds, and
207 floating-point precision is supported. The default is 1.
208
209 Note that if $timeout is not a round multiple of $pollinterval, the
210 nearest round multiple of $pollinterval greater than $timeout will
211 be used instead. Also note that $timeout is used as an integer.
212
213 $dq->visit_all_jobs($visitor, $visitcontext);
214 Visit all the jobs in the queue, in a read-only mode. Used to list
215 the entire queue.
216
217 The callback function $visitor will be called for each job in the
218 queue, like so:
219
220 &$visitor ($visitcontext, $job);
221
222 $visitcontext is whatever you pass in that variable above. $job is
223 a new, read-only instance of "IPC::DirQueue::Job" representing that
224 job.
225
226 If a job is active (being processed), the $job object also contains
227 the following additional data:
228
229 'active_host': the hostname on which the job is active
230 'active_pid': the process ID of the process which picked up the job
231
233 If interrupted or terminated, dequeueing processes should be careful to
234 either call "$job->finish()" or "$job->return_to_queue()" on any active
235 tasks before exiting -- otherwise those jobs will remain marked active.
236
237 Dequeueing processes can also call "$job->touch_active_lock()"
238 periodically, while processing large tasks, to ensure that the task is
239 still marked as active.
240
241 Stale locks are normally dealt with automatically. If a lock is still
242 active after about 10 minutes of inactivity, the other dequeuers on
243 that machine will probe the process ID listed in that lock file using
244 kill(0). If that process ID is no longer running, the lock is presumed
245 likely to be stale. If a given timeout (10 minutes plus a random value
246 between 0 and 256 seconds) has elapsed since the lock file was last
247 modified, the lock file is deleted.
248
249 This 10-minute default can be modified using the "active_file_lifetime"
250 parameter to the "IPC::DirQueue" constructor.
251
252 Note: this means that if the dequeueing processes are spread among
253 multiple machines, and there is no longer a dequeuer running on the
254 machine that initially 'locked' the task, it will never be unlocked,
255 unless you delete the active file for that task.
256
258 "IPC::DirQueue" maintains the following structure for a queue
259 directory:
260
261 queue directory
262 The queue directory is used to store the queue control files.
263 Queue control files determine what jobs are in the queue; if a job
264 has a queue control file in this directory, it is listed in the
265 queue.
266
267 The filename format is as follows:
268
269 50.20040909232529941258.HASH[.PID.RAND]
270
271 The first two digits (50) are the priority of the job. Lower
272 priority numbers are run first. 20040909232529 is the current date
273 and time when the enqueueing process was run, in "YYYYMMDDHHMMSS"
274 format. 941258 is the time in microseconds, as returned by
275 gettimeofday(). And finally, "HASH" is a variable-length hash of
276 some semi-random data, used to increase the chance of uniqueness.
277
278 If there is a collision, the timestamps are regenerated after a 250
279 msec sleep, and further randomness will be added at the end of the
280 string (namely, the current process ID and a random integer value).
281 Up to 10 retries will be attempted. Once the file is atomically
282 moved into the queue directory without collision, the retries
283 cease.
284
285 If queue_fanout was used in the "IPC::DirQueue" constructor, then
286 the queue directory does not contain the queue control files
287 directly; instead, there is an interposing set of 16 "fan-out"
288 directories, named according to the hex digits from 0 to "f".
289
290 active directory
291 The active directory is used to store active queue control files.
292
293 When a job becomes 'active' -- ie. is picked up by
294 pickup_queued_job() -- its control file is moved from the queue
295 directory into the active directory while it is processed.
296
297 data directory
298 The data directory is used to store enqueued data files.
299
300 It contains a two-level "fan-out" hashed directory structure; each
301 data file is stored under a single-letter directory, which in turn
302 is under a single-letter directory. This increases the efficiency
303 of directory lookups under many filesystems.
304
305 The format of filenames here is similar to that used in the queue
306 directory, except that the last two characters are removed and used
307 instead for the "fan-out" directory names.
308
309 tmp directory
310 The tmp directory contains temporary work files that are in the
311 process of enqueueing, and not ready ready for processing.
312
313 The filename format here is similar to the above, with suffixes
314 indicating the type of file (".ctrl", ".data").
315
316 Atomic, NFS-safe renaming is used to avoid collisions, overwriting or
317 other unsafe operations.
318
320 "IPC::DirQueue::Job"
321
323 Justin Mason <dq /at/ jmason.org>
324
326 The IPC::DirQueue mailing list is at <ipc-dirqueue-subscribe@perl.org>.
327
329 "IPC::DirQueue" is distributed under the same license as perl itself.
330
332 The latest version of this library is likely to be available from CPAN.
333
334
335
336perl v5.38.0 2023-07-20 IPC::DirQueue(3)