1Directory::Queue::NormaUls(e3r)Contributed Perl DocumentDaitrieocntory::Queue::Normal(3)
2
3
4
6 Directory::Queue::Normal - object oriented interface to a normal
7 directory based queue
8
10 use Directory::Queue::Normal;
11
12 #
13 # simple schema:
14 # - there must be a "body" which is a string
15 # - there can be a "header" which is a table/hash
16 #
17
18 $schema = { "body" => "string", "header" => "table?" };
19 $queuedir = "/tmp/test";
20
21 #
22 # sample producer
23 #
24
25 $dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
26 foreach $count (1 .. 100) {
27 $name = $dirq->add(body => "element $count\n", header => \%ENV);
28 printf("# added element %d as %s\n", $count, $name);
29 }
30
31 #
32 # sample consumer (one pass only)
33 #
34
35 $dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
36 for ($name = $dirq->first(); $name; $name = $dirq->next()) {
37 next unless $dirq->lock($name);
38 printf("# reading element %s\n", $name);
39 %data = $dirq->get($name);
40 # one can use $data{body} and $data{header} here...
41 # one could use $dirq->unlock($name) to only browse the queue...
42 $dirq->remove($name);
43 }
44
45 #
46 # looping consumer (sleeping to avoid using all CPU time)
47 #
48
49 $dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
50 while (1) {
51 sleep(1) unless $dirq->count();
52 for ($name = $dirq->first(); $name; $name = $dirq->next()) {
53 ... same as above ...
54 }
55 }
56
58 The goal of this module is to offer a "normal" (as opposed to "simple")
59 queue system using the underlying filesystem for storage, security and
60 to prevent race conditions via atomic operations.
61
62 It allows arbitrary data to be stored (see the "SCHEMA" section for
63 more information) but it has a significant disk space and speed
64 overhead.
65
66 Please refer to Directory::Queue for general information about
67 directory queues.
68
70 The new() method can be used to create a Directory::Queue::Normal
71 object that will later be used to interact with the queue. The
72 following attributes are supported:
73
74 path
75 the queue toplevel directory (mandatory)
76
77 rndhex
78 the "random" hexadecimal digit to use in element names (aka R) as a
79 number between 0 and 15 (default: randomly generated)
80
81 umask
82 the umask to use when creating files and directories (default: use
83 the running process' umask)
84
85 maxelts
86 the maximum number of elements that an intermediate directory can
87 hold (default: 16,000)
88
89 maxlock
90 default maximum time for a locked element (in seconds, default 600)
91 as used by the purge() method
92
93 maxtemp
94 default maximum time for a temporary element (in seconds, default
95 300) as used by the purge() method
96
97 schema
98 the schema defining how to interpret user supplied data (mandatory
99 if elements are added or read)
100
102 The schema defines how user supplied data is stored in the queue. It is
103 only required by the add() and get() methods.
104
105 The schema must be a reference to a hash containing key/value pairs.
106
107 The key must contain only alphanumerical characters. It identifies the
108 piece of data and will be used as file name when storing the data
109 inside the element directory.
110
111 The value represents the type of the given piece of data. It can be:
112
113 binary
114 the data is a binary string (i.e. a sequence of bytes), it will be
115 stored directly in a plain file with no further encoding
116
117 string
118 the data is a text string (i.e. a sequence of characters), it will
119 be UTF-8 encoded before being stored in a file
120
121 table
122 the data is a reference to a hash of text strings, it will be
123 serialized and UTF-8 encoded before being stored in a file
124
125 By default, all pieces of data are mandatory. If you append a question
126 mark to the type, this piece of data will be marked as optional. See
127 the comments in the "SYNOPSIS" section for an example.
128
129 By default, string or binary data is used directly. If you append an
130 asterisk to the type, the data that you add or get will be by
131 reference. This can be useful to avoid string copies of large amounts
132 of data.
133
135 The following methods are available:
136
137 new()
138 return a new Directory::Queue::Normal object (class method)
139
140 copy()
141 return a copy of the object; this can be useful to have independent
142 iterators on the same queue
143
144 path()
145 return the queue toplevel path
146
147 id()
148 return a unique identifier for the queue
149
150 count()
151 return the number of elements in the queue
152
153 first()
154 return the first element in the queue, resetting the iterator;
155 return an empty string if the queue is empty
156
157 next()
158 return the next element in the queue, incrementing the iterator;
159 return an empty string if there is no next element
160
161 add(DATA)
162 add the given data (a hash or hash reference) to the queue and
163 return the corresponding element name; the schema must be known and
164 the data must conform to it
165
166 lock(ELEMENT[, PERMISSIVE])
167 attempt to lock the given element and return true on success; if
168 the PERMISSIVE option is true (which is the default), it is not a
169 fatal error if the element cannot be locked and false is returned
170
171 unlock(ELEMENT[, PERMISSIVE])
172 attempt to unlock the given element and return true on success; if
173 the PERMISSIVE option is true (which is not the default), it is not
174 a fatal error if the element cannot be unlocked and false is
175 returned
176
177 touch(ELEMENT)
178 update the access and modification times on the element's directory
179 to indicate that it is still being used; this is useful for
180 elements that are locked for long periods of time (see the purge()
181 method)
182
183 remove(ELEMENT)
184 remove the given element (which must be locked) from the queue
185
186 get(ELEMENT)
187 get the data from the given element (which must be locked) and
188 return basically the same hash as what add() got (in list context,
189 the hash is returned directly while in scalar context, the hash
190 reference is returned instead); the schema must be knownand the
191 data must conform to it
192
193 purge([OPTIONS])
194 purge the queue by removing unused intermediate directories,
195 removing too old temporary elements and unlocking too old locked
196 elements (aka staled locks); note: this can take a long time on
197 queues with many elements; OPTIONS can be:
198
199 maxtemp
200 maximum time for a temporary element (in seconds); if set to 0,
201 temporary elements will not be removed
202
203 maxlock
204 maximum time for a locked element (in seconds); if set to 0,
205 locked elements will not be unlocked
206
208 All the directories holding the elements and all the files holding the
209 data pieces are located under the queue toplevel directory. This
210 directory can contain:
211
212 temporary
213 the directory holding temporary elements, i.e. the elements being
214 added
215
216 obsolete
217 the directory holding obsolete elements, i.e. the elements being
218 removed
219
220 NNNNNNNN
221 an intermediate directory holding elements; NNNNNNNN is an 8-digits
222 long hexadecimal number
223
224 In any of the above directories, an element is stored as a single
225 directory with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
226
227 SSSSSSSS
228 represents the number of seconds since the Epoch
229
230 MMMMM
231 represents the microsecond part of the time since the Epoch
232
233 R is a random hexadecimal digit used to reduce name collisions
234
235 Finally, inside an element directory, the different pieces of data are
236 stored into different files, named according to the schema. A locked
237 element contains in addition a directory named "locked".
238
240 Directory::Queue.
241
243 Lionel Cons <http://cern.ch/lionel.cons>
244
245 Copyright (C) CERN 2010-2018
246
247
248
249perl v5.30.1 2020-01-29 Directory::Queue::Normal(3)