1Directory::Queue::NormaUls(e3r)Contributed Perl DocumentDaitrieocntory::Queue::Normal(3)
2
3
4
6 Directory::Queue::Normal - object oriented interface to a normal
7 directory based queue
8
10 use Directory::Queue::Normal;
11
12 #
13 # simple schema:
14 # - there must be a "body" which is a string
15 # - there can be a "header" which is a table/hash
16 #
17
18 $schema = { "body" => "string", "header" => "table?" };
19 $queuedir = "/tmp/test";
20
21 #
22 # sample producer
23 #
24
25 $dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
26 foreach $count (1 .. 100) {
27 $name = $dirq->add(body => "element $count\n", header => \%ENV);
28 printf("# added element %d as %s\n", $count, $name);
29 }
30
31 #
32 # sample consumer (one pass only)
33 #
34
35 $dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
36 for ($name = $dirq->first(); $name; $name = $dirq->next()) {
37 next unless $dirq->lock($name);
38 printf("# reading element %s\n", $name);
39 %data = $dirq->get($name);
40 # one can use $data{body} and $data{header} here...
41 # one could use $dirq->unlock($name) to only browse the queue...
42 $dirq->remove($name);
43 }
44
45 #
46 # looping consumer (sleeping to avoid using all CPU time)
47 #
48
49 $dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
50 while (1) {
51 sleep(1) unless $dirq->count();
52 for ($name = $dirq->first(); $name; $name = $dirq->next()) {
53 ... same as above ...
54 }
55 }
56
58 The goal of this module is to offer a "normal" (as opposed to "simple")
59 queue system using the underlying filesystem for storage, security and
60 to prevent race conditions via atomic operations.
61
62 It allows arbitrary data to be stored (see the "SCHEMA" section for
63 more information) but it has a significant disk space and speed
64 overhead.
65
66 Please refer to Directory::Queue for general information about
67 directory queues.
68
70 The new() method can be used to create a Directory::Queue::Normal
71 object that will later be used to interact with the queue. The
72 following attributes are supported:
73
74 path
75 the queue toplevel directory (mandatory)
76
77 rndhex
78 the "random" hexadecimal digit to use in element names (aka R) as a
79 number between 0 and 15 (default: randomly generated)
80
81 umask
82 the umask to use when creating files and directories (default: use
83 the running process' umask)
84
85 maxelts
86 the maximum number of elements that an intermediate directory can
87 hold (default: 16,000)
88
89 maxlock
90 default maximum time for a locked element (in seconds, default 600)
91 as used by the purge() method
92
93 maxtemp
94 default maximum time for a temporary element (in seconds, default
95 300) as used by the purge() method
96
97 nlink
98 flag indicating that the "nlink optimization" (faster but only
99 working on some filesystems) will be used
100
101 schema
102 the schema defining how to interpret user supplied data (mandatory
103 if elements are added or read)
104
106 The schema defines how user supplied data is stored in the queue. It is
107 only required by the add() and get() methods.
108
109 The schema must be a reference to a hash containing key/value pairs.
110
111 The key must contain only alphanumerical characters. It identifies the
112 piece of data and will be used as file name when storing the data
113 inside the element directory.
114
115 The value represents the type of the given piece of data. It can be:
116
117 binary
118 the data is a binary string (i.e. a sequence of bytes), it will be
119 stored directly in a plain file with no further encoding
120
121 string
122 the data is a text string (i.e. a sequence of characters), it will
123 be UTF-8 encoded before being stored in a file
124
125 table
126 the data is a reference to a hash of text strings, it will be
127 serialized and UTF-8 encoded before being stored in a file
128
129 By default, all pieces of data are mandatory. If you append a question
130 mark to the type, this piece of data will be marked as optional. See
131 the comments in the "SYNOPSIS" section for an example.
132
133 By default, string or binary data is used directly. If you append an
134 asterisk to the type, the data that you add or get will be by
135 reference. This can be useful to avoid string copies of large amounts
136 of data.
137
139 The following methods are available:
140
141 new()
142 return a new Directory::Queue::Normal object (class method)
143
144 copy()
145 return a copy of the object; this can be useful to have independent
146 iterators on the same queue
147
148 path()
149 return the queue toplevel path
150
151 id()
152 return a unique identifier for the queue
153
154 count()
155 return the number of elements in the queue
156
157 first()
158 return the first element in the queue, resetting the iterator;
159 return an empty string if the queue is empty
160
161 next()
162 return the next element in the queue, incrementing the iterator;
163 return an empty string if there is no next element
164
165 add(DATA)
166 add the given data (a hash or hash reference) to the queue and
167 return the corresponding element name; the schema must be known and
168 the data must conform to it
169
170 lock(ELEMENT[, PERMISSIVE])
171 attempt to lock the given element and return true on success; if
172 the PERMISSIVE option is true (which is the default), it is not a
173 fatal error if the element cannot be locked and false is returned
174
175 unlock(ELEMENT[, PERMISSIVE])
176 attempt to unlock the given element and return true on success; if
177 the PERMISSIVE option is true (which is not the default), it is not
178 a fatal error if the element cannot be unlocked and false is
179 returned
180
181 touch(ELEMENT)
182 update the access and modification times on the element's directory
183 to indicate that it is still being used; this is useful for
184 elements that are locked for long periods of time (see the purge()
185 method)
186
187 remove(ELEMENT)
188 remove the given element (which must be locked) from the queue
189
190 get(ELEMENT)
191 get the data from the given element (which must be locked) and
192 return basically the same hash as what add() got (in list context,
193 the hash is returned directly while in scalar context, the hash
194 reference is returned instead); the schema must be knownand the
195 data must conform to it
196
197 purge([OPTIONS])
198 purge the queue by removing unused intermediate directories,
199 removing too old temporary elements and unlocking too old locked
200 elements (aka staled locks); note: this can take a long time on
201 queues with many elements; OPTIONS can be:
202
203 maxtemp
204 maximum time for a temporary element (in seconds); if set to 0,
205 temporary elements will not be removed
206
207 maxlock
208 maximum time for a locked element (in seconds); if set to 0,
209 locked elements will not be unlocked
210
212 All the directories holding the elements and all the files holding the
213 data pieces are located under the queue toplevel directory. This
214 directory can contain:
215
216 temporary
217 the directory holding temporary elements, i.e. the elements being
218 added
219
220 obsolete
221 the directory holding obsolete elements, i.e. the elements being
222 removed
223
224 NNNNNNNN
225 an intermediate directory holding elements; NNNNNNNN is an 8-digits
226 long hexadecimal number
227
228 In any of the above directories, an element is stored as a single
229 directory with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
230
231 SSSSSSSS
232 represents the number of seconds since the Epoch
233
234 MMMMM
235 represents the microsecond part of the time since the Epoch
236
237 R is a random hexadecimal digit used to reduce name collisions
238
239 Finally, inside an element directory, the different pieces of data are
240 stored into different files, named according to the schema. A locked
241 element contains in addition a directory named "locked".
242
244 Directory::Queue.
245
247 Lionel Cons <http://cern.ch/lionel.cons>
248
249 Copyright (C) CERN 2010-2022
250
251
252
253perl v5.36.0 2023-01-20 Directory::Queue::Normal(3)