1MCE::Hobo(3)          User Contributed Perl Documentation         MCE::Hobo(3)
2
3
4

NAME

6       MCE::Hobo - A threads-like parallelization module
7

VERSION

9       This document describes MCE::Hobo version 1.864
10

SYNOPSIS

12        use MCE::Hobo;
13
14        MCE::Hobo->init(
15            max_workers => 'auto',   # default undef, unlimited
16            hobo_timeout => 20,      # default undef, no timeout
17            posix_exit => 1,         # default undef, CORE::exit
18            void_context => 1,       # default undef
19            on_start => sub {
20                my ( $pid, $ident ) = @_;
21                ...
22            },
23            on_finish => sub {
24                my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
25                ...
26            }
27        );
28
29        MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
30
31        sub parallel {
32            my ($arg1) = @_;
33            print "Hello again, $arg1\n" if defined($arg1);
34            print "Hello again, $_\n"; # same thing
35        }
36
37        MCE::Hobo->create( \&parallel, $_ ) for 1 .. 3;
38
39        my @hobos    = MCE::Hobo->list();
40        my @pids     = MCE::Hobo->list_pids();
41        my @running  = MCE::Hobo->list_running();
42        my @joinable = MCE::Hobo->list_joinable();
43        my @count    = MCE::Hobo->pending();
44
45        # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
46        $_->join() for @hobos;   # (or)
47        $_->join() for @joinable;
48
49        # Joining occurs immediately as hobo processes complete execution.
50        1 while MCE::Hobo->wait_one();
51
52        my $hobo = mce_async { foreach (@files) { ... } };
53
54        $hobo->join();
55
56        if ( my $err = $hobo->error() ) {
57            warn "Hobo error: $err\n";
58        }
59
60        # Get a hobo's object
61        $hobo = MCE::Hobo->self();
62
63        # Get a hobo's ID
64        $pid = MCE::Hobo->pid();  # $$
65        $pid = $hobo->pid();
66        $pid = MCE::Hobo->tid();  # tid is an alias for pid
67        $pid = $hobo->tid();
68
69        # Test hobo objects
70        if ( $hobo1 == $hobo2 ) {
71            ...
72        }
73
74        # Give other workers a chance to run
75        MCE::Hobo->yield();
76        MCE::Hobo->yield(0.05);
77
78        # Return context, wantarray aware
79        my ($value1, $value2) = $hobo->join();
80        my $value = $hobo->join();
81
82        # Check hobo's state
83        if ( $hobo->is_running() ) {
84            sleep 1;
85        }
86        if ( $hobo->is_joinable() ) {
87            $hobo->join();
88        }
89
90        # Send a signal to a hobo
91        $hobo->kill('SIGUSR1');
92
93        # Exit a hobo
94        MCE::Hobo->exit(0);
95        MCE::Hobo->exit(0, @ret);  # MCE::Hobo 1.827+
96

DESCRIPTION

98       A hobo is a migratory worker inside the machine that carries the
99       asynchronous gene. Hobo processes are equipped with "threads"-like
100       capability for running code asynchronously. Unlike threads, each hobo
101       is a unique process to the underlying OS. The IPC is managed by
102       "MCE::Shared", which runs on all the major platforms including Cygwin
103       and Strawberry Perl.
104
105       An exception was made on the Windows platform to spawn threads versus
106       children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
107       release reverts back to spawning children on all supported platforms.
108
109       "MCE::Hobo" may be used as a standalone or together with "MCE"
110       including running alongside "threads".
111
112        use MCE::Hobo;
113        use MCE::Shared;
114
115        # synopsis: head -20 file.txt | perl script.pl
116
117        my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
118        my $ofh = MCE::Shared->handle( ">", \*STDOUT );
119        my $ary = MCE::Shared->array();
120
121        sub parallel_task {
122            my ( $id ) = @_;
123            while ( <$ifh> ) {
124                printf {$ofh} "[ %4d ] %s", $., $_;
125              # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
126                $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
127            }
128        }
129
130        my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
131        my $hobo2 = MCE::Hobo->new( \&parallel_task, 2 );
132        my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
133
134        $_->join for MCE::Hobo->list();  # ditto: MCE::Hobo->wait_all();
135
136        # search array (total one round-trip via IPC)
137        my @vals = $ary->vals( "val =~ / ID 2 /" );
138
139        print {*STDERR} join("", @vals);
140

API DOCUMENTATION

142       $hobo = MCE::Hobo->create( FUNCTION, ARGS )
143       $hobo = MCE::Hobo->new( FUNCTION, ARGS )
144          This will create a new hobo process that will begin execution with
145          function as the entry point, and optionally ARGS for list of
146          parameters. It will return the corresponding MCE::Hobo object, or
147          undef if hobo creation failed.
148
149          FUNCTION may either be the name of a function, an anonymous
150          subroutine, or a code ref.
151
152           my $hobo = MCE::Hobo->create( "func_name", ... );
153               # or
154           my $hobo = MCE::Hobo->create( sub { ... }, ... );
155               # or
156           my $hobo = MCE::Hobo->create( \&func, ... );
157
158       $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
159       $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
160          Options, excluding "ident", may be specified globally via the "init"
161          function.  Otherwise, "ident", "hobo_timeout", "posix_exit", and
162          "void_context" may be set uniquely.
163
164          The "ident" option, available since 1.827, is used by callback
165          functions "on_start" and "on_finish" for identifying the started and
166          finished hobo process respectively.
167
168           my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
169               ...
170           } );
171
172           $hobo1->join;
173
174           my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
175               sleep 1 for ( 1 .. 9 );
176           } );
177
178           $hobo2->join;
179
180           if ( $hobo2->error() eq "Hobo timed out\n" ) {
181               ...
182           }
183
184          The "new()" method is an alias for "create()".
185
186       mce_async { BLOCK } ARGS;
187       mce_async { BLOCK };
188          "mce_async" runs the block asynchronously similarly to
189          "MCE::Hobo->create()".  It returns the hobo object, or undef if hobo
190          creation failed.
191
192           my $hobo = mce_async { foreach (@files) { ... } };
193
194           $hobo->join();
195
196           if ( my $err = $hobo->error() ) {
197               warn("Hobo error: $err\n");
198           }
199
200       $hobo->join()
201          This will wait for the corresponding hobo process to complete its
202          execution.  In non-voided context, "join()" will return the value(s)
203          of the entry point function.
204
205          The context (void, scalar or list) for the return value(s) for
206          "join" is determined at the time of joining and mostly "wantarray"
207          aware.
208
209           my $hobo1 = MCE::Hobo->create( sub {
210               my @res = qw(foo bar baz);
211               return (@res);
212           });
213
214           my @res1 = $hobo1->join();  # ( foo, bar, baz )
215           my $res1 = $hobo1->join();  #   baz
216
217           my $hobo2 = MCE::Hobo->create( sub {
218               return 'foo';
219           });
220
221           my @res2 = $hobo2->join();  # ( foo )
222           my $res2 = $hobo2->join();  #   foo
223
224       $hobo1->equal( $hobo2 )
225          Tests if two hobo objects are the same hobo or not. Hobo comparison
226          is based on process IDs. This is overloaded to the more natural
227          forms.
228
229           if ( $hobo1 == $hobo2 ) {
230               print("Hobo objects are the same\n");
231           }
232           # or
233           if ( $hobo1 != $hobo2 ) {
234               print("Hobo objects differ\n");
235           }
236
237       $hobo->error()
238          Hobo processes are executed in an "eval" context. This method will
239          return "undef" if the hobo terminates normally. Otherwise, it
240          returns the value of $@ associated with the hobo's execution status
241          in its "eval" context.
242
243       $hobo->exit()
244          This sends 'SIGQUIT' to the hobo process, notifying the hobo to
245          exit.  It returns the hobo object to allow for method chaining. It
246          is important to join later if not immediately to not leave a zombie
247          or defunct process.
248
249           $hobo->exit()->join();
250           ...
251
252           $hobo->join();  # later
253
254       MCE::Hobo->exit( 0 )
255       MCE::Hobo->exit( 0, @ret )
256          A hobo can exit at any time by calling "MCE::Hobo->exit()".
257          Otherwise, the behavior is the same as "exit(status)" when called
258          from the main process. Current since 1.827, the hobo process may
259          optionally return data, to be sent via IPC.
260
261       MCE::Hobo->finish()
262          This class method is called automatically by "END", but may be
263          called explicitly. An error is emitted via croak if there are active
264          hobo processes not yet joined.
265
266           MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
267           $_->join for MCE::Hobo->list();
268
269           MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
270           $_->join for MCE::Hobo->list();
271
272           MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
273           $_->join for MCE::Hobo->list();
274
275           MCE::Hobo->finish();
276
277       MCE::Hobo->init( options )
278          The init function accepts a list of MCE::Hobo options.
279
280           MCE::Hobo->init(
281               max_workers => 'auto',   # default undef, unlimited
282               hobo_timeout => 20,      # default undef, no timeout
283               posix_exit => 1,         # default undef, CORE::exit
284               void_context => 1,       # default undef
285               on_start => sub {
286                   my ( $pid, $ident ) = @_;
287                   ...
288               },
289               on_finish => sub {
290                   my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
291                   ...
292               }
293           );
294
295           # Identification given as an option or the 1st argument.
296           # Current API available since 1.827.
297
298           for my $key ( 'aa' .. 'zz' ) {
299               MCE::Hobo->create( { ident => $key }, sub { ... } );
300               MCE::Hobo->create( $key, sub { ... } );
301           }
302
303           MCE::Hobo->wait_all;
304
305          Set "max_workers" if you want to limit the number of workers by
306          waiting automatically for an available slot. Specify "auto" to
307          obtain the number of logical cores via "MCE::Util::get_ncpu()".
308
309          Set "hobo_timeout", in number of seconds, if you want the hobo
310          process to terminate after some time. The default is 0 for no
311          timeout.
312
313          Set "posix_exit" to avoid all END and destructor processing.
314          Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
315          FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
316          Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
317
318          Set "void_context" to create the hobo process in void context for
319          the return value. Otherwise, the return context is wantarray-aware
320          for "join()" and "result()" and determined when retrieving the data.
321
322          The callback options "on_start" and "on_finish" are called in the
323          parent process after starting the worker and later when terminated.
324          The arguments for the subroutines were inspired by
325          Parallel::ForkManager.
326
327          The parameters for "on_start" are the following:
328
329           - pid of the hobo process
330           - identification (ident option or 1st arg to create)
331
332          The parameters for "on_finish" are the following:
333
334           - pid of the hobo process
335           - program exit code
336           - identification (ident option or 1st arg to create)
337           - exit signal id
338           - error message from eval inside MCE::Hobo
339           - returned data
340
341       $hobo->is_running()
342          Returns true if a hobo is still running.
343
344       $hobo->is_joinable()
345          Returns true if the hobo has finished running and not yet joined.
346
347       $hobo->kill( 'SIG...' )
348          Sends the specified signal to the hobo. Returns the hobo object to
349          allow for method chaining. As with "exit", it is important to join
350          eventually if not immediately to not leave a zombie or defunct
351          process.
352
353           $hobo->kill('SIG...')->join();
354
355          The following is a parallel demonstration comparing "MCE::Shared"
356          against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
357          after all workers have been notified to quit.
358
359           use Time::HiRes qw(time);
360
361           use Redis;
362           use Redis::Fast;
363
364           use MCE::Hobo;
365           use MCE::Shared;
366
367           my $redis = Redis->new();
368           my $rfast = Redis::Fast->new();
369           my $array = MCE::Shared->array();
370
371           sub parallel_redis {
372               my ($_redis) = @_;
373               my ($count, $quit, $len) = (0, 0);
374
375               # instead, use a flag to exit loop
376               $SIG{'QUIT'} = sub { $quit = 1 };
377
378               while () {
379                   $len = $_redis->rpush('list', $count++);
380                   last if $quit;
381               }
382
383               $count;
384           }
385
386           sub parallel_array {
387               my ($count, $quit, $len) = (0, 0);
388
389               # do not exit from inside handler
390               $SIG{'QUIT'} = sub { $quit = 1 };
391
392               while () {
393                   $len = $array->push($count++);
394                   last if $quit;
395               }
396
397               $count;
398           }
399
400           sub benchmark_this {
401               my ($desc, $num_procs, $timeout, $code, @args) = @_;
402               my ($start, $total) = (time(), 0);
403
404               MCE::Hobo->new($code, @args) for 1..$num_procs;
405               sleep $timeout;
406
407               # joining is not immediate; ok
408               $_->kill('QUIT') for MCE::Hobo->list();
409
410               # joining later; ok
411               $total += $_->join() for MCE::Hobo->list();
412
413               printf "$desc <> duration: %0.03f secs, count: $total\n",
414                   time() - $start;
415
416               sleep 0.2;
417           }
418
419           benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
420           benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
421           benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
422
423       MCE::Hobo->list()
424          Returns a list of all hobo objects not yet joined.
425
426           @hobos = MCE::Hobo->list();
427
428       MCE::Hobo->list_pids()
429          Returns a list of all hobo pids not yet joined (available since
430          1.849).
431
432           @pids = MCE::Hobo->list_pids();
433
434           $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
435               # Signal workers and the shared manager all at once
436               CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
437               exec('reset');
438           };
439
440       MCE::Hobo->list_running()
441          Returns a list of all hobo objects that are still running.
442
443           @hobos = MCE::Hobo->list_running();
444
445       MCE::Hobo->list_joinable()
446          Returns a list of all hobo objects that have completed running.
447          Thus, ready to be joined without blocking.
448
449           @hobos = MCE::Hobo->list_joinable();
450
451       MCE::Hobo->max_workers([ N ])
452          Getter and setter for max_workers. Specify a number or 'auto' to
453          acquire the total number of cores via MCE::Util::get_ncpu. Specify a
454          false value to set back to no limit.
455
456          API available since 1.835.
457
458       MCE::Hobo->pending()
459          Returns a count of all hobo objects not yet joined.
460
461           $count = MCE::Hobo->pending();
462
463       $hobo->result()
464          Returns the result obtained by "join", "wait_one", or "wait_all". If
465          the process has not yet exited, waits for the corresponding hobo to
466          complete its execution.
467
468           use MCE::Hobo;
469           use Time::HiRes qw(sleep);
470
471           sub task {
472               my ($id) = @_;
473               sleep $id * 0.333;
474               return $id;
475           }
476
477           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
478
479           # 1 while MCE::Hobo->wait_one();
480
481           while ( my $hobo = MCE::Hobo->wait_one() ) {
482               my $err = $hobo->error() || 'no error';
483               my $res = $hobo->result();
484               my $pid = $hobo->pid();
485
486               print "[$pid] $err : $res\n";
487           }
488
489          Like "join" described above, the context (void, scalar or list) for
490          the return value(s) is determined at the time "result" is called and
491          mostly "wantarray" aware.
492
493           my $hobo1 = MCE::Hobo->create( sub {
494               my @res = qw(foo bar baz);
495               return (@res);
496           });
497
498           my @res1 = $hobo1->result();  # ( foo, bar, baz )
499           my $res1 = $hobo1->result();  #   baz
500
501           my $hobo2 = MCE::Hobo->create( sub {
502               return 'foo';
503           });
504
505           my @res2 = $hobo2->result();  # ( foo )
506           my $res2 = $hobo2->result();  #   foo
507
508       MCE::Hobo->self()
509          Class method that allows a hobo to obtain it's own MCE::Hobo object.
510
511       $hobo->pid()
512       $hobo->tid()
513          Returns the ID of the hobo.
514
515           pid: $$  process id
516           tid: $$  alias for pid
517
518       MCE::Hobo->pid()
519       MCE::Hobo->tid()
520          Class methods that allows a hobo to obtain its own ID.
521
522           pid: $$  process id
523           tid: $$  alias for pid
524
525       MCE::Hobo->wait_one()
526       MCE::Hobo->waitone()
527       MCE::Hobo->wait_all()
528       MCE::Hobo->waitall()
529          Meaningful for the manager process only, waits for one or all hobo
530          processes to complete execution. Afterwards, returns the
531          corresponding hobo objects.  If a hobo doesn't exist, returns the
532          "undef" value or an empty list for "wait_one" and "wait_all"
533          respectively.
534
535          The "waitone" and "waitall" methods are aliases since 1.827 for
536          backwards compatibility.
537
538           use MCE::Hobo;
539           use Time::HiRes qw(sleep);
540
541           sub task {
542               my $id = shift;
543               sleep $id * 0.333;
544               return $id;
545           }
546
547           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
548
549           # join, traditional use case
550           $_->join() for MCE::Hobo->list();
551
552           # wait_one, simplistic use case
553           1 while MCE::Hobo->wait_one();
554
555           # wait_one
556           while ( my $hobo = MCE::Hobo->wait_one() ) {
557               my $err = $hobo->error() || 'no error';
558               my $res = $hobo->result();
559               my $pid = $hobo->pid();
560
561               print "[$pid] $err : $res\n";
562           }
563
564           # wait_all
565           my @hobos = MCE::Hobo->wait_all();
566
567           for ( @hobos ) {
568               my $err = $_->error() || 'no error';
569               my $res = $_->result();
570               my $pid = $_->pid();
571
572               print "[$pid] $err : $res\n";
573           }
574
575       MCE::Hobo->yield( [ floating_seconds ] )
576          Prior API till 1.826.
577
578          Let this hobo yield CPU time to other workers. By default, the class
579          method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
580          including Cygwin.
581
582           MCE::Hobo->yield();
583           MCE::Hobo->yield(0.05);
584
585           # total run time: 0.25 seconds, sleep occuring in parallel
586
587           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
588           MCE::Hobo->wait_all();
589
590          Current API available since 1.827.
591
592          Give other workers a chance to run, optionally for given time. Yield
593          behaves similarly to MCE's interval option. It throttles workers
594          from running too fast.  A demonstration is provided in the next
595          section for fetching URLs in parallel.
596
597          The default "floating_seconds" is 0.008 and 0.015 on UNIX and
598          Windows, respectively. Pass 0 if you want to give other workers a
599          chance to run.
600
601           # total run time: 1.00 second
602
603           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
604           MCE::Hobo->wait_all();
605

PARALLEL::FORKMANAGER-like DEMONSTRATION

607       MCE::Hobo behaves similarly to threads for the most part. It also
608       provides Parallel::ForkManager-like capabilities. The
609       "Parallel::ForkManager" example is shown first followed by a version
610       using "MCE::Hobo".
611
612       Parallel::ForkManager
613           use strict;
614           use warnings;
615
616           use Parallel::ForkManager;
617           use Time::HiRes 'time';
618
619           my $start = time;
620
621           my $pm = Parallel::ForkManager->new(10);
622           $pm->set_waitpid_blocking_sleep(0);
623
624           $pm->run_on_finish( sub {
625               my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
626               print "child $pid completed: $ident => ", $resp->[0], "\n";
627           });
628
629           DATA_LOOP:
630           foreach my $data ( 1..2000 ) {
631               # forks and returns the pid for the child
632               my $pid = $pm->start($data) and next DATA_LOOP;
633               my $ret = [ $data * 2 ];
634
635               $pm->finish(0, $ret);
636           }
637
638           $pm->wait_all_children;
639
640           printf STDERR "duration: %0.03f seconds\n", time - $start;
641
642       MCE::Hobo
643           use strict;
644           use warnings;
645
646           use MCE::Hobo 1.843;
647           use Time::HiRes 'time';
648
649           my $start = time;
650
651           MCE::Hobo->init(
652               max_workers => 10,
653               on_finish   => sub {
654                   my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
655                   print "child $pid completed: $ident => ", $resp->[0], "\n";
656               }
657           );
658
659           foreach my $data ( 1..2000 ) {
660               MCE::Hobo->create( $data, sub {
661                   [ $data * 2 ];
662               });
663           }
664
665           MCE::Hobo->wait_all;
666
667           printf STDERR "duration: %0.03f seconds\n", time - $start;
668
669       Time to spin 2,000 workers and obtain results (in seconds).
670          Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
671          Boost).  Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
672          with Moo loaded at the top of the script.
673
674           MCE::Hobo uses MCE::Shared to retrieve data during reaping.
675           MCE::Child uses MCE::Channel, no shared-manager.
676
677                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
678
679           MCE::Child 1.843  19.099s  17.091s  0.965s  1.534s  1.229s
680            MCE::Hobo 1.843  20.514s  19.594s  1.246s  1.629s  1.613s
681                P::FM 1.20   19.703s  19.235s  0.875s  1.445s  1.346s
682
683           MCE::Child 1.843  20.426s  18.417s  1.116s  1.632s  1.338s  Moo loaded
684            MCE::Hobo 1.843  21.809s  20.810s  1.407s  1.759s  1.722s  Moo loaded
685                P::FM 2.02   21.668s  25.927s  1.882s  2.612s  2.483s  Moo used
686
687       Set posix_exit to avoid all END and destructor processing.
688          This is helpful for reducing overhead when workers exit. Ditto if
689          using a Perl module not parallel safe. The option is ignored on
690          Windows "$^O eq 'MSWin32'".
691
692           MCE::Child->init( posix_exit => 1, ... );
693            MCE::Hobo->init( posix_exit => 1, ... );
694
695                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
696
697           MCE::Child 1.843  19.815s  ignored  0.824s  1.284s  1.245s  Moo loaded
698            MCE::Hobo 1.843  21.029s  ignored  0.953s  1.335s  1.439s  Moo loaded
699

PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT

701       This demonstration constructs two queues, two handles, starts the
702       shared-manager process if needed, and spawns four workers.  For this
703       demonstration, am chunking 64 URLs per job. In reality, one may run
704       with 200 workers and chunk 300 URLs on a 24-way box.
705
706        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
707        # perl demo.pl              -- all output
708        # perl demo.pl  >/dev/null  -- mngr/hobo output
709        # perl demo.pl 2>/dev/null  -- show results only
710        #
711        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
712
713        use strict;
714        use warnings;
715
716        use AnyEvent;
717        use AnyEvent::HTTP;
718        use Time::HiRes qw( time );
719
720        use MCE::Hobo;
721        use MCE::Shared;
722
723        # Construct two queues, input and return.
724
725        my $que = MCE::Shared->queue();
726        my $ret = MCE::Shared->queue();
727
728        # Construct shared handles for serializing output from many workers
729        # writing simultaneously. This prevents garbled output.
730
731        mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
732        mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
733
734        # Spawn workers early for minimum memory consumption.
735
736        MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
737
738        # Obtain or generate input data for workers to process.
739
740        my ( $count, @urls ) = ( 0 );
741
742        push @urls, map { "http://127.0.0.$_/"   } 1..254;
743        push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
744
745        while ( @urls ) {
746            my @chunk = splice(@urls, 0, 64);
747            $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
748        }
749
750        # So that workers leave the loop after consuming the queue.
751
752        $que->end();
753
754        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
755        # Loop for the manager process. The manager may do other work if
756        # need be and periodically check $ret->pending() not shown here.
757        #
758        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
759
760        my $start = time;
761
762        printf {$ERR} "Mngr - entering loop\n";
763
764        while ( $count ) {
765            my ( $result, $failed ) = $ret->dequeue( 2 );
766
767            # Remove ID from result, so not treated as a URL item.
768
769            printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
770
771            # Display the URL and the size captured.
772
773            foreach my $url ( keys %{ $result } ) {
774                printf {$OUT} "%s: %d\n", $url, length($result->{$url})
775                    if $result->{$url};  # url has content
776            }
777
778            # Display URLs could not reach.
779
780            if ( @{ $failed } ) {
781                foreach my $url ( @{ $failed } ) {
782                    print {$OUT} "Failed: $url\n";
783                }
784            }
785
786            # Decrement the count.
787
788            $count--;
789        }
790
791        MCE::Hobo->wait_all();
792
793        printf {$ERR} "Mngr - exiting loop\n\n";
794        printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
795
796        exit;
797
798        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
799        # Hobo processes enqueue two items ( $result and $failed ) per each
800        # job for the manager process. Likewise, the manager process dequeues
801        # two items above. Optionally, hobo processes may include the ID in
802        # the result.
803        #
804        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
805
806        sub task {
807            my ( $id ) = @_;
808            printf {$ERR} "Hobo $id entering loop\n";
809
810            while ( my $job = $que->dequeue() ) {
811                my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
812
813                # Walk URLs, provide a hash and array refs for data.
814
815                printf {$ERR} "Hobo $id running  job $job->{ID}\n";
816                walk( $job, $result, $failed );
817
818                # Send results to the manager process.
819
820                $ret->enqueue( $result, $failed );
821            }
822
823            printf {$ERR} "Hobo $id exiting loop\n";
824        }
825
826        sub walk {
827            my ( $job, $result, $failed ) = @_;
828
829            # Yielding is critical when running an event loop in parallel.
830            # Not doing so means that the app may reach contention points
831            # with the firewall and likely impose unnecessary hardship at
832            # the OS level. The idea here is not to have multiple workers
833            # initiate HTTP requests to a batch of URLs at the same time.
834            # Yielding in 1.827+ behaves similarly like scatter to have
835            # the hobo process run solo for a fraction of time.
836
837            MCE::Hobo->yield( 0.03 );   # MCE::Hobo 1.827+
838
839            my $cv = AnyEvent->condvar();
840
841            # Populate the hash ref for the URLs it could reach.
842            # Do not mix AnyEvent timeout with hobo timeout.
843            # Therefore, choose event timeout when available.
844
845            foreach my $url ( @{ $job->{INPUT} } ) {
846                $cv->begin();
847                http_get $url, timeout => 2, sub {
848                    my ( $data, $headers ) = @_;
849                    $result->{$url} = $data;
850                    $cv->end();
851                };
852            }
853
854            $cv->recv();
855
856            # Populate the array ref for URLs it could not reach.
857
858            foreach my $url ( @{ $job->{INPUT} } ) {
859                push @{ $failed }, $url unless (exists $result->{ $url });
860            }
861
862            return;
863        }
864
865        __END__
866
867        $ perl demo.pl
868
869        Hobo 1 entering loop
870        Hobo 2 entering loop
871        Hobo 3 entering loop
872        Mngr - entering loop
873        Hobo 2 running  job 2
874        Hobo 3 running  job 3
875        Hobo 1 running  job 1
876        Hobo 4 entering loop
877        Hobo 4 running  job 4
878        Hobo 2 running  job 5
879        Mngr - received job 2
880        Hobo 3 running  job 6
881        Mngr - received job 3
882        Hobo 1 running  job 7
883        Mngr - received job 1
884        Hobo 4 running  job 8
885        Mngr - received job 4
886        http://192.168.0.1/: 3729
887        Hobo 2 exiting loop
888        Mngr - received job 5
889        Hobo 3 exiting loop
890        Mngr - received job 6
891        Hobo 1 exiting loop
892        Mngr - received job 7
893        Hobo 4 exiting loop
894        Mngr - received job 8
895        Mngr - exiting loop
896
897        Duration: 4.131 seconds
898

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

900       Making an executable is possible with the PAR::Packer module.  On the
901       Windows platform, threads, threads::shared, and exiting via threads are
902       necessary for the binary to exit successfully.
903
904        # https://metacpan.org/pod/PAR::Packer
905        # https://metacpan.org/pod/pp
906        #
907        #   pp -o demo.exe demo.pl
908        #   ./demo.exe
909
910        use strict;
911        use warnings;
912
913        use if $^O eq "MSWin32", "threads";
914        use if $^O eq "MSWin32", "threads::shared";
915
916        # Include minimum dependencies for MCE::Hobo.
917        # Add other modules required by your application here.
918
919        use Storable ();
920        use Time::HiRes ();
921
922        # use IO::FDPass ();  # optional: for condvar, handle, queue
923        # use Sereal ();      # optional: for faster serialization
924
925        use MCE::Hobo;
926        use MCE::Shared;
927
928        # For PAR to work on the Windows platform, one must include manually
929        # any shared modules used by the application.
930
931        # use MCE::Shared::Array;    # if using MCE::Shared->array
932        # use MCE::Shared::Cache;    # if using MCE::Shared->cache
933        # use MCE::Shared::Condvar;  # if using MCE::Shared->condvar
934        # use MCE::Shared::Handle;   # if using MCE::Shared->handle, mce_open
935        # use MCE::Shared::Hash;     # if using MCE::Shared->hash
936        # use MCE::Shared::Minidb;   # if using MCE::Shared->minidb
937        # use MCE::Shared::Ordhash;  # if using MCE::Shared->ordhash
938        # use MCE::Shared::Queue;    # if using MCE::Shared->queue
939        # use MCE::Shared::Scalar;   # if using MCE::Shared->scalar
940
941        # Et cetera. Only load modules needed for your application.
942
943        use MCE::Shared::Sequence;   # if using MCE::Shared->sequence
944
945        my $seq = MCE::Shared->sequence( 1, 9 );
946
947        sub task {
948            my ( $id ) = @_;
949            while ( defined ( my $num = $seq->next() ) ) {
950                print "$id: $num\n";
951                sleep 1;
952            }
953        }
954
955        sub main {
956            MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
957            MCE::Hobo->wait_all();
958        }
959
960        # Main must run inside a thread on the Windows platform or workers
961        # will fail duing exiting, causing the exe to crash. The reason is
962        # that PAR or a dependency isn't multi-process safe.
963
964        ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
965
966        threads->exit(0) if $INC{"threads.pm"};
967

CREDITS

969       The inspiration for "MCE::Hobo" comes from wanting "threads"-like
970       behavior for processes. Both can run side-by-side including safe-use by
971       MCE workers.  Likewise, the documentation resembles "threads".
972
973       The inspiration for "wait_all" and "wait_one" comes from the
974       "Parallel::WorkUnit" module.
975

SEE ALSO

977       ·  forks
978
979       ·  forks::BerkeleyDB
980
981       ·  MCE::Child
982
983       ·  Parallel::ForkManager
984
985       ·  Parallel::Loops
986
987       ·  Parallel::Prefork
988
989       ·  Parallel::WorkUnit
990
991       ·  Proc::Fork
992
993       ·  Thread::Tie
994
995       ·  threads
996

INDEX

998       MCE, MCE::Channel, MCE::Shared
999

AUTHOR

1001       Mario E. Roy, <marioeroy AT gmail DOT com>
1002
1003
1004
1005perl v5.30.1                      2020-01-30                      MCE::Hobo(3)
Impressum