1JobQueue(3)           User Contributed Perl Documentation          JobQueue(3)
2
3
4

NAME

6       POE::Component::JobQueue - a component to manage queues and worker
7       pools
8

SYNOPSIS

10         use POE qw(Component::JobQueue);
11
12         # Passive queue waits for enqueue events.
13         POE::Component::JobQueue->spawn
14           ( Alias         => 'passive',         # defaults to 'queuer'
15             WorkerLimit   => 16,                # defaults to 8
16             Worker        => \&spawn_a_worker,  # code which will start a session
17             Passive       =>
18             { Prioritizer => \&job_comparer,    # defaults to sub { 1 } # FIFO
19             },
20           );
21
22         # Active queue fetches jobs and spawns workers.
23         POE::Component::JobQueue->spawn
24           ( Alias          => 'active',          # defaults to 'queuer'
25             WorkerLimit    => 32,                # defaults to 8
26             Worker         => \&fetch_and_spawn, # fetch a job and start a session
27             Active         =>
28             { PollInterval => 1,                 # defaults to undef (no polling)
29               AckAlias     => 'respondee',       # defaults to undef (no respondee)
30               AckState     => 'response',        # defaults to undef
31             },
32           );
33
34         # Enqueuing a job in a passive queue.
35         $kernel->post( 'passive',   # post to 'passive' alias
36                        'enqueue',   # 'enqueue' a job
37                        'postback',  # which of our states is notified when it's done
38                        @job_params, # job parameters
39                      );
40
41         # Passive worker function.
42         sub spawn_a_worker {
43           my ($postback, @job_params) = @_;     # same parameters as posted
44           POE::Session->create
45             ( inline_states => \%inline_states, # handwaving over details here
46               args          => [ $postback,     # $postback->(@results) to return
47                                  @job_params,   # parameters of this job
48                                ],
49             );
50         }
51
52         # Active worker function.
53         sub fetch_and_spawn {
54           my $meta_postback = shift;               # called to create a postback
55           my @job_params = &fetch_next_job();      # fetch the next job's parameters
56           if (@job_params) {                       # if there's a job to do...
57             my $postback = $meta_postback->(@job_params); # ... create a postback
58             POE::Session->create                          # ... create a session
59               ( inline_states => \%inline_states,  # handwaving over details here
60                 args          => [ $postback,      # $postback->(@results) to return
61                                    @job_params,    # parameters of this job
62                                  ],
63               );
64           }
65         }
66
67         # Invoke a postback to acknowledge that a job is done.
68         $postback->( @job_results );
69
70         # This is the sub which is called when a postback is invoked.
71         sub postback_handler {
72           my ($request_packet, $response_packet) = @_[ARG0, ARG1];
73
74           my @original_job_params = @{$request_packet};  # original post/fetch
75           my @job_results         = @{$response_packet}; # passed to the postback
76
77           print "original job parameters: (@original_job_params)\n";
78           print "results of finished job: (@job_results)\n";
79         }
80
81         # Stop a running queue
82         $kernel->call( 'active' => 'stop' );
83

DESCRIPTION

85       POE::Component::JobQueue manages a finite pool of worker sessions as
86       they handle an arbitrarily large number of tasks.  It often is used as
87       a form of flow control, preventing a large group of tasks from
88       exhausting some sort of resource.
89
90       PoCo::JobQueue implements two kinds of queue: active and passive.  Both
91       kinds of queue use a Worker coderef to spawn sessions that process
92       jobs, but how they use the Worker differs between them.
93
94       Active queues' Worker code fetches a new job from a resource that must
95       be polled.  For example, it may read a new line from a file.  Passive
96       queues, on the other hand, are given jobs with 'enqueue' events.  Their
97       Worker functions are passed the next job as parameters.
98
99       JobQueue components are not proper objects.  Instead of being created,
100       as most objects are, they are "spawned" as separate sessions.  To avoid
101       confusion (and hopefully not cause other confusion), they must be
102       spawned wich a "spawn" method, not created anew with a "new" one.
103
104       POE::Component::JobQueue's "spawn" method takes different parameters
105       depending whether it's going to be an active or a passive session.
106       Regardless, there are a few parameters which are the same for both:
107
108       Alias => $session_alias
109         "Alias" sets the name by which the session will be known.  If no
110         alias is given, the component defaults to "queuer".  The alias lets
111         several sessions interact with job queues without keeping (or even
112         knowing) hard references to them.  It's possible to spawn several
113         queues with different aliases.
114
115       WorkerLimit => $worker_count
116         "WorkerLimit" sets the limit on the number of worker sessions which
117         will run in parallel.  It defaults arbitrarily to 8.  No more than
118         this number of workers will be active at once.
119
120       Worker => \&worker
121         "Worker" is a coderef which is called whenever it's time to spawn a
122         new session.  What it receives as parameters and what it's expected
123         to do are slightly different for active and passive sessions.
124
125         Active workers receive just one parameter: a meta-postback.  This is
126         used to build a postback once the next job's parameters are known.
127         They're expected to actively fetch the next job's parameters and
128         spawn a new session if necessary.
129
130         See "sub fetch_and_spawn" in the SYNOPSIS for an example of an active
131         worker function.>
132
133         Passive workers' arguments include a pre-built postback and the next
134         job's parameters.  Since the JobQueue component already knows what
135         the job parameters are, it's done most of the work for the worker.
136         All that's left is to spawn the session that will process the job.
137
138         See "sub spawn_a_worker" in the SYNOPSIS for an example of a passive
139         worker function.
140
141         When a postback is called, it posts its parameters (plus the
142         parameters passed when it was created) to the session it belongs to.
143         Postbacks are discussed in the POE::Session manpage.
144
145       These parameters are unique to passive queues:
146
147       Passive => \%passive_parameters
148         "Passive" contains a hashref of passive queue parameters.  The
149         "Passive" parameter block's presence indicates that the queue will be
150         passive, but its contents may be empty since all its parameters are
151         optional:
152
153           Passive => { }, # all passive parameters take default values
154
155         A queue can't be both active and passive at the same time.
156
157         The "Passive" block takes up to one parameter.
158
159         Prioritizer => \&prioritizer_function
160           "Prioritizer" holds a function that defines how a job queue will be
161           ordered.  The prioritizer function receives references to two jobs,
162           and it returns a value which tells the JobQueue component which job
163           should be dealt with first.
164
165           In the Unix tradition, lower priorities go first.  This transforms
166           the prioritizer into a simple sort function, which it has been
167           modelled after.  Like sort's sorter sub, the prioritizer returns -1
168           if the first job goes before the second one; 0 if both jobs have
169           the same priority; and 1 if the first job goes after the second.
170           It's easier to write an example than to describe it:
171
172             sub low_priorities_first {
173               my ($first_job, $second_job) = @_;
174               return $first_job->{priority} <=> $second_job->{priority};
175             }
176
177           The first argument always refers to the new job being enqueued.
178
179           The default prioritizer always returns 1.  Since the first argument
180           always refers to the new job being enqueued, this effects a FIFO
181           queue.  Replacing it with a prioritizer that always returns -1 will
182           turn the JobQueue into a stack (last in, first out).
183
184         These parameters are unique to active queues:
185
186         Active => \%active_parameters
187           "Active" contains a hashref of active queue parameters.  The
188           "Active" parameter block's presence indicates that the queue will
189           be active, but its contens may be empty since all its parameters
190           are optional.
191
192             Active => { }, # all active parameters take default values
193
194           A queue can't be both active and passive at the same time.
195
196           The "Active" block takes up to three parameters.
197
198           PollInterval => $seconds
199             Active "Worker" functions indicate that they've run out of jobs
200             by failing to spawn new sessions.  When this happens, an active
201             queue may go into "polling" mode.  In this mode, the "Worker" is
202             called periodically to see if new jobs have appeared in whatever
203             it's getting them from.
204
205             "PollInterval", if present, tells the job queue how often to call
206             "Worker" in the absence of new sessions.  If it's omitted, the
207             active queue stops after the first time it runs out of jobs.
208
209           AckAlias => $alias
210           AckState => $state
211             "AckAlias" and "AckState" tell the active job queue where to send
212             acknowledgements of jobs which have been completed.  If one is
213             specified, then both must be.
214
215         Sessions communicate asynchronously with passive JobQueue components.
216         They post "enqueue" requests to it, and it posts job results back.
217
218         Requests are posted to the component's "enqueue" state.  They include
219         the name of a state to post responses back to, and a list of job
220         parameters.  For example:
221
222           $kernel->post( 'queue', 'enqueue', # queuer session alias & state
223                          'job_results',      # my state to receive responses
224                          @job_parameters,    # parameters of the job
225                        );
226
227         Once the job is completed, the handler for 'job_results' will be
228         called with the job parameters and results.  See "sub
229         postback_handler" in the SYNOPSIS for an example results handler.
230
231         Active JobQueue components act as event generators.  They don't
232         receive jobs from the outside; instead, they poll for them and post
233         acknowledgements as they're completed.
234
235         Running queues can be stopped by posting a "stop" state to the
236         component. Any currently running workers will be allowed to complete,
237         but no new workers will be started.
238
239           $kernel->call( 'queue' => 'stop' ); # Stop the running queue
240

SEE ALSO

242       This component is built upon and POE.  Please see its source code and
243       the documentation for its foundation modules to learn more.
244
245       Also see the test program, t/01_queues.t, in the
246       POE::Component::JobQueue distribution.
247

BUG TRACKER

249       https://rt.cpan.org/Dist/Display.html?Status=Active&Queue=POE-Component-JobQueue
250

REPOSITORY

252       http://thirdlobe.com/svn/poco-jobqueue/
253

OTHER RESOURCES

255       http://search.cpan.org/dist/POE-Component-JobQueue/
256

AUTHOR & COPYRIGHTS

258       POE::Component::JobQueue is Copyright 1999-2009 by Rocco Caputo.  All
259       rights are reserved.  POE::Component::JobQueue is free software; you
260       may redistribute it and/or modify it under the same terms as Perl
261       itself.
262

POD ERRORS

264       Hey! The above document had some coding errors, which are explained
265       below:
266
267       Around line 597:
268           You forgot a '=back' before '=head1'
269
270
271
272perl v5.34.0                      2022-01-21                       JobQueue(3)
Impressum