1MCE::Hobo(3) User Contributed Perl Documentation MCE::Hobo(3)
2
3
4
6 MCE::Hobo - A threads-like parallelization module
7
9 This document describes MCE::Hobo version 1.862
10
12 use MCE::Hobo;
13
14 MCE::Hobo->init(
15 max_workers => 'auto', # default undef, unlimited
16 hobo_timeout => 20, # default undef, no timeout
17 posix_exit => 1, # default undef, CORE::exit
18 void_context => 1, # default undef
19 on_start => sub {
20 my ( $pid, $ident ) = @_;
21 ...
22 },
23 on_finish => sub {
24 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
25 ...
26 }
27 );
28
29 MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
30
31 sub parallel {
32 my ($arg1) = @_;
33 print "Hello again, $arg1\n" if defined($arg1);
34 print "Hello again, $_\n"; # same thing
35 }
36
37 MCE::Hobo->create( \¶llel, $_ ) for 1 .. 3;
38
39 my @hobos = MCE::Hobo->list();
40 my @pids = MCE::Hobo->list_pids();
41 my @running = MCE::Hobo->list_running();
42 my @joinable = MCE::Hobo->list_joinable();
43 my @count = MCE::Hobo->pending();
44
45 # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
46 $_->join() for @hobos; # (or)
47 $_->join() for @joinable;
48
49 # Joining occurs immediately as hobo processes complete execution.
50 1 while MCE::Hobo->wait_one();
51
52 my $hobo = mce_async { foreach (@files) { ... } };
53
54 $hobo->join();
55
56 if ( my $err = $hobo->error() ) {
57 warn "Hobo error: $err\n";
58 }
59
60 # Get a hobo's object
61 $hobo = MCE::Hobo->self();
62
63 # Get a hobo's ID
64 $pid = MCE::Hobo->pid(); # $$
65 $pid = $hobo->pid();
66 $pid = MCE::Hobo->tid(); # tid is an alias for pid
67 $pid = $hobo->tid();
68
69 # Test hobo objects
70 if ( $hobo1 == $hobo2 ) {
71 ...
72 }
73
74 # Give other workers a chance to run
75 MCE::Hobo->yield();
76 MCE::Hobo->yield(0.05);
77
78 # Return context, wantarray aware
79 my ($value1, $value2) = $hobo->join();
80 my $value = $hobo->join();
81
82 # Check hobo's state
83 if ( $hobo->is_running() ) {
84 sleep 1;
85 }
86 if ( $hobo->is_joinable() ) {
87 $hobo->join();
88 }
89
90 # Send a signal to a hobo
91 $hobo->kill('SIGUSR1');
92
93 # Exit a hobo
94 MCE::Hobo->exit(0);
95 MCE::Hobo->exit(0, @ret); # MCE::Hobo 1.827+
96
98 A hobo is a migratory worker inside the machine that carries the
99 asynchronous gene. Hobo processes are equipped with "threads"-like
100 capability for running code asynchronously. Unlike threads, each hobo
101 is a unique process to the underlying OS. The IPC is managed by
102 "MCE::Shared", which runs on all the major platforms including Cygwin
103 and Strawberry Perl.
104
105 An exception was made on the Windows platform to spawn threads versus
106 children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
107 release reverts back to spawning children on all supported platforms.
108
109 "MCE::Hobo" may be used as a standalone or together with "MCE"
110 including running alongside "threads".
111
112 use MCE::Hobo;
113 use MCE::Shared;
114
115 # synopsis: head -20 file.txt | perl script.pl
116
117 my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
118 my $ofh = MCE::Shared->handle( ">", \*STDOUT );
119 my $ary = MCE::Shared->array();
120
121 sub parallel_task {
122 my ( $id ) = @_;
123 while ( <$ifh> ) {
124 printf {$ofh} "[ %4d ] %s", $., $_;
125 # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
126 $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
127 }
128 }
129
130 my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
131 my $hobo2 = MCE::Hobo->new( \¶llel_task, 2 );
132 my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
133
134 $_->join for MCE::Hobo->list(); # ditto: MCE::Hobo->wait_all();
135
136 # search array (total one round-trip via IPC)
137 my @vals = $ary->vals( "val =~ / ID 2 /" );
138
139 print {*STDERR} join("", @vals);
140
142 $hobo = MCE::Hobo->create( FUNCTION, ARGS )
143 $hobo = MCE::Hobo->new( FUNCTION, ARGS )
144 This will create a new hobo process that will begin execution with
145 function as the entry point, and optionally ARGS for list of
146 parameters. It will return the corresponding MCE::Hobo object, or
147 undef if hobo creation failed.
148
149 FUNCTION may either be the name of a function, an anonymous
150 subroutine, or a code ref.
151
152 my $hobo = MCE::Hobo->create( "func_name", ... );
153 # or
154 my $hobo = MCE::Hobo->create( sub { ... }, ... );
155 # or
156 my $hobo = MCE::Hobo->create( \&func, ... );
157
158 $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
159 $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
160 Options, excluding "ident", may be specified globally via the "init"
161 function. Otherwise, "ident", "hobo_timeout", "posix_exit", and
162 "void_context" may be set uniquely.
163
164 The "ident" option, available since 1.827, is used by callback
165 functions "on_start" and "on_finish" for identifying the started and
166 finished hobo process respectively.
167
168 my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
169 ...
170 } );
171
172 $hobo1->join;
173
174 my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
175 sleep 1 for ( 1 .. 9 );
176 } );
177
178 $hobo2->join;
179
180 if ( $hobo2->error() eq "Hobo timed out\n" ) {
181 ...
182 }
183
184 The "new()" method is an alias for "create()".
185
186 mce_async { BLOCK } ARGS;
187 mce_async { BLOCK };
188 "mce_async" runs the block asynchronously similarly to
189 "MCE::Hobo->create()". It returns the hobo object, or undef if hobo
190 creation failed.
191
192 my $hobo = mce_async { foreach (@files) { ... } };
193
194 $hobo->join();
195
196 if ( my $err = $hobo->error() ) {
197 warn("Hobo error: $err\n");
198 }
199
200 $hobo->join()
201 This will wait for the corresponding hobo process to complete its
202 execution. In non-voided context, "join()" will return the value(s)
203 of the entry point function.
204
205 The context (void, scalar or list) for the return value(s) for
206 "join" is determined at the time of joining and mostly "wantarray"
207 aware.
208
209 my $hobo1 = MCE::Hobo->create( sub {
210 my @res = qw(foo bar baz);
211 return (@res);
212 });
213
214 my @res1 = $hobo1->join(); # ( foo, bar, baz )
215 my $res1 = $hobo1->join(); # baz
216
217 my $hobo2 = MCE::Hobo->create( sub {
218 return 'foo';
219 });
220
221 my @res2 = $hobo2->join(); # ( foo )
222 my $res2 = $hobo2->join(); # foo
223
224 $hobo1->equal( $hobo2 )
225 Tests if two hobo objects are the same hobo or not. Hobo comparison
226 is based on process IDs. This is overloaded to the more natural
227 forms.
228
229 if ( $hobo1 == $hobo2 ) {
230 print("Hobo objects are the same\n");
231 }
232 # or
233 if ( $hobo1 != $hobo2 ) {
234 print("Hobo objects differ\n");
235 }
236
237 $hobo->error()
238 Hobo processes are executed in an "eval" context. This method will
239 return "undef" if the hobo terminates normally. Otherwise, it
240 returns the value of $@ associated with the hobo's execution status
241 in its "eval" context.
242
243 $hobo->exit()
244 This sends 'SIGINT' to the hobo process, notifying the hobo to exit.
245 It returns the hobo object to allow for method chaining. It is
246 important to join later if not immediately to not leave a zombie or
247 defunct process.
248
249 $hobo->exit()->join();
250 ...
251
252 $hobo->join(); # later
253
254 MCE::Hobo->exit( 0 )
255 MCE::Hobo->exit( 0, @ret )
256 A hobo can exit at any time by calling "MCE::Hobo->exit()".
257 Otherwise, the behavior is the same as "exit(status)" when called
258 from the main process. Current since 1.827, the hobo process may
259 optionally return data, to be sent via IPC.
260
261 MCE::Hobo->finish()
262 This class method is called automatically by "END", but may be
263 called explicitly. An error is emitted via croak if there are active
264 hobo processes not yet joined.
265
266 MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
267 $_->join for MCE::Hobo->list();
268
269 MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
270 $_->join for MCE::Hobo->list();
271
272 MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
273 $_->join for MCE::Hobo->list();
274
275 MCE::Hobo->finish();
276
277 MCE::Hobo->init( options )
278 The init function accepts a list of MCE::Hobo options.
279
280 MCE::Hobo->init(
281 max_workers => 'auto', # default undef, unlimited
282 hobo_timeout => 20, # default undef, no timeout
283 posix_exit => 1, # default undef, CORE::exit
284 void_context => 1, # default undef
285 on_start => sub {
286 my ( $pid, $ident ) = @_;
287 ...
288 },
289 on_finish => sub {
290 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
291 ...
292 }
293 );
294
295 # Identification given as an option or the 1st argument.
296 # Current API available since 1.827.
297
298 for my $key ( 'aa' .. 'zz' ) {
299 MCE::Hobo->create( { ident => $key }, sub { ... } );
300 MCE::Hobo->create( $key, sub { ... } );
301 }
302
303 MCE::Hobo->wait_all;
304
305 Set "max_workers" if you want to limit the number of workers by
306 waiting automatically for an available slot. Specify "auto" to
307 obtain the number of logical cores via "MCE::Util::get_ncpu()".
308
309 Set "hobo_timeout", in number of seconds, if you want the hobo
310 process to terminate after some time. The default is 0 for no
311 timeout.
312
313 Set "posix_exit" to avoid all END and destructor processing.
314 Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
315 FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
316 Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
317
318 Set "void_context" to create the hobo process in void context for
319 the return value. Otherwise, the return context is wantarray-aware
320 for "join()" and "result()" and determined when retrieving the data.
321
322 The callback options "on_start" and "on_finish" are called in the
323 parent process after starting the worker and later when terminated.
324 The arguments for the subroutines were inspired by
325 Parallel::ForkManager.
326
327 The parameters for "on_start" are the following:
328
329 - pid of the hobo process
330 - identification (ident option or 1st arg to create)
331
332 The parameters for "on_finish" are the following:
333
334 - pid of the hobo process
335 - program exit code
336 - identification (ident option or 1st arg to create)
337 - exit signal id
338 - error message from eval inside MCE::Hobo
339 - returned data
340
341 $hobo->is_running()
342 Returns true if a hobo is still running.
343
344 $hobo->is_joinable()
345 Returns true if the hobo has finished running and not yet joined.
346
347 $hobo->kill( 'SIG...' )
348 Sends the specified signal to the hobo. Returns the hobo object to
349 allow for method chaining. As with "exit", it is important to join
350 eventually if not immediately to not leave a zombie or defunct
351 process.
352
353 $hobo->kill('SIG...')->join();
354
355 The following is a parallel demonstration comparing "MCE::Shared"
356 against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
357 after all workers have been notified to quit.
358
359 use Time::HiRes qw(time);
360
361 use Redis;
362 use Redis::Fast;
363
364 use MCE::Hobo;
365 use MCE::Shared;
366
367 my $redis = Redis->new();
368 my $rfast = Redis::Fast->new();
369 my $array = MCE::Shared->array();
370
371 sub parallel_redis {
372 my ($_redis) = @_;
373 my ($count, $quit, $len) = (0, 0);
374
375 # instead, use a flag to exit loop
376 $SIG{'QUIT'} = sub { $quit = 1 };
377
378 while () {
379 $len = $_redis->rpush('list', $count++);
380 last if $quit;
381 }
382
383 $count;
384 }
385
386 sub parallel_array {
387 my ($count, $quit, $len) = (0, 0);
388
389 # do not exit from inside handler
390 $SIG{'QUIT'} = sub { $quit = 1 };
391
392 while () {
393 $len = $array->push($count++);
394 last if $quit;
395 }
396
397 $count;
398 }
399
400 sub benchmark_this {
401 my ($desc, $num_procs, $timeout, $code, @args) = @_;
402 my ($start, $total) = (time(), 0);
403
404 MCE::Hobo->new($code, @args) for 1..$num_procs;
405 sleep $timeout;
406
407 # joining is not immediate; ok
408 $_->kill('QUIT') for MCE::Hobo->list();
409
410 # joining later; ok
411 $total += $_->join() for MCE::Hobo->list();
412
413 printf "$desc <> duration: %0.03f secs, count: $total\n",
414 time() - $start;
415
416 sleep 0.2;
417 }
418
419 benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis);
420 benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast);
421 benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
422
423 MCE::Hobo->list()
424 Returns a list of all hobo objects not yet joined.
425
426 @hobos = MCE::Hobo->list();
427
428 MCE::Hobo->list_pids()
429 Returns a list of all hobo pids not yet joined (available since
430 1.849).
431
432 @pids = MCE::Hobo->list_pids();
433
434 $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
435 # Signal workers and the shared manager all at once
436 CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
437 exec('reset');
438 };
439
440 MCE::Hobo->list_running()
441 Returns a list of all hobo objects that are still running.
442
443 @hobos = MCE::Hobo->list_running();
444
445 MCE::Hobo->list_joinable()
446 Returns a list of all hobo objects that have completed running.
447 Thus, ready to be joined without blocking.
448
449 @hobos = MCE::Hobo->list_joinable();
450
451 MCE::Hobo->max_workers([ N ])
452 Getter and setter for max_workers. Specify a number or 'auto' to
453 acquire the total number of cores via MCE::Util::get_ncpu. Specify a
454 false value to set back to no limit.
455
456 API available since 1.835.
457
458 MCE::Hobo->pending()
459 Returns a count of all hobo objects not yet joined.
460
461 $count = MCE::Hobo->pending();
462
463 $hobo->result()
464 Returns the result obtained by "join", "wait_one", or "wait_all". If
465 the process has not yet exited, waits for the corresponding hobo to
466 complete its execution.
467
468 use MCE::Hobo;
469 use Time::HiRes qw(sleep);
470
471 sub task {
472 my ($id) = @_;
473 sleep $id * 0.333;
474 return $id;
475 }
476
477 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
478
479 # 1 while MCE::Hobo->wait_one();
480
481 while ( my $hobo = MCE::Hobo->wait_one() ) {
482 my $err = $hobo->error() || 'no error';
483 my $res = $hobo->result();
484 my $pid = $hobo->pid();
485
486 print "[$pid] $err : $res\n";
487 }
488
489 Like "join" described above, the context (void, scalar or list) for
490 the return value(s) is determined at the time "result" is called and
491 mostly "wantarray" aware.
492
493 my $hobo1 = MCE::Hobo->create( sub {
494 my @res = qw(foo bar baz);
495 return (@res);
496 });
497
498 my @res1 = $hobo1->result(); # ( foo, bar, baz )
499 my $res1 = $hobo1->result(); # baz
500
501 my $hobo2 = MCE::Hobo->create( sub {
502 return 'foo';
503 });
504
505 my @res2 = $hobo2->result(); # ( foo )
506 my $res2 = $hobo2->result(); # foo
507
508 MCE::Hobo->self()
509 Class method that allows a hobo to obtain it's own MCE::Hobo object.
510
511 $hobo->pid()
512 $hobo->tid()
513 Returns the ID of the hobo.
514
515 pid: $$ process id
516 tid: $$ alias for pid
517
518 MCE::Hobo->pid()
519 MCE::Hobo->tid()
520 Class methods that allows a hobo to obtain its own ID.
521
522 pid: $$ process id
523 tid: $$ alias for pid
524
525 MCE::Hobo->wait_one()
526 MCE::Hobo->waitone()
527 MCE::Hobo->wait_all()
528 MCE::Hobo->waitall()
529 Meaningful for the manager process only, waits for one or all hobo
530 processes to complete execution. Afterwards, returns the
531 corresponding hobo objects. If a hobo doesn't exist, returns the
532 "undef" value or an empty list for "wait_one" and "wait_all"
533 respectively.
534
535 The "waitone" and "waitall" methods are aliases since 1.827 for
536 backwards compatibility.
537
538 use MCE::Hobo;
539 use Time::HiRes qw(sleep);
540
541 sub task {
542 my $id = shift;
543 sleep $id * 0.333;
544 return $id;
545 }
546
547 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
548
549 # join, traditional use case
550 $_->join() for MCE::Hobo->list();
551
552 # wait_one, simplistic use case
553 1 while MCE::Hobo->wait_one();
554
555 # wait_one
556 while ( my $hobo = MCE::Hobo->wait_one() ) {
557 my $err = $hobo->error() || 'no error';
558 my $res = $hobo->result();
559 my $pid = $hobo->pid();
560
561 print "[$pid] $err : $res\n";
562 }
563
564 # wait_all
565 my @hobos = MCE::Hobo->wait_all();
566
567 for ( @hobos ) {
568 my $err = $_->error() || 'no error';
569 my $res = $_->result();
570 my $pid = $_->pid();
571
572 print "[$pid] $err : $res\n";
573 }
574
575 MCE::Hobo->yield( [ floating_seconds ] )
576 Prior API till 1.826.
577
578 Let this hobo yield CPU time to other workers. By default, the class
579 method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
580 including Cygwin.
581
582 MCE::Hobo->yield();
583 MCE::Hobo->yield(0.05);
584
585 # total run time: 0.25 seconds, sleep occuring in parallel
586
587 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
588 MCE::Hobo->wait_all();
589
590 Current API available since 1.827.
591
592 Give other workers a chance to run, optionally for given time. Yield
593 behaves similarly to MCE's interval option. It throttles workers
594 from running too fast. A demonstration is provided in the next
595 section for fetching URLs in parallel.
596
597 # total run time: 1.00 second
598
599 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
600 MCE::Hobo->wait_all();
601
603 MCE::Hobo behaves similarly to threads for the most part. It also
604 provides Parallel::ForkManager-like capabilities. The
605 "Parallel::ForkManager" example is shown first followed by a version
606 using "MCE::Hobo".
607
608 Parallel::ForkManager
609 use strict;
610 use warnings;
611
612 use Parallel::ForkManager;
613 use Time::HiRes 'time';
614
615 my $start = time;
616
617 my $pm = Parallel::ForkManager->new(10);
618 $pm->set_waitpid_blocking_sleep(0);
619
620 $pm->run_on_finish( sub {
621 my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
622 print "child $pid completed: $ident => ", $resp->[0], "\n";
623 });
624
625 DATA_LOOP:
626 foreach my $data ( 1..2000 ) {
627 # forks and returns the pid for the child
628 my $pid = $pm->start($data) and next DATA_LOOP;
629 my $ret = [ $data * 2 ];
630
631 $pm->finish(0, $ret);
632 }
633
634 $pm->wait_all_children;
635
636 printf STDERR "duration: %0.03f seconds\n", time - $start;
637
638 MCE::Hobo
639 use strict;
640 use warnings;
641
642 use MCE::Hobo 1.843;
643 use Time::HiRes 'time';
644
645 my $start = time;
646
647 MCE::Hobo->init(
648 max_workers => 10,
649 on_finish => sub {
650 my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
651 print "child $pid completed: $ident => ", $resp->[0], "\n";
652 }
653 );
654
655 foreach my $data ( 1..2000 ) {
656 MCE::Hobo->create( $data, sub {
657 [ $data * 2 ];
658 });
659 }
660
661 MCE::Hobo->wait_all;
662
663 printf STDERR "duration: %0.03f seconds\n", time - $start;
664
665 Time to spin 2,000 workers and obtain results (in seconds).
666 Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
667 Boost). Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
668 with Moo loaded at the top of the script.
669
670 MCE::Hobo uses MCE::Shared to retrieve data during reaping.
671 MCE::Child uses MCE::Channel, no shared-manager.
672
673 Version Cygwin Windows Linux macOS FreeBSD
674
675 MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
676 MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
677 P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
678
679 MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
680 MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
681 P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
682
683 Set posix_exit to avoid all END and destructor processing.
684 This is helpful for reducing overhead when workers exit. Ditto if
685 using a Perl module not parallel safe. The option is ignored on
686 Windows "$^O eq 'MSWin32'".
687
688 MCE::Child->init( posix_exit => 1, ... );
689 MCE::Hobo->init( posix_exit => 1, ... );
690
691 Version Cygwin Windows Linux macOS FreeBSD
692
693 MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
694 MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
695
697 This demonstration constructs two queues, two handles, starts the
698 shared-manager process if needed, and spawns four workers. For this
699 demonstration, am chunking 64 URLs per job. In reality, one may run
700 with 200 workers and chunk 300 URLs on a 24-way box.
701
702 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
703 # perl demo.pl -- all output
704 # perl demo.pl >/dev/null -- mngr/hobo output
705 # perl demo.pl 2>/dev/null -- show results only
706 #
707 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
708
709 use strict;
710 use warnings;
711
712 use AnyEvent;
713 use AnyEvent::HTTP;
714 use Time::HiRes qw( time );
715
716 use MCE::Hobo;
717 use MCE::Shared;
718
719 # Construct two queues, input and return.
720
721 my $que = MCE::Shared->queue();
722 my $ret = MCE::Shared->queue();
723
724 # Construct shared handles for serializing output from many workers
725 # writing simultaneously. This prevents garbled output.
726
727 mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
728 mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
729
730 # Spawn workers early for minimum memory consumption.
731
732 MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
733
734 # Obtain or generate input data for workers to process.
735
736 my ( $count, @urls ) = ( 0 );
737
738 push @urls, map { "http://127.0.0.$_/" } 1..254;
739 push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
740
741 while ( @urls ) {
742 my @chunk = splice(@urls, 0, 64);
743 $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
744 }
745
746 # So that workers leave the loop after consuming the queue.
747
748 $que->end();
749
750 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
751 # Loop for the manager process. The manager may do other work if
752 # need be and periodically check $ret->pending() not shown here.
753 #
754 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
755
756 my $start = time;
757
758 printf {$ERR} "Mngr - entering loop\n";
759
760 while ( $count ) {
761 my ( $result, $failed ) = $ret->dequeue( 2 );
762
763 # Remove ID from result, so not treated as a URL item.
764
765 printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
766
767 # Display the URL and the size captured.
768
769 foreach my $url ( keys %{ $result } ) {
770 printf {$OUT} "%s: %d\n", $url, length($result->{$url})
771 if $result->{$url}; # url has content
772 }
773
774 # Display URLs could not reach.
775
776 if ( @{ $failed } ) {
777 foreach my $url ( @{ $failed } ) {
778 print {$OUT} "Failed: $url\n";
779 }
780 }
781
782 # Decrement the count.
783
784 $count--;
785 }
786
787 MCE::Hobo->wait_all();
788
789 printf {$ERR} "Mngr - exiting loop\n\n";
790 printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
791
792 exit;
793
794 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
795 # Hobo processes enqueue two items ( $result and $failed ) per each
796 # job for the manager process. Likewise, the manager process dequeues
797 # two items above. Optionally, hobo processes may include the ID in
798 # the result.
799 #
800 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
801
802 sub task {
803 my ( $id ) = @_;
804 printf {$ERR} "Hobo $id entering loop\n";
805
806 while ( my $job = $que->dequeue() ) {
807 my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
808
809 # Walk URLs, provide a hash and array refs for data.
810
811 printf {$ERR} "Hobo $id running job $job->{ID}\n";
812 walk( $job, $result, $failed );
813
814 # Send results to the manager process.
815
816 $ret->enqueue( $result, $failed );
817 }
818
819 printf {$ERR} "Hobo $id exiting loop\n";
820 }
821
822 sub walk {
823 my ( $job, $result, $failed ) = @_;
824
825 # Yielding is critical when running an event loop in parallel.
826 # Not doing so means that the app may reach contention points
827 # with the firewall and likely impose unnecessary hardship at
828 # the OS level. The idea here is not to have multiple workers
829 # initiate HTTP requests to a batch of URLs at the same time.
830 # Yielding in 1.827+ behaves similarly like scatter to have
831 # the hobo process run solo for a fraction of time.
832
833 MCE::Hobo->yield( 0.03 ); # MCE::Hobo 1.827+
834
835 my $cv = AnyEvent->condvar();
836
837 # Populate the hash ref for the URLs it could reach.
838 # Do not mix AnyEvent timeout with hobo timeout.
839 # Therefore, choose event timeout when available.
840
841 foreach my $url ( @{ $job->{INPUT} } ) {
842 $cv->begin();
843 http_get $url, timeout => 2, sub {
844 my ( $data, $headers ) = @_;
845 $result->{$url} = $data;
846 $cv->end();
847 };
848 }
849
850 $cv->recv();
851
852 # Populate the array ref for URLs it could not reach.
853
854 foreach my $url ( @{ $job->{INPUT} } ) {
855 push @{ $failed }, $url unless (exists $result->{ $url });
856 }
857
858 return;
859 }
860
861 __END__
862
863 $ perl demo.pl
864
865 Hobo 1 entering loop
866 Hobo 2 entering loop
867 Hobo 3 entering loop
868 Mngr - entering loop
869 Hobo 2 running job 2
870 Hobo 3 running job 3
871 Hobo 1 running job 1
872 Hobo 4 entering loop
873 Hobo 4 running job 4
874 Hobo 2 running job 5
875 Mngr - received job 2
876 Hobo 3 running job 6
877 Mngr - received job 3
878 Hobo 1 running job 7
879 Mngr - received job 1
880 Hobo 4 running job 8
881 Mngr - received job 4
882 http://192.168.0.1/: 3729
883 Hobo 2 exiting loop
884 Mngr - received job 5
885 Hobo 3 exiting loop
886 Mngr - received job 6
887 Hobo 1 exiting loop
888 Mngr - received job 7
889 Hobo 4 exiting loop
890 Mngr - received job 8
891 Mngr - exiting loop
892
893 Duration: 4.131 seconds
894
896 Making an executable is possible with the PAR::Packer module. On the
897 Windows platform, threads, threads::shared, and exiting via threads are
898 necessary for the binary to exit successfully.
899
900 # https://metacpan.org/pod/PAR::Packer
901 # https://metacpan.org/pod/pp
902 #
903 # pp -o demo.exe demo.pl
904 # ./demo.exe
905
906 use strict;
907 use warnings;
908
909 use if $^O eq "MSWin32", "threads";
910 use if $^O eq "MSWin32", "threads::shared";
911
912 # Include minimum dependencies for MCE::Hobo.
913 # Add other modules required by your application here.
914
915 use Storable ();
916 use Time::HiRes ();
917
918 # use IO::FDPass (); # optional: for condvar, handle, queue
919 # use Sereal (); # optional: for faster serialization
920
921 use MCE::Hobo;
922 use MCE::Shared;
923
924 # For PAR to work on the Windows platform, one must include manually
925 # any shared modules used by the application.
926
927 # use MCE::Shared::Array; # if using MCE::Shared->array
928 # use MCE::Shared::Cache; # if using MCE::Shared->cache
929 # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
930 # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
931 # use MCE::Shared::Hash; # if using MCE::Shared->hash
932 # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
933 # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
934 # use MCE::Shared::Queue; # if using MCE::Shared->queue
935 # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
936
937 # Et cetera. Only load modules needed for your application.
938
939 use MCE::Shared::Sequence; # if using MCE::Shared->sequence
940
941 my $seq = MCE::Shared->sequence( 1, 9 );
942
943 sub task {
944 my ( $id ) = @_;
945 while ( defined ( my $num = $seq->next() ) ) {
946 print "$id: $num\n";
947 sleep 1;
948 }
949 }
950
951 sub main {
952 MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
953 MCE::Hobo->wait_all();
954 }
955
956 # Main must run inside a thread on the Windows platform or workers
957 # will fail duing exiting, causing the exe to crash. The reason is
958 # that PAR or a dependency isn't multi-process safe.
959
960 ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
961
962 threads->exit(0) if $INC{"threads.pm"};
963
965 The inspiration for "MCE::Hobo" comes from wanting "threads"-like
966 behavior for processes. Both can run side-by-side including safe-use by
967 MCE workers. Likewise, the documentation resembles "threads".
968
969 The inspiration for "wait_all" and "wait_one" comes from the
970 "Parallel::WorkUnit" module.
971
973 · forks
974
975 · forks::BerkeleyDB
976
977 · MCE::Child
978
979 · Parallel::ForkManager
980
981 · Parallel::Loops
982
983 · Parallel::Prefork
984
985 · Parallel::WorkUnit
986
987 · Proc::Fork
988
989 · Thread::Tie
990
991 · threads
992
994 MCE, MCE::Channel, MCE::Shared
995
997 Mario E. Roy, <marioeroy AT gmail DOT com>
998
999
1000
1001perl v5.30.0 2019-09-19 MCE::Hobo(3)