1MCE::Hobo(3) User Contributed Perl Documentation MCE::Hobo(3)
2
3
4
6 MCE::Hobo - A threads-like parallelization module
7
9 This document describes MCE::Hobo version 1.873
10
12 use MCE::Hobo;
13
14 MCE::Hobo->init(
15 max_workers => 'auto', # default undef, unlimited
16 hobo_timeout => 20, # default undef, no timeout
17 posix_exit => 1, # default undef, CORE::exit
18 void_context => 1, # default undef
19 on_start => sub {
20 my ( $pid, $ident ) = @_;
21 ...
22 },
23 on_finish => sub {
24 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
25 ...
26 }
27 );
28
29 MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
30
31 sub parallel {
32 my ($arg1) = @_;
33 print "Hello again, $arg1\n" if defined($arg1);
34 print "Hello again, $_\n"; # same thing
35 }
36
37 MCE::Hobo->create( \¶llel, $_ ) for 1 .. 3;
38
39 my @hobos = MCE::Hobo->list();
40 my @pids = MCE::Hobo->list_pids();
41 my @running = MCE::Hobo->list_running();
42 my @joinable = MCE::Hobo->list_joinable();
43 my @count = MCE::Hobo->pending();
44
45 # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
46 $_->join() for @hobos; # (or)
47 $_->join() for @joinable;
48
49 # Joining occurs immediately as hobo processes complete execution.
50 1 while MCE::Hobo->wait_one();
51
52 my $hobo = mce_async { foreach (@files) { ... } };
53
54 $hobo->join();
55
56 if ( my $err = $hobo->error() ) {
57 warn "Hobo error: $err\n";
58 }
59
60 # Get a hobo's object
61 $hobo = MCE::Hobo->self();
62
63 # Get a hobo's ID
64 $pid = MCE::Hobo->pid(); # $$
65 $pid = $hobo->pid();
66 $pid = MCE::Hobo->tid(); # tid is an alias for pid
67 $pid = $hobo->tid();
68
69 # Test hobo objects
70 if ( $hobo1 == $hobo2 ) {
71 ...
72 }
73
74 # Give other workers a chance to run
75 MCE::Hobo->yield();
76 MCE::Hobo->yield(0.05);
77
78 # Return context, wantarray aware
79 my ($value1, $value2) = $hobo->join();
80 my $value = $hobo->join();
81
82 # Check hobo's state
83 if ( $hobo->is_running() ) {
84 sleep 1;
85 }
86 if ( $hobo->is_joinable() ) {
87 $hobo->join();
88 }
89
90 # Send a signal to a hobo
91 $hobo->kill('SIGUSR1');
92
93 # Exit a hobo
94 MCE::Hobo->exit(0);
95 MCE::Hobo->exit(0, @ret); # MCE::Hobo 1.827+
96
98 A hobo is a migratory worker inside the machine that carries the
99 asynchronous gene. Hobo processes are equipped with "threads"-like
100 capability for running code asynchronously. Unlike threads, each hobo
101 is a unique process to the underlying OS. The IPC is managed by
102 "MCE::Shared", which runs on all the major platforms including Cygwin
103 and Strawberry Perl.
104
105 An exception was made on the Windows platform to spawn threads versus
106 children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
107 release reverts back to spawning children on all supported platforms.
108
109 "MCE::Hobo" may be used as a standalone or together with "MCE"
110 including running alongside "threads".
111
112 use MCE::Hobo;
113 use MCE::Shared;
114
115 # synopsis: head -20 file.txt | perl script.pl
116
117 my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
118 my $ofh = MCE::Shared->handle( ">", \*STDOUT );
119 my $ary = MCE::Shared->array();
120
121 sub parallel_task {
122 my ( $id ) = @_;
123 while ( <$ifh> ) {
124 printf {$ofh} "[ %4d ] %s", $., $_;
125 # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
126 $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
127 }
128 }
129
130 my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
131 my $hobo2 = MCE::Hobo->new( \¶llel_task, 2 );
132 my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
133
134 $_->join for MCE::Hobo->list(); # ditto: MCE::Hobo->wait_all();
135
136 # search array (total one round-trip via IPC)
137 my @vals = $ary->vals( "val =~ / ID 2 /" );
138
139 print {*STDERR} join("", @vals);
140
142 $hobo = MCE::Hobo->create( FUNCTION, ARGS )
143 $hobo = MCE::Hobo->new( FUNCTION, ARGS )
144 This will create a new hobo process that will begin execution with
145 function as the entry point, and optionally ARGS for list of
146 parameters. It will return the corresponding MCE::Hobo object, or
147 undef if hobo creation failed.
148
149 FUNCTION may either be the name of a function, an anonymous
150 subroutine, or a code ref.
151
152 my $hobo = MCE::Hobo->create( "func_name", ... );
153 # or
154 my $hobo = MCE::Hobo->create( sub { ... }, ... );
155 # or
156 my $hobo = MCE::Hobo->create( \&func, ... );
157
158 $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
159 $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
160 Options, excluding "ident", may be specified globally via the "init"
161 function. Otherwise, "ident", "hobo_timeout", "posix_exit", and
162 "void_context" may be set uniquely.
163
164 The "ident" option, available since 1.827, is used by callback
165 functions "on_start" and "on_finish" for identifying the started and
166 finished hobo process respectively.
167
168 my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
169 ...
170 } );
171
172 $hobo1->join;
173
174 my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
175 sleep 1 for ( 1 .. 9 );
176 } );
177
178 $hobo2->join;
179
180 if ( $hobo2->error() eq "Hobo timed out\n" ) {
181 ...
182 }
183
184 The "new()" method is an alias for "create()".
185
186 mce_async { BLOCK } ARGS;
187 mce_async { BLOCK };
188 "mce_async" runs the block asynchronously similarly to
189 "MCE::Hobo->create()". It returns the hobo object, or undef if hobo
190 creation failed.
191
192 my $hobo = mce_async { foreach (@files) { ... } };
193
194 $hobo->join();
195
196 if ( my $err = $hobo->error() ) {
197 warn("Hobo error: $err\n");
198 }
199
200 $hobo->join()
201 This will wait for the corresponding hobo process to complete its
202 execution. In non-voided context, "join()" will return the value(s)
203 of the entry point function.
204
205 The context (void, scalar or list) for the return value(s) for
206 "join" is determined at the time of joining and mostly "wantarray"
207 aware.
208
209 my $hobo1 = MCE::Hobo->create( sub {
210 my @res = qw(foo bar baz);
211 return (@res);
212 });
213
214 my @res1 = $hobo1->join(); # ( foo, bar, baz )
215 my $res1 = $hobo1->join(); # baz
216
217 my $hobo2 = MCE::Hobo->create( sub {
218 return 'foo';
219 });
220
221 my @res2 = $hobo2->join(); # ( foo )
222 my $res2 = $hobo2->join(); # foo
223
224 $hobo1->equal( $hobo2 )
225 Tests if two hobo objects are the same hobo or not. Hobo comparison
226 is based on process IDs. This is overloaded to the more natural
227 forms.
228
229 if ( $hobo1 == $hobo2 ) {
230 print("Hobo objects are the same\n");
231 }
232 # or
233 if ( $hobo1 != $hobo2 ) {
234 print("Hobo objects differ\n");
235 }
236
237 $hobo->error()
238 Hobo processes are executed in an "eval" context. This method will
239 return "undef" if the hobo terminates normally. Otherwise, it
240 returns the value of $@ associated with the hobo's execution status
241 in its "eval" context.
242
243 $hobo->exit()
244 This sends 'SIGQUIT' to the hobo process, notifying the hobo to
245 exit. It returns the hobo object to allow for method chaining. It
246 is important to join later if not immediately to not leave a zombie
247 or defunct process.
248
249 $hobo->exit()->join();
250 ...
251
252 $hobo->join(); # later
253
254 MCE::Hobo->exit( 0 )
255 MCE::Hobo->exit( 0, @ret )
256 A hobo can exit at any time by calling "MCE::Hobo->exit()".
257 Otherwise, the behavior is the same as "exit(status)" when called
258 from the main process. Current since 1.827, the hobo process may
259 optionally return data, to be sent via IPC.
260
261 MCE::Hobo->finish()
262 This class method is called automatically by "END", but may be
263 called explicitly. An error is emitted via croak if there are active
264 hobo processes not yet joined.
265
266 MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
267 $_->join for MCE::Hobo->list();
268
269 MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
270 $_->join for MCE::Hobo->list();
271
272 MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
273 $_->join for MCE::Hobo->list();
274
275 MCE::Hobo->finish();
276
277 MCE::Hobo->init( options )
278 The init function accepts a list of MCE::Hobo options.
279
280 MCE::Hobo->init(
281 max_workers => 'auto', # default undef, unlimited
282 hobo_timeout => 20, # default undef, no timeout
283 posix_exit => 1, # default undef, CORE::exit
284 void_context => 1, # default undef
285 on_start => sub {
286 my ( $pid, $ident ) = @_;
287 ...
288 },
289 on_finish => sub {
290 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
291 ...
292 }
293 );
294
295 # Identification given as an option or the 1st argument.
296 # Current API available since 1.827.
297
298 for my $key ( 'aa' .. 'zz' ) {
299 MCE::Hobo->create( { ident => $key }, sub { ... } );
300 MCE::Hobo->create( $key, sub { ... } );
301 }
302
303 MCE::Hobo->wait_all;
304
305 Set "max_workers" if you want to limit the number of workers by
306 waiting automatically for an available slot. Specify "auto" to
307 obtain the number of logical cores via "MCE::Util::get_ncpu()".
308
309 Set "hobo_timeout", in number of seconds, if you want the hobo
310 process to terminate after some time. The default is 0 for no
311 timeout.
312
313 Set "posix_exit" to avoid all END and destructor processing.
314 Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
315 FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
316 Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
317
318 Set "void_context" to create the hobo process in void context for
319 the return value. Otherwise, the return context is wantarray-aware
320 for "join()" and "result()" and determined when retrieving the data.
321
322 The callback options "on_start" and "on_finish" are called in the
323 parent process after starting the worker and later when terminated.
324 The arguments for the subroutines were inspired by
325 Parallel::ForkManager.
326
327 The parameters for "on_start" are the following:
328
329 - pid of the hobo process
330 - identification (ident option or 1st arg to create)
331
332 The parameters for "on_finish" are the following:
333
334 - pid of the hobo process
335 - program exit code
336 - identification (ident option or 1st arg to create)
337 - exit signal id
338 - error message from eval inside MCE::Hobo
339 - returned data
340
341 $hobo->is_running()
342 Returns true if a hobo is still running.
343
344 $hobo->is_joinable()
345 Returns true if the hobo has finished running and not yet joined.
346
347 $hobo->kill( 'SIG...' )
348 Sends the specified signal to the hobo. Returns the hobo object to
349 allow for method chaining. As with "exit", it is important to join
350 eventually if not immediately to not leave a zombie or defunct
351 process.
352
353 $hobo->kill('SIG...')->join();
354
355 The following is a parallel demonstration comparing "MCE::Shared"
356 against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
357 after all workers have been notified to quit.
358
359 use Time::HiRes qw(time);
360
361 use Redis;
362 use Redis::Fast;
363
364 use MCE::Hobo;
365 use MCE::Shared;
366
367 my $redis = Redis->new();
368 my $rfast = Redis::Fast->new();
369 my $array = MCE::Shared->array();
370
371 sub parallel_redis {
372 my ($_redis) = @_;
373 my ($count, $quit, $len) = (0, 0);
374
375 # instead, use a flag to exit loop
376 $SIG{'QUIT'} = sub { $quit = 1 };
377
378 while () {
379 $len = $_redis->rpush('list', $count++);
380 last if $quit;
381 }
382
383 $count;
384 }
385
386 sub parallel_array {
387 my ($count, $quit, $len) = (0, 0);
388
389 # do not exit from inside handler
390 $SIG{'QUIT'} = sub { $quit = 1 };
391
392 while () {
393 $len = $array->push($count++);
394 last if $quit;
395 }
396
397 $count;
398 }
399
400 sub benchmark_this {
401 my ($desc, $num_procs, $timeout, $code, @args) = @_;
402 my ($start, $total) = (time(), 0);
403
404 MCE::Hobo->new($code, @args) for 1..$num_procs;
405 sleep $timeout;
406
407 # joining is not immediate; ok
408 $_->kill('QUIT') for MCE::Hobo->list();
409
410 # joining later; ok
411 $total += $_->join() for MCE::Hobo->list();
412
413 printf "$desc <> duration: %0.03f secs, count: $total\n",
414 time() - $start;
415
416 sleep 0.2;
417 }
418
419 benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis);
420 benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast);
421 benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
422
423 MCE::Hobo->list()
424 Returns a list of all hobo objects not yet joined.
425
426 @hobos = MCE::Hobo->list();
427
428 MCE::Hobo->list_pids()
429 Returns a list of all hobo pids not yet joined (available since
430 1.849).
431
432 @pids = MCE::Hobo->list_pids();
433
434 $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
435 # Signal workers and the shared manager all at once
436 CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
437 exec('reset');
438 };
439
440 MCE::Hobo->list_running()
441 Returns a list of all hobo objects that are still running.
442
443 @hobos = MCE::Hobo->list_running();
444
445 MCE::Hobo->list_joinable()
446 Returns a list of all hobo objects that have completed running.
447 Thus, ready to be joined without blocking.
448
449 @hobos = MCE::Hobo->list_joinable();
450
451 MCE::Hobo->max_workers([ N ])
452 Getter and setter for max_workers. Specify a number or 'auto' to
453 acquire the total number of cores via MCE::Util::get_ncpu. Specify a
454 false value to set back to no limit.
455
456 API available since 1.835.
457
458 MCE::Hobo->pending()
459 Returns a count of all hobo objects not yet joined.
460
461 $count = MCE::Hobo->pending();
462
463 $hobo->result()
464 Returns the result obtained by "join", "wait_one", or "wait_all". If
465 the process has not yet exited, waits for the corresponding hobo to
466 complete its execution.
467
468 use MCE::Hobo;
469 use Time::HiRes qw(sleep);
470
471 sub task {
472 my ($id) = @_;
473 sleep $id * 0.333;
474 return $id;
475 }
476
477 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
478
479 # 1 while MCE::Hobo->wait_one();
480
481 while ( my $hobo = MCE::Hobo->wait_one() ) {
482 my $err = $hobo->error() || 'no error';
483 my $res = $hobo->result();
484 my $pid = $hobo->pid();
485
486 print "[$pid] $err : $res\n";
487 }
488
489 Like "join" described above, the context (void, scalar or list) for
490 the return value(s) is determined at the time "result" is called and
491 mostly "wantarray" aware.
492
493 my $hobo1 = MCE::Hobo->create( sub {
494 my @res = qw(foo bar baz);
495 return (@res);
496 });
497
498 my @res1 = $hobo1->result(); # ( foo, bar, baz )
499 my $res1 = $hobo1->result(); # baz
500
501 my $hobo2 = MCE::Hobo->create( sub {
502 return 'foo';
503 });
504
505 my @res2 = $hobo2->result(); # ( foo )
506 my $res2 = $hobo2->result(); # foo
507
508 MCE::Hobo->self()
509 Class method that allows a hobo to obtain it's own MCE::Hobo object.
510
511 $hobo->pid()
512 $hobo->tid()
513 Returns the ID of the hobo.
514
515 pid: $$ process id
516 tid: $$ alias for pid
517
518 MCE::Hobo->pid()
519 MCE::Hobo->tid()
520 Class methods that allows a hobo to obtain its own ID.
521
522 pid: $$ process id
523 tid: $$ alias for pid
524
525 MCE::Hobo->wait_one()
526 MCE::Hobo->waitone()
527 MCE::Hobo->wait_all()
528 MCE::Hobo->waitall()
529 Meaningful for the manager process only, waits for one or all hobo
530 processes to complete execution. Afterwards, returns the
531 corresponding hobo objects. If a hobo doesn't exist, returns the
532 "undef" value or an empty list for "wait_one" and "wait_all"
533 respectively.
534
535 The "waitone" and "waitall" methods are aliases since 1.827 for
536 backwards compatibility.
537
538 use MCE::Hobo;
539 use Time::HiRes qw(sleep);
540
541 sub task {
542 my $id = shift;
543 sleep $id * 0.333;
544 return $id;
545 }
546
547 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
548
549 # join, traditional use case
550 $_->join() for MCE::Hobo->list();
551
552 # wait_one, simplistic use case
553 1 while MCE::Hobo->wait_one();
554
555 # wait_one
556 while ( my $hobo = MCE::Hobo->wait_one() ) {
557 my $err = $hobo->error() || 'no error';
558 my $res = $hobo->result();
559 my $pid = $hobo->pid();
560
561 print "[$pid] $err : $res\n";
562 }
563
564 # wait_all
565 my @hobos = MCE::Hobo->wait_all();
566
567 for ( @hobos ) {
568 my $err = $_->error() || 'no error';
569 my $res = $_->result();
570 my $pid = $_->pid();
571
572 print "[$pid] $err : $res\n";
573 }
574
575 MCE::Hobo->yield( [ floating_seconds ] )
576 Prior API till 1.826.
577
578 Let this hobo yield CPU time to other workers. By default, the class
579 method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
580 including Cygwin.
581
582 MCE::Hobo->yield();
583 MCE::Hobo->yield(0.05);
584
585 # total run time: 0.25 seconds, sleep occuring in parallel
586
587 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
588 MCE::Hobo->wait_all();
589
590 Current API available since 1.827.
591
592 Give other workers a chance to run, optionally for given time. Yield
593 behaves similarly to MCE's interval option. It throttles workers
594 from running too fast. A demonstration is provided in the next
595 section for fetching URLs in parallel.
596
597 The default "floating_seconds" is 0.008 and 0.015 on UNIX and
598 Windows, respectively. Pass 0 if simply wanting to give other
599 workers a chance to run.
600
601 # total run time: 1.00 second
602
603 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
604 MCE::Hobo->wait_all();
605
607 Threads-like detach capability was added starting with the 1.867
608 release.
609
610 A threads example is shown first followed by the MCE::Hobo example. All
611 one needs to do is set the CHLD signal handler to IGNORE.
612 Unfortunately, this works on UNIX platforms only. The hobo process
613 restores the CHLD handler to default, so is able to deeply spin workers
614 and reap if desired.
615
616 use threads;
617
618 for ( 1 .. 8 ) {
619 async {
620 # do something
621 }->detach;
622 }
623
624 use MCE::Hobo;
625
626 # Have the OS reap workers automatically when exiting.
627 # The on_finish option is ignored if specified (no-op).
628 # Ensure not inside a thread on UNIX platforms.
629
630 $SIG{CHLD} = 'IGNORE';
631
632 for ( 1 .. 8 ) {
633 mce_async {
634 # do something
635 };
636 }
637
638 # Optionally, wait for any remaining workers before leaving.
639 # This is necessary if workers are consuming shared objects,
640 # constructed via MCE::Shared.
641
642 MCE::Hobo->wait_all;
643
644 The following is another way and works on Windows. Here, the on_finish
645 handler works as usual.
646
647 use MCE::Hobo;
648
649 MCE::Hobo->init(
650 on_finish = sub {
651 ...
652 },
653 );
654
655 for ( 1 .. 8 ) {
656 $_->join for MCE::Hobo->list_joinable;
657 mce_async {
658 # do something
659 };
660 }
661
662 MCE::Hobo->wait_all;
663
665 MCE::Hobo behaves similarly to threads for the most part. It also
666 provides Parallel::ForkManager-like capabilities. The
667 "Parallel::ForkManager" example is shown first followed by a version
668 using "MCE::Hobo".
669
670 Parallel::ForkManager
671 use strict;
672 use warnings;
673
674 use Parallel::ForkManager;
675 use Time::HiRes 'time';
676
677 my $start = time;
678
679 my $pm = Parallel::ForkManager->new(10);
680 $pm->set_waitpid_blocking_sleep(0);
681
682 $pm->run_on_finish( sub {
683 my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
684 print "child $pid completed: $ident => ", $resp->[0], "\n";
685 });
686
687 DATA_LOOP:
688 foreach my $data ( 1..2000 ) {
689 # forks and returns the pid for the child
690 my $pid = $pm->start($data) and next DATA_LOOP;
691 my $ret = [ $data * 2 ];
692
693 $pm->finish(0, $ret);
694 }
695
696 $pm->wait_all_children;
697
698 printf STDERR "duration: %0.03f seconds\n", time - $start;
699
700 MCE::Hobo
701 use strict;
702 use warnings;
703
704 use MCE::Hobo 1.843;
705 use Time::HiRes 'time';
706
707 my $start = time;
708
709 MCE::Hobo->init(
710 max_workers => 10,
711 on_finish => sub {
712 my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
713 print "child $pid completed: $ident => ", $resp->[0], "\n";
714 }
715 );
716
717 foreach my $data ( 1..2000 ) {
718 MCE::Hobo->create( $data, sub {
719 [ $data * 2 ];
720 });
721 }
722
723 MCE::Hobo->wait_all;
724
725 printf STDERR "duration: %0.03f seconds\n", time - $start;
726
727 Time to spin 2,000 workers and obtain results (in seconds).
728 Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
729 Boost). Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
730 with Moo loaded at the top of the script.
731
732 MCE::Hobo uses MCE::Shared to retrieve data during reaping.
733 MCE::Child uses MCE::Channel, no shared-manager.
734
735 Version Cygwin Windows Linux macOS FreeBSD
736
737 MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
738 MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
739 P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
740
741 MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
742 MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
743 P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
744
745 Set posix_exit to avoid all END and destructor processing.
746 This is helpful for reducing overhead when workers exit. Ditto if
747 using a Perl module not parallel safe. The option is ignored on
748 Windows "$^O eq 'MSWin32'".
749
750 MCE::Child->init( posix_exit => 1, ... );
751 MCE::Hobo->init( posix_exit => 1, ... );
752
753 Version Cygwin Windows Linux macOS FreeBSD
754
755 MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
756 MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
757
759 This demonstration constructs two queues, two handles, starts the
760 shared-manager process if needed, and spawns four workers. For this
761 demonstration, am chunking 64 URLs per job. In reality, one may run
762 with 200 workers and chunk 300 URLs on a 24-way box.
763
764 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
765 # perl demo.pl -- all output
766 # perl demo.pl >/dev/null -- mngr/hobo output
767 # perl demo.pl 2>/dev/null -- show results only
768 #
769 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
770
771 use strict;
772 use warnings;
773
774 use AnyEvent;
775 use AnyEvent::HTTP;
776 use Time::HiRes qw( time );
777
778 use MCE::Hobo;
779 use MCE::Shared;
780
781 # Construct two queues, input and return.
782
783 my $que = MCE::Shared->queue();
784 my $ret = MCE::Shared->queue();
785
786 # Construct shared handles for serializing output from many workers
787 # writing simultaneously. This prevents garbled output.
788
789 mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
790 mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
791
792 # Spawn workers early for minimum memory consumption.
793
794 MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
795
796 # Obtain or generate input data for workers to process.
797
798 my ( $count, @urls ) = ( 0 );
799
800 push @urls, map { "http://127.0.0.$_/" } 1..254;
801 push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
802
803 while ( @urls ) {
804 my @chunk = splice(@urls, 0, 64);
805 $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
806 }
807
808 # So that workers leave the loop after consuming the queue.
809
810 $que->end();
811
812 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
813 # Loop for the manager process. The manager may do other work if
814 # need be and periodically check $ret->pending() not shown here.
815 #
816 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
817
818 my $start = time;
819
820 printf {$ERR} "Mngr - entering loop\n";
821
822 while ( $count ) {
823 my ( $result, $failed ) = $ret->dequeue( 2 );
824
825 # Remove ID from result, so not treated as a URL item.
826
827 printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
828
829 # Display the URL and the size captured.
830
831 foreach my $url ( keys %{ $result } ) {
832 printf {$OUT} "%s: %d\n", $url, length($result->{$url})
833 if $result->{$url}; # url has content
834 }
835
836 # Display URLs could not reach.
837
838 if ( @{ $failed } ) {
839 foreach my $url ( @{ $failed } ) {
840 print {$OUT} "Failed: $url\n";
841 }
842 }
843
844 # Decrement the count.
845
846 $count--;
847 }
848
849 MCE::Hobo->wait_all();
850
851 printf {$ERR} "Mngr - exiting loop\n\n";
852 printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
853
854 exit;
855
856 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
857 # Hobo processes enqueue two items ( $result and $failed ) per each
858 # job for the manager process. Likewise, the manager process dequeues
859 # two items above. Optionally, hobo processes may include the ID in
860 # the result.
861 #
862 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
863
864 sub task {
865 my ( $id ) = @_;
866 printf {$ERR} "Hobo $id entering loop\n";
867
868 while ( my $job = $que->dequeue() ) {
869 my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
870
871 # Walk URLs, provide a hash and array refs for data.
872
873 printf {$ERR} "Hobo $id running job $job->{ID}\n";
874 walk( $job, $result, $failed );
875
876 # Send results to the manager process.
877
878 $ret->enqueue( $result, $failed );
879 }
880
881 printf {$ERR} "Hobo $id exiting loop\n";
882 }
883
884 sub walk {
885 my ( $job, $result, $failed ) = @_;
886
887 # Yielding is critical when running an event loop in parallel.
888 # Not doing so means that the app may reach contention points
889 # with the firewall and likely impose unnecessary hardship at
890 # the OS level. The idea here is not to have multiple workers
891 # initiate HTTP requests to a batch of URLs at the same time.
892 # Yielding in 1.827+ behaves similarly like scatter to have
893 # the hobo process run solo for a fraction of time.
894
895 MCE::Hobo->yield( 0.03 ); # MCE::Hobo 1.827+
896
897 my $cv = AnyEvent->condvar();
898
899 # Populate the hash ref for the URLs it could reach.
900 # Do not mix AnyEvent timeout with hobo timeout.
901 # Therefore, choose event timeout when available.
902
903 foreach my $url ( @{ $job->{INPUT} } ) {
904 $cv->begin();
905 http_get $url, timeout => 2, sub {
906 my ( $data, $headers ) = @_;
907 $result->{$url} = $data;
908 $cv->end();
909 };
910 }
911
912 $cv->recv();
913
914 # Populate the array ref for URLs it could not reach.
915
916 foreach my $url ( @{ $job->{INPUT} } ) {
917 push @{ $failed }, $url unless (exists $result->{ $url });
918 }
919
920 return;
921 }
922
923 __END__
924
925 $ perl demo.pl
926
927 Hobo 1 entering loop
928 Hobo 2 entering loop
929 Hobo 3 entering loop
930 Mngr - entering loop
931 Hobo 2 running job 2
932 Hobo 3 running job 3
933 Hobo 1 running job 1
934 Hobo 4 entering loop
935 Hobo 4 running job 4
936 Hobo 2 running job 5
937 Mngr - received job 2
938 Hobo 3 running job 6
939 Mngr - received job 3
940 Hobo 1 running job 7
941 Mngr - received job 1
942 Hobo 4 running job 8
943 Mngr - received job 4
944 http://192.168.0.1/: 3729
945 Hobo 2 exiting loop
946 Mngr - received job 5
947 Hobo 3 exiting loop
948 Mngr - received job 6
949 Hobo 1 exiting loop
950 Mngr - received job 7
951 Hobo 4 exiting loop
952 Mngr - received job 8
953 Mngr - exiting loop
954
955 Duration: 4.131 seconds
956
958 Making an executable is possible with the PAR::Packer module. On the
959 Windows platform, threads, threads::shared, and exiting via threads are
960 necessary for the binary to exit successfully.
961
962 # https://metacpan.org/pod/PAR::Packer
963 # https://metacpan.org/pod/pp
964 #
965 # pp -o demo.exe demo.pl
966 # ./demo.exe
967
968 use strict;
969 use warnings;
970
971 use if $^O eq "MSWin32", "threads";
972 use if $^O eq "MSWin32", "threads::shared";
973
974 # Include minimum dependencies for MCE::Hobo.
975 # Add other modules required by your application here.
976
977 use Storable ();
978 use Time::HiRes ();
979
980 # use IO::FDPass (); # optional: for condvar, handle, queue
981 # use Sereal (); # optional: for faster serialization
982
983 use MCE::Hobo;
984 use MCE::Shared;
985
986 # For PAR to work on the Windows platform, one must include manually
987 # any shared modules used by the application.
988
989 # use MCE::Shared::Array; # if using MCE::Shared->array
990 # use MCE::Shared::Cache; # if using MCE::Shared->cache
991 # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
992 # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
993 # use MCE::Shared::Hash; # if using MCE::Shared->hash
994 # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
995 # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
996 # use MCE::Shared::Queue; # if using MCE::Shared->queue
997 # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
998
999 # Et cetera. Only load modules needed for your application.
1000
1001 use MCE::Shared::Sequence; # if using MCE::Shared->sequence
1002
1003 my $seq = MCE::Shared->sequence( 1, 9 );
1004
1005 sub task {
1006 my ( $id ) = @_;
1007 while ( defined ( my $num = $seq->next() ) ) {
1008 print "$id: $num\n";
1009 sleep 1;
1010 }
1011 }
1012
1013 sub main {
1014 MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
1015 MCE::Hobo->wait_all();
1016 }
1017
1018 # Main must run inside a thread on the Windows platform or workers
1019 # will fail duing exiting, causing the exe to crash. The reason is
1020 # that PAR or a dependency isn't multi-process safe.
1021
1022 ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1023
1024 threads->exit(0) if $INC{"threads.pm"};
1025
1027 The inspiration for "MCE::Hobo" comes from wanting "threads"-like
1028 behavior for processes. Both can run side-by-side including safe-use by
1029 MCE workers. Likewise, the documentation resembles "threads".
1030
1031 The inspiration for "wait_all" and "wait_one" comes from the
1032 "Parallel::WorkUnit" module.
1033
1035 · forks
1036
1037 · forks::BerkeleyDB
1038
1039 · MCE::Child
1040
1041 · Parallel::ForkManager
1042
1043 · Parallel::Loops
1044
1045 · Parallel::Prefork
1046
1047 · Parallel::WorkUnit
1048
1049 · Proc::Fork
1050
1051 · Thread::Tie
1052
1053 · threads
1054
1056 MCE, MCE::Channel, MCE::Shared
1057
1059 Mario E. Roy, <marioeroy AT gmail DOT com>
1060
1061
1062
1063perl v5.32.0 2020-08-02 MCE::Hobo(3)