1MCE::Step(3) User Contributed Perl Documentation MCE::Step(3)
2
3
4
6 MCE::Step - Parallel step model for building creative steps
7
9 This document describes MCE::Step version 1.874
10
12 MCE::Step is similar to MCE::Flow for writing custom apps. The main
13 difference comes from the transparent use of queues between sub-tasks.
14 MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
15 QUEUE-LIKE FEATURES below.
16
17 It is trivial to parallelize with mce_stream shown below.
18
19 ## Native map function
20 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
21
22 ## Same as with MCE::Stream (processing from right to left)
23 @a = mce_stream
24 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
25
26 ## Pass an array reference to have writes occur simultaneously
27 mce_stream \@a,
28 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
29
30 However, let's have MCE::Step compute the same in parallel. Unlike the
31 example in MCE::Flow, the use of MCE::Queue is totally transparent.
32 This calls for preserving output order provided by MCE::Candy.
33
34 use MCE::Step;
35 use MCE::Candy;
36
37 Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as
38 described in MCE::Flow. The call to MCE->step simplifies the passing of
39 data to subsequent sub-task.
40
41 sub task_a {
42 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
43 push @ans, map { $_ * 2 } @{ $chunk_ref };
44 MCE->step(\@ans, $chunk_id);
45 }
46
47 sub task_b {
48 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
49 push @ans, map { $_ * 3 } @{ $chunk_ref };
50 MCE->step(\@ans, $chunk_id);
51 }
52
53 sub task_c {
54 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
55 push @ans, map { $_ * 4 } @{ $chunk_ref };
56 MCE->gather($chunk_id, \@ans);
57 }
58
59 In summary, MCE::Step builds out a MCE instance behind the scene and
60 starts running. The task_name (shown), max_workers, and use_threads
61 options can take an anonymous array for specifying the values uniquely
62 per each sub-task.
63
64 The task_name option is required to use ->enq, ->enqp, and ->await.
65
66 my @a;
67
68 mce_step {
69 task_name => [ 'a', 'b', 'c' ],
70 gather => MCE::Candy::out_iter_array(\@a)
71
72 }, \&task_a, \&task_b, \&task_c, 1..10000;
73
74 print "@a\n";
75
77 In the demonstration below, one may call ->gather or ->step any number
78 of times although ->step is not allowed in the last sub-block. Data is
79 gathered to @arr which may likely be out-of-order. Gathering data is
80 optional. All sub-blocks receive $mce as the first argument.
81
82 First, defining 3 sub-tasks.
83
84 use MCE::Step;
85
86 sub task_a {
87 my ($mce, $chunk_ref, $chunk_id) = @_;
88
89 if ($_ % 2 == 0) {
90 MCE->gather($_);
91 # MCE->gather($_ * 4); ## Ok to gather multiple times
92 }
93 else {
94 MCE->print("a step: $_, $_ * $_\n");
95 MCE->step($_, $_ * $_);
96 # MCE->step($_, $_ * 4 ); ## Ok to step multiple times
97 }
98 }
99
100 sub task_b {
101 my ($mce, $arg1, $arg2) = @_;
102
103 MCE->print("b args: $arg1, $arg2\n");
104
105 if ($_ % 3 == 0) { ## $_ is the same as $arg1
106 MCE->gather($_);
107 }
108 else {
109 MCE->print("b step: $_ * $_\n");
110 MCE->step($_ * $_);
111 }
112 }
113
114 sub task_c {
115 my ($mce, $arg1) = @_;
116
117 MCE->print("c: $_\n");
118 MCE->gather($_);
119 }
120
121 Next, pass MCE options, using chunk_size 1, and run all 3 tasks in
122 parallel. Notice how max_workers and use_threads can take an anonymous
123 array, similarly to task_name.
124
125 my @arr = mce_step {
126 task_name => [ 'a', 'b', 'c' ],
127 max_workers => [ 2, 2, 2 ],
128 use_threads => [ 0, 0, 0 ],
129 chunk_size => 1
130
131 }, \&task_a, \&task_b, \&task_c, 1..10;
132
133 Finally, sort the array and display its contents.
134
135 @arr = sort { $a <=> $b } @arr;
136
137 print "\n@arr\n\n";
138
139 -- Output
140
141 a step: 1, 1 * 1
142 a step: 3, 3 * 3
143 a step: 5, 5 * 5
144 a step: 7, 7 * 7
145 a step: 9, 9 * 9
146 b args: 1, 1
147 b step: 1 * 1
148 b args: 3, 9
149 b args: 7, 49
150 b step: 7 * 7
151 b args: 5, 25
152 b step: 5 * 5
153 b args: 9, 81
154 c: 1
155 c: 49
156 c: 25
157
158 1 2 3 4 6 8 9 10 25 49
159
161 Although MCE::Loop may be preferred for running using a single code
162 block, the text below also applies to this module, particularly for the
163 first block.
164
165 All models in MCE default to 'auto' for chunk_size. The arguments for
166 the block are the same as writing a user_func block using the Core API.
167
168 Beginning with MCE 1.5, the next input item is placed into the input
169 scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
170 $chunk_ref containing many items. Basically, line 2 below may be
171 omitted from your code when using $_. One can call MCE->chunk_id to
172 obtain the current chunk id.
173
174 line 1: user_func => sub {
175 line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
176 line 3:
177 line 4: $_ points to $chunk_ref->[0]
178 line 5: in MCE 1.5 when chunk_size == 1
179 line 6:
180 line 7: $_ points to $chunk_ref
181 line 8: in MCE 1.5 when chunk_size > 1
182 line 9: }
183
184 Follow this synopsis when chunk_size equals one. Looping is not
185 required from inside the first block. Hence, the block is called once
186 per each item.
187
188 ## Exports mce_step, mce_step_f, and mce_step_s
189 use MCE::Step;
190
191 MCE::Step->init(
192 chunk_size => 1
193 );
194
195 ## Array or array_ref
196 mce_step sub { do_work($_) }, 1..10000;
197 mce_step sub { do_work($_) }, \@list;
198
199 ## Important; pass an array_ref for deeply input data
200 mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
201 mce_step sub { do_work($_) }, \@deeply_list;
202
203 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
204 ## Workers read directly and not involve the manager process
205 mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
206
207 ## Involves the manager process, therefore slower
208 mce_step_f sub { chomp; do_work($_) }, $file_handle;
209 mce_step_f sub { chomp; do_work($_) }, $io;
210 mce_step_f sub { chomp; do_work($_) }, \$scalar;
211
212 ## Sequence of numbers (begin, end [, step, format])
213 mce_step_s sub { do_work($_) }, 1, 10000, 5;
214 mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
215
216 mce_step_s sub { do_work($_) }, {
217 begin => 1, end => 10000, step => 5, format => undef
218 };
219
221 Follow this synopsis when chunk_size equals 'auto' or greater than 1.
222 This means having to loop through the chunk from inside the first
223 block.
224
225 use MCE::Step;
226
227 MCE::Step->init( ## Chunk_size defaults to 'auto' when
228 chunk_size => 'auto' ## not specified. Therefore, the init
229 ); ## function may be omitted.
230
231 ## Syntax is shown for mce_step for demonstration purposes.
232 ## Looping inside the block is the same for mce_step_f and
233 ## mce_step_s.
234
235 ## Array or array_ref
236 mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
237 mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
238
239 ## Important; pass an array_ref for deeply input data
240 mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
241 mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
242
243 ## Resembles code using the core MCE API
244 mce_step sub {
245 my ($mce, $chunk_ref, $chunk_id) = @_;
246
247 for (@{ $chunk_ref }) {
248 do_work($_);
249 }
250
251 }, 1..10000;
252
253 Chunking reduces the number of IPC calls behind the scene. Think in
254 terms of chunks whenever processing a large amount of data. For
255 relatively small data, choosing 1 for chunk_size is fine.
256
258 The following list options which may be overridden when loading the
259 module. The fast option is obsolete in 1.867 onwards; ignored if
260 specified.
261
262 use Sereal qw( encode_sereal decode_sereal );
263 use CBOR::XS qw( encode_cbor decode_cbor );
264 use JSON::XS qw( encode_json decode_json );
265
266 use MCE::Step
267 max_workers => 8, # Default 'auto'
268 chunk_size => 500, # Default 'auto'
269 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
270 freeze => \&encode_sereal, # \&Storable::freeze
271 thaw => \&decode_sereal, # \&Storable::thaw
272 fast => 1 # Default 0 (fast dequeue)
273 ;
274
275 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
276 available. Specify "Sereal => 0" to use Storable instead.
277
278 use MCE::Step Sereal => 0;
279
281 MCE::Step->init ( options )
282 MCE::Step::init { options }
283
284 The init function accepts a hash of MCE options. Unlike with
285 MCE::Stream, both gather and bounds_only options may be specified when
286 calling init (not shown below).
287
288 use MCE::Step;
289
290 MCE::Step->init(
291 chunk_size => 1, max_workers => 4,
292
293 user_begin => sub {
294 print "## ", MCE->wid, " started\n";
295 },
296
297 user_end => sub {
298 print "## ", MCE->wid, " completed\n";
299 }
300 );
301
302 my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
303
304 print "\n", "@a{1..100}", "\n";
305
306 -- Output
307
308 ## 3 started
309 ## 1 started
310 ## 4 started
311 ## 2 started
312 ## 3 completed
313 ## 4 completed
314 ## 1 completed
315 ## 2 completed
316
317 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
318 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
319 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
320 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
321 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
322 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
323 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
324 10000
325
326 Like with MCE::Step->init above, MCE options may be specified using an
327 anonymous hash for the first argument. Notice how task_name,
328 max_workers, and use_threads can take an anonymous array for setting
329 uniquely per each code block.
330
331 Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
332 with the first code block, thus processing from left-to-right.
333
334 The following takes 9 seconds to complete. The 9 seconds is from having
335 only 2 workers assigned for the last sub-task and waiting 1 or 2
336 seconds initially before calling MCE->step.
337
338 Removing both calls to MCE->step will cause the script to complete in
339 just 1 second. The reason is due to the 2nd and subsequent sub-tasks
340 awaiting data from an internal queue. Workers terminate upon receiving
341 an undef.
342
343 use threads;
344 use MCE::Step;
345
346 my @a = mce_step {
347 task_name => [ 'a', 'b', 'c' ],
348 max_workers => [ 3, 4, 2, ],
349 use_threads => [ 1, 0, 0, ],
350
351 user_end => sub {
352 my ($mce, $task_id, $task_name) = @_;
353 MCE->print("$task_id - $task_name completed\n");
354 },
355
356 task_end => sub {
357 my ($mce, $task_id, $task_name) = @_;
358 MCE->print("$task_id - $task_name ended\n");
359 }
360 },
361 sub { sleep 1; MCE->step(""); }, ## 3 workers, named a
362 sub { sleep 2; MCE->step(""); }, ## 4 workers, named b
363 sub { sleep 3; }; ## 2 workers, named c
364
365 -- Output
366
367 0 - a completed
368 0 - a completed
369 0 - a completed
370 0 - a ended
371 1 - b completed
372 1 - b completed
373 1 - b completed
374 1 - b completed
375 1 - b ended
376 2 - c completed
377 2 - c completed
378 2 - c ended
379
381 Although input data is optional for MCE::Step, the following assumes
382 chunk_size equals 1 in order to demonstrate all the possibilities for
383 providing input data.
384
385 MCE::Step->run ( sub { code }, list )
386 mce_step sub { code }, list
387
388 Input data may be defined using a list, an array ref, or a hash ref.
389
390 Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Step
391 takes a "sub { ... }" or a code reference. The other difference is that
392 the comma is needed after the block.
393
394 # $_ contains the item when chunk_size => 1
395
396 mce_step sub { do_work($_) }, 1..1000;
397 mce_step sub { do_work($_) }, \@list;
398
399 # Important; pass an array_ref for deeply input data
400
401 mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
402 mce_step sub { do_work($_) }, \@deeply_list;
403
404 # Chunking; any chunk_size => 1 or greater
405
406 my %res = mce_step sub {
407 my ($mce, $chunk_ref, $chunk_id) = @_;
408 my %ret;
409 for my $item (@{ $chunk_ref }) {
410 $ret{$item} = $item * 2;
411 }
412 MCE->gather(%ret);
413 },
414 \@list;
415
416 # Input hash; current API available since 1.828
417
418 my %res = mce_step sub {
419 my ($mce, $chunk_ref, $chunk_id) = @_;
420 my %ret;
421 for my $key (keys %{ $chunk_ref }) {
422 $ret{$key} = $chunk_ref->{$key} * 2;
423 }
424 MCE->gather(%ret);
425 },
426 \%hash;
427
428 # Unlike MCE::Loop, MCE::Step doesn't need input to run
429
430 mce_step { max_workers => 4 }, sub {
431 MCE->say( MCE->wid );
432 };
433
434 # ... and can run multiple tasks
435
436 mce_step {
437 max_workers => [ 1, 3 ],
438 task_name => [ 'p', 'c' ]
439 },
440 sub {
441 # 1 producer
442 MCE->say( "producer: ", MCE->wid );
443 },
444 sub {
445 # 3 consumers
446 MCE->say( "consumer: ", MCE->wid );
447 };
448
449 # Here, options are specified via init
450
451 MCE::Step->init(
452 max_workers => [ 1, 3 ],
453 task_name => [ 'p', 'c' ]
454 );
455
456 mce_step \&producer, \&consumers;
457
458 MCE::Step->run_file ( sub { code }, file )
459 mce_step_f sub { code }, file
460
461 The fastest of these is the /path/to/file. Workers communicate the next
462 offset position among themselves with zero interaction by the manager
463 process.
464
465 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
466
467 # $_ contains the line when chunk_size => 1
468
469 mce_step_f sub { $_ }, "/path/to/file"; # faster
470 mce_step_f sub { $_ }, $file_handle;
471 mce_step_f sub { $_ }, $io; # IO::All
472 mce_step_f sub { $_ }, \$scalar;
473
474 # chunking, any chunk_size => 1 or greater
475
476 my %res = mce_step_f sub {
477 my ($mce, $chunk_ref, $chunk_id) = @_;
478 my $buf = '';
479 for my $line (@{ $chunk_ref }) {
480 $buf .= $line;
481 }
482 MCE->gather($chunk_id, $buf);
483 },
484 "/path/to/file";
485
486 MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
487 mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
488
489 Sequence may be defined as a list, an array reference, or a hash
490 reference. The functions require both begin and end values to run.
491 Step and format are optional. The format is passed to sprintf (% may be
492 omitted below).
493
494 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
495
496 # $_ contains the sequence number when chunk_size => 1
497
498 mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
499 mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
500
501 mce_step_s sub { $_ }, {
502 begin => $beg, end => $end,
503 step => $step, format => $fmt
504 };
505
506 # chunking, any chunk_size => 1 or greater
507
508 my %res = mce_step_s sub {
509 my ($mce, $chunk_ref, $chunk_id) = @_;
510 my $buf = '';
511 for my $seq (@{ $chunk_ref }) {
512 $buf .= "$seq\n";
513 }
514 MCE->gather($chunk_id, $buf);
515 },
516 [ $beg, $end ];
517
518 The sequence engine can compute 'begin' and 'end' items only, for the
519 chunk, and not the items in between (hence boundaries only). This
520 option applies to sequence only and has no effect when chunk_size
521 equals 1.
522
523 The time to run is 0.006s below. This becomes 0.827s without the
524 bounds_only option due to computing all items in between, thus creating
525 a very large array. Basically, specify bounds_only => 1 when boundaries
526 is all you need for looping inside the block; e.g. Monte Carlo
527 simulations.
528
529 Time was measured using 1 worker to emphasize the difference.
530
531 use MCE::Step;
532
533 MCE::Step->init(
534 max_workers => 1, chunk_size => 1_250_000,
535 bounds_only => 1
536 );
537
538 # Typically, the input scalar $_ contains the sequence number
539 # when chunk_size => 1, unless the bounds_only option is set
540 # which is the case here. Thus, $_ points to $chunk_ref.
541
542 mce_step_s sub {
543 my ($mce, $chunk_ref, $chunk_id) = @_;
544
545 # $chunk_ref contains 2 items, not 1_250_000
546 # my ( $begin, $end ) = ( $_->[0], $_->[1] );
547
548 my $begin = $chunk_ref->[0];
549 my $end = $chunk_ref->[1];
550
551 # for my $seq ( $begin .. $end ) {
552 # ...
553 # }
554
555 MCE->printf("%7d .. %8d\n", $begin, $end);
556 },
557 [ 1, 10_000_000 ];
558
559 -- Output
560
561 1 .. 1250000
562 1250001 .. 2500000
563 2500001 .. 3750000
564 3750001 .. 5000000
565 5000001 .. 6250000
566 6250001 .. 7500000
567 7500001 .. 8750000
568 8750001 .. 10000000
569
570 MCE::Step->run ( { input_data => iterator }, sub { code } )
571 mce_step { input_data => iterator }, sub { code }
572
573 An iterator reference may be specified for input_data. The only other
574 way is to specify input_data via MCE::Step->init. This prevents
575 MCE::Step from configuring the iterator reference as another user task
576 which will not work.
577
578 Iterators are described under section "SYNTAX for INPUT_DATA" at
579 MCE::Core.
580
581 MCE::Step->init(
582 input_data => iterator
583 );
584
585 mce_step sub { $_ };
586
588 MCE->step ( item )
589 MCE->step ( arg1, arg2, argN )
590
591 The ->step method is the simplest form for passing elements into the
592 next sub-task.
593
594 use MCE::Step;
595
596 sub provider {
597 MCE->step( $_, rand ) for 10 .. 19;
598 }
599
600 sub consumer {
601 my ( $mce, @args ) = @_;
602 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
603 }
604
605 MCE::Step->init(
606 task_name => [ 'p', 'c' ],
607 max_workers => [ 1 , 4 ]
608 );
609
610 mce_step \&provider, \&consumer;
611
612 -- Output
613
614 2: 10, 0.583551
615 4: 11, 0.175319
616 3: 12, 0.843662
617 4: 15, 0.748302
618 2: 14, 0.591752
619 3: 16, 0.357858
620 5: 13, 0.953528
621 4: 17, 0.698907
622 2: 18, 0.985448
623 3: 19, 0.146548
624
625 MCE->enq ( task_name, item )
626 MCE->enq ( task_name, [ arg1, arg2, argN ] )
627 MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
628 MCE->enqp ( task_name, priority, item )
629 MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
630 MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
631
632 The MCE 1.7 release enables finer control. Unlike ->step, which take
633 multiple arguments, the ->enq and ->enqp methods push items at the end
634 of the array internally. Passing multiple arguments is possible by
635 enclosing the arguments inside an anonymous array.
636
637 The direction of flow is forward only. Thus, stepping to itself or
638 backwards will cause an error.
639
640 use MCE::Step;
641
642 sub provider {
643 if ( MCE->wid % 2 == 0 ) {
644 MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
645 } else {
646 MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
647 }
648 }
649
650 sub consumer_c {
651 my ( $mce, $args ) = @_;
652 MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
653 }
654
655 sub consumer_d {
656 my ( $mce, $args ) = @_;
657 MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
658 }
659
660 MCE::Step->init(
661 task_name => [ 'p', 'c', 'd' ],
662 max_workers => [ 2 , 3 , 3 ]
663 );
664
665 mce_step \&provider, \&consumer_c, \&consumer_d;
666
667 -- Output
668
669 C4: 10, 0.527531
670 D6: 20, 0.420108
671 C5: 11, 0.839770
672 D8: 21, 0.386414
673 C3: 12, 0.834645
674 C4: 13, 0.191014
675 D6: 23, 0.924027
676 C5: 14, 0.899357
677 D8: 24, 0.706186
678 C4: 15, 0.083823
679 D7: 22, 0.479708
680 D6: 25, 0.073882
681 C3: 16, 0.207446
682 D8: 26, 0.560755
683 C5: 17, 0.198157
684 D7: 27, 0.324909
685 C4: 18, 0.147505
686 C5: 19, 0.318371
687 D6: 28, 0.220465
688 D8: 29, 0.630111
689
690 MCE->await ( task_name, pending_threshold )
691
692 Providers may sometime run faster than consumers. Thus, increasing
693 memory consumption. MCE 1.7 adds the ->await method for pausing
694 momentarily until the receiving sub-task reaches the minimum threshold
695 for the number of items pending in its queue.
696
697 use MCE::Step;
698 use Time::HiRes 'sleep';
699
700 sub provider {
701 for ( 10 .. 29 ) {
702 # wait until 10 or less items pending
703 MCE->await( 'c', 10 );
704 # forward item to a later sub-task ( 'c' comes after 'p' )
705 MCE->enq( 'c', [ $_, rand ] );
706 }
707 }
708
709 sub consumer {
710 my ($mce, $args) = @_;
711 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
712 sleep 0.05;
713 }
714
715 MCE::Step->init(
716 task_name => [ 'p', 'c' ],
717 max_workers => [ 1 , 4 ]
718 );
719
720 mce_step \&provider, \&consumer;
721
722 -- Output
723
724 3: 10, 0.527307
725 2: 11, 0.036193
726 5: 12, 0.987168
727 4: 13, 0.998140
728 5: 14, 0.219526
729 4: 15, 0.061609
730 2: 16, 0.557664
731 3: 17, 0.658684
732 4: 18, 0.240932
733 3: 19, 0.241042
734 5: 20, 0.884830
735 2: 21, 0.902223
736 4: 22, 0.699223
737 3: 23, 0.208270
738 5: 24, 0.438919
739 2: 25, 0.268854
740 4: 26, 0.596425
741 5: 27, 0.979818
742 2: 28, 0.918173
743 3: 29, 0.358266
744
746 Unlike MCE::Map where gather and output order are done for you
747 automatically, the gather method is used to have results sent back to
748 the manager process.
749
750 use MCE::Step chunk_size => 1;
751
752 ## Output order is not guaranteed.
753 my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
754 print "@a\n\n";
755
756 ## Outputs to a hash instead (key, value).
757 my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
758 print "@h1{1..100}\n\n";
759
760 ## This does the same thing due to chunk_id starting at one.
761 my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
762 print "@h2{1..100}\n\n";
763
764 The gather method may be called multiple times within the block unlike
765 return which would leave the block. Therefore, think of gather as
766 yielding results immediately to the manager process without actually
767 leaving the block.
768
769 use MCE::Step chunk_size => 1, max_workers => 3;
770
771 my @hosts = qw(
772 hosta hostb hostc hostd hoste
773 );
774
775 my %h3 = mce_step sub {
776 my ($output, $error, $status); my $host = $_;
777
778 ## Do something with $host;
779 $output = "Worker ". MCE->wid .": Hello from $host";
780
781 if (MCE->chunk_id % 3 == 0) {
782 ## Simulating an error condition
783 local $? = 1; $status = $?;
784 $error = "Error from $host"
785 }
786 else {
787 $status = 0;
788 }
789
790 ## Ensure unique keys (key, value) when gathering to
791 ## a hash.
792 MCE->gather("$host.out", $output);
793 MCE->gather("$host.err", $error) if (defined $error);
794 MCE->gather("$host.sta", $status);
795
796 }, @hosts;
797
798 foreach my $host (@hosts) {
799 print $h3{"$host.out"}, "\n";
800 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
801 print "Exit status: ", $h3{"$host.sta"}, "\n\n";
802 }
803
804 -- Output
805
806 Worker 3: Hello from hosta
807 Exit status: 0
808
809 Worker 2: Hello from hostb
810 Exit status: 0
811
812 Worker 1: Hello from hostc
813 Error from hostc
814 Exit status: 1
815
816 Worker 3: Hello from hostd
817 Exit status: 0
818
819 Worker 2: Hello from hoste
820 Exit status: 0
821
822 The following uses an anonymous array containing 3 elements when
823 gathering data. Serialization is automatic behind the scene.
824
825 my %h3 = mce_step sub {
826 ...
827
828 MCE->gather($host, [$output, $error, $status]);
829
830 }, @hosts;
831
832 foreach my $host (@hosts) {
833 print $h3{$host}->[0], "\n";
834 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
835 print "Exit status: ", $h3{$host}->[2], "\n\n";
836 }
837
838 Although MCE::Map comes to mind, one may want additional control when
839 gathering data such as retaining output order.
840
841 use MCE::Step;
842
843 sub preserve_order {
844 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
845
846 return sub {
847 $tmp{ (shift) } = \@_;
848
849 while (1) {
850 last unless exists $tmp{$order_id};
851 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
852 }
853
854 return;
855 };
856 }
857
858 ## Workers persist for the most part after running. Though, not always
859 ## the case and depends on Perl. Pass a reference to a subroutine if
860 ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
861
862 MCE::Step->init(
863 chunk_size => 'auto', max_workers => 'auto'
864 );
865
866 for (1..2) {
867 my @m2;
868
869 mce_step {
870 gather => preserve_order(\@m2)
871 },
872 sub {
873 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
874
875 ## Compute the entire chunk data at once.
876 push @a, map { $_ * 2 } @{ $chunk_ref };
877
878 ## Afterwards, invoke the gather feature, which
879 ## will direct the data to the callback function.
880 MCE->gather(MCE->chunk_id, @a);
881
882 }, 1..100000;
883
884 print scalar @m2, "\n";
885 }
886
887 MCE::Step->finish;
888
889 All 6 models support 'auto' for chunk_size unlike the Core API. Think
890 of the models as the basis for providing JIT for MCE. They create the
891 instance, tune max_workers, and tune chunk_size automatically
892 regardless of the hardware.
893
894 The following does the same thing using the Core API. Workers persist
895 after running.
896
897 use MCE;
898
899 sub preserve_order {
900 ...
901 }
902
903 my $mce = MCE->new(
904 max_workers => 'auto', chunk_size => 8000,
905
906 user_func => sub {
907 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
908
909 ## Compute the entire chunk data at once.
910 push @a, map { $_ * 2 } @{ $chunk_ref };
911
912 ## Afterwards, invoke the gather feature, which
913 ## will direct the data to the callback function.
914 MCE->gather(MCE->chunk_id, @a);
915 }
916 );
917
918 for (1..2) {
919 my @m2;
920
921 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
922
923 print scalar @m2, "\n";
924 }
925
926 $mce->shutdown;
927
929 MCE::Step->finish
930 MCE::Step::finish
931
932 Workers remain persistent as much as possible after running. Shutdown
933 occurs automatically when the script terminates. Call finish when
934 workers are no longer needed.
935
936 use MCE::Step;
937
938 MCE::Step->init(
939 chunk_size => 20, max_workers => 'auto'
940 );
941
942 mce_step sub { ... }, 1..100;
943
944 MCE::Step->finish;
945
947 MCE, MCE::Core
948
950 Mario E. Roy, <marioeroy AT gmail DOT com>
951
952
953
954perl v5.32.1 2021-01-27 MCE::Step(3)