1MCE::Step(3) User Contributed Perl Documentation MCE::Step(3)
2
3
4
6 MCE::Step - Parallel step model for building creative steps
7
9 This document describes MCE::Step version 1.837
10
12 MCE::Step is similar to MCE::Flow for writing custom apps. The main
13 difference comes from the transparent use of queues between sub-tasks.
14 MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
15 QUEUE-LIKE FEATURES below.
16
17 It is trivial to parallelize with mce_stream shown below.
18
19 ## Native map function
20 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
21
22 ## Same as with MCE::Stream (processing from right to left)
23 @a = mce_stream
24 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
25
26 ## Pass an array reference to have writes occur simultaneously
27 mce_stream \@a,
28 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
29
30 However, let's have MCE::Step compute the same in parallel. Unlike the
31 example in MCE::Flow, the use of MCE::Queue is totally transparent.
32 This calls for preserving output order provided by MCE::Candy.
33
34 use MCE::Step;
35 use MCE::Candy;
36
37 Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as
38 described in MCE::Flow. The call to MCE->step simplifies the passing of
39 data to subsequent sub-task.
40
41 sub task_a {
42 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
43 push @ans, map { $_ * 2 } @{ $chunk_ref };
44 MCE->step(\@ans, $chunk_id);
45 }
46
47 sub task_b {
48 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
49 push @ans, map { $_ * 3 } @{ $chunk_ref };
50 MCE->step(\@ans, $chunk_id);
51 }
52
53 sub task_c {
54 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
55 push @ans, map { $_ * 4 } @{ $chunk_ref };
56 MCE->gather($chunk_id, \@ans);
57 }
58
59 In summary, MCE::Step builds out a MCE instance behind the scene and
60 starts running. The task_name (shown), max_workers, and use_threads
61 options can take an anonymous array for specifying the values uniquely
62 per each sub-task.
63
64 The task_name option is required to use ->enq, ->enqp, and ->await.
65
66 my @a;
67
68 mce_step {
69 task_name => [ 'a', 'b', 'c' ],
70 gather => MCE::Candy::out_iter_array(\@a)
71
72 }, \&task_a, \&task_b, \&task_c, 1..10000;
73
74 print "@a\n";
75
77 In the demonstration below, one may call ->gather or ->step any number
78 of times although ->step is not allowed in the last sub-block. Data is
79 gathered to @arr which may likely be out-of-order. Gathering data is
80 optional. All sub-blocks receive $mce as the first argument.
81
82 First, defining 3 sub-tasks.
83
84 use MCE::Step;
85
86 sub task_a {
87 my ($mce, $chunk_ref, $chunk_id) = @_;
88
89 if ($_ % 2 == 0) {
90 MCE->gather($_);
91 # MCE->gather($_ * 4); ## Ok to gather multiple times
92 }
93 else {
94 MCE->print("a step: $_, $_ * $_\n");
95 MCE->step($_, $_ * $_);
96 # MCE->step($_, $_ * 4 ); ## Ok to step multiple times
97 }
98 }
99
100 sub task_b {
101 my ($mce, $arg1, $arg2) = @_;
102
103 MCE->print("b args: $arg1, $arg2\n");
104
105 if ($_ % 3 == 0) { ## $_ is the same as $arg1
106 MCE->gather($_);
107 }
108 else {
109 MCE->print("b step: $_ * $_\n");
110 MCE->step($_ * $_);
111 }
112 }
113
114 sub task_c {
115 my ($mce, $arg1) = @_;
116
117 MCE->print("c: $_\n");
118 MCE->gather($_);
119 }
120
121 Next, pass MCE options, using chunk_size 1, and run all 3 tasks in
122 parallel. Notice how max_workers and use_threads can take an anonymous
123 array, similarly to task_name.
124
125 my @arr = mce_step {
126 task_name => [ 'a', 'b', 'c' ],
127 max_workers => [ 2, 2, 2 ],
128 use_threads => [ 0, 0, 0 ],
129 chunk_size => 1
130
131 }, \&task_a, \&task_b, \&task_c, 1..10;
132
133 Finally, sort the array and display its contents.
134
135 @arr = sort { $a <=> $b } @arr;
136
137 print "\n@arr\n\n";
138
139 -- Output
140
141 a step: 1, 1 * 1
142 a step: 3, 3 * 3
143 a step: 5, 5 * 5
144 a step: 7, 7 * 7
145 a step: 9, 9 * 9
146 b args: 1, 1
147 b step: 1 * 1
148 b args: 3, 9
149 b args: 7, 49
150 b step: 7 * 7
151 b args: 5, 25
152 b step: 5 * 5
153 b args: 9, 81
154 c: 1
155 c: 49
156 c: 25
157
158 1 2 3 4 6 8 9 10 25 49
159
161 Although MCE::Loop may be preferred for running using a single code
162 block, the text below also applies to this module, particularly for the
163 first block.
164
165 All models in MCE default to 'auto' for chunk_size. The arguments for
166 the block are the same as writing a user_func block using the Core API.
167
168 Beginning with MCE 1.5, the next input item is placed into the input
169 scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
170 $chunk_ref containing many items. Basically, line 2 below may be
171 omitted from your code when using $_. One can call MCE->chunk_id to
172 obtain the current chunk id.
173
174 line 1: user_func => sub {
175 line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
176 line 3:
177 line 4: $_ points to $chunk_ref->[0]
178 line 5: in MCE 1.5 when chunk_size == 1
179 line 6:
180 line 7: $_ points to $chunk_ref
181 line 8: in MCE 1.5 when chunk_size > 1
182 line 9: }
183
184 Follow this synopsis when chunk_size equals one. Looping is not
185 required from inside the first block. Hence, the block is called once
186 per each item.
187
188 ## Exports mce_step, mce_step_f, and mce_step_s
189 use MCE::Step;
190
191 MCE::Step::init {
192 chunk_size => 1
193 };
194
195 ## Array or array_ref
196 mce_step sub { do_work($_) }, 1..10000;
197 mce_step sub { do_work($_) }, [ 1..10000 ];
198
199 ## File_path, glob_ref, or scalar_ref
200 mce_step_f sub { chomp; do_work($_) }, "/path/to/file";
201 mce_step_f sub { chomp; do_work($_) }, $file_handle;
202 mce_step_f sub { chomp; do_work($_) }, \$scalar;
203
204 ## Sequence of numbers (begin, end [, step, format])
205 mce_step_s sub { do_work($_) }, 1, 10000, 5;
206 mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
207
208 mce_step_s sub { do_work($_) }, {
209 begin => 1, end => 10000, step => 5, format => undef
210 };
211
213 Follow this synopsis when chunk_size equals 'auto' or greater than 1.
214 This means having to loop through the chunk from inside the first
215 block.
216
217 use MCE::Step;
218
219 MCE::Step::init { ## Chunk_size defaults to 'auto' when
220 chunk_size => 'auto' ## not specified. Therefore, the init
221 }; ## function may be omitted.
222
223 ## Syntax is shown for mce_step for demonstration purposes.
224 ## Looping inside the block is the same for mce_step_f and
225 ## mce_step_s.
226
227 mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
228
229 ## Same as above, resembles code using the Core API.
230
231 mce_step sub {
232 my ($mce, $chunk_ref, $chunk_id) = @_;
233
234 for (@{ $chunk_ref }) {
235 do_work($_);
236 }
237
238 }, 1..10000;
239
240 Chunking reduces the number of IPC calls behind the scene. Think in
241 terms of chunks whenever processing a large amount of data. For
242 relatively small data, choosing 1 for chunk_size is fine.
243
245 The following list options which may be overridden when loading the
246 module.
247
248 use Sereal qw( encode_sereal decode_sereal );
249 use CBOR::XS qw( encode_cbor decode_cbor );
250 use JSON::XS qw( encode_json decode_json );
251
252 use MCE::Step
253 max_workers => 8, # Default 'auto'
254 chunk_size => 500, # Default 'auto'
255 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
256 freeze => \&encode_sereal, # \&Storable::freeze
257 thaw => \&decode_sereal, # \&Storable::thaw
258 fast => 1 # Default 0 (fast dequeue)
259 ;
260
261 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
262 available. Specify "Sereal =" 0> to use Storable instead.
263
264 use MCE::Step Sereal => 0;
265
267 MCE::Step->init ( options )
268 MCE::Step::init { options }
269 The init function accepts a hash of MCE options. Unlike with
270 MCE::Stream, both gather and bounds_only options may be specified
271 when calling init (not shown below).
272
273 use MCE::Step;
274
275 MCE::Step::init {
276 chunk_size => 1, max_workers => 4,
277
278 user_begin => sub {
279 print "## ", MCE->wid, " started\n";
280 },
281
282 user_end => sub {
283 print "## ", MCE->wid, " completed\n";
284 }
285 };
286
287 my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
288
289 print "\n", "@a{1..100}", "\n";
290
291 -- Output
292
293 ## 3 started
294 ## 1 started
295 ## 4 started
296 ## 2 started
297 ## 3 completed
298 ## 4 completed
299 ## 1 completed
300 ## 2 completed
301
302 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
303 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
304 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
305 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
306 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
307 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
308 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
309 10000
310
311 Like with MCE::Step::init above, MCE options may be specified using an
312 anonymous hash for the first argument. Notice how task_name,
313 max_workers, and use_threads can take an anonymous array for setting
314 uniquely per each code block.
315
316 Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
317 with the first code block, thus processing from left-to-right.
318
319 The following takes 9 seconds to complete. The 9 seconds is from having
320 only 2 workers assigned for the last sub-task and waiting 1 or 2
321 seconds initially before calling MCE->step.
322
323 Removing both calls to MCE->step will cause the script to complete in
324 just 1 second. The reason is due to the 2nd and subsequent sub-tasks
325 awaiting data from an internal queue. Workers terminate upon receiving
326 an undef.
327
328 use threads;
329 use MCE::Step;
330
331 my @a = mce_step {
332 task_name => [ 'a', 'b', 'c' ],
333 max_workers => [ 3, 4, 2, ],
334 use_threads => [ 1, 0, 0, ],
335
336 user_end => sub {
337 my ($mce, $task_id, $task_name) = @_;
338 MCE->print("$task_id - $task_name completed\n");
339 },
340
341 task_end => sub {
342 my ($mce, $task_id, $task_name) = @_;
343 MCE->print("$task_id - $task_name ended\n");
344 }
345 },
346 sub { sleep 1; MCE->step(""); }, ## 3 workers, named a
347 sub { sleep 2; MCE->step(""); }, ## 4 workers, named b
348 sub { sleep 3; }; ## 2 workers, named c
349
350 -- Output
351
352 0 - a completed
353 0 - a completed
354 0 - a completed
355 0 - a ended
356 1 - b completed
357 1 - b completed
358 1 - b completed
359 1 - b completed
360 1 - b ended
361 2 - c completed
362 2 - c completed
363 2 - c ended
364
366 Although input data is optional for MCE::Step, the following assumes
367 chunk_size equals 1 in order to demonstrate all the possibilities for
368 providing input data.
369
370 MCE::Step->run ( sub { code }, list )
371 mce_step sub { code }, list
372 Input data may be defined using a list, an array ref, or a hash ref.
373
374 Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }",
375 Step takes a "sub { ... }" or a code reference. The other difference
376 is that the comma is needed after the block.
377
378 # $_ contains the item when chunk_size => 1
379
380 mce_step sub { $_ }, 1..1000;
381 mce_step sub { $_ }, \@list;
382
383 # chunking, any chunk_size => 1 or higher
384
385 my %res = mce_step sub {
386 my ($mce, $chunk_ref, $chunk_id) = @_;
387 my %ret;
388 for my $item (@{ $chunk_ref }) {
389 $ret{$item} = $item * 2;
390 }
391 MCE->gather(%ret);
392 },
393 \@list;
394
395 # input hash, current API available since 1.828
396
397 my %res = mce_step sub {
398 my ($mce, $chunk_ref, $chunk_id) = @_;
399 my %ret;
400 for my $key (keys %{ $chunk_ref }) {
401 $ret{$key} = $chunk_ref->{$key} * 2;
402 }
403 MCE->gather(%ret);
404 },
405 \%hash;
406
407 # unlike MCE::Loop, MCE::Step doesn't need input to run
408
409 mce_step { max_workers => 4 }, sub {
410 MCE->say( MCE->wid );
411 };
412
413 # ... and can run multiple tasks
414
415 mce_step {
416 max_workers => [ 1, 3 ],
417 task_name => [ 'p', 'c' ]
418 },
419 sub {
420 # 1 producer
421 MCE->say( "producer: ", MCE->wid );
422 },
423 sub {
424 # 3 consumers
425 MCE->say( "consumer: ", MCE->wid );
426 };
427
428 # here, options are specified via init
429
430 MCE::Step::init {
431 max_workers => [ 1, 3 ],
432 task_name => [ 'p', 'c' ]
433 };
434
435 mce_step \&producer, \&consumers;
436
437 MCE::Step->run_file ( sub { code }, file )
438 mce_step_f sub { code }, file
439 The fastest of these is the /path/to/file. Workers communicate the
440 next offset position among themselves with zero interaction by the
441 manager process.
442
443 # $_ contains the line when chunk_size => 1
444
445 mce_step_f sub { $_ }, "/path/to/file"; # faster
446 mce_step_f sub { $_ }, $file_handle;
447 mce_step_f sub { $_ }, \$scalar;
448
449 # chunking, any chunk_size => 1 or higher
450
451 my %res = mce_step_f sub {
452 my ($mce, $chunk_ref, $chunk_id) = @_;
453 my $buf = '';
454 for my $line (@{ $chunk_ref }) {
455 $buf .= $line;
456 }
457 MCE->gather($chunk_id, $buf);
458 },
459 "/path/to/file";
460
461 MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
462 mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
463 Sequence may be defined as a list, an array reference, or a hash
464 reference. The functions require both begin and end values to run.
465 Step and format are optional. The format is passed to sprintf (% may
466 be omitted below).
467
468 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
469
470 # $_ contains the sequence number when chunk_size => 1
471
472 mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
473 mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
474
475 mce_step_s sub { $_ }, {
476 begin => $beg, end => $end,
477 step => $step, format => $fmt
478 };
479
480 # chunking, any chunk_size => 1 or higher
481
482 my %res = mce_step_s sub {
483 my ($mce, $chunk_ref, $chunk_id) = @_;
484 my $buf = '';
485 for my $seq (@{ $chunk_ref }) {
486 $buf .= "$seq\n";
487 }
488 MCE->gather($chunk_id, $buf);
489 },
490 [ $beg, $end ];
491
492 The sequence engine can compute 'begin' and 'end' items only, for
493 the chunk, and not the items in between (hence boundaries only).
494 This option applies to sequence only and has no effect when
495 chunk_size equals 1.
496
497 The time to run is 0.006s below. This becomes 0.827s without the
498 bounds_only option due to computing all items in between, thus
499 creating a very large array. Basically, specify bounds_only => 1
500 when boundaries is all you need for looping inside the block; e.g.
501 Monte Carlo simulations.
502
503 Time was measured using 1 worker to emphasize the difference.
504
505 use MCE::Step;
506
507 MCE::Step::init {
508 max_workers => 1, chunk_size => 1_250_000,
509 bounds_only => 1
510 };
511
512 # Typically, the input scalar $_ contains the sequence number
513 # when chunk_size => 1, unless the bounds_only option is set
514 # which is the case here. Thus, $_ points to $chunk_ref.
515
516 mce_step_s sub {
517 my ($mce, $chunk_ref, $chunk_id) = @_;
518
519 # $chunk_ref contains 2 items, not 1_250_000
520 # my ( $begin, $end ) = ( $_->[0], $_->[1] );
521
522 my $begin = $chunk_ref->[0];
523 my $end = $chunk_ref->[1];
524
525 # for my $seq ( $begin .. $end ) {
526 # ...
527 # }
528
529 MCE->printf("%7d .. %8d\n", $begin, $end);
530 },
531 [ 1, 10_000_000 ];
532
533 -- Output
534
535 1 .. 1250000
536 1250001 .. 2500000
537 2500001 .. 3750000
538 3750001 .. 5000000
539 5000001 .. 6250000
540 6250001 .. 7500000
541 7500001 .. 8750000
542 8750001 .. 10000000
543
544 MCE::Step->run ( { input_data => iterator }, sub { code } )
545 mce_step { input_data => iterator }, sub { code }
546 An iterator reference may be specified for input_data. The only
547 other way is to specify input_data via MCE::Step::init. This
548 prevents MCE::Step from configuring the iterator reference as
549 another user task which will not work.
550
551 Iterators are described under section "SYNTAX for INPUT_DATA" at
552 MCE::Core.
553
554 MCE::Step::init {
555 input_data => iterator
556 };
557
558 mce_step sub { $_ };
559
561 MCE->step ( item )
562 MCE->step ( arg1, arg2, argN )
563 The ->step method is the simplest form for passing elements into the
564 next sub-task.
565
566 use MCE::Step;
567
568 sub provider {
569 MCE->step( $_, rand ) for 10 .. 19;
570 }
571
572 sub consumer {
573 my ( $mce, @args ) = @_;
574 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
575 }
576
577 MCE::Step::init {
578 task_name => [ 'p', 'c' ],
579 max_workers => [ 1 , 4 ]
580 };
581
582 mce_step \&provider, \&consumer;
583
584 -- Output
585
586 2: 10, 0.583551
587 4: 11, 0.175319
588 3: 12, 0.843662
589 4: 15, 0.748302
590 2: 14, 0.591752
591 3: 16, 0.357858
592 5: 13, 0.953528
593 4: 17, 0.698907
594 2: 18, 0.985448
595 3: 19, 0.146548
596
597 MCE->enq ( task_name, item )
598 MCE->enq ( task_name, [ arg1, arg2, argN ] )
599 MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
600 MCE->enqp ( task_name, priority, item )
601 MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
602 MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
603 The MCE 1.7 release enables finer control. Unlike ->step, which take
604 multiple arguments, the ->enq and ->enqp methods push items at the
605 end of the array internally. Passing multiple arguments is possible
606 by enclosing the arguments inside an anonymous array.
607
608 The direction of flow is forward only. Thus, stepping to itself or
609 backwards will cause an error.
610
611 use MCE::Step;
612
613 sub provider {
614 if ( MCE->wid % 2 == 0 ) {
615 MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
616 } else {
617 MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
618 }
619 }
620
621 sub consumer_c {
622 my ( $mce, $args ) = @_;
623 MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
624 }
625
626 sub consumer_d {
627 my ( $mce, $args ) = @_;
628 MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
629 }
630
631 MCE::Step::init {
632 task_name => [ 'p', 'c', 'd' ],
633 max_workers => [ 2 , 3 , 3 ]
634 };
635
636 mce_step \&provider, \&consumer_c, \&consumer_d;
637
638 -- Output
639
640 C4: 10, 0.527531
641 D6: 20, 0.420108
642 C5: 11, 0.839770
643 D8: 21, 0.386414
644 C3: 12, 0.834645
645 C4: 13, 0.191014
646 D6: 23, 0.924027
647 C5: 14, 0.899357
648 D8: 24, 0.706186
649 C4: 15, 0.083823
650 D7: 22, 0.479708
651 D6: 25, 0.073882
652 C3: 16, 0.207446
653 D8: 26, 0.560755
654 C5: 17, 0.198157
655 D7: 27, 0.324909
656 C4: 18, 0.147505
657 C5: 19, 0.318371
658 D6: 28, 0.220465
659 D8: 29, 0.630111
660
661 MCE->await ( task_name, pending_threshold )
662 Providers may sometime run faster than consumers. Thus, increasing
663 memory consumption. MCE 1.7 adds the ->await method for pausing
664 momentarily until the receiving sub-task reaches the minimum
665 threshold for the number of items pending in its queue.
666
667 use MCE::Step;
668 use Time::HiRes 'sleep';
669
670 sub provider {
671 for ( 10 .. 29 ) {
672 # wait until 10 or less items pending
673 MCE->await( 'c', 10 );
674 # forward item to a later sub-task ( 'c' comes after 'p' )
675 MCE->enq( 'c', [ $_, rand ] );
676 }
677 }
678
679 sub consumer {
680 my ($mce, $args) = @_;
681 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
682 sleep 0.05;
683 }
684
685 MCE::Step::init {
686 task_name => [ 'p', 'c' ],
687 max_workers => [ 1 , 4 ]
688 };
689
690 mce_step \&provider, \&consumer;
691
692 -- Output
693
694 3: 10, 0.527307
695 2: 11, 0.036193
696 5: 12, 0.987168
697 4: 13, 0.998140
698 5: 14, 0.219526
699 4: 15, 0.061609
700 2: 16, 0.557664
701 3: 17, 0.658684
702 4: 18, 0.240932
703 3: 19, 0.241042
704 5: 20, 0.884830
705 2: 21, 0.902223
706 4: 22, 0.699223
707 3: 23, 0.208270
708 5: 24, 0.438919
709 2: 25, 0.268854
710 4: 26, 0.596425
711 5: 27, 0.979818
712 2: 28, 0.918173
713 3: 29, 0.358266
714
716 Unlike MCE::Map where gather and output order are done for you
717 automatically, the gather method is used to have results sent back to
718 the manager process.
719
720 use MCE::Step chunk_size => 1;
721
722 ## Output order is not guaranteed.
723 my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
724 print "@a\n\n";
725
726 ## Outputs to a hash instead (key, value).
727 my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
728 print "@h1{1..100}\n\n";
729
730 ## This does the same thing due to chunk_id starting at one.
731 my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
732 print "@h2{1..100}\n\n";
733
734 The gather method may be called multiple times within the block unlike
735 return which would leave the block. Therefore, think of gather as
736 yielding results immediately to the manager process without actually
737 leaving the block.
738
739 use MCE::Step chunk_size => 1, max_workers => 3;
740
741 my @hosts = qw(
742 hosta hostb hostc hostd hoste
743 );
744
745 my %h3 = mce_step sub {
746 my ($output, $error, $status); my $host = $_;
747
748 ## Do something with $host;
749 $output = "Worker ". MCE->wid .": Hello from $host";
750
751 if (MCE->chunk_id % 3 == 0) {
752 ## Simulating an error condition
753 local $? = 1; $status = $?;
754 $error = "Error from $host"
755 }
756 else {
757 $status = 0;
758 }
759
760 ## Ensure unique keys (key, value) when gathering to
761 ## a hash.
762 MCE->gather("$host.out", $output);
763 MCE->gather("$host.err", $error) if (defined $error);
764 MCE->gather("$host.sta", $status);
765
766 }, @hosts;
767
768 foreach my $host (@hosts) {
769 print $h3{"$host.out"}, "\n";
770 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
771 print "Exit status: ", $h3{"$host.sta"}, "\n\n";
772 }
773
774 -- Output
775
776 Worker 3: Hello from hosta
777 Exit status: 0
778
779 Worker 2: Hello from hostb
780 Exit status: 0
781
782 Worker 1: Hello from hostc
783 Error from hostc
784 Exit status: 1
785
786 Worker 3: Hello from hostd
787 Exit status: 0
788
789 Worker 2: Hello from hoste
790 Exit status: 0
791
792 The following uses an anonymous array containing 3 elements when
793 gathering data. Serialization is automatic behind the scene.
794
795 my %h3 = mce_step sub {
796 ...
797
798 MCE->gather($host, [$output, $error, $status]);
799
800 }, @hosts;
801
802 foreach my $host (@hosts) {
803 print $h3{$host}->[0], "\n";
804 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
805 print "Exit status: ", $h3{$host}->[2], "\n\n";
806 }
807
808 Although MCE::Map comes to mind, one may want additional control when
809 gathering data such as retaining output order.
810
811 use MCE::Step;
812
813 sub preserve_order {
814 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
815
816 return sub {
817 $tmp{ (shift) } = \@_;
818
819 while (1) {
820 last unless exists $tmp{$order_id};
821 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
822 }
823
824 return;
825 };
826 }
827
828 ## Workers persist for the most part after running. Though, not always
829 ## the case and depends on Perl. Pass a reference to a subroutine if
830 ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
831
832 MCE::Step::init {
833 chunk_size => 'auto', max_workers => 'auto'
834 };
835
836 for (1..2) {
837 my @m2;
838
839 mce_step {
840 gather => preserve_order(\@m2)
841 },
842 sub {
843 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
844
845 ## Compute the entire chunk data at once.
846 push @a, map { $_ * 2 } @{ $chunk_ref };
847
848 ## Afterwards, invoke the gather feature, which
849 ## will direct the data to the callback function.
850 MCE->gather(MCE->chunk_id, @a);
851
852 }, 1..100000;
853
854 print scalar @m2, "\n";
855 }
856
857 MCE::Step::finish;
858
859 All 6 models support 'auto' for chunk_size unlike the Core API. Think
860 of the models as the basis for providing JIT for MCE. They create the
861 instance, tune max_workers, and tune chunk_size automatically
862 regardless of the hardware.
863
864 The following does the same thing using the Core API. Workers persist
865 after running.
866
867 use MCE;
868
869 sub preserve_order {
870 ...
871 }
872
873 my $mce = MCE->new(
874 max_workers => 'auto', chunk_size => 8000,
875
876 user_func => sub {
877 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
878
879 ## Compute the entire chunk data at once.
880 push @a, map { $_ * 2 } @{ $chunk_ref };
881
882 ## Afterwards, invoke the gather feature, which
883 ## will direct the data to the callback function.
884 MCE->gather(MCE->chunk_id, @a);
885 }
886 );
887
888 for (1..2) {
889 my @m2;
890
891 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
892
893 print scalar @m2, "\n";
894 }
895
896 $mce->shutdown;
897
899 MCE::Step->finish
900 MCE::Step::finish
901 Workers remain persistent as much as possible after running.
902 Shutdown occurs automatically when the script terminates. Call
903 finish when workers are no longer needed.
904
905 use MCE::Step;
906
907 MCE::Step::init {
908 chunk_size => 20, max_workers => 'auto'
909 };
910
911 mce_step sub { ... }, 1..100;
912
913 MCE::Step::finish;
914
916 MCE, MCE::Core
917
919 Mario E. Roy, <marioeroy AT gmail DOT com>
920
921
922
923perl v5.28.0 2018-08-25 MCE::Step(3)