1Directory::Queue(3) User Contributed Perl Documentation Directory::Queue(3)
2
3
4
6 Directory::Queue - object oriented interface to a directory based queue
7
9 use Directory::Queue;
10
11 #
12 # simple schema:
13 # - there must be a "body" which is a string
14 # - there can be a "header" which is a table/hash
15 #
16
17 $schema = { "body" => "string", "header" => "table?" };
18 $queuedir = "/tmp/test";
19
20 #
21 # sample producer
22 #
23
24 $dirq = Directory::Queue->new(path => $queuedir, schema => $schema);
25 foreach $count (1 .. 100) {
26 $name = $dirq->add(body => "element $count\n", header => \%ENV);
27 printf("# added element %d as %s\n", $count, $name);
28 }
29
30 #
31 # sample consumer
32 #
33
34 $dirq = Directory::Queue->new(path => $queuedir, schema => $schema);
35 for ($name = $dirq->first(); $name; $name = $dirq->next()) {
36 next unless $dirq->lock($name);
37 printf("# reading element %s\n", $name);
38 %data = $dirq->get($name);
39 # one can use $data{body} and $data{header} here...
40 # one could use $dirq->unlock($name) to only browse the queue...
41 $dirq->remove($name);
42 }
43
45 The goal of this module is to offer a simple queue system using the
46 underlying filesystem for storage, security and to prevent race
47 conditions via atomic operations. It focuses on simplicity, robustness
48 and scalability.
49
50 This module allows multiple concurrent readers and writers to interact
51 with the same queue. A Python implementation of the same algorithm is
52 available at <http://code.google.com/p/dirq> so readers and writers can
53 even be written in different languages.
54
55 There is no knowledge of priority within a queue. If multiple
56 priorities are needed, multiple queues should be used.
57
59 An element is something that contains one or more pieces of data. A
60 simple string may be an element but more complex schemas can also be
61 used, see the "SCHEMA" section for more information.
62
63 A queue is a "best effort FIFO" collection of elements.
64
65 It is very hard to guarantee pure FIFO behavior with multiple writers
66 using the same queue. Consider for instance:
67
68 · Writer1: calls the add() method
69
70 · Writer2: calls the add() method
71
72 · Writer2: the add() method returns
73
74 · Writer1: the add() method returns
75
76 Who should be first in the queue, Writer1 or Writer2?
77
78 For simplicity, this implementation provides only "best effort FIFO",
79 i.e. there is a very high probability that elements are processed in
80 FIFO order but this is not guaranteed. This is achieved by using a
81 high-resolution time function and having elements sorted by the time
82 the element's final directory gets created.
83
85 Adding an element is not a problem because the add() method is atomic.
86
87 In order to support multiple processes interacting with the same queue,
88 advisory locking is used. Processes should first lock an element before
89 working with it. In fact, the get() and remove() methods report a fatal
90 error if they are called on unlocked elements.
91
92 If the process that created the lock dies without unlocking the
93 element, we end up with a staled lock. The purge() method can be used
94 to remove these staled locks.
95
96 An element can basically be in only one of two states: locked or
97 unlocked.
98
99 A newly created element is unlocked as a writer usually does not need
100 to do anything more with the element once dropped in the queue.
101
102 Iterators return all the elements, regardless of their states.
103
104 There is no method to get an element state as this information is
105 usually useless since it may change at any time. Instead, programs
106 should directly try to lock elements to make sure they are indeed
107 locked.
108
110 The new() method can be used to create a Directory::Queue object that
111 will later be used to interact with the queue. The following attributes
112 are supported:
113
114 path
115 the queue toplevel directory (mandatory)
116
117 umask
118 the umask to use when creating files and directories (default: use
119 the running process' umask)
120
121 maxelts
122 the maximum number of elements that an intermediate directory can
123 hold (default: 16,000)
124
125 schema
126 the schema defining how to interpret user supplied data (mandatory
127 if elements are added or read)
128
130 The schema defines how user supplied data is stored in the queue. It is
131 only required by the add() and get() methods.
132
133 The schema must be a reference to a hash containing key/value pairs.
134
135 The key must contain only alphanumerical characters. It identifies the
136 piece of data and will be used as file name when storing the data
137 inside the element directory.
138
139 The value represents the type of the given piece of data. It can be:
140
141 binary
142 the data is a sequence of binary bytes, it will be stored directly
143 in a plain file with no further encoding
144
145 string
146 the data is a text string (i.e. a sequence of characters), it will
147 be UTF-8 encoded
148
149 table
150 the data is a reference to a hash of text strings, it will be
151 serialized and UTF-8 encoded before being stored in a file
152
153 By default, all pieces of data are mandatory. If you append a question
154 mark to the type, this piece of data will be marked as optional. See
155 the comments in the "SYNOPSIS" section for more information.
156
158 The following methods are available:
159
160 new()
161 return a new Directory::Queue object (class method)
162
163 copy()
164 return a copy of the object; this can be useful to have independent
165 iterators on the same queue
166
167 path()
168 return the queue toplevel path
169
170 id()
171 return a unique identifier for the queue
172
173 count()
174 return the number of elements in the queue
175
176 first()
177 return the first element in the queue, resetting the iterator;
178 return an empty string if the queue is empty
179
180 next()
181 return the next element in the queue, incrementing the iterator;
182 return an empty string if there is no next element
183
184 add(DATA)
185 add the given data (a hash) to the queue and return the
186 corresponding element name; the schema must be known and the data
187 must conform to it
188
189 lock(ELEMENT[, PERMISSIVE])
190 attempt to lock the given element and return true on success; if
191 the PERMISSIVE option is true (which is the default), it is not a
192 fatal error if the element cannot be locked and false is returned
193
194 unlock(ELEMENT[, PERMISSIVE])
195 attempt to unlock the given element and return true on success; if
196 the PERMISSIVE option is true (which is not the default), it is not
197 a fatal error if the element cannot be unlocked and false is
198 returned
199
200 remove(ELEMENT)
201 remove the given element (which must be locked) from the queue
202
203 get(ELEMENT)
204 get the data from the given element (which must be locked) and
205 return basically the same hash as what add() used; the schema must
206 be known
207
208 purge([OPTIONS])
209 purge the queue by removing unused intermediate directories,
210 removing too old temporary elements and unlocking too old locked
211 elements (aka staled locks); note: this can take a long time on
212 queues with many elements; OPTIONS can be:
213
214 maxtemp
215 maximum time for a temporary element (in seconds, default 300);
216 if set to 0, temporary elements will not be removed
217
218 maxlock
219 maximum time for a locked element (in seconds, default 600); if
220 set to 0, locked elements will not be unlocked
221
223 All the directories holding the elements and all the files holding the
224 data pieces are located under the queue toplevel directory. This
225 directory can contain:
226
227 temporary
228 the directory holding temporary elements, i.e. the elements being
229 added
230
231 obsolete
232 the directory holding obsolete elements, i.e. the elements being
233 removed
234
235 NNNNNNNN
236 an intermediate directory holding elements; NNNNNNNN is an 8-digits
237 long hexadecimal number
238
239 In any of the above directories, an element is stored as a single
240 directory with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
241
242 SSSSSSSS
243 represents the number of seconds since the Epoch
244
245 MMMMM
246 represents the microsecond part of the time since the Epoch
247
248 R is a random digit used to reduce name collisions
249
250 Finally, inside an element directory, the different pieces of data are
251 stored into different files, named according to the schema. A locked
252 element contains in addition a directory named "locked".
253
255 There are no specific security mechanisms in this module.
256
257 The elements are stored as plain files and directories. The filesystem
258 security features (owner, group, permissions, ACLs...) should be used
259 to adequately protect the data.
260
261 By default, the process' umask is respected. See the class constructor
262 documentation if you want an other behavior.
263
264 If multiple readers and writers with different uids are expected, the
265 easiest solution is to have all the files and directories inside the
266 toplevel directory world-writable (i.e. umask=0). Then, the permissions
267 of the toplevel directory itself (e.g. group-writable) are enough to
268 control who can access the queue.
269
271 Lionel Cons <http://cern.ch/lionel.cons>
272
273 Copyright CERN 2010-2011
274
275
276
277perl v5.12.3 2011-05-02 Directory::Queue(3)