1TheSchwartz(3pm) User Contributed Perl Documentation TheSchwartz(3pm)
2
3
4
6 TheSchwartz - reliable job queue
7
9 # MyApp.pm
10 package MyApp;
11
12 sub work_asynchronously {
13 my %args = @_;
14
15 my $client = TheSchwartz->new( databases => $DATABASE_INFO );
16 $client->insert('MyWorker', \%args);
17 }
18
19
20 # myworker.pl
21 package MyWorker;
22 use base qw( TheSchwartz::Worker );
23
24 sub work {
25 my $class = shift;
26 my TheSchwartz::Job $job = shift;
27
28 print "Workin' hard or hardly workin'? Hyuk!!\n";
29
30 $job->completed();
31 }
32
33 package main;
34
35 my $client = TheSchwartz->new( databases => $DATABASE_INFO );
36 $client->can_do('MyWorker');
37 $client->work();
38
40 TheSchwartz is a reliable job queue system. Your application can put
41 jobs into the system, and your worker processes can pull jobs from the
42 queue atomically to perform. Failed jobs can be left in the queue to
43 retry later.
44
45 Abilities specify what jobs a worker process can perform. Abilities are
46 the names of "TheSchwartz::Worker" sub-classes, as in the synopsis: the
47 "MyWorker" class name is used to specify that the worker script can
48 perform the job. When using the "TheSchwartz" client's "work"
49 functions, the class-ability duality is used to automatically dispatch
50 to the proper class to do the actual work.
51
52 TheSchwartz clients will also prefer to do jobs for unused abilities
53 before reusing a particular ability, to avoid exhausting the supply of
54 one kind of job while jobs of other types stack up.
55
56 Some jobs with high setup times can be performed more efficiently if a
57 group of related jobs are performed together. TheSchwartz offers a
58 facility to coalesce jobs into groups, which a properly constructed
59 worker can find and perform at once. For example, if your worker were
60 delivering email, you might store the domain name from the recipient's
61 address as the coalescing value. The worker that grabs that job could
62 then batch deliver all the mail for that domain once it connects to
63 that domain's mail server.
64
66 "TheSchwartz->new( %args )"
67 Optional members of %args are:
68
69 • "databases"
70
71 An arrayref of database information. TheSchwartz workers can use
72 multiple databases, such that if any of them are unavailable, the
73 worker will search for appropriate jobs in the other databases
74 automatically.
75
76 Each member of the "databases" value should be a hashref containing
77 either:
78
79 • "dsn"
80
81 The database DSN for this database.
82
83 • "user"
84
85 The user name to use when connecting to this database.
86
87 • "pass"
88
89 The password to use when connecting to this database.
90
91 or
92
93 • "driver"
94
95 A "Data::ObjectDriver::Driver::DBI" object.
96
97 See note below.
98
99 • "verbose"
100
101 A value indicating whether to log debug messages. If "verbose" is a
102 coderef, it is called to log debug messages. If "verbose" is not a
103 coderef but is some other true value, debug messages will be sent
104 to "STDERR". Otherwise, debug messages will not be logged.
105
106 • "prioritize"
107
108 A value indicating whether to utilize the job 'priority' field when
109 selecting jobs to be processed. If unspecified, jobs will always be
110 executed in a randomized order.
111
112 • "floor"
113
114 A value indicating the minimum priority a job needs to be for this
115 worker to perform. If unspecified all jobs are considered.
116
117 • "batch_size"
118
119 A value indicating how many jobs should be fetched from the DB for
120 consideration.
121
122 • "driver_cache_expiration"
123
124 Optional value to control how long database connections are cached
125 for in seconds. By default, connections are not cached. To re-use
126 the same database connection for five minutes, pass
127 driver_cache_expiration => 300 to the constructor. Improves job
128 throughput in cases where the work to process a job is small
129 compared to the database connection set-up and tear-down time.
130
131 • "retry_seconds"
132
133 The number of seconds after which to try reconnecting to apparently
134 dead databases. If not given, TheSchwartz will retry connecting to
135 databases after 30 seconds.
136
137 • "strict_remove_ability"
138
139 By default when work_once does not find a job it will reset
140 current_abilities to all_abilities and look for a job. Setting this
141 option will prevent work_once from resetting abilities if it can't
142 find a job for the current capabilities.
143
144 "$client->list_jobs( %args )"
145 Returns a list of "TheSchwartz::Job" objects matching the given
146 arguments. The required members of %args are:
147
148 • "funcname"
149
150 the name of the function or a reference to an array of functions
151
152 • "run_after"
153
154 the value you want to check <= against on the run_after column
155
156 • "grabbed_until"
157
158 the value you want to check <= against on the grabbed_until column
159
160 • "coalesce_op"
161
162 defaults to '=', set it to whatever you want to compare the
163 coalesce field too if you want to search, you can use 'LIKE'
164
165 • "coalesce"
166
167 coalesce value to search for, if you set op to 'LIKE' you can use
168 '%' here, do remember that '%' searches anchored at the beginning
169 of the string are much faster since it is can do a btree index
170 lookup
171
172 • "want_handle"
173
174 if you want all your jobs to be set up using a handle. defaults to
175 true. this option might be removed, as you should always have this
176 on a Job object.
177
178 • "jobid"
179
180 if you want a specific job you can pass in it's ID and if it's
181 available it will be listed.
182
183 It is important to remember that this function does not lock anything,
184 it just returns as many jobs as there is up to amount of databases *
185 $client->{batch_size}
186
187 "$client->lookup_job( $handle_id )"
188 Returns a "TheSchwartz::Job" corresponding to the given handle ID.
189
190 "$client->set_verbose( $verbose )"
191 Sets the current logging function to $verbose if it's a coderef. If not
192 a coderef, enables debug logging to "STDERR" if $verbose is true;
193 otherwise, disables logging.
194
196 The methods of TheSchwartz clients used by applications posting jobs to
197 the queue are:
198
199 "$client->insert( $job )"
200 Adds the given "TheSchwartz::Job" to one of the client's job databases.
201
202 "$client->insert( $funcname, $arg )"
203 Adds a new job with function name $funcname and arguments $arg to the
204 queue.
205
206 "$client->insert_jobs( @jobs )"
207 Adds the given "TheSchwartz::Job" objects to one of the client's job
208 databases. All the given jobs are recorded in one job database.
209
210 "$client->set_prioritize( $prioritize )"
211 Set the "prioritize" value as described in the constructor.
212
213 "$client->set_floor( $floor )"
214 Set the "floor<gt" value as described in the constructor.
215
216 "$client->set_batch_size( $batch_size )"
217 Set the "batch_size<gt" value as described in the constructor.
218
219 "$client->set_strict_remove_ability( $strict_remove_ability )"
220 Set the "strict_remove_ability<gt" value as described in the
221 constructor.
222
224 The methods of TheSchwartz clients for use in worker processes are:
225
226 "$client->can_do( $ability )"
227 Adds $ability to the list of abilities $client is capable of
228 performing. Subsequent calls to that client's "work" methods will find
229 jobs requiring the given ability.
230
231 "$client->work_once()"
232 Find and perform one job $client can do.
233
234 "$client->work_until_done()"
235 Find and perform jobs $client can do until no more such jobs are found
236 in any of the client's job databases.
237
238 "$client->work( [$delay] )"
239 Find and perform any jobs $client can do, forever. When no job is
240 available, the working process will sleep for $delay seconds (or 5, if
241 not specified) before looking again.
242
243 "$client->work_on($handle)"
244 Given a job handle (a scalar string) $handle, runs the job, then
245 returns.
246
247 "$client->grab_and_work_on($handle)"
248 Similar to $client->work_on($handle), except that the job will be
249 grabbed before being run. It guarantees that only one worker will work
250 on it (at least in the "grab_for" interval).
251
252 Returns false if the worker could not grab the job, and true if the
253 worker worked on it.
254
255 "$client->find_job_for_workers( [$abilities] )"
256 Returns a "TheSchwartz::Job" for a random job that the client can do.
257 If specified, the job returned matches one of the abilities in the
258 arrayref $abilities, rather than $client's abilities.
259
260 "$client->find_job_with_coalescing_value( $ability, $coval )"
261 Returns a "TheSchwartz::Job" for a random job for a worker capable of
262 $ability and with a coalescing value of $coval.
263
264 "$client->find_job_with_coalescing_prefix( $ability, $coval )"
265 Returns a "TheSchwartz::Job" for a random job for a worker capable of
266 $ability and with a coalescing value beginning with $coval.
267
268 Note the "TheSchwartz" implementation of this function uses a "LIKE"
269 query to find matching jobs, with all the attendant performance
270 implications for your job databases.
271
272 "$client->get_server_time( $driver )"
273 Given an open driver $driver to a database, gets the current server
274 time from the database.
275
277 The scoreboards can be used to monitor what the TheSchwartz::Worker
278 sub-classes are currently working on. Once the scoreboard has been
279 enabled in the workers with "set_scoreboard" method the "thetop"
280 utility (shipped with TheSchwartz distribution in the "extras"
281 directory) can be used to list all current jobs being worked on.
282
283 "$client->set_scoreboard( $dir )"
284 Enables the scoreboard. Setting this to 1 or "on" will cause
285 TheSchwartz to create a scoreboard file in a location it determines is
286 optimal.
287
288 Passing in any other option sets the directory the TheSchwartz
289 scoreboard directory should be created in. For example, if you set
290 this to "/tmp" then this would create a directory called
291 "/tmp/theschwartz" and a scoreboard file
292 "/tmp/theschwartz/scoreboard.pid" in it (where pid is the current
293 process pid.)
294
295 "$client->scoreboard()"
296 Returns the path to the current scoreboard file.
297
298 "$client->start_scoreboard()"
299 Writes the current job information to the scoreboard file (called by
300 the worker in work_safely before it actually starts working)
301
302 "$client->end_scoreboard()"
303 Appends the current job duration to the end of the scoreboard file
304 (called by the worker in work_safely once work has been completed)
305
306 "$client->clean_scoreboard()"
307 Removes the scoreboard file (but not the scoreboard directory.)
308 Automatically called by TheSchwartz during object destruction (i.e.
309 when the instance goes out of scope)
310
312 You can pass in a existing "Data::Object::Driver::DBI" object which
313 also allows you to reuse exist Database handles like so:
314
315 my $dbh = DBI->connect( $dsn, "root", "", {
316 RaiseError => 1,
317 PrintError => 0,
318 AutoCommit => 1,
319 } ) or die $DBI::errstr;
320 my $driver = Data::ObjectDriver::Driver::DBI->new( dbh => $dbh);
321 return TheSchwartz->new(databases => [{ driver => $driver }]);
322
323 Note: it's important that the "RaiseError" and "AutoCommit" flags are
324 set on the handle for various bits of functionality to work.
325
327 This software is Copyright 2007, Six Apart Ltd, cpan@sixapart.com. All
328 rights reserved.
329
330 TheSchwartz is free software; you may redistribute it and/or modify it
331 under the same terms as Perl itself.
332
333 TheSchwartz comes with no warranty of any kind.
334
335
336
337perl v5.34.0 2021-07-25 TheSchwartz(3pm)