1MCE::Hobo(3)          User Contributed Perl Documentation         MCE::Hobo(3)
2
3
4

NAME

6       MCE::Hobo - A threads-like parallelization module
7

VERSION

9       This document describes MCE::Hobo version 1.880
10

SYNOPSIS

12        use MCE::Hobo;
13
14        MCE::Hobo->init(
15            max_workers => 'auto',   # default undef, unlimited
16
17            # Specify a percentage. MCE::Hobo 1.874+.
18            max_workers => '25%',    # 4 on HW with 16 lcores
19            max_workers => '50%',    # 8 on HW with 16 lcores
20
21            hobo_timeout => 20,      # default undef, no timeout
22            posix_exit => 1,         # default undef, CORE::exit
23            void_context => 1,       # default undef
24
25            on_start => sub {
26                my ( $pid, $ident ) = @_;
27                ...
28            },
29            on_finish => sub {
30                my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
31                ...
32            }
33        );
34
35        MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
36
37        sub parallel {
38            my ($arg1) = @_;
39            print "Hello again, $arg1\n" if defined($arg1);
40            print "Hello again, $_\n"; # same thing
41        }
42
43        MCE::Hobo->create( \&parallel, $_ ) for 1 .. 3;
44
45        my @hobos    = MCE::Hobo->list();
46        my @pids     = MCE::Hobo->list_pids();
47        my @running  = MCE::Hobo->list_running();
48        my @joinable = MCE::Hobo->list_joinable();
49        my @count    = MCE::Hobo->pending();
50
51        # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
52        $_->join() for @hobos;   # (or)
53        $_->join() for @joinable;
54
55        # Joining occurs immediately as hobo processes complete execution.
56        1 while MCE::Hobo->wait_one();
57
58        my $hobo = mce_async { foreach (@files) { ... } };
59
60        $hobo->join();
61
62        if ( my $err = $hobo->error() ) {
63            warn "Hobo error: $err\n";
64        }
65
66        # Get a hobo's object
67        $hobo = MCE::Hobo->self();
68
69        # Get a hobo's ID
70        $pid = MCE::Hobo->pid();  # $$
71        $pid = $hobo->pid();
72        $pid = MCE::Hobo->tid();  # tid is an alias for pid
73        $pid = $hobo->tid();
74
75        # Test hobo objects
76        if ( $hobo1 == $hobo2 ) {
77            ...
78        }
79
80        # Give other workers a chance to run
81        MCE::Hobo->yield();
82        MCE::Hobo->yield(0.05);
83
84        # Return context, wantarray aware
85        my ($value1, $value2) = $hobo->join();
86        my $value = $hobo->join();
87
88        # Check hobo's state
89        if ( $hobo->is_running() ) {
90            sleep 1;
91        }
92        if ( $hobo->is_joinable() ) {
93            $hobo->join();
94        }
95
96        # Send a signal to a hobo
97        $hobo->kill('SIGUSR1');
98
99        # Exit a hobo
100        MCE::Hobo->exit(0);
101        MCE::Hobo->exit(0, @ret);  # MCE::Hobo 1.827+
102

DESCRIPTION

104       A hobo is a migratory worker inside the machine that carries the
105       asynchronous gene. Hobo processes are equipped with "threads"-like
106       capability for running code asynchronously. Unlike threads, each hobo
107       is a unique process to the underlying OS. The IPC is managed by
108       "MCE::Shared", which runs on all the major platforms including Cygwin
109       and Strawberry Perl.
110
111       An exception was made on the Windows platform to spawn threads versus
112       children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
113       release reverts back to spawning children on all supported platforms.
114
115       "MCE::Hobo" may be used as a standalone or together with "MCE"
116       including running alongside "threads".
117
118        use MCE::Hobo;
119        use MCE::Shared;
120
121        # synopsis: head -20 file.txt | perl script.pl
122
123        my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
124        my $ofh = MCE::Shared->handle( ">", \*STDOUT );
125        my $ary = MCE::Shared->array();
126
127        sub parallel_task {
128            my ( $id ) = @_;
129            while ( <$ifh> ) {
130                printf {$ofh} "[ %4d ] %s", $., $_;
131              # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
132                $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
133            }
134        }
135
136        my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
137        my $hobo2 = MCE::Hobo->new( \&parallel_task, 2 );
138        my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
139
140        $_->join for MCE::Hobo->list();  # ditto: MCE::Hobo->wait_all();
141
142        # search array (total one round-trip via IPC)
143        my @vals = $ary->vals( "val =~ / ID 2 /" );
144
145        print {*STDERR} join("", @vals);
146

API DOCUMENTATION

148       $hobo = MCE::Hobo->create( FUNCTION, ARGS )
149       $hobo = MCE::Hobo->new( FUNCTION, ARGS )
150          This will create a new hobo process that will begin execution with
151          function as the entry point, and optionally ARGS for list of
152          parameters. It will return the corresponding MCE::Hobo object, or
153          undef if hobo creation failed.
154
155          FUNCTION may either be the name of a function, an anonymous
156          subroutine, or a code ref.
157
158           my $hobo = MCE::Hobo->create( "func_name", ... );
159               # or
160           my $hobo = MCE::Hobo->create( sub { ... }, ... );
161               # or
162           my $hobo = MCE::Hobo->create( \&func, ... );
163
164       $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
165       $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
166          Options, excluding "ident", may be specified globally via the "init"
167          function.  Otherwise, "ident", "hobo_timeout", "posix_exit", and
168          "void_context" may be set uniquely.
169
170          The "ident" option, available since 1.827, is used by callback
171          functions "on_start" and "on_finish" for identifying the started and
172          finished hobo process respectively.
173
174           my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
175               ...
176           } );
177
178           $hobo1->join;
179
180           my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
181               sleep 1 for ( 1 .. 9 );
182           } );
183
184           $hobo2->join;
185
186           if ( $hobo2->error() eq "Hobo timed out\n" ) {
187               ...
188           }
189
190          The new() method is an alias for create().
191
192       mce_async { BLOCK } ARGS;
193       mce_async { BLOCK };
194          "mce_async" runs the block asynchronously similarly to
195          "MCE::Hobo->create()".  It returns the hobo object, or undef if hobo
196          creation failed.
197
198           my $hobo = mce_async { foreach (@files) { ... } };
199
200           $hobo->join();
201
202           if ( my $err = $hobo->error() ) {
203               warn("Hobo error: $err\n");
204           }
205
206       $hobo->join()
207          This will wait for the corresponding hobo process to complete its
208          execution.  In non-voided context, join() will return the value(s)
209          of the entry point function.
210
211          The context (void, scalar or list) for the return value(s) for
212          "join" is determined at the time of joining and mostly "wantarray"
213          aware.
214
215           my $hobo1 = MCE::Hobo->create( sub {
216               my @res = qw(foo bar baz);
217               return (@res);
218           });
219
220           my @res1 = $hobo1->join();  # ( foo, bar, baz )
221           my $res1 = $hobo1->join();  #   baz
222
223           my $hobo2 = MCE::Hobo->create( sub {
224               return 'foo';
225           });
226
227           my @res2 = $hobo2->join();  # ( foo )
228           my $res2 = $hobo2->join();  #   foo
229
230       $hobo1->equal( $hobo2 )
231          Tests if two hobo objects are the same hobo or not. Hobo comparison
232          is based on process IDs. This is overloaded to the more natural
233          forms.
234
235           if ( $hobo1 == $hobo2 ) {
236               print("Hobo objects are the same\n");
237           }
238           # or
239           if ( $hobo1 != $hobo2 ) {
240               print("Hobo objects differ\n");
241           }
242
243       $hobo->error()
244          Hobo processes are executed in an "eval" context. This method will
245          return "undef" if the hobo terminates normally. Otherwise, it
246          returns the value of $@ associated with the hobo's execution status
247          in its "eval" context.
248
249       $hobo->exit()
250          This sends 'SIGQUIT' to the hobo process, notifying the hobo to
251          exit.  It returns the hobo object to allow for method chaining. It
252          is important to join later if not immediately to not leave a zombie
253          or defunct process.
254
255           $hobo->exit()->join();
256           ...
257
258           $hobo->join();  # later
259
260       MCE::Hobo->exit( 0 )
261       MCE::Hobo->exit( 0, @ret )
262          A hobo can exit at any time by calling "MCE::Hobo->exit()".
263          Otherwise, the behavior is the same as exit(status) when called from
264          the main process. Current since 1.827, the hobo process may
265          optionally return data, to be sent via IPC.
266
267       MCE::Hobo->finish()
268          This class method is called automatically by "END", but may be
269          called explicitly. An error is emitted via croak if there are active
270          hobo processes not yet joined.
271
272           MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
273           $_->join for MCE::Hobo->list();
274
275           MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
276           $_->join for MCE::Hobo->list();
277
278           MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
279           $_->join for MCE::Hobo->list();
280
281           MCE::Hobo->finish();
282
283       MCE::Hobo->init( options )
284          The init function accepts a list of MCE::Hobo options.
285
286           MCE::Hobo->init(
287               max_workers => 'auto',   # default undef, unlimited
288
289               # Specify a percentage. MCE::Hobo 1.874+.
290               max_workers => '25%',    # 4 on HW with 16 lcores
291               max_workers => '50%',    # 8 on HW with 16 lcores
292
293               hobo_timeout => 20,      # default undef, no timeout
294               posix_exit => 1,         # default undef, CORE::exit
295               void_context => 1,       # default undef
296
297               on_start => sub {
298                   my ( $pid, $ident ) = @_;
299                   ...
300               },
301               on_finish => sub {
302                   my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
303                   ...
304               }
305           );
306
307           # Identification given as an option or the 1st argument.
308           # Current API available since 1.827.
309
310           for my $key ( 'aa' .. 'zz' ) {
311               MCE::Hobo->create( { ident => $key }, sub { ... } );
312               MCE::Hobo->create( $key, sub { ... } );
313           }
314
315           MCE::Hobo->wait_all;
316
317          Set "max_workers" if you want to limit the number of workers by
318          waiting automatically for an available slot. Specify a percentage or
319          "auto" to obtain the number of logical cores via
320          MCE::Util::get_ncpu().
321
322          Set "hobo_timeout", in number of seconds, if you want the hobo
323          process to terminate after some time. The default is 0 for no
324          timeout.
325
326          Set "posix_exit" to avoid all END and destructor processing.
327          Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
328          FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
329          Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
330
331          Set "void_context" to create the hobo process in void context for
332          the return value. Otherwise, the return context is wantarray-aware
333          for join() and result() and determined when retrieving the data.
334
335          The callback options "on_start" and "on_finish" are called in the
336          parent process after starting the worker and later when terminated.
337          The arguments for the subroutines were inspired by
338          Parallel::ForkManager.
339
340          The parameters for "on_start" are the following:
341
342           - pid of the hobo process
343           - identification (ident option or 1st arg to create)
344
345          The parameters for "on_finish" are the following:
346
347           - pid of the hobo process
348           - program exit code
349           - identification (ident option or 1st arg to create)
350           - exit signal id
351           - error message from eval inside MCE::Hobo
352           - returned data
353
354       $hobo->is_running()
355          Returns true if a hobo is still running.
356
357       $hobo->is_joinable()
358          Returns true if the hobo has finished running and not yet joined.
359
360       $hobo->kill( 'SIG...' )
361          Sends the specified signal to the hobo. Returns the hobo object to
362          allow for method chaining. As with "exit", it is important to join
363          eventually if not immediately to not leave a zombie or defunct
364          process.
365
366           $hobo->kill('SIG...')->join();
367
368          The following is a parallel demonstration comparing "MCE::Shared"
369          against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
370          after all workers have been notified to quit.
371
372           use Time::HiRes qw(time);
373
374           use Redis;
375           use Redis::Fast;
376
377           use MCE::Hobo;
378           use MCE::Shared;
379
380           my $redis = Redis->new();
381           my $rfast = Redis::Fast->new();
382           my $array = MCE::Shared->array();
383
384           sub parallel_redis {
385               my ($_redis) = @_;
386               my ($count, $quit, $len) = (0, 0);
387
388               # instead, use a flag to exit loop
389               $SIG{'QUIT'} = sub { $quit = 1 };
390
391               while () {
392                   $len = $_redis->rpush('list', $count++);
393                   last if $quit;
394               }
395
396               $count;
397           }
398
399           sub parallel_array {
400               my ($count, $quit, $len) = (0, 0);
401
402               # do not exit from inside handler
403               $SIG{'QUIT'} = sub { $quit = 1 };
404
405               while () {
406                   $len = $array->push($count++);
407                   last if $quit;
408               }
409
410               $count;
411           }
412
413           sub benchmark_this {
414               my ($desc, $num_procs, $timeout, $code, @args) = @_;
415               my ($start, $total) = (time(), 0);
416
417               MCE::Hobo->new($code, @args) for 1..$num_procs;
418               sleep $timeout;
419
420               # joining is not immediate; ok
421               $_->kill('QUIT') for MCE::Hobo->list();
422
423               # joining later; ok
424               $total += $_->join() for MCE::Hobo->list();
425
426               printf "$desc <> duration: %0.03f secs, count: $total\n",
427                   time() - $start;
428
429               sleep 0.2;
430           }
431
432           benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
433           benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
434           benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
435
436       MCE::Hobo->list()
437          Returns a list of all hobo objects not yet joined.
438
439           @hobos = MCE::Hobo->list();
440
441       MCE::Hobo->list_pids()
442          Returns a list of all hobo pids not yet joined (available since
443          1.849).
444
445           @pids = MCE::Hobo->list_pids();
446
447           $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
448               # Signal workers and the shared manager all at once
449               CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
450               exec('reset');
451           };
452
453       MCE::Hobo->list_running()
454          Returns a list of all hobo objects that are still running.
455
456           @hobos = MCE::Hobo->list_running();
457
458       MCE::Hobo->list_joinable()
459          Returns a list of all hobo objects that have completed running.
460          Thus, ready to be joined without blocking.
461
462           @hobos = MCE::Hobo->list_joinable();
463
464       MCE::Hobo->max_workers([ N ])
465          Getter and setter for max_workers. Specify a number or 'auto' to
466          acquire the total number of cores via MCE::Util::get_ncpu. Specify a
467          false value to set back to no limit.
468
469          API available since 1.835.
470
471       MCE::Hobo->pending()
472          Returns a count of all hobo objects not yet joined.
473
474           $count = MCE::Hobo->pending();
475
476       $hobo->result()
477          Returns the result obtained by "join", "wait_one", or "wait_all". If
478          the process has not yet exited, waits for the corresponding hobo to
479          complete its execution.
480
481           use MCE::Hobo;
482           use Time::HiRes qw(sleep);
483
484           sub task {
485               my ($id) = @_;
486               sleep $id * 0.333;
487               return $id;
488           }
489
490           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
491
492           # 1 while MCE::Hobo->wait_one();
493
494           while ( my $hobo = MCE::Hobo->wait_one() ) {
495               my $err = $hobo->error() || 'no error';
496               my $res = $hobo->result();
497               my $pid = $hobo->pid();
498
499               print "[$pid] $err : $res\n";
500           }
501
502          Like "join" described above, the context (void, scalar or list) for
503          the return value(s) is determined at the time "result" is called and
504          mostly "wantarray" aware.
505
506           my $hobo1 = MCE::Hobo->create( sub {
507               my @res = qw(foo bar baz);
508               return (@res);
509           });
510
511           my @res1 = $hobo1->result();  # ( foo, bar, baz )
512           my $res1 = $hobo1->result();  #   baz
513
514           my $hobo2 = MCE::Hobo->create( sub {
515               return 'foo';
516           });
517
518           my @res2 = $hobo2->result();  # ( foo )
519           my $res2 = $hobo2->result();  #   foo
520
521       MCE::Hobo->self()
522          Class method that allows a hobo to obtain it's own MCE::Hobo object.
523
524       $hobo->pid()
525       $hobo->tid()
526          Returns the ID of the hobo.
527
528           pid: $$  process id
529           tid: $$  alias for pid
530
531       MCE::Hobo->pid()
532       MCE::Hobo->tid()
533          Class methods that allows a hobo to obtain its own ID.
534
535           pid: $$  process id
536           tid: $$  alias for pid
537
538       MCE::Hobo->wait_one()
539       MCE::Hobo->waitone()
540       MCE::Hobo->wait_all()
541       MCE::Hobo->waitall()
542          Meaningful for the manager process only, waits for one or all hobo
543          processes to complete execution. Afterwards, returns the
544          corresponding hobo objects.  If a hobo doesn't exist, returns the
545          "undef" value or an empty list for "wait_one" and "wait_all"
546          respectively.
547
548          The "waitone" and "waitall" methods are aliases since 1.827 for
549          backwards compatibility.
550
551           use MCE::Hobo;
552           use Time::HiRes qw(sleep);
553
554           sub task {
555               my $id = shift;
556               sleep $id * 0.333;
557               return $id;
558           }
559
560           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
561
562           # join, traditional use case
563           $_->join() for MCE::Hobo->list();
564
565           # wait_one, simplistic use case
566           1 while MCE::Hobo->wait_one();
567
568           # wait_one
569           while ( my $hobo = MCE::Hobo->wait_one() ) {
570               my $err = $hobo->error() || 'no error';
571               my $res = $hobo->result();
572               my $pid = $hobo->pid();
573
574               print "[$pid] $err : $res\n";
575           }
576
577           # wait_all
578           my @hobos = MCE::Hobo->wait_all();
579
580           for ( @hobos ) {
581               my $err = $_->error() || 'no error';
582               my $res = $_->result();
583               my $pid = $_->pid();
584
585               print "[$pid] $err : $res\n";
586           }
587
588       MCE::Hobo->yield( [ floating_seconds ] )
589          Prior API till 1.826.
590
591          Let this hobo yield CPU time to other workers. By default, the class
592          method calls sleep(0.008) on UNIX and sleep(0.015) on Windows
593          including Cygwin.
594
595           MCE::Hobo->yield();
596           MCE::Hobo->yield(0.05);
597
598           # total run time: 0.25 seconds, sleep occuring in parallel
599
600           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
601           MCE::Hobo->wait_all();
602
603          Current API available since 1.827.
604
605          Give other workers a chance to run, optionally for given time. Yield
606          behaves similarly to MCE's interval option. It throttles workers
607          from running too fast.  A demonstration is provided in the next
608          section for fetching URLs in parallel.
609
610          The default "floating_seconds" is 0.008 and 0.015 on UNIX and
611          Windows, respectively. Pass 0 if simply wanting to give other
612          workers a chance to run.
613
614           # total run time: 1.00 second
615
616           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
617           MCE::Hobo->wait_all();
618

THREADS-like DETACH CAPABILITY

620       Threads-like detach capability was added starting with the 1.867
621       release.
622
623       A threads example is shown first followed by the MCE::Hobo example. All
624       one needs to do is set the CHLD signal handler to IGNORE.
625       Unfortunately, this works on UNIX platforms only. The hobo process
626       restores the CHLD handler to default, so is able to deeply spin workers
627       and reap if desired.
628
629        use threads;
630
631        for ( 1 .. 8 ) {
632            async {
633                # do something
634            }->detach;
635        }
636
637        use MCE::Hobo;
638
639        # Have the OS reap workers automatically when exiting.
640        # The on_finish option is ignored if specified (no-op).
641        # Ensure not inside a thread on UNIX platforms.
642
643        $SIG{CHLD} = 'IGNORE';
644
645        for ( 1 .. 8 ) {
646            mce_async {
647                # do something
648            };
649        }
650
651        # Optionally, wait for any remaining workers before leaving.
652        # This is necessary if workers are consuming shared objects,
653        # constructed via MCE::Shared.
654
655        MCE::Hobo->wait_all;
656
657       The following is another way and works on Windows.  Here, the on_finish
658       handler works as usual.
659
660        use MCE::Hobo;
661
662        MCE::Hobo->init(
663            on_finish = sub {
664                ...
665            },
666        );
667
668        for ( 1 .. 8 ) {
669            $_->join for MCE::Hobo->list_joinable;
670            mce_async {
671                # do something
672            };
673        }
674
675        MCE::Hobo->wait_all;
676

PARALLEL::FORKMANAGER-like DEMONSTRATION

678       MCE::Hobo behaves similarly to threads for the most part. It also
679       provides Parallel::ForkManager-like capabilities. The
680       "Parallel::ForkManager" example is shown first followed by a version
681       using "MCE::Hobo".
682
683       Parallel::ForkManager
684           use strict;
685           use warnings;
686
687           use Parallel::ForkManager;
688           use Time::HiRes 'time';
689
690           my $start = time;
691
692           my $pm = Parallel::ForkManager->new(10);
693           $pm->set_waitpid_blocking_sleep(0);
694
695           $pm->run_on_finish( sub {
696               my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
697               print "child $pid completed: $ident => ", $resp->[0], "\n";
698           });
699
700           DATA_LOOP:
701           foreach my $data ( 1..2000 ) {
702               # forks and returns the pid for the child
703               my $pid = $pm->start($data) and next DATA_LOOP;
704               my $ret = [ $data * 2 ];
705
706               $pm->finish(0, $ret);
707           }
708
709           $pm->wait_all_children;
710
711           printf STDERR "duration: %0.03f seconds\n", time - $start;
712
713       MCE::Hobo
714           use strict;
715           use warnings;
716
717           use MCE::Hobo 1.843;
718           use Time::HiRes 'time';
719
720           my $start = time;
721
722           MCE::Hobo->init(
723               max_workers => 10,
724               on_finish   => sub {
725                   my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
726                   print "child $pid completed: $ident => ", $resp->[0], "\n";
727               }
728           );
729
730           foreach my $data ( 1..2000 ) {
731               MCE::Hobo->create( $data, sub {
732                   [ $data * 2 ];
733               });
734           }
735
736           MCE::Hobo->wait_all;
737
738           printf STDERR "duration: %0.03f seconds\n", time - $start;
739
740       Time to spin 2,000 workers and obtain results (in seconds).
741          Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
742          Boost).  Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
743          with Moo loaded at the top of the script.
744
745           MCE::Hobo uses MCE::Shared to retrieve data during reaping.
746           MCE::Child uses MCE::Channel, no shared-manager.
747
748                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
749
750           MCE::Child 1.843  19.099s  17.091s  0.965s  1.534s  1.229s
751            MCE::Hobo 1.843  20.514s  19.594s  1.246s  1.629s  1.613s
752                P::FM 1.20   19.703s  19.235s  0.875s  1.445s  1.346s
753
754           MCE::Child 1.843  20.426s  18.417s  1.116s  1.632s  1.338s  Moo loaded
755            MCE::Hobo 1.843  21.809s  20.810s  1.407s  1.759s  1.722s  Moo loaded
756                P::FM 2.02   21.668s  25.927s  1.882s  2.612s  2.483s  Moo used
757
758       Set posix_exit to avoid all END and destructor processing.
759          This is helpful for reducing overhead when workers exit. Ditto if
760          using a Perl module not parallel safe. The option is ignored on
761          Windows "$^O eq 'MSWin32'".
762
763           MCE::Child->init( posix_exit => 1, ... );
764            MCE::Hobo->init( posix_exit => 1, ... );
765
766                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
767
768           MCE::Child 1.843  19.815s  ignored  0.824s  1.284s  1.245s  Moo loaded
769            MCE::Hobo 1.843  21.029s  ignored  0.953s  1.335s  1.439s  Moo loaded
770

PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT

772       This demonstration constructs two queues, two handles, starts the
773       shared-manager process if needed, and spawns four workers.  For this
774       demonstration, am chunking 64 URLs per job. In reality, one may run
775       with 200 workers and chunk 300 URLs on a 24-way box.
776
777        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
778        # perl demo.pl              -- all output
779        # perl demo.pl  >/dev/null  -- mngr/hobo output
780        # perl demo.pl 2>/dev/null  -- show results only
781        #
782        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
783
784        use strict;
785        use warnings;
786
787        use AnyEvent;
788        use AnyEvent::HTTP;
789        use Time::HiRes qw( time );
790
791        use MCE::Hobo;
792        use MCE::Shared;
793
794        # Construct two queues, input and return.
795
796        my $que = MCE::Shared->queue();
797        my $ret = MCE::Shared->queue();
798
799        # Construct shared handles for serializing output from many workers
800        # writing simultaneously. This prevents garbled output.
801
802        mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
803        mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
804
805        # Spawn workers early for minimum memory consumption.
806
807        MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
808
809        # Obtain or generate input data for workers to process.
810
811        my ( $count, @urls ) = ( 0 );
812
813        push @urls, map { "http://127.0.0.$_/"   } 1..254;
814        push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
815
816        while ( @urls ) {
817            my @chunk = splice(@urls, 0, 64);
818            $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
819        }
820
821        # So that workers leave the loop after consuming the queue.
822
823        $que->end();
824
825        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
826        # Loop for the manager process. The manager may do other work if
827        # need be and periodically check $ret->pending() not shown here.
828        #
829        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
830
831        my $start = time;
832
833        printf {$ERR} "Mngr - entering loop\n";
834
835        while ( $count ) {
836            my ( $result, $failed ) = $ret->dequeue( 2 );
837
838            # Remove ID from result, so not treated as a URL item.
839
840            printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
841
842            # Display the URL and the size captured.
843
844            foreach my $url ( keys %{ $result } ) {
845                printf {$OUT} "%s: %d\n", $url, length($result->{$url})
846                    if $result->{$url};  # url has content
847            }
848
849            # Display URLs could not reach.
850
851            if ( @{ $failed } ) {
852                foreach my $url ( @{ $failed } ) {
853                    print {$OUT} "Failed: $url\n";
854                }
855            }
856
857            # Decrement the count.
858
859            $count--;
860        }
861
862        MCE::Hobo->wait_all();
863
864        printf {$ERR} "Mngr - exiting loop\n\n";
865        printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
866
867        exit;
868
869        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
870        # Hobo processes enqueue two items ( $result and $failed ) per each
871        # job for the manager process. Likewise, the manager process dequeues
872        # two items above. Optionally, hobo processes may include the ID in
873        # the result.
874        #
875        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
876
877        sub task {
878            my ( $id ) = @_;
879            printf {$ERR} "Hobo $id entering loop\n";
880
881            while ( my $job = $que->dequeue() ) {
882                my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
883
884                # Walk URLs, provide a hash and array refs for data.
885
886                printf {$ERR} "Hobo $id running  job $job->{ID}\n";
887                walk( $job, $result, $failed );
888
889                # Send results to the manager process.
890
891                $ret->enqueue( $result, $failed );
892            }
893
894            printf {$ERR} "Hobo $id exiting loop\n";
895        }
896
897        sub walk {
898            my ( $job, $result, $failed ) = @_;
899
900            # Yielding is critical when running an event loop in parallel.
901            # Not doing so means that the app may reach contention points
902            # with the firewall and likely impose unnecessary hardship at
903            # the OS level. The idea here is not to have multiple workers
904            # initiate HTTP requests to a batch of URLs at the same time.
905            # Yielding in 1.827+ behaves similarly like scatter to have
906            # the hobo process run solo for a fraction of time.
907
908            MCE::Hobo->yield( 0.03 );   # MCE::Hobo 1.827+
909
910            my $cv = AnyEvent->condvar();
911
912            # Populate the hash ref for the URLs it could reach.
913            # Do not mix AnyEvent timeout with hobo timeout.
914            # Therefore, choose event timeout when available.
915
916            foreach my $url ( @{ $job->{INPUT} } ) {
917                $cv->begin();
918                http_get $url, timeout => 2, sub {
919                    my ( $data, $headers ) = @_;
920                    $result->{$url} = $data;
921                    $cv->end();
922                };
923            }
924
925            $cv->recv();
926
927            # Populate the array ref for URLs it could not reach.
928
929            foreach my $url ( @{ $job->{INPUT} } ) {
930                push @{ $failed }, $url unless (exists $result->{ $url });
931            }
932
933            return;
934        }
935
936        __END__
937
938        $ perl demo.pl
939
940        Hobo 1 entering loop
941        Hobo 2 entering loop
942        Hobo 3 entering loop
943        Mngr - entering loop
944        Hobo 2 running  job 2
945        Hobo 3 running  job 3
946        Hobo 1 running  job 1
947        Hobo 4 entering loop
948        Hobo 4 running  job 4
949        Hobo 2 running  job 5
950        Mngr - received job 2
951        Hobo 3 running  job 6
952        Mngr - received job 3
953        Hobo 1 running  job 7
954        Mngr - received job 1
955        Hobo 4 running  job 8
956        Mngr - received job 4
957        http://192.168.0.1/: 3729
958        Hobo 2 exiting loop
959        Mngr - received job 5
960        Hobo 3 exiting loop
961        Mngr - received job 6
962        Hobo 1 exiting loop
963        Mngr - received job 7
964        Hobo 4 exiting loop
965        Mngr - received job 8
966        Mngr - exiting loop
967
968        Duration: 4.131 seconds
969

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

971       Making an executable is possible with the PAR::Packer module.  On the
972       Windows platform, threads, threads::shared, and exiting via threads are
973       necessary for the binary to exit successfully.
974
975        # https://metacpan.org/pod/PAR::Packer
976        # https://metacpan.org/pod/pp
977        #
978        #   pp -o demo.exe demo.pl
979        #   ./demo.exe
980
981        use strict;
982        use warnings;
983
984        use if $^O eq "MSWin32", "threads";
985        use if $^O eq "MSWin32", "threads::shared";
986
987        # Include minimum dependencies for MCE::Hobo.
988        # Add other modules required by your application here.
989
990        use Storable ();
991        use Time::HiRes ();
992
993        # use IO::FDPass ();  # optional: for condvar, handle, queue
994        # use Sereal ();      # optional: for faster serialization
995
996        use MCE::Hobo;
997        use MCE::Shared;
998
999        # For PAR to work on the Windows platform, one must include manually
1000        # any shared modules used by the application.
1001
1002        # use MCE::Shared::Array;    # if using MCE::Shared->array
1003        # use MCE::Shared::Cache;    # if using MCE::Shared->cache
1004        # use MCE::Shared::Condvar;  # if using MCE::Shared->condvar
1005        # use MCE::Shared::Handle;   # if using MCE::Shared->handle, mce_open
1006        # use MCE::Shared::Hash;     # if using MCE::Shared->hash
1007        # use MCE::Shared::Minidb;   # if using MCE::Shared->minidb
1008        # use MCE::Shared::Ordhash;  # if using MCE::Shared->ordhash
1009        # use MCE::Shared::Queue;    # if using MCE::Shared->queue
1010        # use MCE::Shared::Scalar;   # if using MCE::Shared->scalar
1011
1012        # Et cetera. Only load modules needed for your application.
1013
1014        use MCE::Shared::Sequence;   # if using MCE::Shared->sequence
1015
1016        my $seq = MCE::Shared->sequence( 1, 9 );
1017
1018        sub task {
1019            my ( $id ) = @_;
1020            while ( defined ( my $num = $seq->next() ) ) {
1021                print "$id: $num\n";
1022                sleep 1;
1023            }
1024        }
1025
1026        sub main {
1027            MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
1028            MCE::Hobo->wait_all();
1029        }
1030
1031        # Main must run inside a thread on the Windows platform or workers
1032        # will fail duing exiting, causing the exe to crash. The reason is
1033        # that PAR or a dependency isn't multi-process safe.
1034
1035        ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1036
1037        threads->exit(0) if $INC{"threads.pm"};
1038

CREDITS

1040       The inspiration for "MCE::Hobo" comes from wanting "threads"-like
1041       behavior for processes. Both can run side-by-side including safe-use by
1042       MCE workers.  Likewise, the documentation resembles "threads".
1043
1044       The inspiration for "wait_all" and "wait_one" comes from the
1045       "Parallel::WorkUnit" module.
1046

SEE ALSO

1048       •  forks
1049
1050       •  forks::BerkeleyDB
1051
1052       •  MCE::Child
1053
1054       •  Parallel::ForkManager
1055
1056       •  Parallel::Loops
1057
1058       •  Parallel::Prefork
1059
1060       •  Parallel::WorkUnit
1061
1062       •  Proc::Fork
1063
1064       •  Thread::Tie
1065
1066       •  threads
1067

INDEX

1069       MCE, MCE::Channel, MCE::Shared
1070

AUTHOR

1072       Mario E. Roy, <marioeroy AT gmail DOT com>
1073
1074
1075
1076perl v5.36.0                      2023-01-20                      MCE::Hobo(3)
Impressum