1Minion(3) User Contributed Perl Documentation Minion(3)
2
3
4
6 Minion - Job queue
7
9 use Minion;
10
11 # Connect to backend
12 my $minion = Minion->new(Pg => 'postgresql://postgres@/test');
13
14 # Add tasks
15 $minion->add_task(something_slow => sub {
16 my ($job, @args) = @_;
17 sleep 5;
18 say 'This is a background worker process.';
19 });
20
21 # Enqueue jobs
22 $minion->enqueue(something_slow => ['foo', 'bar']);
23 $minion->enqueue(something_slow => [1, 2, 3] => {priority => 5});
24
25 # Perform jobs for testing
26 $minion->enqueue(something_slow => ['foo', 'bar']);
27 $minion->perform_jobs;
28
29 # Start a worker to perform up to 12 jobs concurrently
30 my $worker = $minion->worker;
31 $worker->status->{jobs} = 12;
32 $worker->run;
33
35 Minion is a high performance job queue for the Perl programming
36 language, with support for multiple named queues, priorities, delayed
37 jobs, job dependencies, job progress, job results, retries with
38 backoff, rate limiting, unique jobs, statistics, distributed workers,
39 parallel processing, autoscaling, remote control, Mojolicious
40 <https://mojolicious.org> admin ui, resource leak protection and
41 multiple backends (such as PostgreSQL <https://www.postgresql.org>).
42
43 Job queues allow you to process time and/or computationally intensive
44 tasks in background processes, outside of the request/response
45 lifecycle of web applications. Among those tasks you'll commonly find
46 image resizing, spam filtering, HTTP downloads, building tarballs,
47 warming caches and basically everything else you can imagine that's not
48 super fast.
49
51 You can use Minion as a standalone job queue or integrate it into
52 Mojolicious applications with the plugin Mojolicious::Plugin::Minion.
53
54 use Mojolicious::Lite;
55
56 plugin Minion => {Pg => 'postgresql://sri:s3cret@localhost/test'};
57
58 # Slow task
59 app->minion->add_task(poke_mojo => sub {
60 my $job = shift;
61 $job->app->ua->get('mojolicious.org');
62 $job->app->log->debug('We have poked mojolicious.org for a visitor');
63 });
64
65 # Perform job in a background worker process
66 get '/' => sub {
67 my $c = shift;
68 $c->minion->enqueue('poke_mojo');
69 $c->render(text => 'We will poke mojolicious.org for you soon.');
70 };
71
72 app->start;
73
74 Background worker processes are usually started with the command
75 Minion::Command::minion::worker, which becomes automatically available
76 when an application loads Mojolicious::Plugin::Minion.
77
78 $ ./myapp.pl minion worker
79
80 The worker process will fork a new process for every job that is being
81 processed. This allows for resources such as memory to be returned to
82 the operating system once a job is finished. Perl fork is very fast, so
83 don't worry about the overhead.
84
85 Minion::Worker
86 |- Minion::Job [1]
87 |- Minion::Job [2]
88 +- ...
89
90 By default up to four jobs will be processed in parallel, but that can
91 be changed with configuration options or on demand with signals.
92
93 $ ./myapp.pl minion worker -j 12
94
95 Jobs can be managed right from the command line with
96 Minion::Command::minion::job.
97
98 $ ./myapp.pl minion job
99
100 You can also add an admin ui to your application by loading the plugin
101 Mojolicious::Plugin::Minion::Admin. Just make sure to secure access
102 before making your application publically accessible.
103
104 # Make admin ui available under "/minion"
105 plugin 'Minion::Admin';
106
107 To manage background worker processes with systemd, you can use a unit
108 configuration file like this.
109
110 [Unit]
111 Description=My Mojolicious application workers
112 After=postgresql.service
113
114 [Service]
115 Type=simple
116 ExecStart=/home/sri/myapp/myapp.pl minion worker -m production
117 KillMode=process
118
119 [Install]
120 WantedBy=multi-user.target
121
122 Every job can fail or succeed, but not get lost, the system is
123 eventually consistent and will preserve job results for as long as you
124 like, depending on "remove_after". While individual workers can fail in
125 the middle of processing a job, the system will detect this and ensure
126 that no job is left in an uncertain state, depending on
127 "missing_after".
128
130 And as your application grows, you can move tasks into application
131 specific plugins.
132
133 package MyApp::Task::PokeMojo;
134 use Mojo::Base 'Mojolicious::Plugin';
135
136 sub register {
137 my ($self, $app) = @_;
138 $app->minion->add_task(poke_mojo => sub {
139 my $job = shift;
140 $job->app->ua->get('mojolicious.org');
141 $job->app->log->debug('We have poked mojolicious.org for a visitor');
142 });
143 }
144
145 1;
146
147 Which are loaded like any other plugin from your application.
148
149 # Mojolicious
150 $app->plugin('MyApp::Task::PokeMojo');
151
152 # Mojolicious::Lite
153 plugin 'MyApp::Task::PokeMojo';
154
156 This distribution also contains a great example application you can use
157 for inspiration. The link checker
158 <https://github.com/mojolicious/minion/tree/master/examples/linkcheck>
159 will show you how to integrate background jobs into well-structured
160 Mojolicious applications.
161
163 Minion inherits all events from Mojo::EventEmitter and can emit the
164 following new ones.
165
166 enqueue
167 $minion->on(enqueue => sub {
168 my ($minion, $id) = @_;
169 ...
170 });
171
172 Emitted after a job has been enqueued, in the process that enqueued it.
173
174 $minion->on(enqueue => sub {
175 my ($minion, $id) = @_;
176 say "Job $id has been enqueued.";
177 });
178
179 worker
180 $minion->on(worker => sub {
181 my ($minion, $worker) = @_;
182 ...
183 });
184
185 Emitted in the worker process after it has been created.
186
187 $minion->on(worker => sub {
188 my ($minion, $worker) = @_;
189 say "Worker $$ started.";
190 });
191
193 Minion implements the following attributes.
194
195 app
196 my $app = $minion->app;
197 $minion = $minion->app(MyApp->new);
198
199 Application for job queue, defaults to a Mojo::HelloWorld object. Note
200 that this attribute is weakened.
201
202 backend
203 my $backend = $minion->backend;
204 $minion = $minion->backend(Minion::Backend::Pg->new);
205
206 Backend, usually a Minion::Backend::Pg object.
207
208 backoff
209 my $cb = $minion->backoff;
210 $minion = $minion->backoff(sub {...});
211
212 A callback used to calculate the delay for automatically retried jobs,
213 defaults to "(retries ** 4) + 15" (15, 16, 31, 96, 271, 640...), which
214 means that roughly 25 attempts can be made in 21 days.
215
216 $minion->backoff(sub {
217 my $retries = shift;
218 return ($retries ** 4) + 15 + int(rand 30);
219 });
220
221 missing_after
222 my $after = $minion->missing_after;
223 $minion = $minion->missing_after(172800);
224
225 Amount of time in seconds after which workers without a heartbeat will
226 be considered missing and removed from the registry by "repair",
227 defaults to 1800 (30 minutes).
228
229 remove_after
230 my $after = $minion->remove_after;
231 $minion = $minion->remove_after(86400);
232
233 Amount of time in seconds after which jobs that have reached the state
234 "finished" and have no unresolved dependencies will be removed
235 automatically by "repair", defaults to 172800 (2 days). It is not
236 recommended to set this value below 2 days.
237
238 tasks
239 my $tasks = $minion->tasks;
240 $minion = $minion->tasks({foo => sub {...}});
241
242 Registered tasks.
243
245 Minion inherits all methods from Mojo::EventEmitter and implements the
246 following new ones.
247
248 add_task
249 $minion = $minion->add_task(foo => sub {...});
250
251 Register a task.
252
253 # Job with result
254 $minion->add_task(add => sub {
255 my ($job, $first, $second) = @_;
256 $job->finish($first + $second);
257 });
258 my $id = $minion->enqueue(add => [1, 1]);
259 my $result = $minion->job($id)->info->{result};
260
261 broadcast
262 my $bool = $minion->broadcast('some_command');
263 my $bool = $minion->broadcast('some_command', [@args]);
264 my $bool = $minion->broadcast('some_command', [@args], [$id1, $id2, $id3]);
265
266 Broadcast remote control command to one or more workers.
267
268 # Broadcast "stop" command to all workers to kill job 10025
269 $minion->broadcast('stop', [10025]);
270
271 # Broadcast "kill" command to all workers to interrupt job 10026
272 $minion->broadcast('kill', ['INT', 10026]);
273
274 # Broadcast "jobs" command to pause worker 23
275 $minion->broadcast('jobs', [0], [23]);
276
277 enqueue
278 my $id = $minion->enqueue('foo');
279 my $id = $minion->enqueue(foo => [@args]);
280 my $id = $minion->enqueue(foo => [@args] => {priority => 1});
281
282 Enqueue a new job with "inactive" state. Arguments get serialized by
283 the "backend" (often with Mojo::JSON), so you shouldn't send objects
284 and be careful with binary data, nested data structures with hash and
285 array references are fine though.
286
287 These options are currently available:
288
289 attempts
290 attempts => 25
291
292 Number of times performing this job will be attempted, with a delay
293 based on "backoff" after the first attempt, defaults to 1.
294
295 delay
296 delay => 10
297
298 Delay job for this many seconds (from now), defaults to 0.
299
300 notes
301 notes => {foo => 'bar', baz => [1, 2, 3]}
302
303 Hash reference with arbitrary metadata for this job that gets
304 serialized by the "backend" (often with Mojo::JSON), so you shouldn't
305 send objects and be careful with binary data, nested data structures
306 with hash and array references are fine though.
307
308 parents
309 parents => [$id1, $id2, $id3]
310
311 One or more existing jobs this job depends on, and that need to have
312 transitioned to the state "finished" before it can be processed.
313
314 priority
315 priority => 5
316
317 Job priority, defaults to 0. Jobs with a higher priority get
318 performed first.
319
320 queue
321 queue => 'important'
322
323 Queue to put job in, defaults to "default".
324
325 foreground
326 my $bool = $minion->foreground($id);
327
328 Retry job in "minion_foreground" queue, then perform it right away with
329 a temporary worker in this process, very useful for debugging.
330
331 guard
332 my $guard = $minion->guard('foo', 3600);
333 my $guard = $minion->guard('foo', 3600, {limit => 20});
334
335 Same as "lock", but returns a scope guard object that automatically
336 releases the lock as soon as the object is destroyed, or "undef" if
337 aquiring the lock failed.
338
339 # Only one job should run at a time (unique job)
340 $minion->add_task(do_unique_stuff => sub {
341 my ($job, @args) = @_;
342 return $job->finish('Previous job is still active')
343 unless my $guard = $minion->guard('fragile_backend_service', 7200);
344 ...
345 });
346
347 # Only five jobs should run at a time and we try again later if necessary
348 $minion->add_task(do_concurrent_stuff => sub {
349 my ($job, @args) = @_;
350 return $job->retry({delay => 30})
351 unless my $guard = $minion->guard('some_web_service', 60, {limit => 5});
352 ...
353 });
354
355 history
356 my $history = $minion->history;
357
358 Get history information for job queue.
359
360 These fields are currently available:
361
362 daily
363 daily => [{epoch => 12345, finished_jobs => 95, failed_jobs => 2}, ...]
364
365 Hourly counts for processed jobs from the past day.
366
367 job
368 my $job = $minion->job($id);
369
370 Get Minion::Job object without making any changes to the actual job or
371 return "undef" if job does not exist.
372
373 # Check job state
374 my $state = $minion->job($id)->info->{state};
375
376 # Get job metadata
377 my $progress = $minion->$job($id)->info->{notes}{progress};
378
379 # Get job result
380 my $result = $minion->job($id)->info->{result};
381
382 jobs
383 my $jobs = $minion->jobs;
384 my $jobs = $minion->jobs({states => ['inactive']});
385
386 Return Minion::Iterator object to safely iterate through job
387 information. Note that this method is EXPERIMENTAL and might change
388 without warning!
389
390 # Iterate through jobs for two tasks
391 my $jobs = $minion->jobs({tasks => ['foo', 'bar']});
392 while (my $info = $jobs->next) {
393 say "$info->{id}: $info->{state}";
394 }
395
396 # Remove all failed jobs from a named queue
397 my $jobs = $minion->jobs({states => ['failed'], queues => ['unimportant']});
398 while (my $info = $jobs->next) {
399 $minion->job($info->{id})->remove;
400 }
401
402 # Count failed jobs for a task
403 say $minion->jobs({states => ['failed'], tasks => ['foo']})->total;
404
405 These options are currently available:
406
407 ids
408 ids => ['23', '24']
409
410 List only jobs with these ids.
411
412 notes
413 notes => ['foo', 'bar']
414
415 List only jobs with one of these notes. Note that this option is
416 EXPERIMENTAL and might change without warning!
417
418 queues
419 queues => ['important', 'unimportant']
420
421 List only jobs in these queues.
422
423 states
424 states => ['inactive', 'active']
425
426 List only jobs in these states.
427
428 tasks
429 tasks => ['foo', 'bar']
430
431 List only jobs for these tasks.
432
433 These fields are currently available:
434
435 args
436 args => ['foo', 'bar']
437
438 Job arguments.
439
440 attempts
441 attempts => 25
442
443 Number of times performing this job will be attempted.
444
445 children
446 children => ['10026', '10027', '10028']
447
448 Jobs depending on this job.
449
450 created
451 created => 784111777
452
453 Epoch time job was created.
454
455 delayed
456 delayed => 784111777
457
458 Epoch time job was delayed to.
459
460 finished
461 finished => 784111777
462
463 Epoch time job was finished.
464
465 id
466 id => 10025
467
468 Job id.
469
470 notes
471 notes => {foo => 'bar', baz => [1, 2, 3]}
472
473 Hash reference with arbitrary metadata for this job.
474
475 parents
476 parents => ['10023', '10024', '10025']
477
478 Jobs this job depends on.
479
480 priority
481 priority => 3
482
483 Job priority.
484
485 queue
486 queue => 'important'
487
488 Queue name.
489
490 result
491 result => 'All went well!'
492
493 Job result.
494
495 retried
496 retried => 784111777
497
498 Epoch time job has been retried.
499
500 retries
501 retries => 3
502
503 Number of times job has been retried.
504
505 started
506 started => 784111777
507
508 Epoch time job was started.
509
510 state
511 state => 'inactive'
512
513 Current job state, usually "active", "failed", "finished" or
514 "inactive".
515
516 task
517 task => 'foo'
518
519 Task name.
520
521 time
522 time => 78411177
523
524 Server time.
525
526 worker
527 worker => '154'
528
529 Id of worker that is processing the job.
530
531 lock
532 my $bool = $minion->lock('foo', 3600);
533 my $bool = $minion->lock('foo', 3600, {limit => 20});
534
535 Try to acquire a named lock that will expire automatically after the
536 given amount of time in seconds. You can release the lock manually with
537 "unlock" to limit concurrency, or let it expire for rate limiting. For
538 convenience you can also use "guard" to release the lock automatically,
539 even if the job failed.
540
541 # Only one job should run at a time (unique job)
542 $minion->add_task(do_unique_stuff => sub {
543 my ($job, @args) = @_;
544 return $job->finish('Previous job is still active')
545 unless $minion->lock('fragile_backend_service', 7200);
546 ...
547 $minion->unlock('fragile_backend_service');
548 });
549
550 # Only five jobs should run at a time and we wait for our turn
551 $minion->add_task(do_concurrent_stuff => sub {
552 my ($job, @args) = @_;
553 sleep 1 until $minion->lock('some_web_service', 60, {limit => 5});
554 ...
555 $minion->unlock('some_web_service');
556 });
557
558 # Only a hundred jobs should run per hour and we try again later if necessary
559 $minion->add_task(do_rate_limited_stuff => sub {
560 my ($job, @args) = @_;
561 return $job->retry({delay => 3600})
562 unless $minion->lock('another_web_service', 3600, {limit => 100});
563 ...
564 });
565
566 An expiration time of 0 can be used to check if a named lock already
567 exists without creating one.
568
569 # Check if the lock "foo" already exists
570 say 'Lock exists' unless $minion->lock('foo', 0);
571
572 These options are currently available:
573
574 limit
575 limit => 20
576
577 Number of shared locks with the same name that can be active at the
578 same time, defaults to 1.
579
580 new
581 my $minion = Minion->new(Pg => 'postgresql://postgres@/test');
582 my $minion = Minion->new(Pg => Mojo::Pg->new);
583
584 Construct a new Minion object.
585
586 perform_jobs
587 $minion->perform_jobs;
588 $minion->perform_jobs({queues => ['important']});
589
590 Perform all jobs with a temporary worker, very useful for testing.
591
592 # Longer version
593 my $worker = $minion->worker;
594 while (my $job = $worker->register->dequeue(0)) { $job->perform }
595 $worker->unregister;
596
597 These options are currently available:
598
599 queues
600 queues => ['important']
601
602 One or more queues to dequeue jobs from, defaults to "default".
603
604 repair
605 $minion = $minion->repair;
606
607 Repair worker registry and job queue if necessary.
608
609 reset
610 $minion = $minion->reset({all => 1});
611
612 Reset job queue.
613
614 These options are currently available:
615
616 all
617 all => 1
618
619 Reset everything.
620
621 locks
622 locks => 1
623
624 Reset only locks.
625
626 result_p
627 my $promise = $minion->result_p($id);
628 my $promise = $minion->result_p($id, {interval => 5});
629
630 Return a Mojo::Promise object for the result of a job. The state
631 "finished" will result in the promise being "fullfilled", and the state
632 "failed" in the promise being "rejected". This operation can be
633 cancelled by resolving the promise manually at any time.
634
635 # Enqueue job and receive the result at some point in the future
636 my $id = $minion->enqueue('foo');
637 $minion->result_p($id)->then(sub {
638 my $info = shift;
639 my $result = ref $info ? $info->{result} : 'Job already removed';
640 say "Finished: $result";
641 })->catch(sub {
642 my $info = shift;
643 say "Failed: $info->{result}";
644 })->wait;
645
646 These options are currently available:
647
648 interval
649 interval => 5
650
651 Polling interval in seconds for checking if the state of the job has
652 changed, defaults to 3.
653
654 stats
655 my $stats = $minion->stats;
656
657 Get statistics for the job queue.
658
659 # Check idle workers
660 my $idle = $minion->stats->{inactive_workers};
661
662 These fields are currently available:
663
664 active_jobs
665 active_jobs => 100
666
667 Number of jobs in "active" state.
668
669 active_locks
670 active_locks => 100
671
672 Number of active named locks.
673
674 active_workers
675 active_workers => 100
676
677 Number of workers that are currently processing a job.
678
679 delayed_jobs
680 delayed_jobs => 100
681
682 Number of jobs in "inactive" state that are scheduled to run at
683 specific time in the future or have unresolved dependencies.
684
685 enqueued_jobs
686 enqueued_jobs => 100000
687
688 Rough estimate of how many jobs have ever been enqueued. Note that
689 this field is EXPERIMENTAL and might change without warning!
690
691 failed_jobs
692 failed_jobs => 100
693
694 Number of jobs in "failed" state.
695
696 finished_jobs
697 finished_jobs => 100
698
699 Number of jobs in "finished" state.
700
701 inactive_jobs
702 inactive_jobs => 100
703
704 Number of jobs in "inactive" state.
705
706 inactive_workers
707 inactive_workers => 100
708
709 Number of workers that are currently not processing a job.
710
711 uptime
712 uptime => 1000
713
714 Uptime in seconds.
715
716 unlock
717 my $bool = $minion->unlock('foo');
718
719 Release a named lock that has been previously acquired with "lock".
720
721 worker
722 my $worker = $minion->worker;
723
724 Build Minion::Worker object. Note that this method should only be used
725 to implement custom workers.
726
727 # Use the standard worker with all its features
728 my $worker = $minion->worker;
729 $worker->status->{jobs} = 12;
730 $worker->status->{queues} = ['important'];
731 $worker->run;
732
733 # Perform one job manually in a separate process
734 my $worker = $minion->repair->worker->register;
735 my $job = $worker->dequeue(5);
736 $job->perform;
737 $worker->unregister;
738
739 # Perform one job manually in this process
740 my $worker = $minion->repair->worker->register;
741 my $job = $worker->dequeue(5);
742 if (my $err = $job->execute) { $job->fail($err) }
743 else { $job->finish }
744 $worker->unregister;
745
746 # Build a custom worker performing multiple jobs at the same time
747 my %jobs;
748 my $worker = $minion->repair->worker->register;
749 do {
750 for my $id (keys %jobs) {
751 delete $jobs{$id} if $jobs{$id}->is_finished;
752 }
753 if (keys %jobs >= 4) { sleep 5 }
754 else {
755 my $job = $worker->dequeue(5);
756 $jobs{$job->id} = $job->start if $job;
757 }
758 } while keys %jobs;
759 $worker->unregister;
760
761 workers
762 my $workers = $minion->workers;
763 my $workers = $minion->workers({ids => [2, 3]});
764
765 Return Minion::Iterator object to safely iterate through worker
766 information. Note that this method is EXPERIMENTAL and might change
767 without warning!
768
769 # Iterate through workers
770 my $workers = $minion->workers;
771 while (my $info = $workers->next) {
772 say "$info->{id}: $info->{host}";
773 }
774
775 These options are currently available:
776
777 ids
778 ids => ['23', '24']
779
780 List only workers with these ids.
781
782 These fields are currently available:
783
784 id
785 id => 22
786
787 Worker id.
788
789 host
790 host => 'localhost'
791
792 Worker host.
793
794 jobs
795 jobs => ['10023', '10024', '10025', '10029']
796
797 Ids of jobs the worker is currently processing.
798
799 notified
800 notified => 784111777
801
802 Epoch time worker sent the last heartbeat.
803
804 pid
805 pid => 12345
806
807 Process id of worker.
808
809 started
810 started => 784111777
811
812 Epoch time worker was started.
813
814 status
815 status => {queues => ['default', 'important']}
816
817 Hash reference with whatever status information the worker would like
818 to share.
819
821 This is the class hierarchy of the Minion distribution.
822
823 · Minion
824
825 · Minion::Backend
826
827 · Minion::Backend::Pg
828
829 · Minion::Command::minion
830
831 · Minion::Command::minion::job
832
833 · Minion::Command::minion::worker
834
835 · Minion::Job
836
837 · Minion::Worker
838
839 · Mojolicious::Plugin::Minion
840
841 · Mojolicious::Plugin::Minion::Admin
842
844 The Minion distribution includes a few files with different licenses
845 that have been bundled for internal use.
846
847 Minion Artwork
848 Copyright (C) 2017, Sebastian Riedel.
849
850 Licensed under the CC-SA License, Version 4.0
851 <http://creativecommons.org/licenses/by-sa/4.0>.
852
853 Bootstrap
854 Copyright (C) 2011-2018 The Bootstrap Authors.
855
856 Licensed under the MIT License,
857 <http://creativecommons.org/licenses/MIT>.
858
859 D3.js
860 Copyright (C) 2010-2016, Michael Bostock.
861
862 Licensed under the 3-Clause BSD License,
863 <https://opensource.org/licenses/BSD-3-Clause>.
864
865 epoch.js
866 Copyright (C) 2014 Fastly, Inc.
867
868 Licensed under the MIT License,
869 <http://creativecommons.org/licenses/MIT>.
870
871 Font Awesome
872 Copyright (C) Dave Gandy.
873
874 Licensed under the MIT License,
875 <http://creativecommons.org/licenses/MIT>, and the SIL OFL 1.1,
876 <http://scripts.sil.org/OFL>.
877
878 moment.js
879 Copyright (C) JS Foundation and other contributors.
880
881 Licensed under the MIT License,
882 <http://creativecommons.org/licenses/MIT>.
883
884 popper.js
885 Copyright (C) Federico Zivolo 2017.
886
887 Licensed under the MIT License,
888 <http://creativecommons.org/licenses/MIT>.
889
891 Sebastian Riedel, "sri@cpan.org".
892
894 In alphabetical order:
895
896 Andrey Khozov
897
898 Andrii Nikitin
899
900 Brian Medley
901
902 Franz Skale
903
904 Hubert "depesz" Lubaczewski
905
906 Joel Berger
907
908 Paul Williams
909
910 Stefan Adams
911
913 Copyright (C) 2014-2020, Sebastian Riedel and others.
914
915 This program is free software, you can redistribute it and/or modify it
916 under the terms of the Artistic License version 2.0.
917
919 <https://github.com/mojolicious/minion>, Mojolicious::Guides,
920 <https://mojolicious.org>.
921
922
923
924perl v5.30.1 2020-02-02 Minion(3)