1MCE::Hobo(3) User Contributed Perl Documentation MCE::Hobo(3)
2
3
4
6 MCE::Hobo - A threads-like parallelization module
7
9 This document describes MCE::Hobo version 1.864
10
12 use MCE::Hobo;
13
14 MCE::Hobo->init(
15 max_workers => 'auto', # default undef, unlimited
16 hobo_timeout => 20, # default undef, no timeout
17 posix_exit => 1, # default undef, CORE::exit
18 void_context => 1, # default undef
19 on_start => sub {
20 my ( $pid, $ident ) = @_;
21 ...
22 },
23 on_finish => sub {
24 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
25 ...
26 }
27 );
28
29 MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
30
31 sub parallel {
32 my ($arg1) = @_;
33 print "Hello again, $arg1\n" if defined($arg1);
34 print "Hello again, $_\n"; # same thing
35 }
36
37 MCE::Hobo->create( \¶llel, $_ ) for 1 .. 3;
38
39 my @hobos = MCE::Hobo->list();
40 my @pids = MCE::Hobo->list_pids();
41 my @running = MCE::Hobo->list_running();
42 my @joinable = MCE::Hobo->list_joinable();
43 my @count = MCE::Hobo->pending();
44
45 # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
46 $_->join() for @hobos; # (or)
47 $_->join() for @joinable;
48
49 # Joining occurs immediately as hobo processes complete execution.
50 1 while MCE::Hobo->wait_one();
51
52 my $hobo = mce_async { foreach (@files) { ... } };
53
54 $hobo->join();
55
56 if ( my $err = $hobo->error() ) {
57 warn "Hobo error: $err\n";
58 }
59
60 # Get a hobo's object
61 $hobo = MCE::Hobo->self();
62
63 # Get a hobo's ID
64 $pid = MCE::Hobo->pid(); # $$
65 $pid = $hobo->pid();
66 $pid = MCE::Hobo->tid(); # tid is an alias for pid
67 $pid = $hobo->tid();
68
69 # Test hobo objects
70 if ( $hobo1 == $hobo2 ) {
71 ...
72 }
73
74 # Give other workers a chance to run
75 MCE::Hobo->yield();
76 MCE::Hobo->yield(0.05);
77
78 # Return context, wantarray aware
79 my ($value1, $value2) = $hobo->join();
80 my $value = $hobo->join();
81
82 # Check hobo's state
83 if ( $hobo->is_running() ) {
84 sleep 1;
85 }
86 if ( $hobo->is_joinable() ) {
87 $hobo->join();
88 }
89
90 # Send a signal to a hobo
91 $hobo->kill('SIGUSR1');
92
93 # Exit a hobo
94 MCE::Hobo->exit(0);
95 MCE::Hobo->exit(0, @ret); # MCE::Hobo 1.827+
96
98 A hobo is a migratory worker inside the machine that carries the
99 asynchronous gene. Hobo processes are equipped with "threads"-like
100 capability for running code asynchronously. Unlike threads, each hobo
101 is a unique process to the underlying OS. The IPC is managed by
102 "MCE::Shared", which runs on all the major platforms including Cygwin
103 and Strawberry Perl.
104
105 An exception was made on the Windows platform to spawn threads versus
106 children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
107 release reverts back to spawning children on all supported platforms.
108
109 "MCE::Hobo" may be used as a standalone or together with "MCE"
110 including running alongside "threads".
111
112 use MCE::Hobo;
113 use MCE::Shared;
114
115 # synopsis: head -20 file.txt | perl script.pl
116
117 my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
118 my $ofh = MCE::Shared->handle( ">", \*STDOUT );
119 my $ary = MCE::Shared->array();
120
121 sub parallel_task {
122 my ( $id ) = @_;
123 while ( <$ifh> ) {
124 printf {$ofh} "[ %4d ] %s", $., $_;
125 # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
126 $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
127 }
128 }
129
130 my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
131 my $hobo2 = MCE::Hobo->new( \¶llel_task, 2 );
132 my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
133
134 $_->join for MCE::Hobo->list(); # ditto: MCE::Hobo->wait_all();
135
136 # search array (total one round-trip via IPC)
137 my @vals = $ary->vals( "val =~ / ID 2 /" );
138
139 print {*STDERR} join("", @vals);
140
142 $hobo = MCE::Hobo->create( FUNCTION, ARGS )
143 $hobo = MCE::Hobo->new( FUNCTION, ARGS )
144 This will create a new hobo process that will begin execution with
145 function as the entry point, and optionally ARGS for list of
146 parameters. It will return the corresponding MCE::Hobo object, or
147 undef if hobo creation failed.
148
149 FUNCTION may either be the name of a function, an anonymous
150 subroutine, or a code ref.
151
152 my $hobo = MCE::Hobo->create( "func_name", ... );
153 # or
154 my $hobo = MCE::Hobo->create( sub { ... }, ... );
155 # or
156 my $hobo = MCE::Hobo->create( \&func, ... );
157
158 $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
159 $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
160 Options, excluding "ident", may be specified globally via the "init"
161 function. Otherwise, "ident", "hobo_timeout", "posix_exit", and
162 "void_context" may be set uniquely.
163
164 The "ident" option, available since 1.827, is used by callback
165 functions "on_start" and "on_finish" for identifying the started and
166 finished hobo process respectively.
167
168 my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
169 ...
170 } );
171
172 $hobo1->join;
173
174 my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
175 sleep 1 for ( 1 .. 9 );
176 } );
177
178 $hobo2->join;
179
180 if ( $hobo2->error() eq "Hobo timed out\n" ) {
181 ...
182 }
183
184 The "new()" method is an alias for "create()".
185
186 mce_async { BLOCK } ARGS;
187 mce_async { BLOCK };
188 "mce_async" runs the block asynchronously similarly to
189 "MCE::Hobo->create()". It returns the hobo object, or undef if hobo
190 creation failed.
191
192 my $hobo = mce_async { foreach (@files) { ... } };
193
194 $hobo->join();
195
196 if ( my $err = $hobo->error() ) {
197 warn("Hobo error: $err\n");
198 }
199
200 $hobo->join()
201 This will wait for the corresponding hobo process to complete its
202 execution. In non-voided context, "join()" will return the value(s)
203 of the entry point function.
204
205 The context (void, scalar or list) for the return value(s) for
206 "join" is determined at the time of joining and mostly "wantarray"
207 aware.
208
209 my $hobo1 = MCE::Hobo->create( sub {
210 my @res = qw(foo bar baz);
211 return (@res);
212 });
213
214 my @res1 = $hobo1->join(); # ( foo, bar, baz )
215 my $res1 = $hobo1->join(); # baz
216
217 my $hobo2 = MCE::Hobo->create( sub {
218 return 'foo';
219 });
220
221 my @res2 = $hobo2->join(); # ( foo )
222 my $res2 = $hobo2->join(); # foo
223
224 $hobo1->equal( $hobo2 )
225 Tests if two hobo objects are the same hobo or not. Hobo comparison
226 is based on process IDs. This is overloaded to the more natural
227 forms.
228
229 if ( $hobo1 == $hobo2 ) {
230 print("Hobo objects are the same\n");
231 }
232 # or
233 if ( $hobo1 != $hobo2 ) {
234 print("Hobo objects differ\n");
235 }
236
237 $hobo->error()
238 Hobo processes are executed in an "eval" context. This method will
239 return "undef" if the hobo terminates normally. Otherwise, it
240 returns the value of $@ associated with the hobo's execution status
241 in its "eval" context.
242
243 $hobo->exit()
244 This sends 'SIGQUIT' to the hobo process, notifying the hobo to
245 exit. It returns the hobo object to allow for method chaining. It
246 is important to join later if not immediately to not leave a zombie
247 or defunct process.
248
249 $hobo->exit()->join();
250 ...
251
252 $hobo->join(); # later
253
254 MCE::Hobo->exit( 0 )
255 MCE::Hobo->exit( 0, @ret )
256 A hobo can exit at any time by calling "MCE::Hobo->exit()".
257 Otherwise, the behavior is the same as "exit(status)" when called
258 from the main process. Current since 1.827, the hobo process may
259 optionally return data, to be sent via IPC.
260
261 MCE::Hobo->finish()
262 This class method is called automatically by "END", but may be
263 called explicitly. An error is emitted via croak if there are active
264 hobo processes not yet joined.
265
266 MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
267 $_->join for MCE::Hobo->list();
268
269 MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
270 $_->join for MCE::Hobo->list();
271
272 MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
273 $_->join for MCE::Hobo->list();
274
275 MCE::Hobo->finish();
276
277 MCE::Hobo->init( options )
278 The init function accepts a list of MCE::Hobo options.
279
280 MCE::Hobo->init(
281 max_workers => 'auto', # default undef, unlimited
282 hobo_timeout => 20, # default undef, no timeout
283 posix_exit => 1, # default undef, CORE::exit
284 void_context => 1, # default undef
285 on_start => sub {
286 my ( $pid, $ident ) = @_;
287 ...
288 },
289 on_finish => sub {
290 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
291 ...
292 }
293 );
294
295 # Identification given as an option or the 1st argument.
296 # Current API available since 1.827.
297
298 for my $key ( 'aa' .. 'zz' ) {
299 MCE::Hobo->create( { ident => $key }, sub { ... } );
300 MCE::Hobo->create( $key, sub { ... } );
301 }
302
303 MCE::Hobo->wait_all;
304
305 Set "max_workers" if you want to limit the number of workers by
306 waiting automatically for an available slot. Specify "auto" to
307 obtain the number of logical cores via "MCE::Util::get_ncpu()".
308
309 Set "hobo_timeout", in number of seconds, if you want the hobo
310 process to terminate after some time. The default is 0 for no
311 timeout.
312
313 Set "posix_exit" to avoid all END and destructor processing.
314 Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
315 FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
316 Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
317
318 Set "void_context" to create the hobo process in void context for
319 the return value. Otherwise, the return context is wantarray-aware
320 for "join()" and "result()" and determined when retrieving the data.
321
322 The callback options "on_start" and "on_finish" are called in the
323 parent process after starting the worker and later when terminated.
324 The arguments for the subroutines were inspired by
325 Parallel::ForkManager.
326
327 The parameters for "on_start" are the following:
328
329 - pid of the hobo process
330 - identification (ident option or 1st arg to create)
331
332 The parameters for "on_finish" are the following:
333
334 - pid of the hobo process
335 - program exit code
336 - identification (ident option or 1st arg to create)
337 - exit signal id
338 - error message from eval inside MCE::Hobo
339 - returned data
340
341 $hobo->is_running()
342 Returns true if a hobo is still running.
343
344 $hobo->is_joinable()
345 Returns true if the hobo has finished running and not yet joined.
346
347 $hobo->kill( 'SIG...' )
348 Sends the specified signal to the hobo. Returns the hobo object to
349 allow for method chaining. As with "exit", it is important to join
350 eventually if not immediately to not leave a zombie or defunct
351 process.
352
353 $hobo->kill('SIG...')->join();
354
355 The following is a parallel demonstration comparing "MCE::Shared"
356 against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
357 after all workers have been notified to quit.
358
359 use Time::HiRes qw(time);
360
361 use Redis;
362 use Redis::Fast;
363
364 use MCE::Hobo;
365 use MCE::Shared;
366
367 my $redis = Redis->new();
368 my $rfast = Redis::Fast->new();
369 my $array = MCE::Shared->array();
370
371 sub parallel_redis {
372 my ($_redis) = @_;
373 my ($count, $quit, $len) = (0, 0);
374
375 # instead, use a flag to exit loop
376 $SIG{'QUIT'} = sub { $quit = 1 };
377
378 while () {
379 $len = $_redis->rpush('list', $count++);
380 last if $quit;
381 }
382
383 $count;
384 }
385
386 sub parallel_array {
387 my ($count, $quit, $len) = (0, 0);
388
389 # do not exit from inside handler
390 $SIG{'QUIT'} = sub { $quit = 1 };
391
392 while () {
393 $len = $array->push($count++);
394 last if $quit;
395 }
396
397 $count;
398 }
399
400 sub benchmark_this {
401 my ($desc, $num_procs, $timeout, $code, @args) = @_;
402 my ($start, $total) = (time(), 0);
403
404 MCE::Hobo->new($code, @args) for 1..$num_procs;
405 sleep $timeout;
406
407 # joining is not immediate; ok
408 $_->kill('QUIT') for MCE::Hobo->list();
409
410 # joining later; ok
411 $total += $_->join() for MCE::Hobo->list();
412
413 printf "$desc <> duration: %0.03f secs, count: $total\n",
414 time() - $start;
415
416 sleep 0.2;
417 }
418
419 benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis);
420 benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast);
421 benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
422
423 MCE::Hobo->list()
424 Returns a list of all hobo objects not yet joined.
425
426 @hobos = MCE::Hobo->list();
427
428 MCE::Hobo->list_pids()
429 Returns a list of all hobo pids not yet joined (available since
430 1.849).
431
432 @pids = MCE::Hobo->list_pids();
433
434 $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
435 # Signal workers and the shared manager all at once
436 CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
437 exec('reset');
438 };
439
440 MCE::Hobo->list_running()
441 Returns a list of all hobo objects that are still running.
442
443 @hobos = MCE::Hobo->list_running();
444
445 MCE::Hobo->list_joinable()
446 Returns a list of all hobo objects that have completed running.
447 Thus, ready to be joined without blocking.
448
449 @hobos = MCE::Hobo->list_joinable();
450
451 MCE::Hobo->max_workers([ N ])
452 Getter and setter for max_workers. Specify a number or 'auto' to
453 acquire the total number of cores via MCE::Util::get_ncpu. Specify a
454 false value to set back to no limit.
455
456 API available since 1.835.
457
458 MCE::Hobo->pending()
459 Returns a count of all hobo objects not yet joined.
460
461 $count = MCE::Hobo->pending();
462
463 $hobo->result()
464 Returns the result obtained by "join", "wait_one", or "wait_all". If
465 the process has not yet exited, waits for the corresponding hobo to
466 complete its execution.
467
468 use MCE::Hobo;
469 use Time::HiRes qw(sleep);
470
471 sub task {
472 my ($id) = @_;
473 sleep $id * 0.333;
474 return $id;
475 }
476
477 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
478
479 # 1 while MCE::Hobo->wait_one();
480
481 while ( my $hobo = MCE::Hobo->wait_one() ) {
482 my $err = $hobo->error() || 'no error';
483 my $res = $hobo->result();
484 my $pid = $hobo->pid();
485
486 print "[$pid] $err : $res\n";
487 }
488
489 Like "join" described above, the context (void, scalar or list) for
490 the return value(s) is determined at the time "result" is called and
491 mostly "wantarray" aware.
492
493 my $hobo1 = MCE::Hobo->create( sub {
494 my @res = qw(foo bar baz);
495 return (@res);
496 });
497
498 my @res1 = $hobo1->result(); # ( foo, bar, baz )
499 my $res1 = $hobo1->result(); # baz
500
501 my $hobo2 = MCE::Hobo->create( sub {
502 return 'foo';
503 });
504
505 my @res2 = $hobo2->result(); # ( foo )
506 my $res2 = $hobo2->result(); # foo
507
508 MCE::Hobo->self()
509 Class method that allows a hobo to obtain it's own MCE::Hobo object.
510
511 $hobo->pid()
512 $hobo->tid()
513 Returns the ID of the hobo.
514
515 pid: $$ process id
516 tid: $$ alias for pid
517
518 MCE::Hobo->pid()
519 MCE::Hobo->tid()
520 Class methods that allows a hobo to obtain its own ID.
521
522 pid: $$ process id
523 tid: $$ alias for pid
524
525 MCE::Hobo->wait_one()
526 MCE::Hobo->waitone()
527 MCE::Hobo->wait_all()
528 MCE::Hobo->waitall()
529 Meaningful for the manager process only, waits for one or all hobo
530 processes to complete execution. Afterwards, returns the
531 corresponding hobo objects. If a hobo doesn't exist, returns the
532 "undef" value or an empty list for "wait_one" and "wait_all"
533 respectively.
534
535 The "waitone" and "waitall" methods are aliases since 1.827 for
536 backwards compatibility.
537
538 use MCE::Hobo;
539 use Time::HiRes qw(sleep);
540
541 sub task {
542 my $id = shift;
543 sleep $id * 0.333;
544 return $id;
545 }
546
547 MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
548
549 # join, traditional use case
550 $_->join() for MCE::Hobo->list();
551
552 # wait_one, simplistic use case
553 1 while MCE::Hobo->wait_one();
554
555 # wait_one
556 while ( my $hobo = MCE::Hobo->wait_one() ) {
557 my $err = $hobo->error() || 'no error';
558 my $res = $hobo->result();
559 my $pid = $hobo->pid();
560
561 print "[$pid] $err : $res\n";
562 }
563
564 # wait_all
565 my @hobos = MCE::Hobo->wait_all();
566
567 for ( @hobos ) {
568 my $err = $_->error() || 'no error';
569 my $res = $_->result();
570 my $pid = $_->pid();
571
572 print "[$pid] $err : $res\n";
573 }
574
575 MCE::Hobo->yield( [ floating_seconds ] )
576 Prior API till 1.826.
577
578 Let this hobo yield CPU time to other workers. By default, the class
579 method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
580 including Cygwin.
581
582 MCE::Hobo->yield();
583 MCE::Hobo->yield(0.05);
584
585 # total run time: 0.25 seconds, sleep occuring in parallel
586
587 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
588 MCE::Hobo->wait_all();
589
590 Current API available since 1.827.
591
592 Give other workers a chance to run, optionally for given time. Yield
593 behaves similarly to MCE's interval option. It throttles workers
594 from running too fast. A demonstration is provided in the next
595 section for fetching URLs in parallel.
596
597 The default "floating_seconds" is 0.008 and 0.015 on UNIX and
598 Windows, respectively. Pass 0 if you want to give other workers a
599 chance to run.
600
601 # total run time: 1.00 second
602
603 MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
604 MCE::Hobo->wait_all();
605
607 MCE::Hobo behaves similarly to threads for the most part. It also
608 provides Parallel::ForkManager-like capabilities. The
609 "Parallel::ForkManager" example is shown first followed by a version
610 using "MCE::Hobo".
611
612 Parallel::ForkManager
613 use strict;
614 use warnings;
615
616 use Parallel::ForkManager;
617 use Time::HiRes 'time';
618
619 my $start = time;
620
621 my $pm = Parallel::ForkManager->new(10);
622 $pm->set_waitpid_blocking_sleep(0);
623
624 $pm->run_on_finish( sub {
625 my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
626 print "child $pid completed: $ident => ", $resp->[0], "\n";
627 });
628
629 DATA_LOOP:
630 foreach my $data ( 1..2000 ) {
631 # forks and returns the pid for the child
632 my $pid = $pm->start($data) and next DATA_LOOP;
633 my $ret = [ $data * 2 ];
634
635 $pm->finish(0, $ret);
636 }
637
638 $pm->wait_all_children;
639
640 printf STDERR "duration: %0.03f seconds\n", time - $start;
641
642 MCE::Hobo
643 use strict;
644 use warnings;
645
646 use MCE::Hobo 1.843;
647 use Time::HiRes 'time';
648
649 my $start = time;
650
651 MCE::Hobo->init(
652 max_workers => 10,
653 on_finish => sub {
654 my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
655 print "child $pid completed: $ident => ", $resp->[0], "\n";
656 }
657 );
658
659 foreach my $data ( 1..2000 ) {
660 MCE::Hobo->create( $data, sub {
661 [ $data * 2 ];
662 });
663 }
664
665 MCE::Hobo->wait_all;
666
667 printf STDERR "duration: %0.03f seconds\n", time - $start;
668
669 Time to spin 2,000 workers and obtain results (in seconds).
670 Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
671 Boost). Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
672 with Moo loaded at the top of the script.
673
674 MCE::Hobo uses MCE::Shared to retrieve data during reaping.
675 MCE::Child uses MCE::Channel, no shared-manager.
676
677 Version Cygwin Windows Linux macOS FreeBSD
678
679 MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
680 MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
681 P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
682
683 MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
684 MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
685 P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
686
687 Set posix_exit to avoid all END and destructor processing.
688 This is helpful for reducing overhead when workers exit. Ditto if
689 using a Perl module not parallel safe. The option is ignored on
690 Windows "$^O eq 'MSWin32'".
691
692 MCE::Child->init( posix_exit => 1, ... );
693 MCE::Hobo->init( posix_exit => 1, ... );
694
695 Version Cygwin Windows Linux macOS FreeBSD
696
697 MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
698 MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
699
701 This demonstration constructs two queues, two handles, starts the
702 shared-manager process if needed, and spawns four workers. For this
703 demonstration, am chunking 64 URLs per job. In reality, one may run
704 with 200 workers and chunk 300 URLs on a 24-way box.
705
706 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
707 # perl demo.pl -- all output
708 # perl demo.pl >/dev/null -- mngr/hobo output
709 # perl demo.pl 2>/dev/null -- show results only
710 #
711 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
712
713 use strict;
714 use warnings;
715
716 use AnyEvent;
717 use AnyEvent::HTTP;
718 use Time::HiRes qw( time );
719
720 use MCE::Hobo;
721 use MCE::Shared;
722
723 # Construct two queues, input and return.
724
725 my $que = MCE::Shared->queue();
726 my $ret = MCE::Shared->queue();
727
728 # Construct shared handles for serializing output from many workers
729 # writing simultaneously. This prevents garbled output.
730
731 mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
732 mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
733
734 # Spawn workers early for minimum memory consumption.
735
736 MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
737
738 # Obtain or generate input data for workers to process.
739
740 my ( $count, @urls ) = ( 0 );
741
742 push @urls, map { "http://127.0.0.$_/" } 1..254;
743 push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
744
745 while ( @urls ) {
746 my @chunk = splice(@urls, 0, 64);
747 $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
748 }
749
750 # So that workers leave the loop after consuming the queue.
751
752 $que->end();
753
754 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
755 # Loop for the manager process. The manager may do other work if
756 # need be and periodically check $ret->pending() not shown here.
757 #
758 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
759
760 my $start = time;
761
762 printf {$ERR} "Mngr - entering loop\n";
763
764 while ( $count ) {
765 my ( $result, $failed ) = $ret->dequeue( 2 );
766
767 # Remove ID from result, so not treated as a URL item.
768
769 printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
770
771 # Display the URL and the size captured.
772
773 foreach my $url ( keys %{ $result } ) {
774 printf {$OUT} "%s: %d\n", $url, length($result->{$url})
775 if $result->{$url}; # url has content
776 }
777
778 # Display URLs could not reach.
779
780 if ( @{ $failed } ) {
781 foreach my $url ( @{ $failed } ) {
782 print {$OUT} "Failed: $url\n";
783 }
784 }
785
786 # Decrement the count.
787
788 $count--;
789 }
790
791 MCE::Hobo->wait_all();
792
793 printf {$ERR} "Mngr - exiting loop\n\n";
794 printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
795
796 exit;
797
798 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
799 # Hobo processes enqueue two items ( $result and $failed ) per each
800 # job for the manager process. Likewise, the manager process dequeues
801 # two items above. Optionally, hobo processes may include the ID in
802 # the result.
803 #
804 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
805
806 sub task {
807 my ( $id ) = @_;
808 printf {$ERR} "Hobo $id entering loop\n";
809
810 while ( my $job = $que->dequeue() ) {
811 my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
812
813 # Walk URLs, provide a hash and array refs for data.
814
815 printf {$ERR} "Hobo $id running job $job->{ID}\n";
816 walk( $job, $result, $failed );
817
818 # Send results to the manager process.
819
820 $ret->enqueue( $result, $failed );
821 }
822
823 printf {$ERR} "Hobo $id exiting loop\n";
824 }
825
826 sub walk {
827 my ( $job, $result, $failed ) = @_;
828
829 # Yielding is critical when running an event loop in parallel.
830 # Not doing so means that the app may reach contention points
831 # with the firewall and likely impose unnecessary hardship at
832 # the OS level. The idea here is not to have multiple workers
833 # initiate HTTP requests to a batch of URLs at the same time.
834 # Yielding in 1.827+ behaves similarly like scatter to have
835 # the hobo process run solo for a fraction of time.
836
837 MCE::Hobo->yield( 0.03 ); # MCE::Hobo 1.827+
838
839 my $cv = AnyEvent->condvar();
840
841 # Populate the hash ref for the URLs it could reach.
842 # Do not mix AnyEvent timeout with hobo timeout.
843 # Therefore, choose event timeout when available.
844
845 foreach my $url ( @{ $job->{INPUT} } ) {
846 $cv->begin();
847 http_get $url, timeout => 2, sub {
848 my ( $data, $headers ) = @_;
849 $result->{$url} = $data;
850 $cv->end();
851 };
852 }
853
854 $cv->recv();
855
856 # Populate the array ref for URLs it could not reach.
857
858 foreach my $url ( @{ $job->{INPUT} } ) {
859 push @{ $failed }, $url unless (exists $result->{ $url });
860 }
861
862 return;
863 }
864
865 __END__
866
867 $ perl demo.pl
868
869 Hobo 1 entering loop
870 Hobo 2 entering loop
871 Hobo 3 entering loop
872 Mngr - entering loop
873 Hobo 2 running job 2
874 Hobo 3 running job 3
875 Hobo 1 running job 1
876 Hobo 4 entering loop
877 Hobo 4 running job 4
878 Hobo 2 running job 5
879 Mngr - received job 2
880 Hobo 3 running job 6
881 Mngr - received job 3
882 Hobo 1 running job 7
883 Mngr - received job 1
884 Hobo 4 running job 8
885 Mngr - received job 4
886 http://192.168.0.1/: 3729
887 Hobo 2 exiting loop
888 Mngr - received job 5
889 Hobo 3 exiting loop
890 Mngr - received job 6
891 Hobo 1 exiting loop
892 Mngr - received job 7
893 Hobo 4 exiting loop
894 Mngr - received job 8
895 Mngr - exiting loop
896
897 Duration: 4.131 seconds
898
900 Making an executable is possible with the PAR::Packer module. On the
901 Windows platform, threads, threads::shared, and exiting via threads are
902 necessary for the binary to exit successfully.
903
904 # https://metacpan.org/pod/PAR::Packer
905 # https://metacpan.org/pod/pp
906 #
907 # pp -o demo.exe demo.pl
908 # ./demo.exe
909
910 use strict;
911 use warnings;
912
913 use if $^O eq "MSWin32", "threads";
914 use if $^O eq "MSWin32", "threads::shared";
915
916 # Include minimum dependencies for MCE::Hobo.
917 # Add other modules required by your application here.
918
919 use Storable ();
920 use Time::HiRes ();
921
922 # use IO::FDPass (); # optional: for condvar, handle, queue
923 # use Sereal (); # optional: for faster serialization
924
925 use MCE::Hobo;
926 use MCE::Shared;
927
928 # For PAR to work on the Windows platform, one must include manually
929 # any shared modules used by the application.
930
931 # use MCE::Shared::Array; # if using MCE::Shared->array
932 # use MCE::Shared::Cache; # if using MCE::Shared->cache
933 # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
934 # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
935 # use MCE::Shared::Hash; # if using MCE::Shared->hash
936 # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
937 # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
938 # use MCE::Shared::Queue; # if using MCE::Shared->queue
939 # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
940
941 # Et cetera. Only load modules needed for your application.
942
943 use MCE::Shared::Sequence; # if using MCE::Shared->sequence
944
945 my $seq = MCE::Shared->sequence( 1, 9 );
946
947 sub task {
948 my ( $id ) = @_;
949 while ( defined ( my $num = $seq->next() ) ) {
950 print "$id: $num\n";
951 sleep 1;
952 }
953 }
954
955 sub main {
956 MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
957 MCE::Hobo->wait_all();
958 }
959
960 # Main must run inside a thread on the Windows platform or workers
961 # will fail duing exiting, causing the exe to crash. The reason is
962 # that PAR or a dependency isn't multi-process safe.
963
964 ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
965
966 threads->exit(0) if $INC{"threads.pm"};
967
969 The inspiration for "MCE::Hobo" comes from wanting "threads"-like
970 behavior for processes. Both can run side-by-side including safe-use by
971 MCE workers. Likewise, the documentation resembles "threads".
972
973 The inspiration for "wait_all" and "wait_one" comes from the
974 "Parallel::WorkUnit" module.
975
977 · forks
978
979 · forks::BerkeleyDB
980
981 · MCE::Child
982
983 · Parallel::ForkManager
984
985 · Parallel::Loops
986
987 · Parallel::Prefork
988
989 · Parallel::WorkUnit
990
991 · Proc::Fork
992
993 · Thread::Tie
994
995 · threads
996
998 MCE, MCE::Channel, MCE::Shared
999
1001 Mario E. Roy, <marioeroy AT gmail DOT com>
1002
1003
1004
1005perl v5.30.1 2020-01-30 MCE::Hobo(3)