1MCE::Step(3)          User Contributed Perl Documentation         MCE::Step(3)
2
3
4

NAME

6       MCE::Step - Parallel step model for building creative steps
7

VERSION

9       This document describes MCE::Step version 1.879
10

DESCRIPTION

12       MCE::Step is similar to MCE::Flow for writing custom apps. The main
13       difference comes from the transparent use of queues between sub-tasks.
14       MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
15       QUEUE-LIKE FEATURES below.
16
17       It is trivial to parallelize with mce_stream shown below.
18
19        ## Native map function
20        my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
21
22        ## Same as with MCE::Stream (processing from right to left)
23        @a = mce_stream
24             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
25
26        ## Pass an array reference to have writes occur simultaneously
27        mce_stream \@a,
28             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
29
30       However, let's have MCE::Step compute the same in parallel. Unlike the
31       example in MCE::Flow, the use of MCE::Queue is totally transparent.
32       This calls for preserving output order provided by MCE::Candy.
33
34        use MCE::Step;
35        use MCE::Candy;
36
37       Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as
38       described in MCE::Flow. The call to MCE->step simplifies the passing of
39       data to subsequent sub-task.
40
41        sub task_a {
42           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
43           push @ans, map { $_ * 2 } @{ $chunk_ref };
44           MCE->step(\@ans, $chunk_id);
45        }
46
47        sub task_b {
48           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
49           push @ans, map { $_ * 3 } @{ $chunk_ref };
50           MCE->step(\@ans, $chunk_id);
51        }
52
53        sub task_c {
54           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
55           push @ans, map { $_ * 4 } @{ $chunk_ref };
56           MCE->gather($chunk_id, \@ans);
57        }
58
59       In summary, MCE::Step builds out a MCE instance behind the scene and
60       starts running. The task_name (shown), max_workers, and use_threads
61       options can take an anonymous array for specifying the values uniquely
62       per each sub-task.
63
64       The task_name option is required to use ->enq, ->enqp, and ->await.
65
66        my @a;
67
68        mce_step {
69           task_name => [ 'a', 'b', 'c' ],
70           gather => MCE::Candy::out_iter_array(\@a)
71
72        }, \&task_a, \&task_b, \&task_c, 1..10000;
73
74        print "@a\n";
75

STEP DEMO

77       In the demonstration below, one may call ->gather or ->step any number
78       of times although ->step is not allowed in the last sub-block. Data is
79       gathered to @arr which may likely be out-of-order. Gathering data is
80       optional. All sub-blocks receive $mce as the first argument.
81
82       First, defining 3 sub-tasks.
83
84        use MCE::Step;
85
86        sub task_a {
87           my ($mce, $chunk_ref, $chunk_id) = @_;
88
89           if ($_ % 2 == 0) {
90              MCE->gather($_);
91            # MCE->gather($_ * 4);        ## Ok to gather multiple times
92           }
93           else {
94              MCE->print("a step: $_, $_ * $_\n");
95              MCE->step($_, $_ * $_);
96            # MCE->step($_, $_ * 4 );     ## Ok to step multiple times
97           }
98        }
99
100        sub task_b {
101           my ($mce, $arg1, $arg2) = @_;
102
103           MCE->print("b args: $arg1, $arg2\n");
104
105           if ($_ % 3 == 0) {             ## $_ is the same as $arg1
106              MCE->gather($_);
107           }
108           else {
109              MCE->print("b step: $_ * $_\n");
110              MCE->step($_ * $_);
111           }
112        }
113
114        sub task_c {
115           my ($mce, $arg1) = @_;
116
117           MCE->print("c: $_\n");
118           MCE->gather($_);
119        }
120
121       Next, pass MCE options, using chunk_size 1, and run all 3 tasks in
122       parallel.  Notice how max_workers and use_threads can take an anonymous
123       array, similarly to task_name.
124
125        my @arr = mce_step {
126           task_name   => [ 'a', 'b', 'c' ],
127           max_workers => [  2,   2,   2  ],
128           use_threads => [  0,   0,   0  ],
129           chunk_size  => 1
130
131        }, \&task_a, \&task_b, \&task_c, 1..10;
132
133       Finally, sort the array and display its contents.
134
135        @arr = sort { $a <=> $b } @arr;
136
137        print "\n@arr\n\n";
138
139        -- Output
140
141        a step: 1, 1 * 1
142        a step: 3, 3 * 3
143        a step: 5, 5 * 5
144        a step: 7, 7 * 7
145        a step: 9, 9 * 9
146        b args: 1, 1
147        b step: 1 * 1
148        b args: 3, 9
149        b args: 7, 49
150        b step: 7 * 7
151        b args: 5, 25
152        b step: 5 * 5
153        b args: 9, 81
154        c: 1
155        c: 49
156        c: 25
157
158        1 2 3 4 6 8 9 10 25 49
159

SYNOPSIS when CHUNK_SIZE EQUALS 1

161       Although MCE::Loop may be preferred for running using a single code
162       block, the text below also applies to this module, particularly for the
163       first block.
164
165       All models in MCE default to 'auto' for chunk_size. The arguments for
166       the block are the same as writing a user_func block using the Core API.
167
168       Beginning with MCE 1.5, the next input item is placed into the input
169       scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
170       $chunk_ref containing many items. Basically, line 2 below may be
171       omitted from your code when using $_. One can call MCE->chunk_id to
172       obtain the current chunk id.
173
174        line 1:  user_func => sub {
175        line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
176        line 3:
177        line 4:     $_ points to $chunk_ref->[0]
178        line 5:        in MCE 1.5 when chunk_size == 1
179        line 6:
180        line 7:     $_ points to $chunk_ref
181        line 8:        in MCE 1.5 when chunk_size  > 1
182        line 9:  }
183
184       Follow this synopsis when chunk_size equals one. Looping is not
185       required from inside the first block. Hence, the block is called once
186       per each item.
187
188        ## Exports mce_step, mce_step_f, and mce_step_s
189        use MCE::Step;
190
191        MCE::Step->init(
192           chunk_size => 1
193        );
194
195        ## Array or array_ref
196        mce_step sub { do_work($_) }, 1..10000;
197        mce_step sub { do_work($_) }, \@list;
198
199        ## Important; pass an array_ref for deeply input data
200        mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
201        mce_step sub { do_work($_) }, \@deeply_list;
202
203        ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
204        ## Workers read directly and not involve the manager process
205        mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
206
207        ## Involves the manager process, therefore slower
208        mce_step_f sub { chomp; do_work($_) }, $file_handle;
209        mce_step_f sub { chomp; do_work($_) }, $io;
210        mce_step_f sub { chomp; do_work($_) }, \$scalar;
211
212        ## Sequence of numbers (begin, end [, step, format])
213        mce_step_s sub { do_work($_) }, 1, 10000, 5;
214        mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
215
216        mce_step_s sub { do_work($_) }, {
217           begin => 1, end => 10000, step => 5, format => undef
218        };
219

SYNOPSIS when CHUNK_SIZE is GREATER THAN 1

221       Follow this synopsis when chunk_size equals 'auto' or greater than 1.
222       This means having to loop through the chunk from inside the first
223       block.
224
225        use MCE::Step;
226
227        MCE::Step->init(           ## Chunk_size defaults to 'auto' when
228           chunk_size => 'auto'    ## not specified. Therefore, the init
229        );                         ## function may be omitted.
230
231        ## Syntax is shown for mce_step for demonstration purposes.
232        ## Looping inside the block is the same for mce_step_f and
233        ## mce_step_s.
234
235        ## Array or array_ref
236        mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
237        mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
238
239        ## Important; pass an array_ref for deeply input data
240        mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
241        mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
242
243        ## Resembles code using the core MCE API
244        mce_step sub {
245           my ($mce, $chunk_ref, $chunk_id) = @_;
246
247           for (@{ $chunk_ref }) {
248              do_work($_);
249           }
250
251        }, 1..10000;
252
253       Chunking reduces the number of IPC calls behind the scene. Think in
254       terms of chunks whenever processing a large amount of data. For
255       relatively small data, choosing 1 for chunk_size is fine.
256

OVERRIDING DEFAULTS

258       The following list options which may be overridden when loading the
259       module.  The fast option is obsolete in 1.867 onwards; ignored if
260       specified.
261
262        use Sereal qw( encode_sereal decode_sereal );
263        use CBOR::XS qw( encode_cbor decode_cbor );
264        use JSON::XS qw( encode_json decode_json );
265
266        use MCE::Step
267            max_workers => 8,                # Default 'auto'
268            chunk_size => 500,               # Default 'auto'
269            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
270            freeze => \&encode_sereal,       # \&Storable::freeze
271            thaw => \&decode_sereal,         # \&Storable::thaw
272            fast => 1                        # Default 0 (fast dequeue)
273        ;
274
275       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
276       available.  Specify "Sereal => 0" to use Storable instead.
277
278        use MCE::Step Sereal => 0;
279

CUSTOMIZING MCE

281       MCE::Step->init ( options )
282       MCE::Step::init { options }
283
284       The init function accepts a hash of MCE options. Unlike with
285       MCE::Stream, both gather and bounds_only options may be specified when
286       calling init (not shown below).
287
288        use MCE::Step;
289
290        MCE::Step->init(
291           chunk_size => 1, max_workers => 4,
292
293           user_begin => sub {
294              print "## ", MCE->wid, " started\n";
295           },
296
297           user_end => sub {
298              print "## ", MCE->wid, " completed\n";
299           }
300        );
301
302        my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
303
304        print "\n", "@a{1..100}", "\n";
305
306        -- Output
307
308        ## 3 started
309        ## 1 started
310        ## 4 started
311        ## 2 started
312        ## 3 completed
313        ## 4 completed
314        ## 1 completed
315        ## 2 completed
316
317        1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
318        400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
319        1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
320        2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
321        3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
322        5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
323        7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
324        10000
325
326       Like with MCE::Step->init above, MCE options may be specified using an
327       anonymous hash for the first argument. Notice how task_name,
328       max_workers, and use_threads can take an anonymous array for setting
329       uniquely per each code block.
330
331       Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
332       with the first code block, thus processing from left-to-right.
333
334       The following takes 9 seconds to complete. The 9 seconds is from having
335       only 2 workers assigned for the last sub-task and waiting 1 or 2
336       seconds initially before calling MCE->step.
337
338       Removing both calls to MCE->step will cause the script to complete in
339       just 1 second. The reason is due to the 2nd and subsequent sub-tasks
340       awaiting data from an internal queue. Workers terminate upon receiving
341       an undef.
342
343        use threads;
344        use MCE::Step;
345
346        my @a = mce_step {
347           task_name   => [ 'a', 'b', 'c' ],
348           max_workers => [  3,   4,   2, ],
349           use_threads => [  1,   0,   0, ],
350
351           user_end => sub {
352              my ($mce, $task_id, $task_name) = @_;
353              MCE->print("$task_id - $task_name completed\n");
354           },
355
356           task_end => sub {
357              my ($mce, $task_id, $task_name) = @_;
358              MCE->print("$task_id - $task_name ended\n");
359           }
360        },
361        sub { sleep 1; MCE->step(""); },   ## 3 workers, named a
362        sub { sleep 2; MCE->step(""); },   ## 4 workers, named b
363        sub { sleep 3;                };   ## 2 workers, named c
364
365        -- Output
366
367        0 - a completed
368        0 - a completed
369        0 - a completed
370        0 - a ended
371        1 - b completed
372        1 - b completed
373        1 - b completed
374        1 - b completed
375        1 - b ended
376        2 - c completed
377        2 - c completed
378        2 - c ended
379

API DOCUMENTATION

381       Although input data is optional for MCE::Step, the following assumes
382       chunk_size equals 1 in order to demonstrate all the possibilities for
383       providing input data.
384
385       MCE::Step->run ( sub { code }, list )
386       mce_step sub { code }, list
387
388       Input data may be defined using a list, an array ref, or a hash ref.
389
390       Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Step
391       takes a "sub { ... }" or a code reference. The other difference is that
392       the comma is needed after the block.
393
394        # $_ contains the item when chunk_size => 1
395
396        mce_step sub { do_work($_) }, 1..1000;
397        mce_step sub { do_work($_) }, \@list;
398
399        # Important; pass an array_ref for deeply input data
400
401        mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
402        mce_step sub { do_work($_) }, \@deeply_list;
403
404        # Chunking; any chunk_size => 1 or greater
405
406        my %res = mce_step sub {
407           my ($mce, $chunk_ref, $chunk_id) = @_;
408           my %ret;
409           for my $item (@{ $chunk_ref }) {
410              $ret{$item} = $item * 2;
411           }
412           MCE->gather(%ret);
413        },
414        \@list;
415
416        # Input hash; current API available since 1.828
417
418        my %res = mce_step sub {
419           my ($mce, $chunk_ref, $chunk_id) = @_;
420           my %ret;
421           for my $key (keys %{ $chunk_ref }) {
422              $ret{$key} = $chunk_ref->{$key} * 2;
423           }
424           MCE->gather(%ret);
425        },
426        \%hash;
427
428        # Unlike MCE::Loop, MCE::Step doesn't need input to run
429
430        mce_step { max_workers => 4 }, sub {
431           MCE->say( MCE->wid );
432        };
433
434        # ... and can run multiple tasks
435
436        mce_step {
437           max_workers => [  1,   3  ],
438           task_name   => [ 'p', 'c' ]
439        },
440        sub {
441           # 1 producer
442           MCE->say( "producer: ", MCE->wid );
443        },
444        sub {
445           # 3 consumers
446           MCE->say( "consumer: ", MCE->wid );
447        };
448
449        # Here, options are specified via init
450
451        MCE::Step->init(
452           max_workers => [  1,   3  ],
453           task_name   => [ 'p', 'c' ]
454        );
455
456        mce_step \&producer, \&consumers;
457
458       MCE::Step->run_file ( sub { code }, file )
459       mce_step_f sub { code }, file
460
461       The fastest of these is the /path/to/file. Workers communicate the next
462       offset position among themselves with zero interaction by the manager
463       process.
464
465       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
466
467        # $_ contains the line when chunk_size => 1
468
469        mce_step_f sub { $_ }, "/path/to/file";  # faster
470        mce_step_f sub { $_ }, $file_handle;
471        mce_step_f sub { $_ }, $io;              # IO::All
472        mce_step_f sub { $_ }, \$scalar;
473
474        # chunking, any chunk_size => 1 or greater
475
476        my %res = mce_step_f sub {
477           my ($mce, $chunk_ref, $chunk_id) = @_;
478           my $buf = '';
479           for my $line (@{ $chunk_ref }) {
480              $buf .= $line;
481           }
482           MCE->gather($chunk_id, $buf);
483        },
484        "/path/to/file";
485
486       MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
487       mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
488
489       Sequence may be defined as a list, an array reference, or a hash
490       reference.  The functions require both begin and end values to run.
491       Step and format are optional. The format is passed to sprintf (% may be
492       omitted below).
493
494        my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
495
496        # $_ contains the sequence number when chunk_size => 1
497
498        mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
499        mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
500
501        mce_step_s sub { $_ }, {
502           begin => $beg, end => $end,
503           step => $step, format => $fmt
504        };
505
506        # chunking, any chunk_size => 1 or greater
507
508        my %res = mce_step_s sub {
509           my ($mce, $chunk_ref, $chunk_id) = @_;
510           my $buf = '';
511           for my $seq (@{ $chunk_ref }) {
512              $buf .= "$seq\n";
513           }
514           MCE->gather($chunk_id, $buf);
515        },
516        [ $beg, $end ];
517
518       The sequence engine can compute 'begin' and 'end' items only, for the
519       chunk, and not the items in between (hence boundaries only). This
520       option applies to sequence only and has no effect when chunk_size
521       equals 1.
522
523       The time to run is 0.006s below. This becomes 0.827s without the
524       bounds_only option due to computing all items in between, thus creating
525       a very large array. Basically, specify bounds_only => 1 when boundaries
526       is all you need for looping inside the block; e.g. Monte Carlo
527       simulations.
528
529       Time was measured using 1 worker to emphasize the difference.
530
531        use MCE::Step;
532
533        MCE::Step->init(
534           max_workers => 1, chunk_size => 1_250_000,
535           bounds_only => 1
536        );
537
538        # Typically, the input scalar $_ contains the sequence number
539        # when chunk_size => 1, unless the bounds_only option is set
540        # which is the case here. Thus, $_ points to $chunk_ref.
541
542        mce_step_s sub {
543           my ($mce, $chunk_ref, $chunk_id) = @_;
544
545           # $chunk_ref contains 2 items, not 1_250_000
546           # my ( $begin, $end ) = ( $_->[0], $_->[1] );
547
548           my $begin = $chunk_ref->[0];
549           my $end   = $chunk_ref->[1];
550
551           # for my $seq ( $begin .. $end ) {
552           #    ...
553           # }
554
555           MCE->printf("%7d .. %8d\n", $begin, $end);
556        },
557        [ 1, 10_000_000 ];
558
559        -- Output
560
561              1 ..  1250000
562        1250001 ..  2500000
563        2500001 ..  3750000
564        3750001 ..  5000000
565        5000001 ..  6250000
566        6250001 ..  7500000
567        7500001 ..  8750000
568        8750001 .. 10000000
569
570       MCE::Step->run ( { input_data => iterator }, sub { code } )
571       mce_step { input_data => iterator }, sub { code }
572
573       An iterator reference may be specified for input_data. The only other
574       way is to specify input_data via MCE::Step->init. This prevents
575       MCE::Step from configuring the iterator reference as another user task
576       which will not work.
577
578       Iterators are described under section "SYNTAX for INPUT_DATA" at
579       MCE::Core.
580
581        MCE::Step->init(
582           input_data => iterator
583        );
584
585        mce_step sub { $_ };
586

QUEUE-LIKE FEATURES

588       MCE->step ( item )
589       MCE->step ( arg1, arg2, argN )
590
591       The ->step method is the simplest form for passing elements into the
592       next sub-task.
593
594        use MCE::Step;
595
596        sub provider {
597           MCE->step( $_, rand ) for 10 .. 19;
598        }
599
600        sub consumer {
601           my ( $mce, @args ) = @_;
602           MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
603        }
604
605        MCE::Step->init(
606           task_name   => [ 'p', 'c' ],
607           max_workers => [  1 ,  4  ]
608        );
609
610        mce_step \&provider, \&consumer;
611
612        -- Output
613
614        2: 10, 0.583551
615        4: 11, 0.175319
616        3: 12, 0.843662
617        4: 15, 0.748302
618        2: 14, 0.591752
619        3: 16, 0.357858
620        5: 13, 0.953528
621        4: 17, 0.698907
622        2: 18, 0.985448
623        3: 19, 0.146548
624
625       MCE->enq ( task_name, item )
626       MCE->enq ( task_name, [ arg1, arg2, argN ] )
627       MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
628       MCE->enqp ( task_name, priority, item )
629       MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
630       MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
631
632       The MCE 1.7 release enables finer control. Unlike ->step, which take
633       multiple arguments, the ->enq and ->enqp methods push items at the end
634       of the array internally. Passing multiple arguments is possible by
635       enclosing the arguments inside an anonymous array.
636
637       The direction of flow is forward only. Thus, stepping to itself or
638       backwards will cause an error.
639
640        use MCE::Step;
641
642        sub provider {
643           if ( MCE->wid % 2 == 0 ) {
644              MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
645           } else {
646              MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
647           }
648        }
649
650        sub consumer_c {
651           my ( $mce, $args ) = @_;
652           MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
653        }
654
655        sub consumer_d {
656           my ( $mce, $args ) = @_;
657           MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
658        }
659
660        MCE::Step->init(
661           task_name   => [ 'p', 'c', 'd' ],
662           max_workers => [  2 ,  3 ,  3  ]
663        );
664
665        mce_step \&provider, \&consumer_c, \&consumer_d;
666
667        -- Output
668
669        C4: 10, 0.527531
670        D6: 20, 0.420108
671        C5: 11, 0.839770
672        D8: 21, 0.386414
673        C3: 12, 0.834645
674        C4: 13, 0.191014
675        D6: 23, 0.924027
676        C5: 14, 0.899357
677        D8: 24, 0.706186
678        C4: 15, 0.083823
679        D7: 22, 0.479708
680        D6: 25, 0.073882
681        C3: 16, 0.207446
682        D8: 26, 0.560755
683        C5: 17, 0.198157
684        D7: 27, 0.324909
685        C4: 18, 0.147505
686        C5: 19, 0.318371
687        D6: 28, 0.220465
688        D8: 29, 0.630111
689
690       MCE->await ( task_name, pending_threshold )
691
692       Providers may sometime run faster than consumers. Thus, increasing
693       memory consumption. MCE 1.7 adds the ->await method for pausing
694       momentarily until the receiving sub-task reaches the minimum threshold
695       for the number of items pending in its queue.
696
697        use MCE::Step;
698        use Time::HiRes 'sleep';
699
700        sub provider {
701           for ( 10 .. 29 ) {
702              # wait until 10 or less items pending
703              MCE->await( 'c', 10 );
704              # forward item to a later sub-task ( 'c' comes after 'p' )
705              MCE->enq( 'c', [ $_, rand ] );
706           }
707        }
708
709        sub consumer {
710           my ($mce, $args) = @_;
711           MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
712           sleep 0.05;
713        }
714
715        MCE::Step->init(
716           task_name   => [ 'p', 'c' ],
717           max_workers => [  1 ,  4  ]
718        );
719
720        mce_step \&provider, \&consumer;
721
722        -- Output
723
724        3: 10, 0.527307
725        2: 11, 0.036193
726        5: 12, 0.987168
727        4: 13, 0.998140
728        5: 14, 0.219526
729        4: 15, 0.061609
730        2: 16, 0.557664
731        3: 17, 0.658684
732        4: 18, 0.240932
733        3: 19, 0.241042
734        5: 20, 0.884830
735        2: 21, 0.902223
736        4: 22, 0.699223
737        3: 23, 0.208270
738        5: 24, 0.438919
739        2: 25, 0.268854
740        4: 26, 0.596425
741        5: 27, 0.979818
742        2: 28, 0.918173
743        3: 29, 0.358266
744

GATHERING DATA

746       Unlike MCE::Map where gather and output order are done for you
747       automatically, the gather method is used to have results sent back to
748       the manager process.
749
750        use MCE::Step chunk_size => 1;
751
752        ## Output order is not guaranteed.
753        my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
754        print "@a\n\n";
755
756        ## Outputs to a hash instead (key, value).
757        my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
758        print "@h1{1..100}\n\n";
759
760        ## This does the same thing due to chunk_id starting at one.
761        my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
762        print "@h2{1..100}\n\n";
763
764       The gather method may be called multiple times within the block unlike
765       return which would leave the block. Therefore, think of gather as
766       yielding results immediately to the manager process without actually
767       leaving the block.
768
769        use MCE::Step chunk_size => 1, max_workers => 3;
770
771        my @hosts = qw(
772           hosta hostb hostc hostd hoste
773        );
774
775        my %h3 = mce_step sub {
776           my ($output, $error, $status); my $host = $_;
777
778           ## Do something with $host;
779           $output = "Worker ". MCE->wid .": Hello from $host";
780
781           if (MCE->chunk_id % 3 == 0) {
782              ## Simulating an error condition
783              local $? = 1; $status = $?;
784              $error = "Error from $host"
785           }
786           else {
787              $status = 0;
788           }
789
790           ## Ensure unique keys (key, value) when gathering to
791           ## a hash.
792           MCE->gather("$host.out", $output);
793           MCE->gather("$host.err", $error) if (defined $error);
794           MCE->gather("$host.sta", $status);
795
796        }, @hosts;
797
798        foreach my $host (@hosts) {
799           print $h3{"$host.out"}, "\n";
800           print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
801           print "Exit status: ", $h3{"$host.sta"}, "\n\n";
802        }
803
804        -- Output
805
806        Worker 3: Hello from hosta
807        Exit status: 0
808
809        Worker 2: Hello from hostb
810        Exit status: 0
811
812        Worker 1: Hello from hostc
813        Error from hostc
814        Exit status: 1
815
816        Worker 3: Hello from hostd
817        Exit status: 0
818
819        Worker 2: Hello from hoste
820        Exit status: 0
821
822       The following uses an anonymous array containing 3 elements when
823       gathering data. Serialization is automatic behind the scene.
824
825        my %h3 = mce_step sub {
826           ...
827
828           MCE->gather($host, [$output, $error, $status]);
829
830        }, @hosts;
831
832        foreach my $host (@hosts) {
833           print $h3{$host}->[0], "\n";
834           print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
835           print "Exit status: ", $h3{$host}->[2], "\n\n";
836        }
837
838       Although MCE::Map comes to mind, one may want additional control when
839       gathering data such as retaining output order.
840
841        use MCE::Step;
842
843        sub preserve_order {
844           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
845
846           return sub {
847              $tmp{ (shift) } = \@_;
848
849              while (1) {
850                 last unless exists $tmp{$order_id};
851                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
852              }
853
854              return;
855           };
856        }
857
858        ## Workers persist for the most part after running. Though, not always
859        ## the case and depends on Perl. Pass a reference to a subroutine if
860        ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
861
862        MCE::Step->init(
863           chunk_size => 'auto', max_workers => 'auto'
864        );
865
866        for (1..2) {
867           my @m2;
868
869           mce_step {
870              gather => preserve_order(\@m2)
871           },
872           sub {
873              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
874
875              ## Compute the entire chunk data at once.
876              push @a, map { $_ * 2 } @{ $chunk_ref };
877
878              ## Afterwards, invoke the gather feature, which
879              ## will direct the data to the callback function.
880              MCE->gather(MCE->chunk_id, @a);
881
882           }, 1..100000;
883
884           print scalar @m2, "\n";
885        }
886
887        MCE::Step->finish;
888
889       All 6 models support 'auto' for chunk_size unlike the Core API. Think
890       of the models as the basis for providing JIT for MCE. They create the
891       instance, tune max_workers, and tune chunk_size automatically
892       regardless of the hardware.
893
894       The following does the same thing using the Core API. Workers persist
895       after running.
896
897        use MCE;
898
899        sub preserve_order {
900           ...
901        }
902
903        my $mce = MCE->new(
904           max_workers => 'auto', chunk_size => 8000,
905
906           user_func => sub {
907              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
908
909              ## Compute the entire chunk data at once.
910              push @a, map { $_ * 2 } @{ $chunk_ref };
911
912              ## Afterwards, invoke the gather feature, which
913              ## will direct the data to the callback function.
914              MCE->gather(MCE->chunk_id, @a);
915           }
916        );
917
918        for (1..2) {
919           my @m2;
920
921           $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
922
923           print scalar @m2, "\n";
924        }
925
926        $mce->shutdown;
927

MANUAL SHUTDOWN

929       MCE::Step->finish
930       MCE::Step::finish
931
932       Workers remain persistent as much as possible after running. Shutdown
933       occurs automatically when the script terminates. Call finish when
934       workers are no longer needed.
935
936        use MCE::Step;
937
938        MCE::Step->init(
939           chunk_size => 20, max_workers => 'auto'
940        );
941
942        mce_step sub { ... }, 1..100;
943
944        MCE::Step->finish;
945

INDEX

947       MCE, MCE::Core
948

AUTHOR

950       Mario E. Roy, <marioeroy AT gmail DOT com>
951
952
953
954perl v5.36.0                      2022-07-22                      MCE::Step(3)
Impressum