1MCE::Hobo(3)          User Contributed Perl Documentation         MCE::Hobo(3)
2
3
4

NAME

6       MCE::Hobo - A threads-like parallelization module
7

VERSION

9       This document describes MCE::Hobo version 1.862
10

SYNOPSIS

12        use MCE::Hobo;
13
14        MCE::Hobo->init(
15            max_workers => 'auto',   # default undef, unlimited
16            hobo_timeout => 20,      # default undef, no timeout
17            posix_exit => 1,         # default undef, CORE::exit
18            void_context => 1,       # default undef
19            on_start => sub {
20                my ( $pid, $ident ) = @_;
21                ...
22            },
23            on_finish => sub {
24                my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
25                ...
26            }
27        );
28
29        MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
30
31        sub parallel {
32            my ($arg1) = @_;
33            print "Hello again, $arg1\n" if defined($arg1);
34            print "Hello again, $_\n"; # same thing
35        }
36
37        MCE::Hobo->create( \&parallel, $_ ) for 1 .. 3;
38
39        my @hobos    = MCE::Hobo->list();
40        my @pids     = MCE::Hobo->list_pids();
41        my @running  = MCE::Hobo->list_running();
42        my @joinable = MCE::Hobo->list_joinable();
43        my @count    = MCE::Hobo->pending();
44
45        # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
46        $_->join() for @hobos;   # (or)
47        $_->join() for @joinable;
48
49        # Joining occurs immediately as hobo processes complete execution.
50        1 while MCE::Hobo->wait_one();
51
52        my $hobo = mce_async { foreach (@files) { ... } };
53
54        $hobo->join();
55
56        if ( my $err = $hobo->error() ) {
57            warn "Hobo error: $err\n";
58        }
59
60        # Get a hobo's object
61        $hobo = MCE::Hobo->self();
62
63        # Get a hobo's ID
64        $pid = MCE::Hobo->pid();  # $$
65        $pid = $hobo->pid();
66        $pid = MCE::Hobo->tid();  # tid is an alias for pid
67        $pid = $hobo->tid();
68
69        # Test hobo objects
70        if ( $hobo1 == $hobo2 ) {
71            ...
72        }
73
74        # Give other workers a chance to run
75        MCE::Hobo->yield();
76        MCE::Hobo->yield(0.05);
77
78        # Return context, wantarray aware
79        my ($value1, $value2) = $hobo->join();
80        my $value = $hobo->join();
81
82        # Check hobo's state
83        if ( $hobo->is_running() ) {
84            sleep 1;
85        }
86        if ( $hobo->is_joinable() ) {
87            $hobo->join();
88        }
89
90        # Send a signal to a hobo
91        $hobo->kill('SIGUSR1');
92
93        # Exit a hobo
94        MCE::Hobo->exit(0);
95        MCE::Hobo->exit(0, @ret);  # MCE::Hobo 1.827+
96

DESCRIPTION

98       A hobo is a migratory worker inside the machine that carries the
99       asynchronous gene. Hobo processes are equipped with "threads"-like
100       capability for running code asynchronously. Unlike threads, each hobo
101       is a unique process to the underlying OS. The IPC is managed by
102       "MCE::Shared", which runs on all the major platforms including Cygwin
103       and Strawberry Perl.
104
105       An exception was made on the Windows platform to spawn threads versus
106       children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
107       release reverts back to spawning children on all supported platforms.
108
109       "MCE::Hobo" may be used as a standalone or together with "MCE"
110       including running alongside "threads".
111
112        use MCE::Hobo;
113        use MCE::Shared;
114
115        # synopsis: head -20 file.txt | perl script.pl
116
117        my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
118        my $ofh = MCE::Shared->handle( ">", \*STDOUT );
119        my $ary = MCE::Shared->array();
120
121        sub parallel_task {
122            my ( $id ) = @_;
123            while ( <$ifh> ) {
124                printf {$ofh} "[ %4d ] %s", $., $_;
125              # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
126                $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
127            }
128        }
129
130        my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
131        my $hobo2 = MCE::Hobo->new( \&parallel_task, 2 );
132        my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
133
134        $_->join for MCE::Hobo->list();  # ditto: MCE::Hobo->wait_all();
135
136        # search array (total one round-trip via IPC)
137        my @vals = $ary->vals( "val =~ / ID 2 /" );
138
139        print {*STDERR} join("", @vals);
140

API DOCUMENTATION

142       $hobo = MCE::Hobo->create( FUNCTION, ARGS )
143       $hobo = MCE::Hobo->new( FUNCTION, ARGS )
144          This will create a new hobo process that will begin execution with
145          function as the entry point, and optionally ARGS for list of
146          parameters. It will return the corresponding MCE::Hobo object, or
147          undef if hobo creation failed.
148
149          FUNCTION may either be the name of a function, an anonymous
150          subroutine, or a code ref.
151
152           my $hobo = MCE::Hobo->create( "func_name", ... );
153               # or
154           my $hobo = MCE::Hobo->create( sub { ... }, ... );
155               # or
156           my $hobo = MCE::Hobo->create( \&func, ... );
157
158       $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
159       $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
160          Options, excluding "ident", may be specified globally via the "init"
161          function.  Otherwise, "ident", "hobo_timeout", "posix_exit", and
162          "void_context" may be set uniquely.
163
164          The "ident" option, available since 1.827, is used by callback
165          functions "on_start" and "on_finish" for identifying the started and
166          finished hobo process respectively.
167
168           my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
169               ...
170           } );
171
172           $hobo1->join;
173
174           my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
175               sleep 1 for ( 1 .. 9 );
176           } );
177
178           $hobo2->join;
179
180           if ( $hobo2->error() eq "Hobo timed out\n" ) {
181               ...
182           }
183
184          The "new()" method is an alias for "create()".
185
186       mce_async { BLOCK } ARGS;
187       mce_async { BLOCK };
188          "mce_async" runs the block asynchronously similarly to
189          "MCE::Hobo->create()".  It returns the hobo object, or undef if hobo
190          creation failed.
191
192           my $hobo = mce_async { foreach (@files) { ... } };
193
194           $hobo->join();
195
196           if ( my $err = $hobo->error() ) {
197               warn("Hobo error: $err\n");
198           }
199
200       $hobo->join()
201          This will wait for the corresponding hobo process to complete its
202          execution.  In non-voided context, "join()" will return the value(s)
203          of the entry point function.
204
205          The context (void, scalar or list) for the return value(s) for
206          "join" is determined at the time of joining and mostly "wantarray"
207          aware.
208
209           my $hobo1 = MCE::Hobo->create( sub {
210               my @res = qw(foo bar baz);
211               return (@res);
212           });
213
214           my @res1 = $hobo1->join();  # ( foo, bar, baz )
215           my $res1 = $hobo1->join();  #   baz
216
217           my $hobo2 = MCE::Hobo->create( sub {
218               return 'foo';
219           });
220
221           my @res2 = $hobo2->join();  # ( foo )
222           my $res2 = $hobo2->join();  #   foo
223
224       $hobo1->equal( $hobo2 )
225          Tests if two hobo objects are the same hobo or not. Hobo comparison
226          is based on process IDs. This is overloaded to the more natural
227          forms.
228
229           if ( $hobo1 == $hobo2 ) {
230               print("Hobo objects are the same\n");
231           }
232           # or
233           if ( $hobo1 != $hobo2 ) {
234               print("Hobo objects differ\n");
235           }
236
237       $hobo->error()
238          Hobo processes are executed in an "eval" context. This method will
239          return "undef" if the hobo terminates normally. Otherwise, it
240          returns the value of $@ associated with the hobo's execution status
241          in its "eval" context.
242
243       $hobo->exit()
244          This sends 'SIGINT' to the hobo process, notifying the hobo to exit.
245          It returns the hobo object to allow for method chaining. It is
246          important to join later if not immediately to not leave a zombie or
247          defunct process.
248
249           $hobo->exit()->join();
250           ...
251
252           $hobo->join();  # later
253
254       MCE::Hobo->exit( 0 )
255       MCE::Hobo->exit( 0, @ret )
256          A hobo can exit at any time by calling "MCE::Hobo->exit()".
257          Otherwise, the behavior is the same as "exit(status)" when called
258          from the main process. Current since 1.827, the hobo process may
259          optionally return data, to be sent via IPC.
260
261       MCE::Hobo->finish()
262          This class method is called automatically by "END", but may be
263          called explicitly. An error is emitted via croak if there are active
264          hobo processes not yet joined.
265
266           MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
267           $_->join for MCE::Hobo->list();
268
269           MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
270           $_->join for MCE::Hobo->list();
271
272           MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
273           $_->join for MCE::Hobo->list();
274
275           MCE::Hobo->finish();
276
277       MCE::Hobo->init( options )
278          The init function accepts a list of MCE::Hobo options.
279
280           MCE::Hobo->init(
281               max_workers => 'auto',   # default undef, unlimited
282               hobo_timeout => 20,      # default undef, no timeout
283               posix_exit => 1,         # default undef, CORE::exit
284               void_context => 1,       # default undef
285               on_start => sub {
286                   my ( $pid, $ident ) = @_;
287                   ...
288               },
289               on_finish => sub {
290                   my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
291                   ...
292               }
293           );
294
295           # Identification given as an option or the 1st argument.
296           # Current API available since 1.827.
297
298           for my $key ( 'aa' .. 'zz' ) {
299               MCE::Hobo->create( { ident => $key }, sub { ... } );
300               MCE::Hobo->create( $key, sub { ... } );
301           }
302
303           MCE::Hobo->wait_all;
304
305          Set "max_workers" if you want to limit the number of workers by
306          waiting automatically for an available slot. Specify "auto" to
307          obtain the number of logical cores via "MCE::Util::get_ncpu()".
308
309          Set "hobo_timeout", in number of seconds, if you want the hobo
310          process to terminate after some time. The default is 0 for no
311          timeout.
312
313          Set "posix_exit" to avoid all END and destructor processing.
314          Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
315          FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
316          Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
317
318          Set "void_context" to create the hobo process in void context for
319          the return value. Otherwise, the return context is wantarray-aware
320          for "join()" and "result()" and determined when retrieving the data.
321
322          The callback options "on_start" and "on_finish" are called in the
323          parent process after starting the worker and later when terminated.
324          The arguments for the subroutines were inspired by
325          Parallel::ForkManager.
326
327          The parameters for "on_start" are the following:
328
329           - pid of the hobo process
330           - identification (ident option or 1st arg to create)
331
332          The parameters for "on_finish" are the following:
333
334           - pid of the hobo process
335           - program exit code
336           - identification (ident option or 1st arg to create)
337           - exit signal id
338           - error message from eval inside MCE::Hobo
339           - returned data
340
341       $hobo->is_running()
342          Returns true if a hobo is still running.
343
344       $hobo->is_joinable()
345          Returns true if the hobo has finished running and not yet joined.
346
347       $hobo->kill( 'SIG...' )
348          Sends the specified signal to the hobo. Returns the hobo object to
349          allow for method chaining. As with "exit", it is important to join
350          eventually if not immediately to not leave a zombie or defunct
351          process.
352
353           $hobo->kill('SIG...')->join();
354
355          The following is a parallel demonstration comparing "MCE::Shared"
356          against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
357          after all workers have been notified to quit.
358
359           use Time::HiRes qw(time);
360
361           use Redis;
362           use Redis::Fast;
363
364           use MCE::Hobo;
365           use MCE::Shared;
366
367           my $redis = Redis->new();
368           my $rfast = Redis::Fast->new();
369           my $array = MCE::Shared->array();
370
371           sub parallel_redis {
372               my ($_redis) = @_;
373               my ($count, $quit, $len) = (0, 0);
374
375               # instead, use a flag to exit loop
376               $SIG{'QUIT'} = sub { $quit = 1 };
377
378               while () {
379                   $len = $_redis->rpush('list', $count++);
380                   last if $quit;
381               }
382
383               $count;
384           }
385
386           sub parallel_array {
387               my ($count, $quit, $len) = (0, 0);
388
389               # do not exit from inside handler
390               $SIG{'QUIT'} = sub { $quit = 1 };
391
392               while () {
393                   $len = $array->push($count++);
394                   last if $quit;
395               }
396
397               $count;
398           }
399
400           sub benchmark_this {
401               my ($desc, $num_procs, $timeout, $code, @args) = @_;
402               my ($start, $total) = (time(), 0);
403
404               MCE::Hobo->new($code, @args) for 1..$num_procs;
405               sleep $timeout;
406
407               # joining is not immediate; ok
408               $_->kill('QUIT') for MCE::Hobo->list();
409
410               # joining later; ok
411               $total += $_->join() for MCE::Hobo->list();
412
413               printf "$desc <> duration: %0.03f secs, count: $total\n",
414                   time() - $start;
415
416               sleep 0.2;
417           }
418
419           benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
420           benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
421           benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
422
423       MCE::Hobo->list()
424          Returns a list of all hobo objects not yet joined.
425
426           @hobos = MCE::Hobo->list();
427
428       MCE::Hobo->list_pids()
429          Returns a list of all hobo pids not yet joined (available since
430          1.849).
431
432           @pids = MCE::Hobo->list_pids();
433
434           $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
435               # Signal workers and the shared manager all at once
436               CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
437               exec('reset');
438           };
439
440       MCE::Hobo->list_running()
441          Returns a list of all hobo objects that are still running.
442
443           @hobos = MCE::Hobo->list_running();
444
445       MCE::Hobo->list_joinable()
446          Returns a list of all hobo objects that have completed running.
447          Thus, ready to be joined without blocking.
448
449           @hobos = MCE::Hobo->list_joinable();
450
451       MCE::Hobo->max_workers([ N ])
452          Getter and setter for max_workers. Specify a number or 'auto' to
453          acquire the total number of cores via MCE::Util::get_ncpu. Specify a
454          false value to set back to no limit.
455
456          API available since 1.835.
457
458       MCE::Hobo->pending()
459          Returns a count of all hobo objects not yet joined.
460
461           $count = MCE::Hobo->pending();
462
463       $hobo->result()
464          Returns the result obtained by "join", "wait_one", or "wait_all". If
465          the process has not yet exited, waits for the corresponding hobo to
466          complete its execution.
467
468           use MCE::Hobo;
469           use Time::HiRes qw(sleep);
470
471           sub task {
472               my ($id) = @_;
473               sleep $id * 0.333;
474               return $id;
475           }
476
477           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
478
479           # 1 while MCE::Hobo->wait_one();
480
481           while ( my $hobo = MCE::Hobo->wait_one() ) {
482               my $err = $hobo->error() || 'no error';
483               my $res = $hobo->result();
484               my $pid = $hobo->pid();
485
486               print "[$pid] $err : $res\n";
487           }
488
489          Like "join" described above, the context (void, scalar or list) for
490          the return value(s) is determined at the time "result" is called and
491          mostly "wantarray" aware.
492
493           my $hobo1 = MCE::Hobo->create( sub {
494               my @res = qw(foo bar baz);
495               return (@res);
496           });
497
498           my @res1 = $hobo1->result();  # ( foo, bar, baz )
499           my $res1 = $hobo1->result();  #   baz
500
501           my $hobo2 = MCE::Hobo->create( sub {
502               return 'foo';
503           });
504
505           my @res2 = $hobo2->result();  # ( foo )
506           my $res2 = $hobo2->result();  #   foo
507
508       MCE::Hobo->self()
509          Class method that allows a hobo to obtain it's own MCE::Hobo object.
510
511       $hobo->pid()
512       $hobo->tid()
513          Returns the ID of the hobo.
514
515           pid: $$  process id
516           tid: $$  alias for pid
517
518       MCE::Hobo->pid()
519       MCE::Hobo->tid()
520          Class methods that allows a hobo to obtain its own ID.
521
522           pid: $$  process id
523           tid: $$  alias for pid
524
525       MCE::Hobo->wait_one()
526       MCE::Hobo->waitone()
527       MCE::Hobo->wait_all()
528       MCE::Hobo->waitall()
529          Meaningful for the manager process only, waits for one or all hobo
530          processes to complete execution. Afterwards, returns the
531          corresponding hobo objects.  If a hobo doesn't exist, returns the
532          "undef" value or an empty list for "wait_one" and "wait_all"
533          respectively.
534
535          The "waitone" and "waitall" methods are aliases since 1.827 for
536          backwards compatibility.
537
538           use MCE::Hobo;
539           use Time::HiRes qw(sleep);
540
541           sub task {
542               my $id = shift;
543               sleep $id * 0.333;
544               return $id;
545           }
546
547           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
548
549           # join, traditional use case
550           $_->join() for MCE::Hobo->list();
551
552           # wait_one, simplistic use case
553           1 while MCE::Hobo->wait_one();
554
555           # wait_one
556           while ( my $hobo = MCE::Hobo->wait_one() ) {
557               my $err = $hobo->error() || 'no error';
558               my $res = $hobo->result();
559               my $pid = $hobo->pid();
560
561               print "[$pid] $err : $res\n";
562           }
563
564           # wait_all
565           my @hobos = MCE::Hobo->wait_all();
566
567           for ( @hobos ) {
568               my $err = $_->error() || 'no error';
569               my $res = $_->result();
570               my $pid = $_->pid();
571
572               print "[$pid] $err : $res\n";
573           }
574
575       MCE::Hobo->yield( [ floating_seconds ] )
576          Prior API till 1.826.
577
578          Let this hobo yield CPU time to other workers. By default, the class
579          method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
580          including Cygwin.
581
582           MCE::Hobo->yield();
583           MCE::Hobo->yield(0.05);
584
585           # total run time: 0.25 seconds, sleep occuring in parallel
586
587           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
588           MCE::Hobo->wait_all();
589
590          Current API available since 1.827.
591
592          Give other workers a chance to run, optionally for given time. Yield
593          behaves similarly to MCE's interval option. It throttles workers
594          from running too fast.  A demonstration is provided in the next
595          section for fetching URLs in parallel.
596
597           # total run time: 1.00 second
598
599           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
600           MCE::Hobo->wait_all();
601

PARALLEL::FORKMANAGER-like DEMONSTRATION

603       MCE::Hobo behaves similarly to threads for the most part. It also
604       provides Parallel::ForkManager-like capabilities. The
605       "Parallel::ForkManager" example is shown first followed by a version
606       using "MCE::Hobo".
607
608       Parallel::ForkManager
609           use strict;
610           use warnings;
611
612           use Parallel::ForkManager;
613           use Time::HiRes 'time';
614
615           my $start = time;
616
617           my $pm = Parallel::ForkManager->new(10);
618           $pm->set_waitpid_blocking_sleep(0);
619
620           $pm->run_on_finish( sub {
621               my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
622               print "child $pid completed: $ident => ", $resp->[0], "\n";
623           });
624
625           DATA_LOOP:
626           foreach my $data ( 1..2000 ) {
627               # forks and returns the pid for the child
628               my $pid = $pm->start($data) and next DATA_LOOP;
629               my $ret = [ $data * 2 ];
630
631               $pm->finish(0, $ret);
632           }
633
634           $pm->wait_all_children;
635
636           printf STDERR "duration: %0.03f seconds\n", time - $start;
637
638       MCE::Hobo
639           use strict;
640           use warnings;
641
642           use MCE::Hobo 1.843;
643           use Time::HiRes 'time';
644
645           my $start = time;
646
647           MCE::Hobo->init(
648               max_workers => 10,
649               on_finish   => sub {
650                   my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
651                   print "child $pid completed: $ident => ", $resp->[0], "\n";
652               }
653           );
654
655           foreach my $data ( 1..2000 ) {
656               MCE::Hobo->create( $data, sub {
657                   [ $data * 2 ];
658               });
659           }
660
661           MCE::Hobo->wait_all;
662
663           printf STDERR "duration: %0.03f seconds\n", time - $start;
664
665       Time to spin 2,000 workers and obtain results (in seconds).
666          Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
667          Boost).  Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
668          with Moo loaded at the top of the script.
669
670           MCE::Hobo uses MCE::Shared to retrieve data during reaping.
671           MCE::Child uses MCE::Channel, no shared-manager.
672
673                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
674
675           MCE::Child 1.843  19.099s  17.091s  0.965s  1.534s  1.229s
676            MCE::Hobo 1.843  20.514s  19.594s  1.246s  1.629s  1.613s
677                P::FM 1.20   19.703s  19.235s  0.875s  1.445s  1.346s
678
679           MCE::Child 1.843  20.426s  18.417s  1.116s  1.632s  1.338s  Moo loaded
680            MCE::Hobo 1.843  21.809s  20.810s  1.407s  1.759s  1.722s  Moo loaded
681                P::FM 2.02   21.668s  25.927s  1.882s  2.612s  2.483s  Moo used
682
683       Set posix_exit to avoid all END and destructor processing.
684          This is helpful for reducing overhead when workers exit. Ditto if
685          using a Perl module not parallel safe. The option is ignored on
686          Windows "$^O eq 'MSWin32'".
687
688           MCE::Child->init( posix_exit => 1, ... );
689            MCE::Hobo->init( posix_exit => 1, ... );
690
691                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
692
693           MCE::Child 1.843  19.815s  ignored  0.824s  1.284s  1.245s  Moo loaded
694            MCE::Hobo 1.843  21.029s  ignored  0.953s  1.335s  1.439s  Moo loaded
695

PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT

697       This demonstration constructs two queues, two handles, starts the
698       shared-manager process if needed, and spawns four workers.  For this
699       demonstration, am chunking 64 URLs per job. In reality, one may run
700       with 200 workers and chunk 300 URLs on a 24-way box.
701
702        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
703        # perl demo.pl              -- all output
704        # perl demo.pl  >/dev/null  -- mngr/hobo output
705        # perl demo.pl 2>/dev/null  -- show results only
706        #
707        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
708
709        use strict;
710        use warnings;
711
712        use AnyEvent;
713        use AnyEvent::HTTP;
714        use Time::HiRes qw( time );
715
716        use MCE::Hobo;
717        use MCE::Shared;
718
719        # Construct two queues, input and return.
720
721        my $que = MCE::Shared->queue();
722        my $ret = MCE::Shared->queue();
723
724        # Construct shared handles for serializing output from many workers
725        # writing simultaneously. This prevents garbled output.
726
727        mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
728        mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
729
730        # Spawn workers early for minimum memory consumption.
731
732        MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
733
734        # Obtain or generate input data for workers to process.
735
736        my ( $count, @urls ) = ( 0 );
737
738        push @urls, map { "http://127.0.0.$_/"   } 1..254;
739        push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
740
741        while ( @urls ) {
742            my @chunk = splice(@urls, 0, 64);
743            $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
744        }
745
746        # So that workers leave the loop after consuming the queue.
747
748        $que->end();
749
750        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
751        # Loop for the manager process. The manager may do other work if
752        # need be and periodically check $ret->pending() not shown here.
753        #
754        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
755
756        my $start = time;
757
758        printf {$ERR} "Mngr - entering loop\n";
759
760        while ( $count ) {
761            my ( $result, $failed ) = $ret->dequeue( 2 );
762
763            # Remove ID from result, so not treated as a URL item.
764
765            printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
766
767            # Display the URL and the size captured.
768
769            foreach my $url ( keys %{ $result } ) {
770                printf {$OUT} "%s: %d\n", $url, length($result->{$url})
771                    if $result->{$url};  # url has content
772            }
773
774            # Display URLs could not reach.
775
776            if ( @{ $failed } ) {
777                foreach my $url ( @{ $failed } ) {
778                    print {$OUT} "Failed: $url\n";
779                }
780            }
781
782            # Decrement the count.
783
784            $count--;
785        }
786
787        MCE::Hobo->wait_all();
788
789        printf {$ERR} "Mngr - exiting loop\n\n";
790        printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
791
792        exit;
793
794        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
795        # Hobo processes enqueue two items ( $result and $failed ) per each
796        # job for the manager process. Likewise, the manager process dequeues
797        # two items above. Optionally, hobo processes may include the ID in
798        # the result.
799        #
800        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
801
802        sub task {
803            my ( $id ) = @_;
804            printf {$ERR} "Hobo $id entering loop\n";
805
806            while ( my $job = $que->dequeue() ) {
807                my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
808
809                # Walk URLs, provide a hash and array refs for data.
810
811                printf {$ERR} "Hobo $id running  job $job->{ID}\n";
812                walk( $job, $result, $failed );
813
814                # Send results to the manager process.
815
816                $ret->enqueue( $result, $failed );
817            }
818
819            printf {$ERR} "Hobo $id exiting loop\n";
820        }
821
822        sub walk {
823            my ( $job, $result, $failed ) = @_;
824
825            # Yielding is critical when running an event loop in parallel.
826            # Not doing so means that the app may reach contention points
827            # with the firewall and likely impose unnecessary hardship at
828            # the OS level. The idea here is not to have multiple workers
829            # initiate HTTP requests to a batch of URLs at the same time.
830            # Yielding in 1.827+ behaves similarly like scatter to have
831            # the hobo process run solo for a fraction of time.
832
833            MCE::Hobo->yield( 0.03 );   # MCE::Hobo 1.827+
834
835            my $cv = AnyEvent->condvar();
836
837            # Populate the hash ref for the URLs it could reach.
838            # Do not mix AnyEvent timeout with hobo timeout.
839            # Therefore, choose event timeout when available.
840
841            foreach my $url ( @{ $job->{INPUT} } ) {
842                $cv->begin();
843                http_get $url, timeout => 2, sub {
844                    my ( $data, $headers ) = @_;
845                    $result->{$url} = $data;
846                    $cv->end();
847                };
848            }
849
850            $cv->recv();
851
852            # Populate the array ref for URLs it could not reach.
853
854            foreach my $url ( @{ $job->{INPUT} } ) {
855                push @{ $failed }, $url unless (exists $result->{ $url });
856            }
857
858            return;
859        }
860
861        __END__
862
863        $ perl demo.pl
864
865        Hobo 1 entering loop
866        Hobo 2 entering loop
867        Hobo 3 entering loop
868        Mngr - entering loop
869        Hobo 2 running  job 2
870        Hobo 3 running  job 3
871        Hobo 1 running  job 1
872        Hobo 4 entering loop
873        Hobo 4 running  job 4
874        Hobo 2 running  job 5
875        Mngr - received job 2
876        Hobo 3 running  job 6
877        Mngr - received job 3
878        Hobo 1 running  job 7
879        Mngr - received job 1
880        Hobo 4 running  job 8
881        Mngr - received job 4
882        http://192.168.0.1/: 3729
883        Hobo 2 exiting loop
884        Mngr - received job 5
885        Hobo 3 exiting loop
886        Mngr - received job 6
887        Hobo 1 exiting loop
888        Mngr - received job 7
889        Hobo 4 exiting loop
890        Mngr - received job 8
891        Mngr - exiting loop
892
893        Duration: 4.131 seconds
894

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

896       Making an executable is possible with the PAR::Packer module.  On the
897       Windows platform, threads, threads::shared, and exiting via threads are
898       necessary for the binary to exit successfully.
899
900        # https://metacpan.org/pod/PAR::Packer
901        # https://metacpan.org/pod/pp
902        #
903        #   pp -o demo.exe demo.pl
904        #   ./demo.exe
905
906        use strict;
907        use warnings;
908
909        use if $^O eq "MSWin32", "threads";
910        use if $^O eq "MSWin32", "threads::shared";
911
912        # Include minimum dependencies for MCE::Hobo.
913        # Add other modules required by your application here.
914
915        use Storable ();
916        use Time::HiRes ();
917
918        # use IO::FDPass ();  # optional: for condvar, handle, queue
919        # use Sereal ();      # optional: for faster serialization
920
921        use MCE::Hobo;
922        use MCE::Shared;
923
924        # For PAR to work on the Windows platform, one must include manually
925        # any shared modules used by the application.
926
927        # use MCE::Shared::Array;    # if using MCE::Shared->array
928        # use MCE::Shared::Cache;    # if using MCE::Shared->cache
929        # use MCE::Shared::Condvar;  # if using MCE::Shared->condvar
930        # use MCE::Shared::Handle;   # if using MCE::Shared->handle, mce_open
931        # use MCE::Shared::Hash;     # if using MCE::Shared->hash
932        # use MCE::Shared::Minidb;   # if using MCE::Shared->minidb
933        # use MCE::Shared::Ordhash;  # if using MCE::Shared->ordhash
934        # use MCE::Shared::Queue;    # if using MCE::Shared->queue
935        # use MCE::Shared::Scalar;   # if using MCE::Shared->scalar
936
937        # Et cetera. Only load modules needed for your application.
938
939        use MCE::Shared::Sequence;   # if using MCE::Shared->sequence
940
941        my $seq = MCE::Shared->sequence( 1, 9 );
942
943        sub task {
944            my ( $id ) = @_;
945            while ( defined ( my $num = $seq->next() ) ) {
946                print "$id: $num\n";
947                sleep 1;
948            }
949        }
950
951        sub main {
952            MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
953            MCE::Hobo->wait_all();
954        }
955
956        # Main must run inside a thread on the Windows platform or workers
957        # will fail duing exiting, causing the exe to crash. The reason is
958        # that PAR or a dependency isn't multi-process safe.
959
960        ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
961
962        threads->exit(0) if $INC{"threads.pm"};
963

CREDITS

965       The inspiration for "MCE::Hobo" comes from wanting "threads"-like
966       behavior for processes. Both can run side-by-side including safe-use by
967       MCE workers.  Likewise, the documentation resembles "threads".
968
969       The inspiration for "wait_all" and "wait_one" comes from the
970       "Parallel::WorkUnit" module.
971

SEE ALSO

973       ·  forks
974
975       ·  forks::BerkeleyDB
976
977       ·  MCE::Child
978
979       ·  Parallel::ForkManager
980
981       ·  Parallel::Loops
982
983       ·  Parallel::Prefork
984
985       ·  Parallel::WorkUnit
986
987       ·  Proc::Fork
988
989       ·  Thread::Tie
990
991       ·  threads
992

INDEX

994       MCE, MCE::Channel, MCE::Shared
995

AUTHOR

997       Mario E. Roy, <marioeroy AT gmail DOT com>
998
999
1000
1001perl v5.30.0                      2019-09-19                      MCE::Hobo(3)
Impressum