1MCE::Hobo(3)          User Contributed Perl Documentation         MCE::Hobo(3)
2
3
4

NAME

6       MCE::Hobo - A threads-like parallelization module
7

VERSION

9       This document describes MCE::Hobo version 1.840
10

SYNOPSIS

12        use MCE::Hobo;
13
14        MCE::Hobo->init(
15            max_workers => 'auto',   # default undef, unlimited
16            hobo_timeout => 20,      # default undef, no timeout
17            posix_exit => 1,         # default undef, CORE::exit
18            on_start => sub {
19                my ( $pid, $ident ) = @_;
20                ...
21            },
22            on_finish => sub {
23                my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
24                ...
25            }
26        );
27
28        MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
29
30        sub parallel {
31            my ($arg1) = @_;
32            print "Hello again, $arg1\n" if defined($arg1);
33            print "Hello again, $_\n"; # same thing
34        }
35
36        MCE::Hobo->create( \&parallel, $_ ) for 1 .. 3;
37
38        my @hobos    = MCE::Hobo->list();
39        my @running  = MCE::Hobo->list_running();
40        my @joinable = MCE::Hobo->list_joinable();
41        my @count    = MCE::Hobo->pending();
42
43        # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
44        $_->join() for @hobos;
45
46        # Joining occurs immediately as hobo(s) complete execution.
47        1 while MCE::Hobo->wait_one();
48
49        my $hobo = mce_async { foreach (@files) { ... } };
50
51        $hobo->join();
52
53        if ( my $err = $hobo->error() ) {
54            warn "Hobo error: $err\n";
55        }
56
57        # Get a hobo's object
58        $hobo = MCE::Hobo->self();
59
60        # Get a hobo's ID
61        $pid = MCE::Hobo->pid();  # $$
62        $pid = $hobo->pid();
63        $pid = MCE::Hobo->tid();  # tid is an alias for pid
64        $pid = $hobo->tid();
65
66        # Test hobo objects
67        if ( $hobo1 == $hobo2 ) {
68            ...
69        }
70
71        # Give other hobos a chance to run
72        MCE::Hobo->yield();
73        MCE::Hobo->yield(0.05);
74
75        # Return context, wantarray aware
76        my ($value1, $value2) = $hobo->join();
77        my $value = $hobo->join();
78
79        # Check hobo's state
80        if ( $hobo->is_running() ) {
81            sleep 1;
82        }
83        if ( $hobo->is_joinable() ) {
84            $hobo->join();
85        }
86
87        # Send a signal to a hobo
88        $hobo->kill('SIGUSR1');
89
90        # Exit a hobo
91        MCE::Hobo->exit(0);
92        MCE::Hobo->exit(0, @ret);  # MCE::Hobo 1.827+
93

DESCRIPTION

95       A Hobo is a migratory worker inside the machine that carries the
96       asynchronous gene. Hobos are equipped with "threads"-like capability
97       for running code asynchronously. Unlike threads, each hobo is a unique
98       process to the underlying OS. The IPC is managed by "MCE::Shared",
99       which runs on all the major platforms including Cygwin.
100
101       An exception was made on the Windows platform to spawn threads versus
102       children in "MCE::Hobo" 1.807 until 1.816. For consistency, the 1.817
103       release reverts back to spawning children on all supported platforms.
104
105       "MCE::Hobo" may be used as a standalone or together with "MCE"
106       including running alongside "threads".
107
108        use MCE::Hobo;
109        use MCE::Shared;
110
111        # synopsis: head -20 file.txt | perl script.pl
112
113        my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
114        my $ofh = MCE::Shared->handle( ">", \*STDOUT );
115        my $ary = MCE::Shared->array();
116
117        sub parallel_task {
118            my ( $id ) = @_;
119            while ( <$ifh> ) {
120                printf {$ofh} "[ %4d ] %s", $., $_;
121              # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
122                $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
123            }
124        }
125
126        my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
127        my $hobo2 = MCE::Hobo->new( \&parallel_task, 2 );
128        my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
129
130        $_->join for MCE::Hobo->list();  # ditto: MCE::Hobo->wait_all();
131
132        # search array (total one round-trip via IPC)
133        my @vals = $ary->vals( "val =~ / ID 2 /" );
134
135        print {*STDERR} join("", @vals);
136

API DOCUMENTATION

138       $hobo = MCE::Hobo->create( FUNCTION, ARGS )
139       $hobo = MCE::Hobo->new( FUNCTION, ARGS )
140          This will create a new hobo that will begin execution with function
141          as the entry point, and optionally ARGS for list of parameters. It
142          will return the corresponding MCE::Hobo object, or undef if hobo
143          creation failed.
144
145          FUNCTION may either be the name of a function, an anonymous
146          subroutine, or a code ref.
147
148           my $hobo = MCE::Hobo->create( "func_name", ... );
149               # or
150           my $hobo = MCE::Hobo->create( sub { ... }, ... );
151               # or
152           my $hobo = MCE::Hobo->create( \&func, ... );
153
154       $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
155       $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
156          Options, excluding "ident", may be specified globally via the "init"
157          function.  Otherwise, "ident", "hobo_timeout", and "posix_exit" may
158          be set uniquely.
159
160          The "ident" option, available since 1.827, is used by callback
161          functions "on_start" and "on_finish", for identifying the started
162          and finished process respectively.
163
164           my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
165               ...
166           } );
167
168           $hobo1->join;
169
170           my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
171               sleep 1 for ( 1 .. 9 );
172           } );
173
174           $hobo2->join;
175
176           if ( $hobo2->error() eq "Hobo timed out\n" ) {
177               ...
178           }
179
180          The "new()" method is an alias for "create()".
181
182       mce_async { BLOCK } ARGS;
183       mce_async { BLOCK };
184          "mce_async" runs the block asynchronously similarly to
185          "MCE::Hobo-"create()>.  It returns the hobo object, or undef if hobo
186          creation failed.
187
188           my $hobo = mce_async { foreach (@files) { ... } };
189
190           $hobo->join();
191
192           if ( my $err = $hobo->error() ) {
193               warn("Hobo error: $err\n");
194           }
195
196       $hobo->join()
197          This will wait for the corresponding hobo to complete its execution.
198          In non-voided context, "join()" will return the value(s) of the
199          entry point function.
200
201          The context (void, scalar or list) for the return value(s) for
202          "join" is determined at the time of joining and mostly "wantarray"
203          aware.
204
205           my $hobo1 = MCE::Hobo->create( sub {
206               my @res = qw(foo bar baz);
207               return (@res);
208           });
209
210           my @res1 = $hobo1->join();  # ( foo, bar, baz )
211           my $res1 = $hobo1->join();  #   baz
212
213           my $hobo2 = MCE::Hobo->create( sub {
214               return 'foo';
215           });
216
217           my @res2 = $hobo2->join();  # ( foo )
218           my $res2 = $hobo2->join();  #   foo
219
220       $hobo1->equal( $hobo2 )
221          Tests if two hobo objects are the same hobo or not. Hobo comparison
222          is based on process IDs. This is overloaded to the more natural
223          forms.
224
225           if ( $hobo1 == $hobo2 ) {
226               print("Hobos are the same\n");
227           }
228           # or
229           if ( $hobo1 != $hobo2 ) {
230               print("Hobos differ\n");
231           }
232
233       $hobo->error()
234          Hobos are executed in an "eval" context. This method will return
235          "undef" if the hobo terminates normally. Otherwise, it returns the
236          value of $@ associated with the hobo's execution status in its
237          "eval" context.
238
239       $hobo->exit()
240          This sends 'SIGQUIT' to the hobo object, notifying hobo to exit. It
241          returns the hobo object to allow for method chaining. It is
242          important to join later if not immediately to not leave a zombie or
243          defunct process.
244
245           $hobo->exit()->join();
246           ...
247
248           $hobo->join();  # later
249
250       MCE::Hobo->exit( 0 )
251       MCE::Hobo->exit( 0, @ret )
252          A hobo can exit at any time by calling "MCE::Hobo-"exit()>.
253          Otherwise, the behavior is the same as "exit(status)" when called
254          from the main process.  Current since 1.827, a worker may optionally
255          return data, to be transmitted to the parent process.
256
257       MCE::Hobo->finish()
258          This class method is called automatically by "END", but may be
259          called explicitly. An error is emitted via croak if there are active
260          hobos not yet joined.
261
262           MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
263           $_->join for MCE::Hobo->list();
264
265           MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
266           $_->join for MCE::Hobo->list();
267
268           MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
269           $_->join for MCE::Hobo->list();
270
271           MCE::Hobo->finish();
272
273       MCE::Hobo->init( options )
274          The init function accepts a list of MCE::Hobo options.
275
276           MCE::Hobo->init(
277               max_workers => 'auto',   # default undef, unlimited
278               hobo_timeout => 20,      # default undef, no timeout
279               posix_exit => 1,         # default undef, CORE::exit
280               on_start => sub {
281                   my ( $pid, $ident ) = @_;
282                   ...
283               },
284               on_finish => sub {
285                   my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
286                   ...
287               }
288           );
289
290           # Identification given as option or 1st argument.
291           # Current API available since 1.827.
292
293           for my $key ( 'aa' .. 'zz' ) {
294               MCE::Hobo->create( { ident => $key }, sub { ... } );
295               MCE::Hobo->create( $key, sub { ... } );
296           }
297
298           MCE::Hobo->wait_all;
299
300          Set "max_workers" if you want to limit the number of workers by
301          waiting automatically for an available slot. Specify "auto" to
302          obtain the number of logical cores via "MCE::Util::get_ncpu()".
303
304          Set "hobo_timeout", in number of seconds, if you want the hobo
305          process to terminate after some time. The default is 0 for no
306          timeout.
307
308          Set "posix_exit" to avoid all END and destructor processing.
309          Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
310          FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
311          Mojo::IOLoop, Prima, STFL, Tk, Wx, or Win32::GUI.
312
313          The callback options "on_start" and "on_finish" are called in the
314          parent process after starting a Hobo and later when terminated. The
315          arguments for the subroutines were inspired by
316          Parallel::ForkManager.
317
318          The parameters for "on_start" are the following:
319
320           - pid of the process
321           - identification (ident option or 1st arg to create)
322
323          The parameters for "on_finish" are the following:
324
325           - pid of the process
326           - program exit code
327           - identification (ident option or 1st arg to create)
328           - exit signal id
329           - error message from eval inside MCE::Hobo
330           - returned data
331
332       $hobo->is_running()
333          Returns true if a hobo is still running.
334
335       $hobo->is_joinable()
336          Returns true if the hobo has finished running and not yet joined.
337
338       $hobo->kill( 'SIG...' )
339          Sends the specified signal to the hobo. Returns the hobo object to
340          allow for method chaining. As with "exit", it is important to join
341          eventually if not immediately to not leave a zombie or defunct
342          process.
343
344           $hobo->kill('SIG...')->join();
345
346          The following is a parallel demonstration comparing "MCE::Shared"
347          against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
348          after all workers have been notified to quit.
349
350           use Time::HiRes qw(time);
351
352           use Redis;
353           use Redis::Fast;
354
355           use MCE::Hobo;
356           use MCE::Shared;
357
358           my $redis = Redis->new();
359           my $rfast = Redis::Fast->new();
360           my $array = MCE::Shared->array();
361
362           sub parallel_redis {
363               my ($_redis) = @_;
364               my ($count, $quit, $len) = (0, 0);
365
366               # instead, use a flag to exit loop
367               $SIG{'QUIT'} = sub { $quit = 1 };
368
369               while () {
370                   $len = $_redis->rpush('list', $count++);
371                   last if $quit;
372               }
373
374               $count;
375           }
376
377           sub parallel_array {
378               my ($count, $quit, $len) = (0, 0);
379
380               # do not exit from inside handler
381               $SIG{'QUIT'} = sub { $quit = 1 };
382
383               while () {
384                   $len = $array->push($count++);
385                   last if $quit;
386               }
387
388               $count;
389           }
390
391           sub benchmark_this {
392               my ($desc, $num_hobos, $timeout, $code, @args) = @_;
393               my ($start, $total) = (time(), 0);
394
395               MCE::Hobo->new($code, @args) for 1..$num_hobos;
396               sleep $timeout;
397
398               # joining is not immediate; ok
399               $_->kill('QUIT') for MCE::Hobo->list();
400
401               # joining later; ok
402               $total += $_->join() for MCE::Hobo->list();
403
404               printf "$desc <> duration: %0.03f secs, count: $total\n",
405                   time() - $start;
406
407               sleep 0.2;
408           }
409
410           benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
411           benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
412           benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
413
414       MCE::Hobo->list()
415          Returns a list of all hobos not yet joined.
416
417           @hobos = MCE::Hobo->list();
418
419       MCE::Hobo->list_running()
420          Returns a list of all hobos that are still running.
421
422           @hobos = MCE::Hobo->list_running();
423
424       MCE::Hobo->list_joinable()
425          Returns a list of all hobos that have completed running. Thus, ready
426          to be joined without blocking.
427
428           @hobos = MCE::Hobo->list_joinable();
429
430       MCE::Hobo->max_workers([ N ])
431          Getter and setter for max_workers. Specify a number or 'auto' to
432          acquire the total number of cores via MCE::Util::get_ncpu. Specify a
433          false value to set back to no limit.
434
435          API available since 1.835.
436
437       MCE::Hobo->pending()
438          Returns a count of all hobos not yet joined.
439
440           $count = MCE::Hobo->pending();
441
442       $hobo->result()
443          Returns the result obtained by "join", "wait_one", or "wait_all". If
444          the process has not yet exited, waits for the corresponding hobo to
445          complete its execution.
446
447           use MCE::Hobo;
448           use Time::HiRes qw(sleep);
449
450           sub task {
451               my ($id) = @_;
452               sleep $id * 0.333;
453               return $id;
454           }
455
456           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
457
458           # 1 while MCE::Hobo->wait_one();
459
460           while ( my $hobo = MCE::Hobo->wait_one() ) {
461               my $err = $hobo->error() // 'no error';
462               my $res = $hobo->result();
463               my $pid = $hobo->pid();
464
465               print "[$pid] $err : $res\n";
466           }
467
468          Like "join" described above, the context (void, scalar or list) for
469          the return value(s) is determined at the time "result" is called and
470          mostly "wantarray" aware.
471
472           my $hobo1 = MCE::Hobo->create( sub {
473               my @res = qw(foo bar baz);
474               return (@res);
475           });
476
477           my @res1 = $hobo1->result();  # ( foo, bar, baz )
478           my $res1 = $hobo1->result();  #   baz
479
480           my $hobo2 = MCE::Hobo->create( sub {
481               return 'foo';
482           });
483
484           my @res2 = $hobo2->result();  # ( foo )
485           my $res2 = $hobo2->result();  #   foo
486
487       MCE::Hobo->self()
488          Class method that allows a hobo to obtain it's own MCE::Hobo object.
489
490       $hobo->pid()
491       $hobo->tid()
492          Returns the ID of the hobo.
493
494           pid: $$  process id
495           tid: $$  alias for pid
496
497       MCE::Hobo->pid()
498       MCE::Hobo->tid()
499          Class methods that allows a hobo to obtain its own ID.
500
501           pid: $$  process id
502           tid: $$  alias for pid
503
504       MCE::Hobo->wait_one()
505       MCE::Hobo->wait_all()
506          Meaningful for the manager process only, waits for one or all hobos
507          to complete execution. Afterwards, returns the corresponding
508          hobo(s).  If a hobo doesn't exist, returns the "undef" value or an
509          empty list for "wait_one" and "wait_all" respectively.
510
511          The "waitone" and "waitall" methods are aliases since 1.827 for
512          backwards compatibility.
513
514           use MCE::Hobo;
515           use Time::HiRes qw(sleep);
516
517           sub task {
518               my $id = shift;
519               sleep $id * 0.333;
520               return $id;
521           }
522
523           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
524
525           # join, traditional use case
526           $_->join() for MCE::Hobo->list();
527
528           # wait_one, simplistic use case
529           1 while MCE::Hobo->wait_one();
530
531           # wait_one
532           while ( my $hobo = MCE::Hobo->wait_one() ) {
533               my $err = $hobo->error() // 'no error';
534               my $res = $hobo->result();
535               my $pid = $hobo->pid();
536
537               print "[$pid] $err : $res\n";
538           }
539
540           # wait_all
541           my @hobos = MCE::Hobo->wait_all();
542
543           for ( @hobos ) {
544               my $err = $_->error() // 'no error';
545               my $res = $_->result();
546               my $pid = $_->pid();
547
548               print "[$pid] $err : $res\n";
549           }
550
551       MCE::Hobo->yield( [ floating_seconds ] )
552          Prior API till 1.826.
553
554          Let this hobo yield CPU time to other hobos. By default, the class
555          method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
556          including Cygwin.
557
558           MCE::Hobo->yield();
559           MCE::Hobo->yield(0.05);
560
561           # total run time: 0.25 seconds, sleep occurs in parallel
562
563           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
564           MCE::Hobo->wait_all();
565
566          Current API available since 1.827.
567
568          Give other hobos a chance to run, optionally for given time. Yield
569          behaves similarly to MCE's interval option. It throttles hobos from
570          running too fast.  A demonstration is provided in the next section
571          for fetching URLs in parallel.
572
573           # total run time: 1.00 second
574
575           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
576           MCE::Hobo->wait_all();
577

PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT

579       This demonstration constructs two queues, two handles, starts the
580       shared-manager process if needed, and spawns four hobo workers.  For
581       this demonstration, am chunking 64 URLs per job. In reality, one may
582       run with 200 workers and chunk 300 URLs on a 24-way box.
583
584        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
585        # perl demo.pl              -- all output
586        # perl demo.pl  >/dev/null  -- mngr/hobo output
587        # perl demo.pl 2>/dev/null  -- show results only
588        #
589        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
590
591        use strict;
592        use warnings;
593
594        use AnyEvent;
595        use AnyEvent::HTTP;
596        use Time::HiRes qw( time );
597
598        use MCE::Hobo;
599        use MCE::Shared;
600
601        # Construct two queues, input and return.
602
603        my $que = MCE::Shared->queue();
604        my $ret = MCE::Shared->queue();
605
606        # Construct shared handles to serialize output from many hobos
607        # writing simultaneously. This prevents garbled output.
608
609        mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
610        mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
611
612        # Spawn workers early for minimum memory consumption.
613
614        MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
615
616        # Obtain or generate input data for hobos to process.
617
618        my ( $count, @urls ) = ( 0 );
619
620        push @urls, map { "http://127.0.0.$_/"   } 1..254;
621        push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
622
623        while ( @urls ) {
624            my @chunk = splice(@urls, 0, 64);
625            $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
626        }
627
628        # So that workers leave the loop after consuming the queue.
629
630        $que->end();
631
632        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
633        # Loop for the manager process. The manager may do other work if
634        # need be and periodically check $ret->pending() not shown here.
635        #
636        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
637
638        my $start = time;
639
640        printf {$ERR} "Mngr - entering loop\n";
641
642        while ( $count ) {
643            my ( $result, $failed ) = $ret->dequeue( 2 );
644
645            # Remove ID from result, so not treated as a URL item.
646
647            printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
648
649            # Display the URL and the size captured.
650
651            foreach my $url ( keys %{ $result } ) {
652                printf {$OUT} "%s: %d\n", $url, length($result->{$url})
653                    if $result->{$url};  # url has content
654            }
655
656            # Display URLs the hobo worker could not reach.
657
658            if ( @{ $failed } ) {
659                foreach my $url ( @{ $failed } ) {
660                    print {$OUT} "Failed: $url\n";
661                }
662            }
663
664            # Decrement the count.
665
666            $count--;
667        }
668
669        MCE::Hobo->wait_all();
670
671        printf {$ERR} "Mngr - exiting loop\n\n";
672        printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
673
674        exit;
675
676        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
677        # Hobos enqueue two items ( $result and $failed ) per each job
678        # for the manager process. Likewise, the manager process dequeues
679        # two items above. Optionally, Hobos add ID to result.
680        #
681        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
682
683        sub task {
684            my ( $id ) = @_;
685            printf {$ERR} "Hobo $id entering loop\n";
686
687            while ( my $job = $que->dequeue() ) {
688                my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
689
690                # Walk URLs, provide a hash and array refs for data.
691
692                printf {$ERR} "Hobo $id running  job $job->{ID}\n";
693                walk( $job, $result, $failed );
694
695                # Send results to the manager process.
696
697                $ret->enqueue( $result, $failed );
698            }
699
700            printf {$ERR} "Hobo $id exiting loop\n";
701        }
702
703        sub walk {
704            my ( $job, $result, $failed ) = @_;
705
706            # Yielding is critical when running an event loop in parallel.
707            # Not doing so means that the app may reach contention points
708            # with the firewall and likely impose unnecessary hardship at
709            # the OS level. The idea here is not to have multiple workers
710            # initiate HTTP requests to a batch of URLs at the same time.
711            # Yielding in 1.827+ behaves more like scatter for the worker
712            # to run solo in a fraction of time.
713
714            MCE::Hobo->yield( 0.03 );   # MCE::Hobo 1.827
715
716            my $cv = AnyEvent->condvar();
717
718            # Populate the hash ref for URLs it could reach.
719            # Do not mix AnyEvent timeout and Hobo timeout.
720            # Choose to do the event timeout if available.
721
722            foreach my $url ( @{ $job->{INPUT} } ) {
723                $cv->begin();
724                http_get $url, timeout => 2, sub {
725                    my ( $data, $headers ) = @_;
726                    $result->{$url} = $data;
727                    $cv->end();
728                };
729            }
730
731            $cv->recv();
732
733            # Populate the array ref for URLs it could not reach.
734
735            foreach my $url ( @{ $job->{INPUT} } ) {
736                push @{ $failed }, $url unless (exists $result->{ $url });
737            }
738
739            return;
740        }
741
742        __END__
743
744        $ perl demo.pl
745
746        Hobo 1 entering loop
747        Hobo 2 entering loop
748        Hobo 3 entering loop
749        Mngr - entering loop
750        Hobo 2 running  job 2
751        Hobo 3 running  job 3
752        Hobo 1 running  job 1
753        Hobo 4 entering loop
754        Hobo 4 running  job 4
755        Hobo 2 running  job 5
756        Mngr - received job 2
757        Hobo 3 running  job 6
758        Mngr - received job 3
759        Hobo 1 running  job 7
760        Mngr - received job 1
761        Hobo 4 running  job 8
762        Mngr - received job 4
763        http://192.168.0.1/: 3729
764        Hobo 2 exiting loop
765        Mngr - received job 5
766        Hobo 3 exiting loop
767        Mngr - received job 6
768        Hobo 1 exiting loop
769        Mngr - received job 7
770        Hobo 4 exiting loop
771        Mngr - received job 8
772        Mngr - exiting loop
773
774        Duration: 4.131 seconds
775

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

777       Making an executable is possible with the PAR::Packer module.  On the
778       Windows platform, threads, threads::shared, and exiting via threads are
779       necessary for the binary to exit successfully.
780
781        # https://metacpan.org/pod/PAR::Packer
782        # https://metacpan.org/pod/pp
783        #
784        #   pp -o demo.exe demo.pl
785        #   ./demo.exe
786
787        use strict;
788        use warnings;
789
790        use if $^O eq "MSWin32", "threads";
791        use if $^O eq "MSWin32", "threads::shared";
792
793        # Include minimum dependencies for MCE::Hobo.
794        # Add other modules required by your application here.
795
796        use Storable ();
797        use Time::HiRes ();
798
799        # use IO::FDPass ();  # optional: for condvar, handle, queue
800        # use Sereal ();      # optional: for faster serialization
801
802        use MCE::Hobo;
803        use MCE::Shared;
804
805        # For PAR to work on the Windows platform, one must include manually
806        # any shared modules used by the application.
807
808        # use MCE::Shared::Array;    # for MCE::Shared->array
809        # use MCE::Shared::Cache;    # for MCE::Shared->cache
810        # use MCE::Shared::Condvar;  # for MCE::Shared->condvar
811        # use MCE::Shared::Handle;   # for MCE::Shared->handle, mce_open
812        # use MCE::Shared::Hash;     # for MCE::Shared->hash
813        # use MCE::Shared::Minidb;   # for MCE::Shared->minidb
814        # use MCE::Shared::Ordhash;  # for MCE::Shared->ordhash
815        # use MCE::Shared::Queue;    # for MCE::Shared->queue
816        # use MCE::Shared::Scalar;   # for MCE::Shared->scalar
817
818        # Et cetera. Only load modules needed for your application.
819
820        use MCE::Shared::Sequence;   # for MCE::Shared->sequence
821
822        my $seq = MCE::Shared->sequence( 1, 9 );
823
824        sub task {
825            my ( $id ) = @_;
826            while ( defined ( my $num = $seq->next() ) ) {
827                print "$id: $num\n";
828                sleep 1;
829            }
830        }
831
832        sub main {
833            MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
834            MCE::Hobo->wait_all();
835        }
836
837        # Main must run inside a thread on the Windows platform or workers
838        # will fail duing exiting, causing the exe to crash. The reason is
839        # that PAR or a dependency isn't multi-process safe.
840
841        ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
842
843        threads->exit(0) if $INC{"threads.pm"};
844

CREDITS

846       The inspiration for "MCE::Hobo" comes from wanting "threads"-like
847       behavior for processes. Both can run side-by-side including safe-use by
848       MCE workers.  Likewise, the documentation resembles "threads".
849
850       The inspiration for "wait_all" and "wait_one" comes from the
851       "Parallel::WorkUnit" module.
852

SEE ALSO

854       ·  forks
855
856       ·  forks::BerkeleyDB
857
858       ·  Parallel::ForkManager
859
860       ·  Parallel::Loops
861
862       ·  Parallel::Prefork
863
864       ·  Parallel::WorkUnit
865
866       ·  Proc::Fork
867
868       ·  Thread::Tie
869
870       ·  threads
871

INDEX

873       MCE, MCE::Shared
874

AUTHOR

876       Mario E. Roy, <marioeroy AT gmail DOT com>
877
878
879
880perl v5.28.1                      2019-01-04                      MCE::Hobo(3)
Impressum