1MCE::Step(3)          User Contributed Perl Documentation         MCE::Step(3)
2
3
4

NAME

6       MCE::Step - Parallel step model for building creative steps
7

VERSION

9       This document describes MCE::Step version 1.884
10

DESCRIPTION

12       MCE::Step is similar to MCE::Flow for writing custom apps. The main
13       difference comes from the transparent use of queues between sub-tasks.
14       MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
15       QUEUE-LIKE FEATURES below.
16
17       It is trivial to parallelize with mce_stream shown below.
18
19        ## Native map function
20        my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
21
22        ## Same as with MCE::Stream (processing from right to left)
23        @a = mce_stream
24             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
25
26        ## Pass an array reference to have writes occur simultaneously
27        mce_stream \@a,
28             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
29
30       However, let's have MCE::Step compute the same in parallel. Unlike the
31       example in MCE::Flow, the use of MCE::Queue is totally transparent.
32       This calls for preserving output order provided by MCE::Candy.
33
34        use MCE::Step;
35        use MCE::Candy;
36
37       Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as
38       described in MCE::Flow. The call to MCE->step simplifies the passing of
39       data to subsequent sub-task.
40
41        sub task_a {
42           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
43           push @ans, map { $_ * 2 } @{ $chunk_ref };
44           MCE->step(\@ans, $chunk_id);
45        }
46
47        sub task_b {
48           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
49           push @ans, map { $_ * 3 } @{ $chunk_ref };
50           MCE->step(\@ans, $chunk_id);
51        }
52
53        sub task_c {
54           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
55           push @ans, map { $_ * 4 } @{ $chunk_ref };
56           MCE->gather($chunk_id, \@ans);
57        }
58
59       In summary, MCE::Step builds out a MCE instance behind the scene and
60       starts running. The task_name (shown), max_workers, and use_threads
61       options can take an anonymous array for specifying the values uniquely
62       per each sub-task.
63
64       The task_name option is required to use ->enq, ->enqp, and ->await.
65
66        my @a;
67
68        mce_step {
69           task_name => [ 'a', 'b', 'c' ],
70           gather => MCE::Candy::out_iter_array(\@a)
71
72        }, \&task_a, \&task_b, \&task_c, 1..10000;
73
74        print "@a\n";
75

STEP DEMO

77       In the demonstration below, one may call ->gather or ->step any number
78       of times although ->step is not allowed in the last sub-block. Data is
79       gathered to @arr which may likely be out-of-order. Gathering data is
80       optional. All sub-blocks receive $mce as the first argument.
81
82       First, defining 3 sub-tasks.
83
84        use MCE::Step;
85
86        sub task_a {
87           my ($mce, $chunk_ref, $chunk_id) = @_;
88
89           if ($_ % 2 == 0) {
90              MCE->gather($_);
91            # MCE->gather($_ * 4);        ## Ok to gather multiple times
92           }
93           else {
94              MCE->print("a step: $_, $_ * $_\n");
95              MCE->step($_, $_ * $_);
96            # MCE->step($_, $_ * 4 );     ## Ok to step multiple times
97           }
98        }
99
100        sub task_b {
101           my ($mce, $arg1, $arg2) = @_;
102
103           MCE->print("b args: $arg1, $arg2\n");
104
105           if ($_ % 3 == 0) {             ## $_ is the same as $arg1
106              MCE->gather($_);
107           }
108           else {
109              MCE->print("b step: $_ * $_\n");
110              MCE->step($_ * $_);
111           }
112        }
113
114        sub task_c {
115           my ($mce, $arg1) = @_;
116
117           MCE->print("c: $_\n");
118           MCE->gather($_);
119        }
120
121       Next, pass MCE options, using chunk_size 1, and run all 3 tasks in
122       parallel.  Notice how max_workers and use_threads can take an anonymous
123       array, similarly to task_name.
124
125        my @arr = mce_step {
126           task_name   => [ 'a', 'b', 'c' ],
127           max_workers => [  2,   2,   2  ],
128           use_threads => [  0,   0,   0  ],
129           chunk_size  => 1
130
131        }, \&task_a, \&task_b, \&task_c, 1..10;
132
133       Finally, sort the array and display its contents.
134
135        @arr = sort { $a <=> $b } @arr;
136
137        print "\n@arr\n\n";
138
139        -- Output
140
141        a step: 1, 1 * 1
142        a step: 3, 3 * 3
143        a step: 5, 5 * 5
144        a step: 7, 7 * 7
145        a step: 9, 9 * 9
146        b args: 1, 1
147        b step: 1 * 1
148        b args: 3, 9
149        b args: 7, 49
150        b step: 7 * 7
151        b args: 5, 25
152        b step: 5 * 5
153        b args: 9, 81
154        c: 1
155        c: 49
156        c: 25
157
158        1 2 3 4 6 8 9 10 25 49
159

SYNOPSIS when CHUNK_SIZE EQUALS 1

161       Although MCE::Loop may be preferred for running using a single code
162       block, the text below also applies to this module, particularly for the
163       first block.
164
165       All models in MCE default to 'auto' for chunk_size. The arguments for
166       the block are the same as writing a user_func block using the Core API.
167
168       Beginning with MCE 1.5, the next input item is placed into the input
169       scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
170       $chunk_ref containing many items. Basically, line 2 below may be
171       omitted from your code when using $_. One can call MCE->chunk_id to
172       obtain the current chunk id.
173
174        line 1:  user_func => sub {
175        line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
176        line 3:
177        line 4:     $_ points to $chunk_ref->[0]
178        line 5:        in MCE 1.5 when chunk_size == 1
179        line 6:
180        line 7:     $_ points to $chunk_ref
181        line 8:        in MCE 1.5 when chunk_size  > 1
182        line 9:  }
183
184       Follow this synopsis when chunk_size equals one. Looping is not
185       required from inside the first block. Hence, the block is called once
186       per each item.
187
188        ## Exports mce_step, mce_step_f, and mce_step_s
189        use MCE::Step;
190
191        MCE::Step->init(
192           chunk_size => 1
193        );
194
195        ## Array or array_ref
196        mce_step sub { do_work($_) }, 1..10000;
197        mce_step sub { do_work($_) }, \@list;
198
199        ## Important; pass an array_ref for deeply input data
200        mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
201        mce_step sub { do_work($_) }, \@deeply_list;
202
203        ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
204        ## Workers read directly and not involve the manager process
205        mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
206
207        ## Involves the manager process, therefore slower
208        mce_step_f sub { chomp; do_work($_) }, $file_handle;
209        mce_step_f sub { chomp; do_work($_) }, $io;
210        mce_step_f sub { chomp; do_work($_) }, \$scalar;
211
212        ## Sequence of numbers (begin, end [, step, format])
213        mce_step_s sub { do_work($_) }, 1, 10000, 5;
214        mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
215
216        mce_step_s sub { do_work($_) }, {
217           begin => 1, end => 10000, step => 5, format => undef
218        };
219

SYNOPSIS when CHUNK_SIZE is GREATER THAN 1

221       Follow this synopsis when chunk_size equals 'auto' or greater than 1.
222       This means having to loop through the chunk from inside the first
223       block.
224
225        use MCE::Step;
226
227        MCE::Step->init(           ## Chunk_size defaults to 'auto' when
228           chunk_size => 'auto'    ## not specified. Therefore, the init
229        );                         ## function may be omitted.
230
231        ## Syntax is shown for mce_step for demonstration purposes.
232        ## Looping inside the block is the same for mce_step_f and
233        ## mce_step_s.
234
235        ## Array or array_ref
236        mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
237        mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
238
239        ## Important; pass an array_ref for deeply input data
240        mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
241        mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
242
243        ## Resembles code using the core MCE API
244        mce_step sub {
245           my ($mce, $chunk_ref, $chunk_id) = @_;
246
247           for (@{ $chunk_ref }) {
248              do_work($_);
249           }
250
251        }, 1..10000;
252
253       Chunking reduces the number of IPC calls behind the scene. Think in
254       terms of chunks whenever processing a large amount of data. For
255       relatively small data, choosing 1 for chunk_size is fine.
256

OVERRIDING DEFAULTS

258       The following list options which may be overridden when loading the
259       module.  The fast option is obsolete in 1.867 onwards; ignored if
260       specified.
261
262        use Sereal qw( encode_sereal decode_sereal );
263        use CBOR::XS qw( encode_cbor decode_cbor );
264        use JSON::XS qw( encode_json decode_json );
265
266        use MCE::Step
267            max_workers => 8,                # Default 'auto'
268            chunk_size => 500,               # Default 'auto'
269            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
270            freeze => \&encode_sereal,       # \&Storable::freeze
271            thaw => \&decode_sereal,         # \&Storable::thaw
272            init_relay => 0,                 # Default undef; MCE 1.882+
273            use_threads => 0,                # Default undef; MCE 1.882+
274        ;
275
276       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
277       available.  Specify "Sereal => 0" to use Storable instead.
278
279        use MCE::Step Sereal => 0;
280

CUSTOMIZING MCE

282       MCE::Step->init ( options )
283       MCE::Step::init { options }
284
285       The init function accepts a hash of MCE options. Unlike with
286       MCE::Stream, both gather and bounds_only options may be specified when
287       calling init (not shown below).
288
289        use MCE::Step;
290
291        MCE::Step->init(
292           chunk_size => 1, max_workers => 4,
293
294           user_begin => sub {
295              print "## ", MCE->wid, " started\n";
296           },
297
298           user_end => sub {
299              print "## ", MCE->wid, " completed\n";
300           }
301        );
302
303        my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
304
305        print "\n", "@a{1..100}", "\n";
306
307        -- Output
308
309        ## 3 started
310        ## 1 started
311        ## 4 started
312        ## 2 started
313        ## 3 completed
314        ## 4 completed
315        ## 1 completed
316        ## 2 completed
317
318        1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
319        400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
320        1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
321        2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
322        3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
323        5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
324        7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
325        10000
326
327       Like with MCE::Step->init above, MCE options may be specified using an
328       anonymous hash for the first argument. Notice how task_name,
329       max_workers, and use_threads can take an anonymous array for setting
330       uniquely per each code block.
331
332       Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
333       with the first code block, thus processing from left-to-right.
334
335       The following takes 9 seconds to complete. The 9 seconds is from having
336       only 2 workers assigned for the last sub-task and waiting 1 or 2
337       seconds initially before calling MCE->step.
338
339       Removing both calls to MCE->step will cause the script to complete in
340       just 1 second. The reason is due to the 2nd and subsequent sub-tasks
341       awaiting data from an internal queue. Workers terminate upon receiving
342       an undef.
343
344        use threads;
345        use MCE::Step;
346
347        my @a = mce_step {
348           task_name   => [ 'a', 'b', 'c' ],
349           max_workers => [  3,   4,   2, ],
350           use_threads => [  1,   0,   0, ],
351
352           user_end => sub {
353              my ($mce, $task_id, $task_name) = @_;
354              MCE->print("$task_id - $task_name completed\n");
355           },
356
357           task_end => sub {
358              my ($mce, $task_id, $task_name) = @_;
359              MCE->print("$task_id - $task_name ended\n");
360           }
361        },
362        sub { sleep 1; MCE->step(""); },   ## 3 workers, named a
363        sub { sleep 2; MCE->step(""); },   ## 4 workers, named b
364        sub { sleep 3;                };   ## 2 workers, named c
365
366        -- Output
367
368        0 - a completed
369        0 - a completed
370        0 - a completed
371        0 - a ended
372        1 - b completed
373        1 - b completed
374        1 - b completed
375        1 - b completed
376        1 - b ended
377        2 - c completed
378        2 - c completed
379        2 - c ended
380

API DOCUMENTATION

382       Although input data is optional for MCE::Step, the following assumes
383       chunk_size equals 1 in order to demonstrate all the possibilities for
384       providing input data.
385
386       MCE::Step->run ( sub { code }, list )
387       mce_step sub { code }, list
388
389       Input data may be defined using a list, an array ref, or a hash ref.
390
391       Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Step
392       takes a "sub { ... }" or a code reference. The other difference is that
393       the comma is needed after the block.
394
395        # $_ contains the item when chunk_size => 1
396
397        mce_step sub { do_work($_) }, 1..1000;
398        mce_step sub { do_work($_) }, \@list;
399
400        # Important; pass an array_ref for deeply input data
401
402        mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
403        mce_step sub { do_work($_) }, \@deeply_list;
404
405        # Chunking; any chunk_size => 1 or greater
406
407        my %res = mce_step sub {
408           my ($mce, $chunk_ref, $chunk_id) = @_;
409           my %ret;
410           for my $item (@{ $chunk_ref }) {
411              $ret{$item} = $item * 2;
412           }
413           MCE->gather(%ret);
414        },
415        \@list;
416
417        # Input hash; current API available since 1.828
418
419        my %res = mce_step sub {
420           my ($mce, $chunk_ref, $chunk_id) = @_;
421           my %ret;
422           for my $key (keys %{ $chunk_ref }) {
423              $ret{$key} = $chunk_ref->{$key} * 2;
424           }
425           MCE->gather(%ret);
426        },
427        \%hash;
428
429        # Unlike MCE::Loop, MCE::Step doesn't need input to run
430
431        mce_step { max_workers => 4 }, sub {
432           MCE->say( MCE->wid );
433        };
434
435        # ... and can run multiple tasks
436
437        mce_step {
438           max_workers => [  1,   3  ],
439           task_name   => [ 'p', 'c' ]
440        },
441        sub {
442           # 1 producer
443           MCE->say( "producer: ", MCE->wid );
444        },
445        sub {
446           # 3 consumers
447           MCE->say( "consumer: ", MCE->wid );
448        };
449
450        # Here, options are specified via init
451
452        MCE::Step->init(
453           max_workers => [  1,   3  ],
454           task_name   => [ 'p', 'c' ]
455        );
456
457        mce_step \&producer, \&consumers;
458
459       MCE::Step->run_file ( sub { code }, file )
460       mce_step_f sub { code }, file
461
462       The fastest of these is the /path/to/file. Workers communicate the next
463       offset position among themselves with zero interaction by the manager
464       process.
465
466       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
467
468        # $_ contains the line when chunk_size => 1
469
470        mce_step_f sub { $_ }, "/path/to/file";  # faster
471        mce_step_f sub { $_ }, $file_handle;
472        mce_step_f sub { $_ }, $io;              # IO::All
473        mce_step_f sub { $_ }, \$scalar;
474
475        # chunking, any chunk_size => 1 or greater
476
477        my %res = mce_step_f sub {
478           my ($mce, $chunk_ref, $chunk_id) = @_;
479           my $buf = '';
480           for my $line (@{ $chunk_ref }) {
481              $buf .= $line;
482           }
483           MCE->gather($chunk_id, $buf);
484        },
485        "/path/to/file";
486
487       MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
488       mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
489
490       Sequence may be defined as a list, an array reference, or a hash
491       reference.  The functions require both begin and end values to run.
492       Step and format are optional. The format is passed to sprintf (% may be
493       omitted below).
494
495        my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
496
497        # $_ contains the sequence number when chunk_size => 1
498
499        mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
500        mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
501
502        mce_step_s sub { $_ }, {
503           begin => $beg, end => $end,
504           step => $step, format => $fmt
505        };
506
507        # chunking, any chunk_size => 1 or greater
508
509        my %res = mce_step_s sub {
510           my ($mce, $chunk_ref, $chunk_id) = @_;
511           my $buf = '';
512           for my $seq (@{ $chunk_ref }) {
513              $buf .= "$seq\n";
514           }
515           MCE->gather($chunk_id, $buf);
516        },
517        [ $beg, $end ];
518
519       The sequence engine can compute 'begin' and 'end' items only, for the
520       chunk, and not the items in between (hence boundaries only). This
521       option applies to sequence only and has no effect when chunk_size
522       equals 1.
523
524       The time to run is 0.006s below. This becomes 0.827s without the
525       bounds_only option due to computing all items in between, thus creating
526       a very large array. Basically, specify bounds_only => 1 when boundaries
527       is all you need for looping inside the block; e.g. Monte Carlo
528       simulations.
529
530       Time was measured using 1 worker to emphasize the difference.
531
532        use MCE::Step;
533
534        MCE::Step->init(
535           max_workers => 1, chunk_size => 1_250_000,
536           bounds_only => 1
537        );
538
539        # Typically, the input scalar $_ contains the sequence number
540        # when chunk_size => 1, unless the bounds_only option is set
541        # which is the case here. Thus, $_ points to $chunk_ref.
542
543        mce_step_s sub {
544           my ($mce, $chunk_ref, $chunk_id) = @_;
545
546           # $chunk_ref contains 2 items, not 1_250_000
547           # my ( $begin, $end ) = ( $_->[0], $_->[1] );
548
549           my $begin = $chunk_ref->[0];
550           my $end   = $chunk_ref->[1];
551
552           # for my $seq ( $begin .. $end ) {
553           #    ...
554           # }
555
556           MCE->printf("%7d .. %8d\n", $begin, $end);
557        },
558        [ 1, 10_000_000 ];
559
560        -- Output
561
562              1 ..  1250000
563        1250001 ..  2500000
564        2500001 ..  3750000
565        3750001 ..  5000000
566        5000001 ..  6250000
567        6250001 ..  7500000
568        7500001 ..  8750000
569        8750001 .. 10000000
570
571       MCE::Step->run ( { input_data => iterator }, sub { code } )
572       mce_step { input_data => iterator }, sub { code }
573
574       An iterator reference may be specified for input_data. The only other
575       way is to specify input_data via MCE::Step->init. This prevents
576       MCE::Step from configuring the iterator reference as another user task
577       which will not work.
578
579       Iterators are described under section "SYNTAX for INPUT_DATA" at
580       MCE::Core.
581
582        MCE::Step->init(
583           input_data => iterator
584        );
585
586        mce_step sub { $_ };
587

QUEUE-LIKE FEATURES

589       MCE->step ( item )
590       MCE->step ( arg1, arg2, argN )
591
592       The ->step method is the simplest form for passing elements into the
593       next sub-task.
594
595        use MCE::Step;
596
597        sub provider {
598           MCE->step( $_, rand ) for 10 .. 19;
599        }
600
601        sub consumer {
602           my ( $mce, @args ) = @_;
603           MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
604        }
605
606        MCE::Step->init(
607           task_name   => [ 'p', 'c' ],
608           max_workers => [  1 ,  4  ]
609        );
610
611        mce_step \&provider, \&consumer;
612
613        -- Output
614
615        2: 10, 0.583551
616        4: 11, 0.175319
617        3: 12, 0.843662
618        4: 15, 0.748302
619        2: 14, 0.591752
620        3: 16, 0.357858
621        5: 13, 0.953528
622        4: 17, 0.698907
623        2: 18, 0.985448
624        3: 19, 0.146548
625
626       MCE->enq ( task_name, item )
627       MCE->enq ( task_name, [ arg1, arg2, argN ] )
628       MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
629       MCE->enqp ( task_name, priority, item )
630       MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
631       MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
632
633       The MCE 1.7 release enables finer control. Unlike ->step, which take
634       multiple arguments, the ->enq and ->enqp methods push items at the end
635       of the array internally. Passing multiple arguments is possible by
636       enclosing the arguments inside an anonymous array.
637
638       The direction of flow is forward only. Thus, stepping to itself or
639       backwards will cause an error.
640
641        use MCE::Step;
642
643        sub provider {
644           if ( MCE->wid % 2 == 0 ) {
645              MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
646           } else {
647              MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
648           }
649        }
650
651        sub consumer_c {
652           my ( $mce, $args ) = @_;
653           MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
654        }
655
656        sub consumer_d {
657           my ( $mce, $args ) = @_;
658           MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
659        }
660
661        MCE::Step->init(
662           task_name   => [ 'p', 'c', 'd' ],
663           max_workers => [  2 ,  3 ,  3  ]
664        );
665
666        mce_step \&provider, \&consumer_c, \&consumer_d;
667
668        -- Output
669
670        C4: 10, 0.527531
671        D6: 20, 0.420108
672        C5: 11, 0.839770
673        D8: 21, 0.386414
674        C3: 12, 0.834645
675        C4: 13, 0.191014
676        D6: 23, 0.924027
677        C5: 14, 0.899357
678        D8: 24, 0.706186
679        C4: 15, 0.083823
680        D7: 22, 0.479708
681        D6: 25, 0.073882
682        C3: 16, 0.207446
683        D8: 26, 0.560755
684        C5: 17, 0.198157
685        D7: 27, 0.324909
686        C4: 18, 0.147505
687        C5: 19, 0.318371
688        D6: 28, 0.220465
689        D8: 29, 0.630111
690
691       MCE->await ( task_name, pending_threshold )
692
693       Providers may sometime run faster than consumers. Thus, increasing
694       memory consumption. MCE 1.7 adds the ->await method for pausing
695       momentarily until the receiving sub-task reaches the minimum threshold
696       for the number of items pending in its queue.
697
698        use MCE::Step;
699        use Time::HiRes 'sleep';
700
701        sub provider {
702           for ( 10 .. 29 ) {
703              # wait until 10 or less items pending
704              MCE->await( 'c', 10 );
705              # forward item to a later sub-task ( 'c' comes after 'p' )
706              MCE->enq( 'c', [ $_, rand ] );
707           }
708        }
709
710        sub consumer {
711           my ($mce, $args) = @_;
712           MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
713           sleep 0.05;
714        }
715
716        MCE::Step->init(
717           task_name   => [ 'p', 'c' ],
718           max_workers => [  1 ,  4  ]
719        );
720
721        mce_step \&provider, \&consumer;
722
723        -- Output
724
725        3: 10, 0.527307
726        2: 11, 0.036193
727        5: 12, 0.987168
728        4: 13, 0.998140
729        5: 14, 0.219526
730        4: 15, 0.061609
731        2: 16, 0.557664
732        3: 17, 0.658684
733        4: 18, 0.240932
734        3: 19, 0.241042
735        5: 20, 0.884830
736        2: 21, 0.902223
737        4: 22, 0.699223
738        3: 23, 0.208270
739        5: 24, 0.438919
740        2: 25, 0.268854
741        4: 26, 0.596425
742        5: 27, 0.979818
743        2: 28, 0.918173
744        3: 29, 0.358266
745

GATHERING DATA

747       Unlike MCE::Map where gather and output order are done for you
748       automatically, the gather method is used to have results sent back to
749       the manager process.
750
751        use MCE::Step chunk_size => 1;
752
753        ## Output order is not guaranteed.
754        my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
755        print "@a\n\n";
756
757        ## Outputs to a hash instead (key, value).
758        my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
759        print "@h1{1..100}\n\n";
760
761        ## This does the same thing due to chunk_id starting at one.
762        my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
763        print "@h2{1..100}\n\n";
764
765       The gather method may be called multiple times within the block unlike
766       return which would leave the block. Therefore, think of gather as
767       yielding results immediately to the manager process without actually
768       leaving the block.
769
770        use MCE::Step chunk_size => 1, max_workers => 3;
771
772        my @hosts = qw(
773           hosta hostb hostc hostd hoste
774        );
775
776        my %h3 = mce_step sub {
777           my ($output, $error, $status); my $host = $_;
778
779           ## Do something with $host;
780           $output = "Worker ". MCE->wid .": Hello from $host";
781
782           if (MCE->chunk_id % 3 == 0) {
783              ## Simulating an error condition
784              local $? = 1; $status = $?;
785              $error = "Error from $host"
786           }
787           else {
788              $status = 0;
789           }
790
791           ## Ensure unique keys (key, value) when gathering to
792           ## a hash.
793           MCE->gather("$host.out", $output);
794           MCE->gather("$host.err", $error) if (defined $error);
795           MCE->gather("$host.sta", $status);
796
797        }, @hosts;
798
799        foreach my $host (@hosts) {
800           print $h3{"$host.out"}, "\n";
801           print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
802           print "Exit status: ", $h3{"$host.sta"}, "\n\n";
803        }
804
805        -- Output
806
807        Worker 3: Hello from hosta
808        Exit status: 0
809
810        Worker 2: Hello from hostb
811        Exit status: 0
812
813        Worker 1: Hello from hostc
814        Error from hostc
815        Exit status: 1
816
817        Worker 3: Hello from hostd
818        Exit status: 0
819
820        Worker 2: Hello from hoste
821        Exit status: 0
822
823       The following uses an anonymous array containing 3 elements when
824       gathering data. Serialization is automatic behind the scene.
825
826        my %h3 = mce_step sub {
827           ...
828
829           MCE->gather($host, [$output, $error, $status]);
830
831        }, @hosts;
832
833        foreach my $host (@hosts) {
834           print $h3{$host}->[0], "\n";
835           print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
836           print "Exit status: ", $h3{$host}->[2], "\n\n";
837        }
838
839       Although MCE::Map comes to mind, one may want additional control when
840       gathering data such as retaining output order.
841
842        use MCE::Step;
843
844        sub preserve_order {
845           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
846
847           return sub {
848              $tmp{ (shift) } = \@_;
849
850              while (1) {
851                 last unless exists $tmp{$order_id};
852                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
853              }
854
855              return;
856           };
857        }
858
859        ## Workers persist for the most part after running. Though, not always
860        ## the case and depends on Perl. Pass a reference to a subroutine if
861        ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
862
863        MCE::Step->init(
864           chunk_size => 'auto', max_workers => 'auto'
865        );
866
867        for (1..2) {
868           my @m2;
869
870           mce_step {
871              gather => preserve_order(\@m2)
872           },
873           sub {
874              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
875
876              ## Compute the entire chunk data at once.
877              push @a, map { $_ * 2 } @{ $chunk_ref };
878
879              ## Afterwards, invoke the gather feature, which
880              ## will direct the data to the callback function.
881              MCE->gather(MCE->chunk_id, @a);
882
883           }, 1..100000;
884
885           print scalar @m2, "\n";
886        }
887
888        MCE::Step->finish;
889
890       All 6 models support 'auto' for chunk_size unlike the Core API. Think
891       of the models as the basis for providing JIT for MCE. They create the
892       instance, tune max_workers, and tune chunk_size automatically
893       regardless of the hardware.
894
895       The following does the same thing using the Core API. Workers persist
896       after running.
897
898        use MCE;
899
900        sub preserve_order {
901           ...
902        }
903
904        my $mce = MCE->new(
905           max_workers => 'auto', chunk_size => 8000,
906
907           user_func => sub {
908              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
909
910              ## Compute the entire chunk data at once.
911              push @a, map { $_ * 2 } @{ $chunk_ref };
912
913              ## Afterwards, invoke the gather feature, which
914              ## will direct the data to the callback function.
915              MCE->gather(MCE->chunk_id, @a);
916           }
917        );
918
919        for (1..2) {
920           my @m2;
921
922           $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
923
924           print scalar @m2, "\n";
925        }
926
927        $mce->shutdown;
928

MANUAL SHUTDOWN

930       MCE::Step->finish
931       MCE::Step::finish
932
933       Workers remain persistent as much as possible after running. Shutdown
934       occurs automatically when the script terminates. Call finish when
935       workers are no longer needed.
936
937        use MCE::Step;
938
939        MCE::Step->init(
940           chunk_size => 20, max_workers => 'auto'
941        );
942
943        mce_step sub { ... }, 1..100;
944
945        MCE::Step->finish;
946

INDEX

948       MCE, MCE::Core
949

AUTHOR

951       Mario E. Roy, <marioeroy AT gmail DOT com>
952
953
954
955perl v5.36.0                      2023-01-20                      MCE::Step(3)
Impressum