1MCE::Hobo(3) User Contributed Perl Documentation MCE::Hobo(3)
2
3
4
6 MCE::Hobo - A threads-like parallelization module
7
9 This document describes MCE::Hobo version 1.876
10
12 use MCE::Hobo;
13
14 MCE::Hobo->init(
15 max_workers => 'auto', # default undef, unlimited
16
17 # Specify a percentage. MCE::Hobo 1.874+.
18 max_workers => '25%', # 4 on HW with 16 lcores
19 max_workers => '50%', # 8 on HW with 16 lcores
20
21 hobo_timeout => 20, # default undef, no timeout
22 posix_exit => 1, # default undef, CORE::exit
23 void_context => 1, # default undef
24
25 on_start => sub {
26 my ( $pid, $ident ) = @_;
27 ...
28 },
29 on_finish => sub {
30 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
31 ...
32 }
33 );
34
35 MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
36
37 sub parallel {
38 my ($arg1) = @_;
39 print "Hello again, $arg1\n" if defined($arg1);
40 print "Hello again, $_\n"; # same thing
41 }
42
43 MCE::Hobo->create( \¶llel, $_ ) for 1 .. 3;
44
45 my @hobos = MCE::Hobo->list();
46 my @pids = MCE::Hobo->list_pids();
47 my @running = MCE::Hobo->list_running();
48 my @joinable = MCE::Hobo->list_joinable();
49 my @count = MCE::Hobo->pending();
50
51 # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
52 $_->join() for @hobos; # (or)
53 $_->join() for @joinable;
54
55 # Joining occurs immediately as hobo processes complete execution.
56 1 while MCE::Hobo->wait_one();
57
58 my $hobo = mce_async { foreach (@files) { ... } };
59
60 $hobo->join();
61
62 if ( my $err = $hobo->error() ) {
63 warn "Hobo error: $err\n";
64 }
65
66 # Get a hobo's object
67 $hobo = MCE::Hobo->self();
68
69 # Get a hobo's ID
70 $pid = MCE::Hobo->pid(); # $$
71 $pid = $hobo->pid();
72 $pid = MCE::Hobo->tid(); # tid is an alias for pid
73 $pid = $hobo->tid();
74
75 # Test hobo objects
76 if ( $hobo1 == $hobo2 ) {
77 ...
78 }
79
80 # Give other workers a chance to run
81 MCE::Hobo->yield();
82 MCE::Hobo->yield(0.05);
83
84 # Return context, wantarray aware
85 my ($value1, $value2) = $hobo->join();
86 my $value = $hobo->join();
87
88 # Check hobo's state
89 if ( $hobo->is_running() ) {
90 sleep 1;
91 }
92 if ( $hobo->is_joinable() ) {
93 $hobo->join();
94 }
95
96 # Send a signal to a hobo
97 $hobo->kill('SIGUSR1');
98
99 # Exit a hobo
100 MCE::Hobo->exit(0);
101 MCE::Hobo->exit(0, @ret); # MCE::Hobo 1.827+
102
104 A hobo is a migratory worker inside the machine that carries the
105 asynchronous gene. Hobo processes are equipped with "threads"-like
106 capability for running code asynchronously. Unlike threads, each hobo
107 is a unique process to the underlying OS. The IPC is managed by
108 "MCE::Shared", which runs on all the major platforms including Cygwin
109 and Strawberry Perl.
110
111 An exception was made on the Windows platform to spawn threads versus
112 children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
113 release reverts back to spawning children on all supported platforms.
114
115 "MCE::Hobo" may be used as a standalone or together with "MCE"
116 including running alongside "threads".
117
118 use MCE::Hobo;
119 use MCE::Shared;
120
121 # synopsis: head -20 file.txt | perl script.pl
122
123 my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
124 my $ofh = MCE::Shared->handle( ">", \*STDOUT );
125 my $ary = MCE::Shared->array();
126
127 sub parallel_task {
128 my ( $id ) = @_;
129 while ( <$ifh> ) {
130 printf {$ofh} "[ %4d ] %s", $., $_;
131 # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
132 $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
133 }
134 }
135
136 my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
137 my $hobo2 = MCE::Hobo->new( \¶llel_task, 2 );
138 my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
139
140 $_->join for MCE::Hobo->list(); # ditto: MCE::Hobo->wait_all();
141
142 # search array (total one round-trip via IPC)
143 my @vals = $ary->vals( "val =~ / ID 2 /" );
144
145 print {*STDERR} join("", @vals);
146
148 $hobo = MCE::Hobo->create( FUNCTION, ARGS )
149 $hobo = MCE::Hobo->new( FUNCTION, ARGS )
150 This will create a new hobo process that will begin execution with
151 function as the entry point, and optionally ARGS for list of
152 parameters. It will return the corresponding MCE::Hobo object, or
153 undef if hobo creation failed.
154
155 FUNCTION may either be the name of a function, an anonymous
156 subroutine, or a code ref.
157
158 my $hobo = MCE::Hobo->create( "func_name", ... );
159 # or
160 my $hobo = MCE::Hobo->create( sub { ... }, ... );
161 # or
162 my $hobo = MCE::Hobo->create( \&func, ... );
163
164 $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
165 $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
166 Options, excluding "ident", may be specified globally via the "init"
167 function. Otherwise, "ident", "hobo_timeout", "posix_exit", and
168 "void_context" may be set uniquely.
169
170 The "ident" option, available since 1.827, is used by callback
171 functions "on_start" and "on_finish" for identifying the started and
172 finished hobo process respectively.
173
174 my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
175 ...
176 } );
177
178 $hobo1->join;
179
180 my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
181 sleep 1 for ( 1 .. 9 );
182 } );
183
184 $hobo2->join;
185
186 if ( $hobo2->error() eq "Hobo timed out\n" ) {
187 ...
188 }
189
190 The "new()" method is an alias for "create()".
191
192 mce_async { BLOCK } ARGS;
193 mce_async { BLOCK };
194 "mce_async" runs the block asynchronously similarly to
195 "MCE::Hobo->create()". It returns the hobo object, or undef if hobo
196 creation failed.
197
198 my $hobo = mce_async { foreach (@files) { ... } };
199
200 $hobo->join();
201
202 if ( my $err = $hobo->error() ) {
203 warn("Hobo error: $err\n");
204 }
205
206 $hobo->join()
207 This will wait for the corresponding hobo process to complete its
208 execution. In non-voided context, "join()" will return the value(s)
209 of the entry point function.
210
211 The context (void, scalar or list) for the return value(s) for
212 "join" is determined at the time of joining and mostly "wantarray"
213 aware.
214
215 my $hobo1 = MCE::Hobo->create( sub {
216 my @res = qw(foo bar baz);
217 return (@res);
218 });
219
220 my @res1 = $hobo1->join(); # ( foo, bar, baz )
221 my $res1 = $hobo1->join(); # baz
222
223 my $hobo2 = MCE::Hobo->create( sub {
224 return 'foo';
225 });
226
227 my @res2 = $hobo2->join(); # ( foo )
228 my $res2 = $hobo2->join(); # foo
229
230 $hobo1->equal( $hobo2 )
231 Tests if two hobo objects are the same hobo or not. Hobo comparison
232 is based on process IDs. This is overloaded to the more natural
233 forms.
234
235 if ( $hobo1 == $hobo2 ) {
236 print("Hobo objects are the same\n");
237 }
238 # or
239 if ( $hobo1 != $hobo2 ) {
240 print("Hobo objects differ\n");
241 }
242
243 $hobo->error()
244 Hobo processes are executed in an "eval" context. This method will
245 return "undef" if the hobo terminates normally. Otherwise, it
246 returns the value of $@ associated with the hobo's execution status
247 in its "eval" context.
248
249 $hobo->exit()
250 This sends 'SIGQUIT' to the hobo process, notifying the hobo to
251 exit. It returns the hobo object to allow for method chaining. It
252 is important to join later if not immediately to not leave a zombie
253 or defunct process.
254
255 $hobo->exit()->join();
256 ...
257
258 $hobo->join(); # later
259
260 MCE::Hobo->exit( 0 )
261 MCE::Hobo->exit( 0, @ret )
262 A hobo can exit at any time by calling "MCE::Hobo->exit()".
263 Otherwise, the behavior is the same as "exit(status)" when called
264 from the main process. Current since 1.827, the hobo process may
265 optionally return data, to be sent via IPC.
266
267 MCE::Hobo->finish()
268 This class method is called automatically by "END", but may be
269 called explicitly. An error is emitted via croak if there are active
270 hobo processes not yet joined.
271
272 MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
273 $_->join for MCE::Hobo->list();
274
275 MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
276 $_->join for MCE::Hobo->list();
277
278 MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
279 $_->join for MCE::Hobo->list();
280
281 MCE::Hobo->finish();
282
283 MCE::Hobo->init( options )
284 The init function accepts a list of MCE::Hobo options.
285
286 MCE::Hobo->init(
287 max_workers => 'auto', # default undef, unlimited
288
289 # Specify a percentage. MCE::Hobo 1.874+.
290 max_workers => '25%', # 4 on HW with 16 lcores
291 max_workers => '50%', # 8 on HW with 16 lcores
292
293 hobo_timeout => 20, # default undef, no timeout
294 posix_exit => 1, # default undef, CORE::exit
295 void_context => 1, # default undef
296
297 on_start => sub {
298 my ( $pid, $ident ) = @_;
299 ...
300 },
301 on_finish => sub {
302 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
303 ...
304 }
305 );
306
307 # Identification given as an option or the 1st argument.
308 # Current API available since 1.827.
309
310 for my $key ( 'aa' .. 'zz' ) {
311 MCE::Hobo->create( { ident => $key }, sub { ... } );
312 MCE::Hobo->create( $key, sub { ... } );
313 }
314
315 MCE::Hobo->wait_all;
316
317 Set "max_workers" if you want to limit the number of workers by
318 waiting automatically for an available slot. Specify a percentage or
319 "auto" to obtain the number of logical cores via
320 "MCE::Util::get_ncpu()".
321
322 Set "hobo_timeout", in number of seconds, if you want the hobo
323 process to terminate after some time. The default is 0 for no
324 timeout.
325
326 Set "posix_exit" to avoid all END and destructor processing.
327 Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
328 FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
329 Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
330
331 Set "void_context" to create the hobo process in void context for
332 the return value. Otherwise, the return context is wantarray-aware
333 for "join()" and "result()" and determined when retrieving the data.
334
335 The callback options "on_start" and "on_finish" are called in the
336 parent process after starting the worker and later when terminated.
337 The arguments for the subroutines were inspired by
338 Parallel::ForkManager.
339
340 The parameters for "on_start" are the following:
341
342 - pid of the hobo process
343 - identification (ident option or 1st arg to create)
344
345 The parameters for "on_finish" are the following:
346
347 - pid of the hobo process
348 - program exit code
349 - identification (ident option or 1st arg to create)
350 - exit signal id
351 - error message from eval inside MCE::Hobo
352 - returned data
353
354 $hobo->is_running()
355 Returns true if a hobo is still running.
356
357 $hobo->is_joinable()
358 Returns true if the hobo has finished running and not yet joined.
359
360 $hobo->kill( 'SIG...' )
361 Sends the specified signal to the hobo. Returns the hobo object to
362 allow for method chaining. As with "exit", it is important to join
363 eventually if not immediately to not leave a zombie or defunct
364 process.
365
366 $hobo->kill('SIG...')->join();
367
368 The following is a parallel demonstration comparing "MCE::Shared"
369 against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
370 after all workers have been notified to quit.
371
372 use Time::HiRes qw(time);
373
374 use Redis;
375 use Redis::Fast;
376
377 use MCE::Hobo;
378 use MCE::Shared;
379
380 my $redis = Redis->new();
381 my $rfast = Redis::Fast->new();
382 my $array = MCE::Shared->array();
383
384 sub parallel_redis {
385 my ($_redis) = @_;
386 my ($count, $quit, $len) = (0, 0);
387
388 # instead, use a flag to exit loop
389 $SIG{'QUIT'} = sub { $quit = 1 };
390
391 while () {
392 $len = $_redis->rpush('list', $count++);
393 last if $quit;
394 }
395
396 $count;
397 }
398
399 sub parallel_array {
400 my ($count, $quit, $len) = (0, 0);
401
402 # do not exit from inside handler
403 $SIG{'QUIT'} = sub { $quit = 1 };
404
405 while () {
406 $len = $array->push($count++);
407 last if $quit;
408 }
409
410 $count;
411 }
412
413 sub benchmark_this {
414 my ($desc, $num_procs, $timeout, $code, @args) = @_;
415 my ($start, $total) = (time(), 0);
416
417 MCE::Hobo->new($code, @args) for 1..$num_procs;
418 sleep $timeout;
419
420 # joining is not immediate; ok
421 $_->kill('QUIT') for MCE::Hobo->list();
422
423 # joining later; ok
424 $total += $_->join() for MCE::Hobo->list();
425
426 printf "$desc <> duration: %0.03f secs, count: $total\n",
427 time() - $start;
428
429 sleep 0.2;
430 }
431
432 benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis);
433 benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast);
434 benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
435
436 MCE::Hobo->list()
437 Returns a list of all hobo objects not yet joined.
438
439 @hobos = MCE::Hobo->list();
440
441 MCE::Hobo->list_pids()
442 Returns a list of all hobo pids not yet joined (available since
443 1.849).
444
445 @pids = MCE::Hobo->list_pids();
446
447 $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
448 # Signal workers and the shared manager all at once
449 CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
450 exec('reset');
451 };
452
453 MCE::Hobo->list_running()
454 Returns a list of all hobo objects that are still running.
455
456 @hobos = MCE::Hobo->list_running();
457
458 MCE::Hobo->list_joinable()
459 Returns a list of all hobo objects that have completed running.
460 Thus, ready to be joined without blocking.
461
462 @hobos = MCE::Hobo->list_joinable();
463
464 MCE::Hobo->max_workers([ N ])
465 Getter and setter for max_workers. Specify a number or 'auto' to
466 acquire the total number of cores via MCE::Util::get_ncpu. Specify a
467 false value to set back to no limit.
468
469 API available since 1.835.
470
471 MCE::Hobo->pending()
472 Returns a count of all hobo objects not yet joined.
473
474 $count = MCE::Hobo->pending();
475
476 $hobo->result()
477 Returns the result obtained by "join", "wait_one", or "wait_all". If
478 the process has not yet exited, waits for the corresponding hobo to
479 complete its execution.
480
481 use MCE::Hobo;
482 use Time::HiRes qw(sleep);
483
484 sub task {
485 my ($id) = @_;
486 sleep $id * 0.333;
487 return $id;
488 }
489
490 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
491
492 # 1 while MCE::Hobo->wait_one();
493
494 while ( my $hobo = MCE::Hobo->wait_one() ) {
495 my $err = $hobo->error() || 'no error';
496 my $res = $hobo->result();
497 my $pid = $hobo->pid();
498
499 print "[$pid] $err : $res\n";
500 }
501
502 Like "join" described above, the context (void, scalar or list) for
503 the return value(s) is determined at the time "result" is called and
504 mostly "wantarray" aware.
505
506 my $hobo1 = MCE::Hobo->create( sub {
507 my @res = qw(foo bar baz);
508 return (@res);
509 });
510
511 my @res1 = $hobo1->result(); # ( foo, bar, baz )
512 my $res1 = $hobo1->result(); # baz
513
514 my $hobo2 = MCE::Hobo->create( sub {
515 return 'foo';
516 });
517
518 my @res2 = $hobo2->result(); # ( foo )
519 my $res2 = $hobo2->result(); # foo
520
521 MCE::Hobo->self()
522 Class method that allows a hobo to obtain it's own MCE::Hobo object.
523
524 $hobo->pid()
525 $hobo->tid()
526 Returns the ID of the hobo.
527
528 pid: $$ process id
529 tid: $$ alias for pid
530
531 MCE::Hobo->pid()
532 MCE::Hobo->tid()
533 Class methods that allows a hobo to obtain its own ID.
534
535 pid: $$ process id
536 tid: $$ alias for pid
537
538 MCE::Hobo->wait_one()
539 MCE::Hobo->waitone()
540 MCE::Hobo->wait_all()
541 MCE::Hobo->waitall()
542 Meaningful for the manager process only, waits for one or all hobo
543 processes to complete execution. Afterwards, returns the
544 corresponding hobo objects. If a hobo doesn't exist, returns the
545 "undef" value or an empty list for "wait_one" and "wait_all"
546 respectively.
547
548 The "waitone" and "waitall" methods are aliases since 1.827 for
549 backwards compatibility.
550
551 use MCE::Hobo;
552 use Time::HiRes qw(sleep);
553
554 sub task {
555 my $id = shift;
556 sleep $id * 0.333;
557 return $id;
558 }
559
560 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
561
562 # join, traditional use case
563 $_->join() for MCE::Hobo->list();
564
565 # wait_one, simplistic use case
566 1 while MCE::Hobo->wait_one();
567
568 # wait_one
569 while ( my $hobo = MCE::Hobo->wait_one() ) {
570 my $err = $hobo->error() || 'no error';
571 my $res = $hobo->result();
572 my $pid = $hobo->pid();
573
574 print "[$pid] $err : $res\n";
575 }
576
577 # wait_all
578 my @hobos = MCE::Hobo->wait_all();
579
580 for ( @hobos ) {
581 my $err = $_->error() || 'no error';
582 my $res = $_->result();
583 my $pid = $_->pid();
584
585 print "[$pid] $err : $res\n";
586 }
587
588 MCE::Hobo->yield( [ floating_seconds ] )
589 Prior API till 1.826.
590
591 Let this hobo yield CPU time to other workers. By default, the class
592 method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
593 including Cygwin.
594
595 MCE::Hobo->yield();
596 MCE::Hobo->yield(0.05);
597
598 # total run time: 0.25 seconds, sleep occuring in parallel
599
600 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
601 MCE::Hobo->wait_all();
602
603 Current API available since 1.827.
604
605 Give other workers a chance to run, optionally for given time. Yield
606 behaves similarly to MCE's interval option. It throttles workers
607 from running too fast. A demonstration is provided in the next
608 section for fetching URLs in parallel.
609
610 The default "floating_seconds" is 0.008 and 0.015 on UNIX and
611 Windows, respectively. Pass 0 if simply wanting to give other
612 workers a chance to run.
613
614 # total run time: 1.00 second
615
616 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
617 MCE::Hobo->wait_all();
618
620 Threads-like detach capability was added starting with the 1.867
621 release.
622
623 A threads example is shown first followed by the MCE::Hobo example. All
624 one needs to do is set the CHLD signal handler to IGNORE.
625 Unfortunately, this works on UNIX platforms only. The hobo process
626 restores the CHLD handler to default, so is able to deeply spin workers
627 and reap if desired.
628
629 use threads;
630
631 for ( 1 .. 8 ) {
632 async {
633 # do something
634 }->detach;
635 }
636
637 use MCE::Hobo;
638
639 # Have the OS reap workers automatically when exiting.
640 # The on_finish option is ignored if specified (no-op).
641 # Ensure not inside a thread on UNIX platforms.
642
643 $SIG{CHLD} = 'IGNORE';
644
645 for ( 1 .. 8 ) {
646 mce_async {
647 # do something
648 };
649 }
650
651 # Optionally, wait for any remaining workers before leaving.
652 # This is necessary if workers are consuming shared objects,
653 # constructed via MCE::Shared.
654
655 MCE::Hobo->wait_all;
656
657 The following is another way and works on Windows. Here, the on_finish
658 handler works as usual.
659
660 use MCE::Hobo;
661
662 MCE::Hobo->init(
663 on_finish = sub {
664 ...
665 },
666 );
667
668 for ( 1 .. 8 ) {
669 $_->join for MCE::Hobo->list_joinable;
670 mce_async {
671 # do something
672 };
673 }
674
675 MCE::Hobo->wait_all;
676
678 MCE::Hobo behaves similarly to threads for the most part. It also
679 provides Parallel::ForkManager-like capabilities. The
680 "Parallel::ForkManager" example is shown first followed by a version
681 using "MCE::Hobo".
682
683 Parallel::ForkManager
684 use strict;
685 use warnings;
686
687 use Parallel::ForkManager;
688 use Time::HiRes 'time';
689
690 my $start = time;
691
692 my $pm = Parallel::ForkManager->new(10);
693 $pm->set_waitpid_blocking_sleep(0);
694
695 $pm->run_on_finish( sub {
696 my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
697 print "child $pid completed: $ident => ", $resp->[0], "\n";
698 });
699
700 DATA_LOOP:
701 foreach my $data ( 1..2000 ) {
702 # forks and returns the pid for the child
703 my $pid = $pm->start($data) and next DATA_LOOP;
704 my $ret = [ $data * 2 ];
705
706 $pm->finish(0, $ret);
707 }
708
709 $pm->wait_all_children;
710
711 printf STDERR "duration: %0.03f seconds\n", time - $start;
712
713 MCE::Hobo
714 use strict;
715 use warnings;
716
717 use MCE::Hobo 1.843;
718 use Time::HiRes 'time';
719
720 my $start = time;
721
722 MCE::Hobo->init(
723 max_workers => 10,
724 on_finish => sub {
725 my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
726 print "child $pid completed: $ident => ", $resp->[0], "\n";
727 }
728 );
729
730 foreach my $data ( 1..2000 ) {
731 MCE::Hobo->create( $data, sub {
732 [ $data * 2 ];
733 });
734 }
735
736 MCE::Hobo->wait_all;
737
738 printf STDERR "duration: %0.03f seconds\n", time - $start;
739
740 Time to spin 2,000 workers and obtain results (in seconds).
741 Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
742 Boost). Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
743 with Moo loaded at the top of the script.
744
745 MCE::Hobo uses MCE::Shared to retrieve data during reaping.
746 MCE::Child uses MCE::Channel, no shared-manager.
747
748 Version Cygwin Windows Linux macOS FreeBSD
749
750 MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
751 MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
752 P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
753
754 MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
755 MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
756 P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
757
758 Set posix_exit to avoid all END and destructor processing.
759 This is helpful for reducing overhead when workers exit. Ditto if
760 using a Perl module not parallel safe. The option is ignored on
761 Windows "$^O eq 'MSWin32'".
762
763 MCE::Child->init( posix_exit => 1, ... );
764 MCE::Hobo->init( posix_exit => 1, ... );
765
766 Version Cygwin Windows Linux macOS FreeBSD
767
768 MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
769 MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
770
772 This demonstration constructs two queues, two handles, starts the
773 shared-manager process if needed, and spawns four workers. For this
774 demonstration, am chunking 64 URLs per job. In reality, one may run
775 with 200 workers and chunk 300 URLs on a 24-way box.
776
777 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
778 # perl demo.pl -- all output
779 # perl demo.pl >/dev/null -- mngr/hobo output
780 # perl demo.pl 2>/dev/null -- show results only
781 #
782 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
783
784 use strict;
785 use warnings;
786
787 use AnyEvent;
788 use AnyEvent::HTTP;
789 use Time::HiRes qw( time );
790
791 use MCE::Hobo;
792 use MCE::Shared;
793
794 # Construct two queues, input and return.
795
796 my $que = MCE::Shared->queue();
797 my $ret = MCE::Shared->queue();
798
799 # Construct shared handles for serializing output from many workers
800 # writing simultaneously. This prevents garbled output.
801
802 mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
803 mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
804
805 # Spawn workers early for minimum memory consumption.
806
807 MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
808
809 # Obtain or generate input data for workers to process.
810
811 my ( $count, @urls ) = ( 0 );
812
813 push @urls, map { "http://127.0.0.$_/" } 1..254;
814 push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
815
816 while ( @urls ) {
817 my @chunk = splice(@urls, 0, 64);
818 $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
819 }
820
821 # So that workers leave the loop after consuming the queue.
822
823 $que->end();
824
825 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
826 # Loop for the manager process. The manager may do other work if
827 # need be and periodically check $ret->pending() not shown here.
828 #
829 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
830
831 my $start = time;
832
833 printf {$ERR} "Mngr - entering loop\n";
834
835 while ( $count ) {
836 my ( $result, $failed ) = $ret->dequeue( 2 );
837
838 # Remove ID from result, so not treated as a URL item.
839
840 printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
841
842 # Display the URL and the size captured.
843
844 foreach my $url ( keys %{ $result } ) {
845 printf {$OUT} "%s: %d\n", $url, length($result->{$url})
846 if $result->{$url}; # url has content
847 }
848
849 # Display URLs could not reach.
850
851 if ( @{ $failed } ) {
852 foreach my $url ( @{ $failed } ) {
853 print {$OUT} "Failed: $url\n";
854 }
855 }
856
857 # Decrement the count.
858
859 $count--;
860 }
861
862 MCE::Hobo->wait_all();
863
864 printf {$ERR} "Mngr - exiting loop\n\n";
865 printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
866
867 exit;
868
869 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
870 # Hobo processes enqueue two items ( $result and $failed ) per each
871 # job for the manager process. Likewise, the manager process dequeues
872 # two items above. Optionally, hobo processes may include the ID in
873 # the result.
874 #
875 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
876
877 sub task {
878 my ( $id ) = @_;
879 printf {$ERR} "Hobo $id entering loop\n";
880
881 while ( my $job = $que->dequeue() ) {
882 my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
883
884 # Walk URLs, provide a hash and array refs for data.
885
886 printf {$ERR} "Hobo $id running job $job->{ID}\n";
887 walk( $job, $result, $failed );
888
889 # Send results to the manager process.
890
891 $ret->enqueue( $result, $failed );
892 }
893
894 printf {$ERR} "Hobo $id exiting loop\n";
895 }
896
897 sub walk {
898 my ( $job, $result, $failed ) = @_;
899
900 # Yielding is critical when running an event loop in parallel.
901 # Not doing so means that the app may reach contention points
902 # with the firewall and likely impose unnecessary hardship at
903 # the OS level. The idea here is not to have multiple workers
904 # initiate HTTP requests to a batch of URLs at the same time.
905 # Yielding in 1.827+ behaves similarly like scatter to have
906 # the hobo process run solo for a fraction of time.
907
908 MCE::Hobo->yield( 0.03 ); # MCE::Hobo 1.827+
909
910 my $cv = AnyEvent->condvar();
911
912 # Populate the hash ref for the URLs it could reach.
913 # Do not mix AnyEvent timeout with hobo timeout.
914 # Therefore, choose event timeout when available.
915
916 foreach my $url ( @{ $job->{INPUT} } ) {
917 $cv->begin();
918 http_get $url, timeout => 2, sub {
919 my ( $data, $headers ) = @_;
920 $result->{$url} = $data;
921 $cv->end();
922 };
923 }
924
925 $cv->recv();
926
927 # Populate the array ref for URLs it could not reach.
928
929 foreach my $url ( @{ $job->{INPUT} } ) {
930 push @{ $failed }, $url unless (exists $result->{ $url });
931 }
932
933 return;
934 }
935
936 __END__
937
938 $ perl demo.pl
939
940 Hobo 1 entering loop
941 Hobo 2 entering loop
942 Hobo 3 entering loop
943 Mngr - entering loop
944 Hobo 2 running job 2
945 Hobo 3 running job 3
946 Hobo 1 running job 1
947 Hobo 4 entering loop
948 Hobo 4 running job 4
949 Hobo 2 running job 5
950 Mngr - received job 2
951 Hobo 3 running job 6
952 Mngr - received job 3
953 Hobo 1 running job 7
954 Mngr - received job 1
955 Hobo 4 running job 8
956 Mngr - received job 4
957 http://192.168.0.1/: 3729
958 Hobo 2 exiting loop
959 Mngr - received job 5
960 Hobo 3 exiting loop
961 Mngr - received job 6
962 Hobo 1 exiting loop
963 Mngr - received job 7
964 Hobo 4 exiting loop
965 Mngr - received job 8
966 Mngr - exiting loop
967
968 Duration: 4.131 seconds
969
971 Making an executable is possible with the PAR::Packer module. On the
972 Windows platform, threads, threads::shared, and exiting via threads are
973 necessary for the binary to exit successfully.
974
975 # https://metacpan.org/pod/PAR::Packer
976 # https://metacpan.org/pod/pp
977 #
978 # pp -o demo.exe demo.pl
979 # ./demo.exe
980
981 use strict;
982 use warnings;
983
984 use if $^O eq "MSWin32", "threads";
985 use if $^O eq "MSWin32", "threads::shared";
986
987 # Include minimum dependencies for MCE::Hobo.
988 # Add other modules required by your application here.
989
990 use Storable ();
991 use Time::HiRes ();
992
993 # use IO::FDPass (); # optional: for condvar, handle, queue
994 # use Sereal (); # optional: for faster serialization
995
996 use MCE::Hobo;
997 use MCE::Shared;
998
999 # For PAR to work on the Windows platform, one must include manually
1000 # any shared modules used by the application.
1001
1002 # use MCE::Shared::Array; # if using MCE::Shared->array
1003 # use MCE::Shared::Cache; # if using MCE::Shared->cache
1004 # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
1005 # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
1006 # use MCE::Shared::Hash; # if using MCE::Shared->hash
1007 # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
1008 # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
1009 # use MCE::Shared::Queue; # if using MCE::Shared->queue
1010 # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
1011
1012 # Et cetera. Only load modules needed for your application.
1013
1014 use MCE::Shared::Sequence; # if using MCE::Shared->sequence
1015
1016 my $seq = MCE::Shared->sequence( 1, 9 );
1017
1018 sub task {
1019 my ( $id ) = @_;
1020 while ( defined ( my $num = $seq->next() ) ) {
1021 print "$id: $num\n";
1022 sleep 1;
1023 }
1024 }
1025
1026 sub main {
1027 MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
1028 MCE::Hobo->wait_all();
1029 }
1030
1031 # Main must run inside a thread on the Windows platform or workers
1032 # will fail duing exiting, causing the exe to crash. The reason is
1033 # that PAR or a dependency isn't multi-process safe.
1034
1035 ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1036
1037 threads->exit(0) if $INC{"threads.pm"};
1038
1040 The inspiration for "MCE::Hobo" comes from wanting "threads"-like
1041 behavior for processes. Both can run side-by-side including safe-use by
1042 MCE workers. Likewise, the documentation resembles "threads".
1043
1044 The inspiration for "wait_all" and "wait_one" comes from the
1045 "Parallel::WorkUnit" module.
1046
1048 • forks
1049
1050 • forks::BerkeleyDB
1051
1052 • MCE::Child
1053
1054 • Parallel::ForkManager
1055
1056 • Parallel::Loops
1057
1058 • Parallel::Prefork
1059
1060 • Parallel::WorkUnit
1061
1062 • Proc::Fork
1063
1064 • Thread::Tie
1065
1066 • threads
1067
1069 MCE, MCE::Channel, MCE::Shared
1070
1072 Mario E. Roy, <marioeroy AT gmail DOT com>
1073
1074
1075
1076perl v5.34.0 2022-02-20 MCE::Hobo(3)