1JobQueue(3) User Contributed Perl Documentation JobQueue(3)
2
3
4
6 POE::Component::JobQueue - a component to manage queues and worker
7 pools
8
10 use POE qw(Component::JobQueue);
11
12 # Passive queue waits for enqueue events.
13 POE::Component::JobQueue->spawn
14 ( Alias => 'passive', # defaults to 'queuer'
15 WorkerLimit => 16, # defaults to 8
16 Worker => \&spawn_a_worker, # code which will start a session
17 Passive =>
18 { Prioritizer => \&job_comparer, # defaults to sub { 1 } # FIFO
19 },
20 );
21
22 # Active queue fetches jobs and spawns workers.
23 POE::Component::JobQueue->spawn
24 ( Alias => 'active', # defaults to 'queuer'
25 WorkerLimit => 32, # defaults to 8
26 Worker => \&fetch_and_spawn, # fetch a job and start a session
27 Active =>
28 { PollInterval => 1, # defaults to undef (no polling)
29 AckAlias => 'respondee', # defaults to undef (no respondee)
30 AckState => 'response', # defaults to undef
31 },
32 );
33
34 # Enqueuing a job in a passive queue.
35 $kernel->post( 'passive', # post to 'passive' alias
36 'enqueue', # 'enqueue' a job
37 'postback', # which of our states is notified when it's done
38 @job_params, # job parameters
39 );
40
41 # Passive worker function.
42 sub spawn_a_worker {
43 my ($postback, @job_params) = @_; # same parameters as posted
44 POE::Session->create
45 ( inline_states => \%inline_states, # handwaving over details here
46 args => [ $postback, # $postback->(@results) to return
47 @job_params, # parameters of this job
48 ],
49 );
50 }
51
52 # Active worker function.
53 sub fetch_and_spawn {
54 my $meta_postback = shift; # called to create a postback
55 my @job_params = &fetch_next_job(); # fetch the next job's parameters
56 if (@job_params) { # if there's a job to do...
57 my $postback = $meta_postback->(@job_params); # ... create a postback
58 POE::Session->create # ... create a session
59 ( inline_states => \%inline_states, # handwaving over details here
60 args => [ $postback, # $postback->(@results) to return
61 @job_params, # parameters of this job
62 ],
63 );
64 }
65 }
66
67 # Invoke a postback to acknowledge that a job is done.
68 $postback->( @job_results );
69
70 # This is the sub which is called when a postback is invoked.
71 sub postback_handler {
72 my ($request_packet, $response_packet) = @_[ARG0, ARG1];
73
74 my @original_job_params = @{$request_packet}; # original post/fetch
75 my @job_results = @{$response_packet}; # passed to the postback
76
77 print "original job parameters: (@original_job_params)\n";
78 print "results of finished job: (@job_results)\n";
79 }
80
81 # Stop a running queue
82 $kernel->call( 'active' => 'stop' );
83
85 POE::Component::JobQueue manages a finite pool of worker sessions as
86 they handle an arbitrarily large number of tasks. It often is used as
87 a form of flow control, preventing a large group of tasks from
88 exhausting some sort of resource.
89
90 PoCo::JobQueue implements two kinds of queue: active and passive. Both
91 kinds of queue use a Worker coderef to spawn sessions that process
92 jobs, but how they use the Worker differs between them.
93
94 Active queues' Worker code fetches a new job from a resource that must
95 be polled. For example, it may read a new line from a file. Passive
96 queues, on the other hand, are given jobs with 'enqueue' events. Their
97 Worker functions are passed the next job as parameters.
98
99 JobQueue components are not proper objects. Instead of being created,
100 as most objects are, they are "spawned" as separate sessions. To avoid
101 confusion (and hopefully not cause other confusion), they must be
102 spawned wich a "spawn" method, not created anew with a "new" one.
103
104 POE::Component::JobQueue's "spawn" method takes different parameters
105 depending whether it's going to be an active or a passive session.
106 Regardless, there are a few parameters which are the same for both:
107
108 Alias => $session_alias
109 "Alias" sets the name by which the session will be known. If no
110 alias is given, the component defaults to "queuer". The alias lets
111 several sessions interact with job queues without keeping (or even
112 knowing) hard references to them. It's possible to spawn several
113 queues with different aliases.
114
115 WorkerLimit => $worker_count
116 "WorkerLimit" sets the limit on the number of worker sessions which
117 will run in parallel. It defaults arbitrarily to 8. No more than
118 this number of workers will be active at once.
119
120 Worker => \&worker
121 "Worker" is a coderef which is called whenever it's time to spawn a
122 new session. What it receives as parameters and what it's expected
123 to do are slightly different for active and passive sessions.
124
125 Active workers receive just one parameter: a meta-postback. This is
126 used to build a postback once the next job's parameters are known.
127 They're expected to actively fetch the next job's parameters and
128 spawn a new session if necessary.
129
130 See "sub fetch_and_spawn" in the SYNOPSIS for an example of an active
131 worker function.>
132
133 Passive workers' arguments include a pre-built postback and the next
134 job's parameters. Since the JobQueue component already knows what
135 the job parameters are, it's done most of the work for the worker.
136 All that's left is to spawn the session that will process the job.
137
138 See "sub spawn_a_worker" in the SYNOPSIS for an example of a passive
139 worker function.
140
141 When a postback is called, it posts its parameters (plus the
142 parameters passed when it was created) to the session it belongs to.
143 Postbacks are discussed in the POE::Session manpage.
144
145 These parameters are unique to passive queues:
146
147 Passive => \%passive_parameters
148 "Passive" contains a hashref of passive queue parameters. The
149 "Passive" parameter block's presence indicates that the queue will be
150 passive, but its contents may be empty since all its parameters are
151 optional:
152
153 Passive => { }, # all passive parameters take default values
154
155 A queue can't be both active and passive at the same time.
156
157 The "Passive" block takes up to one parameter.
158
159 Prioritizer => \&prioritizer_function
160 "Prioritizer" holds a function that defines how a job queue will be
161 ordered. The prioritizer function receives references to two jobs,
162 and it returns a value which tells the JobQueue component which job
163 should be dealt with first.
164
165 In the Unix tradition, lower priorities go first. This transforms
166 the prioritizer into a simple sort function, which it has been
167 modelled after. Like sort's sorter sub, the prioritizer returns -1
168 if the first job goes before the second one; 0 if both jobs have
169 the same priority; and 1 if the first job goes after the second.
170 It's easier to write an example than to describe it:
171
172 sub low_priorities_first {
173 my ($first_job, $second_job) = @_;
174 return $first_job->{priority} <=> $second_job->{priority};
175 }
176
177 The first argument always refers to the new job being enqueued.
178
179 The default prioritizer always returns 1. Since the first argument
180 always refers to the new job being enqueued, this effects a FIFO
181 queue. Replacing it with a prioritizer that always returns -1 will
182 turn the JobQueue into a stack (last in, first out).
183
184 These parameters are unique to active queues:
185
186 Active => \%active_parameters
187 "Active" contains a hashref of active queue parameters. The
188 "Active" parameter block's presence indicates that the queue will
189 be active, but its contens may be empty since all its parameters
190 are optional.
191
192 Active => { }, # all active parameters take default values
193
194 A queue can't be both active and passive at the same time.
195
196 The "Active" block takes up to three parameters.
197
198 PollInterval => $seconds
199 Active "Worker" functions indicate that they've run out of jobs
200 by failing to spawn new sessions. When this happens, an active
201 queue may go into "polling" mode. In this mode, the "Worker" is
202 called periodically to see if new jobs have appeared in whatever
203 it's getting them from.
204
205 "PollInterval", if present, tells the job queue how often to call
206 "Worker" in the absence of new sessions. If it's omitted, the
207 active queue stops after the first time it runs out of jobs.
208
209 AckAlias => $alias
210 AckState => $state
211 "AckAlias" and "AckState" tell the active job queue where to send
212 acknowledgements of jobs which have been completed. If one is
213 specified, then both must be.
214
215 Sessions communicate asynchronously with passive JobQueue components.
216 They post "enqueue" requests to it, and it posts job results back.
217
218 Requests are posted to the component's "enqueue" state. They include
219 the name of a state to post responses back to, and a list of job
220 parameters. For example:
221
222 $kernel->post( 'queue', 'enqueue', # queuer session alias & state
223 'job_results', # my state to receive responses
224 @job_parameters, # parameters of the job
225 );
226
227 Once the job is completed, the handler for 'job_results' will be
228 called with the job parameters and results. See "sub
229 postback_handler" in the SYNOPSIS for an example results handler.
230
231 Active JobQueue components act as event generators. They don't
232 receive jobs from the outside; instead, they poll for them and post
233 acknowledgements as they're completed.
234
235 Running queues can be stopped by posting a "stop" state to the
236 component. Any currently running workers will be allowed to complete,
237 but no new workers will be started.
238
239 $kernel->call( 'queue' => 'stop' ); # Stop the running queue
240
242 This component is built upon and POE. Please see its source code and
243 the documentation for its foundation modules to learn more.
244
245 Also see the test program, t/01_queues.t, in the
246 POE::Component::JobQueue distribution.
247
249 https://rt.cpan.org/Dist/Display.html?Status=Active&Queue=POE-Component-JobQueue
250
252 http://thirdlobe.com/svn/poco-jobqueue/
253
255 http://search.cpan.org/dist/POE-Component-JobQueue/
256
258 POE::Component::JobQueue is Copyright 1999-2009 by Rocco Caputo. All
259 rights are reserved. POE::Component::JobQueue is free software; you
260 may redistribute it and/or modify it under the same terms as Perl
261 itself.
262
264 Hey! The above document had some coding errors, which are explained
265 below:
266
267 Around line 597:
268 You forgot a '=back' before '=head1'
269
270
271
272perl v5.34.0 2022-01-21 JobQueue(3)