1MCE::Step(3)          User Contributed Perl Documentation         MCE::Step(3)
2
3
4

NAME

6       MCE::Step - Parallel step model for building creative steps
7

VERSION

9       This document describes MCE::Step version 1.862
10

DESCRIPTION

12       MCE::Step is similar to MCE::Flow for writing custom apps. The main
13       difference comes from the transparent use of queues between sub-tasks.
14       MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
15       QUEUE-LIKE FEATURES below.
16
17       It is trivial to parallelize with mce_stream shown below.
18
19        ## Native map function
20        my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
21
22        ## Same as with MCE::Stream (processing from right to left)
23        @a = mce_stream
24             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
25
26        ## Pass an array reference to have writes occur simultaneously
27        mce_stream \@a,
28             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
29
30       However, let's have MCE::Step compute the same in parallel. Unlike the
31       example in MCE::Flow, the use of MCE::Queue is totally transparent.
32       This calls for preserving output order provided by MCE::Candy.
33
34        use MCE::Step;
35        use MCE::Candy;
36
37       Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as
38       described in MCE::Flow. The call to MCE->step simplifies the passing of
39       data to subsequent sub-task.
40
41        sub task_a {
42           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
43           push @ans, map { $_ * 2 } @{ $chunk_ref };
44           MCE->step(\@ans, $chunk_id);
45        }
46
47        sub task_b {
48           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
49           push @ans, map { $_ * 3 } @{ $chunk_ref };
50           MCE->step(\@ans, $chunk_id);
51        }
52
53        sub task_c {
54           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
55           push @ans, map { $_ * 4 } @{ $chunk_ref };
56           MCE->gather($chunk_id, \@ans);
57        }
58
59       In summary, MCE::Step builds out a MCE instance behind the scene and
60       starts running. The task_name (shown), max_workers, and use_threads
61       options can take an anonymous array for specifying the values uniquely
62       per each sub-task.
63
64       The task_name option is required to use ->enq, ->enqp, and ->await.
65
66        my @a;
67
68        mce_step {
69           task_name => [ 'a', 'b', 'c' ],
70           gather => MCE::Candy::out_iter_array(\@a)
71
72        }, \&task_a, \&task_b, \&task_c, 1..10000;
73
74        print "@a\n";
75

STEP DEMO

77       In the demonstration below, one may call ->gather or ->step any number
78       of times although ->step is not allowed in the last sub-block. Data is
79       gathered to @arr which may likely be out-of-order. Gathering data is
80       optional. All sub-blocks receive $mce as the first argument.
81
82       First, defining 3 sub-tasks.
83
84        use MCE::Step;
85
86        sub task_a {
87           my ($mce, $chunk_ref, $chunk_id) = @_;
88
89           if ($_ % 2 == 0) {
90              MCE->gather($_);
91            # MCE->gather($_ * 4);        ## Ok to gather multiple times
92           }
93           else {
94              MCE->print("a step: $_, $_ * $_\n");
95              MCE->step($_, $_ * $_);
96            # MCE->step($_, $_ * 4 );     ## Ok to step multiple times
97           }
98        }
99
100        sub task_b {
101           my ($mce, $arg1, $arg2) = @_;
102
103           MCE->print("b args: $arg1, $arg2\n");
104
105           if ($_ % 3 == 0) {             ## $_ is the same as $arg1
106              MCE->gather($_);
107           }
108           else {
109              MCE->print("b step: $_ * $_\n");
110              MCE->step($_ * $_);
111           }
112        }
113
114        sub task_c {
115           my ($mce, $arg1) = @_;
116
117           MCE->print("c: $_\n");
118           MCE->gather($_);
119        }
120
121       Next, pass MCE options, using chunk_size 1, and run all 3 tasks in
122       parallel.  Notice how max_workers and use_threads can take an anonymous
123       array, similarly to task_name.
124
125        my @arr = mce_step {
126           task_name   => [ 'a', 'b', 'c' ],
127           max_workers => [  2,   2,   2  ],
128           use_threads => [  0,   0,   0  ],
129           chunk_size  => 1
130
131        }, \&task_a, \&task_b, \&task_c, 1..10;
132
133       Finally, sort the array and display its contents.
134
135        @arr = sort { $a <=> $b } @arr;
136
137        print "\n@arr\n\n";
138
139        -- Output
140
141        a step: 1, 1 * 1
142        a step: 3, 3 * 3
143        a step: 5, 5 * 5
144        a step: 7, 7 * 7
145        a step: 9, 9 * 9
146        b args: 1, 1
147        b step: 1 * 1
148        b args: 3, 9
149        b args: 7, 49
150        b step: 7 * 7
151        b args: 5, 25
152        b step: 5 * 5
153        b args: 9, 81
154        c: 1
155        c: 49
156        c: 25
157
158        1 2 3 4 6 8 9 10 25 49
159

SYNOPSIS when CHUNK_SIZE EQUALS 1

161       Although MCE::Loop may be preferred for running using a single code
162       block, the text below also applies to this module, particularly for the
163       first block.
164
165       All models in MCE default to 'auto' for chunk_size. The arguments for
166       the block are the same as writing a user_func block using the Core API.
167
168       Beginning with MCE 1.5, the next input item is placed into the input
169       scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
170       $chunk_ref containing many items. Basically, line 2 below may be
171       omitted from your code when using $_. One can call MCE->chunk_id to
172       obtain the current chunk id.
173
174        line 1:  user_func => sub {
175        line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
176        line 3:
177        line 4:     $_ points to $chunk_ref->[0]
178        line 5:        in MCE 1.5 when chunk_size == 1
179        line 6:
180        line 7:     $_ points to $chunk_ref
181        line 8:        in MCE 1.5 when chunk_size  > 1
182        line 9:  }
183
184       Follow this synopsis when chunk_size equals one. Looping is not
185       required from inside the first block. Hence, the block is called once
186       per each item.
187
188        ## Exports mce_step, mce_step_f, and mce_step_s
189        use MCE::Step;
190
191        MCE::Step::init {
192           chunk_size => 1
193        };
194
195        ## Array or array_ref
196        mce_step sub { do_work($_) }, 1..10000;
197        mce_step sub { do_work($_) }, \@list;
198
199        ## Important; pass an array_ref for deeply input data
200        mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
201        mce_step sub { do_work($_) }, \@deeply_list;
202
203        ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
204        ## Workers read directly and not involve the manager process
205        mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
206
207        ## Involves the manager process, therefore slower
208        mce_step_f sub { chomp; do_work($_) }, $file_handle;
209        mce_step_f sub { chomp; do_work($_) }, $io;
210        mce_step_f sub { chomp; do_work($_) }, \$scalar;
211
212        ## Sequence of numbers (begin, end [, step, format])
213        mce_step_s sub { do_work($_) }, 1, 10000, 5;
214        mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
215
216        mce_step_s sub { do_work($_) }, {
217           begin => 1, end => 10000, step => 5, format => undef
218        };
219

SYNOPSIS when CHUNK_SIZE is GREATER THAN 1

221       Follow this synopsis when chunk_size equals 'auto' or greater than 1.
222       This means having to loop through the chunk from inside the first
223       block.
224
225        use MCE::Step;
226
227        MCE::Step::init {          ## Chunk_size defaults to 'auto' when
228           chunk_size => 'auto'    ## not specified. Therefore, the init
229        };                         ## function may be omitted.
230
231        ## Syntax is shown for mce_step for demonstration purposes.
232        ## Looping inside the block is the same for mce_step_f and
233        ## mce_step_s.
234
235        ## Array or array_ref
236        mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
237        mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
238
239        ## Important; pass an array_ref for deeply input data
240        mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
241        mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
242
243        ## Resembles code using the core MCE API
244        mce_step sub {
245           my ($mce, $chunk_ref, $chunk_id) = @_;
246
247           for (@{ $chunk_ref }) {
248              do_work($_);
249           }
250
251        }, 1..10000;
252
253       Chunking reduces the number of IPC calls behind the scene. Think in
254       terms of chunks whenever processing a large amount of data. For
255       relatively small data, choosing 1 for chunk_size is fine.
256

OVERRIDING DEFAULTS

258       The following list options which may be overridden when loading the
259       module.
260
261        use Sereal qw( encode_sereal decode_sereal );
262        use CBOR::XS qw( encode_cbor decode_cbor );
263        use JSON::XS qw( encode_json decode_json );
264
265        use MCE::Step
266            max_workers => 8,                # Default 'auto'
267            chunk_size => 500,               # Default 'auto'
268            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
269            freeze => \&encode_sereal,       # \&Storable::freeze
270            thaw => \&decode_sereal,         # \&Storable::thaw
271            fast => 1                        # Default 0 (fast dequeue)
272        ;
273
274       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
275       available.  Specify "Sereal => 0" to use Storable instead.
276
277        use MCE::Step Sereal => 0;
278

CUSTOMIZING MCE

280       MCE::Step->init ( options )
281       MCE::Step::init { options }
282
283       The init function accepts a hash of MCE options. Unlike with
284       MCE::Stream, both gather and bounds_only options may be specified when
285       calling init (not shown below).
286
287        use MCE::Step;
288
289        MCE::Step::init {
290           chunk_size => 1, max_workers => 4,
291
292           user_begin => sub {
293              print "## ", MCE->wid, " started\n";
294           },
295
296           user_end => sub {
297              print "## ", MCE->wid, " completed\n";
298           }
299        };
300
301        my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
302
303        print "\n", "@a{1..100}", "\n";
304
305        -- Output
306
307        ## 3 started
308        ## 1 started
309        ## 4 started
310        ## 2 started
311        ## 3 completed
312        ## 4 completed
313        ## 1 completed
314        ## 2 completed
315
316        1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
317        400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
318        1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
319        2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
320        3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
321        5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
322        7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
323        10000
324
325       Like with MCE::Step::init above, MCE options may be specified using an
326       anonymous hash for the first argument. Notice how task_name,
327       max_workers, and use_threads can take an anonymous array for setting
328       uniquely per each code block.
329
330       Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
331       with the first code block, thus processing from left-to-right.
332
333       The following takes 9 seconds to complete. The 9 seconds is from having
334       only 2 workers assigned for the last sub-task and waiting 1 or 2
335       seconds initially before calling MCE->step.
336
337       Removing both calls to MCE->step will cause the script to complete in
338       just 1 second. The reason is due to the 2nd and subsequent sub-tasks
339       awaiting data from an internal queue. Workers terminate upon receiving
340       an undef.
341
342        use threads;
343        use MCE::Step;
344
345        my @a = mce_step {
346           task_name   => [ 'a', 'b', 'c' ],
347           max_workers => [  3,   4,   2, ],
348           use_threads => [  1,   0,   0, ],
349
350           user_end => sub {
351              my ($mce, $task_id, $task_name) = @_;
352              MCE->print("$task_id - $task_name completed\n");
353           },
354
355           task_end => sub {
356              my ($mce, $task_id, $task_name) = @_;
357              MCE->print("$task_id - $task_name ended\n");
358           }
359        },
360        sub { sleep 1; MCE->step(""); },   ## 3 workers, named a
361        sub { sleep 2; MCE->step(""); },   ## 4 workers, named b
362        sub { sleep 3;                };   ## 2 workers, named c
363
364        -- Output
365
366        0 - a completed
367        0 - a completed
368        0 - a completed
369        0 - a ended
370        1 - b completed
371        1 - b completed
372        1 - b completed
373        1 - b completed
374        1 - b ended
375        2 - c completed
376        2 - c completed
377        2 - c ended
378

API DOCUMENTATION

380       Although input data is optional for MCE::Step, the following assumes
381       chunk_size equals 1 in order to demonstrate all the possibilities for
382       providing input data.
383
384       MCE::Step->run ( sub { code }, list )
385       mce_step sub { code }, list
386
387       Input data may be defined using a list, an array ref, or a hash ref.
388
389       Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Step
390       takes a "sub { ... }" or a code reference. The other difference is that
391       the comma is needed after the block.
392
393        # $_ contains the item when chunk_size => 1
394
395        mce_step sub { do_work($_) }, 1..1000;
396        mce_step sub { do_work($_) }, \@list;
397
398        # Important; pass an array_ref for deeply input data
399
400        mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
401        mce_step sub { do_work($_) }, \@deeply_list;
402
403        # Chunking; any chunk_size => 1 or greater
404
405        my %res = mce_step sub {
406           my ($mce, $chunk_ref, $chunk_id) = @_;
407           my %ret;
408           for my $item (@{ $chunk_ref }) {
409              $ret{$item} = $item * 2;
410           }
411           MCE->gather(%ret);
412        },
413        \@list;
414
415        # Input hash; current API available since 1.828
416
417        my %res = mce_step sub {
418           my ($mce, $chunk_ref, $chunk_id) = @_;
419           my %ret;
420           for my $key (keys %{ $chunk_ref }) {
421              $ret{$key} = $chunk_ref->{$key} * 2;
422           }
423           MCE->gather(%ret);
424        },
425        \%hash;
426
427        # Unlike MCE::Loop, MCE::Step doesn't need input to run
428
429        mce_step { max_workers => 4 }, sub {
430           MCE->say( MCE->wid );
431        };
432
433        # ... and can run multiple tasks
434
435        mce_step {
436           max_workers => [  1,   3  ],
437           task_name   => [ 'p', 'c' ]
438        },
439        sub {
440           # 1 producer
441           MCE->say( "producer: ", MCE->wid );
442        },
443        sub {
444           # 3 consumers
445           MCE->say( "consumer: ", MCE->wid );
446        };
447
448        # Here, options are specified via init
449
450        MCE::Step::init {
451           max_workers => [  1,   3  ],
452           task_name   => [ 'p', 'c' ]
453        };
454
455        mce_step \&producer, \&consumers;
456
457       MCE::Step->run_file ( sub { code }, file )
458       mce_step_f sub { code }, file
459
460       The fastest of these is the /path/to/file. Workers communicate the next
461       offset position among themselves with zero interaction by the manager
462       process.
463
464       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
465
466        # $_ contains the line when chunk_size => 1
467
468        mce_step_f sub { $_ }, "/path/to/file";  # faster
469        mce_step_f sub { $_ }, $file_handle;
470        mce_step_f sub { $_ }, $io;              # IO::All
471        mce_step_f sub { $_ }, \$scalar;
472
473        # chunking, any chunk_size => 1 or greater
474
475        my %res = mce_step_f sub {
476           my ($mce, $chunk_ref, $chunk_id) = @_;
477           my $buf = '';
478           for my $line (@{ $chunk_ref }) {
479              $buf .= $line;
480           }
481           MCE->gather($chunk_id, $buf);
482        },
483        "/path/to/file";
484
485       MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
486       mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
487
488       Sequence may be defined as a list, an array reference, or a hash
489       reference.  The functions require both begin and end values to run.
490       Step and format are optional. The format is passed to sprintf (% may be
491       omitted below).
492
493        my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
494
495        # $_ contains the sequence number when chunk_size => 1
496
497        mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
498        mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
499
500        mce_step_s sub { $_ }, {
501           begin => $beg, end => $end,
502           step => $step, format => $fmt
503        };
504
505        # chunking, any chunk_size => 1 or greater
506
507        my %res = mce_step_s sub {
508           my ($mce, $chunk_ref, $chunk_id) = @_;
509           my $buf = '';
510           for my $seq (@{ $chunk_ref }) {
511              $buf .= "$seq\n";
512           }
513           MCE->gather($chunk_id, $buf);
514        },
515        [ $beg, $end ];
516
517       The sequence engine can compute 'begin' and 'end' items only, for the
518       chunk, and not the items in between (hence boundaries only). This
519       option applies to sequence only and has no effect when chunk_size
520       equals 1.
521
522       The time to run is 0.006s below. This becomes 0.827s without the
523       bounds_only option due to computing all items in between, thus creating
524       a very large array. Basically, specify bounds_only => 1 when boundaries
525       is all you need for looping inside the block; e.g. Monte Carlo
526       simulations.
527
528       Time was measured using 1 worker to emphasize the difference.
529
530        use MCE::Step;
531
532        MCE::Step::init {
533           max_workers => 1, chunk_size => 1_250_000,
534           bounds_only => 1
535        };
536
537        # Typically, the input scalar $_ contains the sequence number
538        # when chunk_size => 1, unless the bounds_only option is set
539        # which is the case here. Thus, $_ points to $chunk_ref.
540
541        mce_step_s sub {
542           my ($mce, $chunk_ref, $chunk_id) = @_;
543
544           # $chunk_ref contains 2 items, not 1_250_000
545           # my ( $begin, $end ) = ( $_->[0], $_->[1] );
546
547           my $begin = $chunk_ref->[0];
548           my $end   = $chunk_ref->[1];
549
550           # for my $seq ( $begin .. $end ) {
551           #    ...
552           # }
553
554           MCE->printf("%7d .. %8d\n", $begin, $end);
555        },
556        [ 1, 10_000_000 ];
557
558        -- Output
559
560              1 ..  1250000
561        1250001 ..  2500000
562        2500001 ..  3750000
563        3750001 ..  5000000
564        5000001 ..  6250000
565        6250001 ..  7500000
566        7500001 ..  8750000
567        8750001 .. 10000000
568
569       MCE::Step->run ( { input_data => iterator }, sub { code } )
570       mce_step { input_data => iterator }, sub { code }
571
572       An iterator reference may be specified for input_data. The only other
573       way is to specify input_data via MCE::Step::init. This prevents
574       MCE::Step from configuring the iterator reference as another user task
575       which will not work.
576
577       Iterators are described under section "SYNTAX for INPUT_DATA" at
578       MCE::Core.
579
580        MCE::Step::init {
581           input_data => iterator
582        };
583
584        mce_step sub { $_ };
585

QUEUE-LIKE FEATURES

587       MCE->step ( item )
588       MCE->step ( arg1, arg2, argN )
589
590       The ->step method is the simplest form for passing elements into the
591       next sub-task.
592
593        use MCE::Step;
594
595        sub provider {
596           MCE->step( $_, rand ) for 10 .. 19;
597        }
598
599        sub consumer {
600           my ( $mce, @args ) = @_;
601           MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
602        }
603
604        MCE::Step::init {
605           task_name   => [ 'p', 'c' ],
606           max_workers => [  1 ,  4  ]
607        };
608
609        mce_step \&provider, \&consumer;
610
611        -- Output
612
613        2: 10, 0.583551
614        4: 11, 0.175319
615        3: 12, 0.843662
616        4: 15, 0.748302
617        2: 14, 0.591752
618        3: 16, 0.357858
619        5: 13, 0.953528
620        4: 17, 0.698907
621        2: 18, 0.985448
622        3: 19, 0.146548
623
624       MCE->enq ( task_name, item )
625       MCE->enq ( task_name, [ arg1, arg2, argN ] )
626       MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
627       MCE->enqp ( task_name, priority, item )
628       MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
629       MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
630
631       The MCE 1.7 release enables finer control. Unlike ->step, which take
632       multiple arguments, the ->enq and ->enqp methods push items at the end
633       of the array internally. Passing multiple arguments is possible by
634       enclosing the arguments inside an anonymous array.
635
636       The direction of flow is forward only. Thus, stepping to itself or
637       backwards will cause an error.
638
639        use MCE::Step;
640
641        sub provider {
642           if ( MCE->wid % 2 == 0 ) {
643              MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
644           } else {
645              MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
646           }
647        }
648
649        sub consumer_c {
650           my ( $mce, $args ) = @_;
651           MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
652        }
653
654        sub consumer_d {
655           my ( $mce, $args ) = @_;
656           MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
657        }
658
659        MCE::Step::init {
660           task_name   => [ 'p', 'c', 'd' ],
661           max_workers => [  2 ,  3 ,  3  ]
662        };
663
664        mce_step \&provider, \&consumer_c, \&consumer_d;
665
666        -- Output
667
668        C4: 10, 0.527531
669        D6: 20, 0.420108
670        C5: 11, 0.839770
671        D8: 21, 0.386414
672        C3: 12, 0.834645
673        C4: 13, 0.191014
674        D6: 23, 0.924027
675        C5: 14, 0.899357
676        D8: 24, 0.706186
677        C4: 15, 0.083823
678        D7: 22, 0.479708
679        D6: 25, 0.073882
680        C3: 16, 0.207446
681        D8: 26, 0.560755
682        C5: 17, 0.198157
683        D7: 27, 0.324909
684        C4: 18, 0.147505
685        C5: 19, 0.318371
686        D6: 28, 0.220465
687        D8: 29, 0.630111
688
689       MCE->await ( task_name, pending_threshold )
690
691       Providers may sometime run faster than consumers. Thus, increasing
692       memory consumption. MCE 1.7 adds the ->await method for pausing
693       momentarily until the receiving sub-task reaches the minimum threshold
694       for the number of items pending in its queue.
695
696        use MCE::Step;
697        use Time::HiRes 'sleep';
698
699        sub provider {
700           for ( 10 .. 29 ) {
701              # wait until 10 or less items pending
702              MCE->await( 'c', 10 );
703              # forward item to a later sub-task ( 'c' comes after 'p' )
704              MCE->enq( 'c', [ $_, rand ] );
705           }
706        }
707
708        sub consumer {
709           my ($mce, $args) = @_;
710           MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
711           sleep 0.05;
712        }
713
714        MCE::Step::init {
715           task_name   => [ 'p', 'c' ],
716           max_workers => [  1 ,  4  ]
717        };
718
719        mce_step \&provider, \&consumer;
720
721        -- Output
722
723        3: 10, 0.527307
724        2: 11, 0.036193
725        5: 12, 0.987168
726        4: 13, 0.998140
727        5: 14, 0.219526
728        4: 15, 0.061609
729        2: 16, 0.557664
730        3: 17, 0.658684
731        4: 18, 0.240932
732        3: 19, 0.241042
733        5: 20, 0.884830
734        2: 21, 0.902223
735        4: 22, 0.699223
736        3: 23, 0.208270
737        5: 24, 0.438919
738        2: 25, 0.268854
739        4: 26, 0.596425
740        5: 27, 0.979818
741        2: 28, 0.918173
742        3: 29, 0.358266
743

GATHERING DATA

745       Unlike MCE::Map where gather and output order are done for you
746       automatically, the gather method is used to have results sent back to
747       the manager process.
748
749        use MCE::Step chunk_size => 1;
750
751        ## Output order is not guaranteed.
752        my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
753        print "@a\n\n";
754
755        ## Outputs to a hash instead (key, value).
756        my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
757        print "@h1{1..100}\n\n";
758
759        ## This does the same thing due to chunk_id starting at one.
760        my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
761        print "@h2{1..100}\n\n";
762
763       The gather method may be called multiple times within the block unlike
764       return which would leave the block. Therefore, think of gather as
765       yielding results immediately to the manager process without actually
766       leaving the block.
767
768        use MCE::Step chunk_size => 1, max_workers => 3;
769
770        my @hosts = qw(
771           hosta hostb hostc hostd hoste
772        );
773
774        my %h3 = mce_step sub {
775           my ($output, $error, $status); my $host = $_;
776
777           ## Do something with $host;
778           $output = "Worker ". MCE->wid .": Hello from $host";
779
780           if (MCE->chunk_id % 3 == 0) {
781              ## Simulating an error condition
782              local $? = 1; $status = $?;
783              $error = "Error from $host"
784           }
785           else {
786              $status = 0;
787           }
788
789           ## Ensure unique keys (key, value) when gathering to
790           ## a hash.
791           MCE->gather("$host.out", $output);
792           MCE->gather("$host.err", $error) if (defined $error);
793           MCE->gather("$host.sta", $status);
794
795        }, @hosts;
796
797        foreach my $host (@hosts) {
798           print $h3{"$host.out"}, "\n";
799           print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
800           print "Exit status: ", $h3{"$host.sta"}, "\n\n";
801        }
802
803        -- Output
804
805        Worker 3: Hello from hosta
806        Exit status: 0
807
808        Worker 2: Hello from hostb
809        Exit status: 0
810
811        Worker 1: Hello from hostc
812        Error from hostc
813        Exit status: 1
814
815        Worker 3: Hello from hostd
816        Exit status: 0
817
818        Worker 2: Hello from hoste
819        Exit status: 0
820
821       The following uses an anonymous array containing 3 elements when
822       gathering data. Serialization is automatic behind the scene.
823
824        my %h3 = mce_step sub {
825           ...
826
827           MCE->gather($host, [$output, $error, $status]);
828
829        }, @hosts;
830
831        foreach my $host (@hosts) {
832           print $h3{$host}->[0], "\n";
833           print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
834           print "Exit status: ", $h3{$host}->[2], "\n\n";
835        }
836
837       Although MCE::Map comes to mind, one may want additional control when
838       gathering data such as retaining output order.
839
840        use MCE::Step;
841
842        sub preserve_order {
843           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
844
845           return sub {
846              $tmp{ (shift) } = \@_;
847
848              while (1) {
849                 last unless exists $tmp{$order_id};
850                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
851              }
852
853              return;
854           };
855        }
856
857        ## Workers persist for the most part after running. Though, not always
858        ## the case and depends on Perl. Pass a reference to a subroutine if
859        ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
860
861        MCE::Step::init {
862           chunk_size => 'auto', max_workers => 'auto'
863        };
864
865        for (1..2) {
866           my @m2;
867
868           mce_step {
869              gather => preserve_order(\@m2)
870           },
871           sub {
872              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
873
874              ## Compute the entire chunk data at once.
875              push @a, map { $_ * 2 } @{ $chunk_ref };
876
877              ## Afterwards, invoke the gather feature, which
878              ## will direct the data to the callback function.
879              MCE->gather(MCE->chunk_id, @a);
880
881           }, 1..100000;
882
883           print scalar @m2, "\n";
884        }
885
886        MCE::Step::finish;
887
888       All 6 models support 'auto' for chunk_size unlike the Core API. Think
889       of the models as the basis for providing JIT for MCE. They create the
890       instance, tune max_workers, and tune chunk_size automatically
891       regardless of the hardware.
892
893       The following does the same thing using the Core API. Workers persist
894       after running.
895
896        use MCE;
897
898        sub preserve_order {
899           ...
900        }
901
902        my $mce = MCE->new(
903           max_workers => 'auto', chunk_size => 8000,
904
905           user_func => sub {
906              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
907
908              ## Compute the entire chunk data at once.
909              push @a, map { $_ * 2 } @{ $chunk_ref };
910
911              ## Afterwards, invoke the gather feature, which
912              ## will direct the data to the callback function.
913              MCE->gather(MCE->chunk_id, @a);
914           }
915        );
916
917        for (1..2) {
918           my @m2;
919
920           $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
921
922           print scalar @m2, "\n";
923        }
924
925        $mce->shutdown;
926

MANUAL SHUTDOWN

928       MCE::Step->finish
929       MCE::Step::finish
930
931       Workers remain persistent as much as possible after running. Shutdown
932       occurs automatically when the script terminates. Call finish when
933       workers are no longer needed.
934
935        use MCE::Step;
936
937        MCE::Step::init {
938           chunk_size => 20, max_workers => 'auto'
939        };
940
941        mce_step sub { ... }, 1..100;
942
943        MCE::Step::finish;
944

INDEX

946       MCE, MCE::Core
947

AUTHOR

949       Mario E. Roy, <marioeroy AT gmail DOT com>
950
951
952
953perl v5.30.0                      2019-09-19                      MCE::Step(3)
Impressum