1MCE::Step(3) User Contributed Perl Documentation MCE::Step(3)
2
3
4
6 MCE::Step - Parallel step model for building creative steps
7
9 This document describes MCE::Step version 1.866
10
12 MCE::Step is similar to MCE::Flow for writing custom apps. The main
13 difference comes from the transparent use of queues between sub-tasks.
14 MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
15 QUEUE-LIKE FEATURES below.
16
17 It is trivial to parallelize with mce_stream shown below.
18
19 ## Native map function
20 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
21
22 ## Same as with MCE::Stream (processing from right to left)
23 @a = mce_stream
24 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
25
26 ## Pass an array reference to have writes occur simultaneously
27 mce_stream \@a,
28 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
29
30 However, let's have MCE::Step compute the same in parallel. Unlike the
31 example in MCE::Flow, the use of MCE::Queue is totally transparent.
32 This calls for preserving output order provided by MCE::Candy.
33
34 use MCE::Step;
35 use MCE::Candy;
36
37 Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as
38 described in MCE::Flow. The call to MCE->step simplifies the passing of
39 data to subsequent sub-task.
40
41 sub task_a {
42 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
43 push @ans, map { $_ * 2 } @{ $chunk_ref };
44 MCE->step(\@ans, $chunk_id);
45 }
46
47 sub task_b {
48 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
49 push @ans, map { $_ * 3 } @{ $chunk_ref };
50 MCE->step(\@ans, $chunk_id);
51 }
52
53 sub task_c {
54 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
55 push @ans, map { $_ * 4 } @{ $chunk_ref };
56 MCE->gather($chunk_id, \@ans);
57 }
58
59 In summary, MCE::Step builds out a MCE instance behind the scene and
60 starts running. The task_name (shown), max_workers, and use_threads
61 options can take an anonymous array for specifying the values uniquely
62 per each sub-task.
63
64 The task_name option is required to use ->enq, ->enqp, and ->await.
65
66 my @a;
67
68 mce_step {
69 task_name => [ 'a', 'b', 'c' ],
70 gather => MCE::Candy::out_iter_array(\@a)
71
72 }, \&task_a, \&task_b, \&task_c, 1..10000;
73
74 print "@a\n";
75
77 In the demonstration below, one may call ->gather or ->step any number
78 of times although ->step is not allowed in the last sub-block. Data is
79 gathered to @arr which may likely be out-of-order. Gathering data is
80 optional. All sub-blocks receive $mce as the first argument.
81
82 First, defining 3 sub-tasks.
83
84 use MCE::Step;
85
86 sub task_a {
87 my ($mce, $chunk_ref, $chunk_id) = @_;
88
89 if ($_ % 2 == 0) {
90 MCE->gather($_);
91 # MCE->gather($_ * 4); ## Ok to gather multiple times
92 }
93 else {
94 MCE->print("a step: $_, $_ * $_\n");
95 MCE->step($_, $_ * $_);
96 # MCE->step($_, $_ * 4 ); ## Ok to step multiple times
97 }
98 }
99
100 sub task_b {
101 my ($mce, $arg1, $arg2) = @_;
102
103 MCE->print("b args: $arg1, $arg2\n");
104
105 if ($_ % 3 == 0) { ## $_ is the same as $arg1
106 MCE->gather($_);
107 }
108 else {
109 MCE->print("b step: $_ * $_\n");
110 MCE->step($_ * $_);
111 }
112 }
113
114 sub task_c {
115 my ($mce, $arg1) = @_;
116
117 MCE->print("c: $_\n");
118 MCE->gather($_);
119 }
120
121 Next, pass MCE options, using chunk_size 1, and run all 3 tasks in
122 parallel. Notice how max_workers and use_threads can take an anonymous
123 array, similarly to task_name.
124
125 my @arr = mce_step {
126 task_name => [ 'a', 'b', 'c' ],
127 max_workers => [ 2, 2, 2 ],
128 use_threads => [ 0, 0, 0 ],
129 chunk_size => 1
130
131 }, \&task_a, \&task_b, \&task_c, 1..10;
132
133 Finally, sort the array and display its contents.
134
135 @arr = sort { $a <=> $b } @arr;
136
137 print "\n@arr\n\n";
138
139 -- Output
140
141 a step: 1, 1 * 1
142 a step: 3, 3 * 3
143 a step: 5, 5 * 5
144 a step: 7, 7 * 7
145 a step: 9, 9 * 9
146 b args: 1, 1
147 b step: 1 * 1
148 b args: 3, 9
149 b args: 7, 49
150 b step: 7 * 7
151 b args: 5, 25
152 b step: 5 * 5
153 b args: 9, 81
154 c: 1
155 c: 49
156 c: 25
157
158 1 2 3 4 6 8 9 10 25 49
159
161 Although MCE::Loop may be preferred for running using a single code
162 block, the text below also applies to this module, particularly for the
163 first block.
164
165 All models in MCE default to 'auto' for chunk_size. The arguments for
166 the block are the same as writing a user_func block using the Core API.
167
168 Beginning with MCE 1.5, the next input item is placed into the input
169 scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
170 $chunk_ref containing many items. Basically, line 2 below may be
171 omitted from your code when using $_. One can call MCE->chunk_id to
172 obtain the current chunk id.
173
174 line 1: user_func => sub {
175 line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
176 line 3:
177 line 4: $_ points to $chunk_ref->[0]
178 line 5: in MCE 1.5 when chunk_size == 1
179 line 6:
180 line 7: $_ points to $chunk_ref
181 line 8: in MCE 1.5 when chunk_size > 1
182 line 9: }
183
184 Follow this synopsis when chunk_size equals one. Looping is not
185 required from inside the first block. Hence, the block is called once
186 per each item.
187
188 ## Exports mce_step, mce_step_f, and mce_step_s
189 use MCE::Step;
190
191 MCE::Step::init {
192 chunk_size => 1
193 };
194
195 ## Array or array_ref
196 mce_step sub { do_work($_) }, 1..10000;
197 mce_step sub { do_work($_) }, \@list;
198
199 ## Important; pass an array_ref for deeply input data
200 mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
201 mce_step sub { do_work($_) }, \@deeply_list;
202
203 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
204 ## Workers read directly and not involve the manager process
205 mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
206
207 ## Involves the manager process, therefore slower
208 mce_step_f sub { chomp; do_work($_) }, $file_handle;
209 mce_step_f sub { chomp; do_work($_) }, $io;
210 mce_step_f sub { chomp; do_work($_) }, \$scalar;
211
212 ## Sequence of numbers (begin, end [, step, format])
213 mce_step_s sub { do_work($_) }, 1, 10000, 5;
214 mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
215
216 mce_step_s sub { do_work($_) }, {
217 begin => 1, end => 10000, step => 5, format => undef
218 };
219
221 Follow this synopsis when chunk_size equals 'auto' or greater than 1.
222 This means having to loop through the chunk from inside the first
223 block.
224
225 use MCE::Step;
226
227 MCE::Step::init { ## Chunk_size defaults to 'auto' when
228 chunk_size => 'auto' ## not specified. Therefore, the init
229 }; ## function may be omitted.
230
231 ## Syntax is shown for mce_step for demonstration purposes.
232 ## Looping inside the block is the same for mce_step_f and
233 ## mce_step_s.
234
235 ## Array or array_ref
236 mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
237 mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
238
239 ## Important; pass an array_ref for deeply input data
240 mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
241 mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
242
243 ## Resembles code using the core MCE API
244 mce_step sub {
245 my ($mce, $chunk_ref, $chunk_id) = @_;
246
247 for (@{ $chunk_ref }) {
248 do_work($_);
249 }
250
251 }, 1..10000;
252
253 Chunking reduces the number of IPC calls behind the scene. Think in
254 terms of chunks whenever processing a large amount of data. For
255 relatively small data, choosing 1 for chunk_size is fine.
256
258 The following list options which may be overridden when loading the
259 module.
260
261 use Sereal qw( encode_sereal decode_sereal );
262 use CBOR::XS qw( encode_cbor decode_cbor );
263 use JSON::XS qw( encode_json decode_json );
264
265 use MCE::Step
266 max_workers => 8, # Default 'auto'
267 chunk_size => 500, # Default 'auto'
268 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
269 freeze => \&encode_sereal, # \&Storable::freeze
270 thaw => \&decode_sereal, # \&Storable::thaw
271 fast => 1 # Default 0 (fast dequeue)
272 ;
273
274 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
275 available. Specify "Sereal => 0" to use Storable instead.
276
277 use MCE::Step Sereal => 0;
278
280 MCE::Step->init ( options )
281 MCE::Step::init { options }
282
283 The init function accepts a hash of MCE options. Unlike with
284 MCE::Stream, both gather and bounds_only options may be specified when
285 calling init (not shown below).
286
287 use MCE::Step;
288
289 MCE::Step::init {
290 chunk_size => 1, max_workers => 4,
291
292 user_begin => sub {
293 print "## ", MCE->wid, " started\n";
294 },
295
296 user_end => sub {
297 print "## ", MCE->wid, " completed\n";
298 }
299 };
300
301 my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
302
303 print "\n", "@a{1..100}", "\n";
304
305 -- Output
306
307 ## 3 started
308 ## 1 started
309 ## 4 started
310 ## 2 started
311 ## 3 completed
312 ## 4 completed
313 ## 1 completed
314 ## 2 completed
315
316 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
317 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
318 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
319 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
320 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
321 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
322 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
323 10000
324
325 Like with MCE::Step::init above, MCE options may be specified using an
326 anonymous hash for the first argument. Notice how task_name,
327 max_workers, and use_threads can take an anonymous array for setting
328 uniquely per each code block.
329
330 Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
331 with the first code block, thus processing from left-to-right.
332
333 The following takes 9 seconds to complete. The 9 seconds is from having
334 only 2 workers assigned for the last sub-task and waiting 1 or 2
335 seconds initially before calling MCE->step.
336
337 Removing both calls to MCE->step will cause the script to complete in
338 just 1 second. The reason is due to the 2nd and subsequent sub-tasks
339 awaiting data from an internal queue. Workers terminate upon receiving
340 an undef.
341
342 use threads;
343 use MCE::Step;
344
345 my @a = mce_step {
346 task_name => [ 'a', 'b', 'c' ],
347 max_workers => [ 3, 4, 2, ],
348 use_threads => [ 1, 0, 0, ],
349
350 user_end => sub {
351 my ($mce, $task_id, $task_name) = @_;
352 MCE->print("$task_id - $task_name completed\n");
353 },
354
355 task_end => sub {
356 my ($mce, $task_id, $task_name) = @_;
357 MCE->print("$task_id - $task_name ended\n");
358 }
359 },
360 sub { sleep 1; MCE->step(""); }, ## 3 workers, named a
361 sub { sleep 2; MCE->step(""); }, ## 4 workers, named b
362 sub { sleep 3; }; ## 2 workers, named c
363
364 -- Output
365
366 0 - a completed
367 0 - a completed
368 0 - a completed
369 0 - a ended
370 1 - b completed
371 1 - b completed
372 1 - b completed
373 1 - b completed
374 1 - b ended
375 2 - c completed
376 2 - c completed
377 2 - c ended
378
380 Although input data is optional for MCE::Step, the following assumes
381 chunk_size equals 1 in order to demonstrate all the possibilities for
382 providing input data.
383
384 MCE::Step->run ( sub { code }, list )
385 mce_step sub { code }, list
386
387 Input data may be defined using a list, an array ref, or a hash ref.
388
389 Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Step
390 takes a "sub { ... }" or a code reference. The other difference is that
391 the comma is needed after the block.
392
393 # $_ contains the item when chunk_size => 1
394
395 mce_step sub { do_work($_) }, 1..1000;
396 mce_step sub { do_work($_) }, \@list;
397
398 # Important; pass an array_ref for deeply input data
399
400 mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
401 mce_step sub { do_work($_) }, \@deeply_list;
402
403 # Chunking; any chunk_size => 1 or greater
404
405 my %res = mce_step sub {
406 my ($mce, $chunk_ref, $chunk_id) = @_;
407 my %ret;
408 for my $item (@{ $chunk_ref }) {
409 $ret{$item} = $item * 2;
410 }
411 MCE->gather(%ret);
412 },
413 \@list;
414
415 # Input hash; current API available since 1.828
416
417 my %res = mce_step sub {
418 my ($mce, $chunk_ref, $chunk_id) = @_;
419 my %ret;
420 for my $key (keys %{ $chunk_ref }) {
421 $ret{$key} = $chunk_ref->{$key} * 2;
422 }
423 MCE->gather(%ret);
424 },
425 \%hash;
426
427 # Unlike MCE::Loop, MCE::Step doesn't need input to run
428
429 mce_step { max_workers => 4 }, sub {
430 MCE->say( MCE->wid );
431 };
432
433 # ... and can run multiple tasks
434
435 mce_step {
436 max_workers => [ 1, 3 ],
437 task_name => [ 'p', 'c' ]
438 },
439 sub {
440 # 1 producer
441 MCE->say( "producer: ", MCE->wid );
442 },
443 sub {
444 # 3 consumers
445 MCE->say( "consumer: ", MCE->wid );
446 };
447
448 # Here, options are specified via init
449
450 MCE::Step::init {
451 max_workers => [ 1, 3 ],
452 task_name => [ 'p', 'c' ]
453 };
454
455 mce_step \&producer, \&consumers;
456
457 MCE::Step->run_file ( sub { code }, file )
458 mce_step_f sub { code }, file
459
460 The fastest of these is the /path/to/file. Workers communicate the next
461 offset position among themselves with zero interaction by the manager
462 process.
463
464 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
465
466 # $_ contains the line when chunk_size => 1
467
468 mce_step_f sub { $_ }, "/path/to/file"; # faster
469 mce_step_f sub { $_ }, $file_handle;
470 mce_step_f sub { $_ }, $io; # IO::All
471 mce_step_f sub { $_ }, \$scalar;
472
473 # chunking, any chunk_size => 1 or greater
474
475 my %res = mce_step_f sub {
476 my ($mce, $chunk_ref, $chunk_id) = @_;
477 my $buf = '';
478 for my $line (@{ $chunk_ref }) {
479 $buf .= $line;
480 }
481 MCE->gather($chunk_id, $buf);
482 },
483 "/path/to/file";
484
485 MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
486 mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
487
488 Sequence may be defined as a list, an array reference, or a hash
489 reference. The functions require both begin and end values to run.
490 Step and format are optional. The format is passed to sprintf (% may be
491 omitted below).
492
493 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
494
495 # $_ contains the sequence number when chunk_size => 1
496
497 mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
498 mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
499
500 mce_step_s sub { $_ }, {
501 begin => $beg, end => $end,
502 step => $step, format => $fmt
503 };
504
505 # chunking, any chunk_size => 1 or greater
506
507 my %res = mce_step_s sub {
508 my ($mce, $chunk_ref, $chunk_id) = @_;
509 my $buf = '';
510 for my $seq (@{ $chunk_ref }) {
511 $buf .= "$seq\n";
512 }
513 MCE->gather($chunk_id, $buf);
514 },
515 [ $beg, $end ];
516
517 The sequence engine can compute 'begin' and 'end' items only, for the
518 chunk, and not the items in between (hence boundaries only). This
519 option applies to sequence only and has no effect when chunk_size
520 equals 1.
521
522 The time to run is 0.006s below. This becomes 0.827s without the
523 bounds_only option due to computing all items in between, thus creating
524 a very large array. Basically, specify bounds_only => 1 when boundaries
525 is all you need for looping inside the block; e.g. Monte Carlo
526 simulations.
527
528 Time was measured using 1 worker to emphasize the difference.
529
530 use MCE::Step;
531
532 MCE::Step::init {
533 max_workers => 1, chunk_size => 1_250_000,
534 bounds_only => 1
535 };
536
537 # Typically, the input scalar $_ contains the sequence number
538 # when chunk_size => 1, unless the bounds_only option is set
539 # which is the case here. Thus, $_ points to $chunk_ref.
540
541 mce_step_s sub {
542 my ($mce, $chunk_ref, $chunk_id) = @_;
543
544 # $chunk_ref contains 2 items, not 1_250_000
545 # my ( $begin, $end ) = ( $_->[0], $_->[1] );
546
547 my $begin = $chunk_ref->[0];
548 my $end = $chunk_ref->[1];
549
550 # for my $seq ( $begin .. $end ) {
551 # ...
552 # }
553
554 MCE->printf("%7d .. %8d\n", $begin, $end);
555 },
556 [ 1, 10_000_000 ];
557
558 -- Output
559
560 1 .. 1250000
561 1250001 .. 2500000
562 2500001 .. 3750000
563 3750001 .. 5000000
564 5000001 .. 6250000
565 6250001 .. 7500000
566 7500001 .. 8750000
567 8750001 .. 10000000
568
569 MCE::Step->run ( { input_data => iterator }, sub { code } )
570 mce_step { input_data => iterator }, sub { code }
571
572 An iterator reference may be specified for input_data. The only other
573 way is to specify input_data via MCE::Step::init. This prevents
574 MCE::Step from configuring the iterator reference as another user task
575 which will not work.
576
577 Iterators are described under section "SYNTAX for INPUT_DATA" at
578 MCE::Core.
579
580 MCE::Step::init {
581 input_data => iterator
582 };
583
584 mce_step sub { $_ };
585
587 MCE->step ( item )
588 MCE->step ( arg1, arg2, argN )
589
590 The ->step method is the simplest form for passing elements into the
591 next sub-task.
592
593 use MCE::Step;
594
595 sub provider {
596 MCE->step( $_, rand ) for 10 .. 19;
597 }
598
599 sub consumer {
600 my ( $mce, @args ) = @_;
601 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
602 }
603
604 MCE::Step::init {
605 task_name => [ 'p', 'c' ],
606 max_workers => [ 1 , 4 ]
607 };
608
609 mce_step \&provider, \&consumer;
610
611 -- Output
612
613 2: 10, 0.583551
614 4: 11, 0.175319
615 3: 12, 0.843662
616 4: 15, 0.748302
617 2: 14, 0.591752
618 3: 16, 0.357858
619 5: 13, 0.953528
620 4: 17, 0.698907
621 2: 18, 0.985448
622 3: 19, 0.146548
623
624 MCE->enq ( task_name, item )
625 MCE->enq ( task_name, [ arg1, arg2, argN ] )
626 MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
627 MCE->enqp ( task_name, priority, item )
628 MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
629 MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
630
631 The MCE 1.7 release enables finer control. Unlike ->step, which take
632 multiple arguments, the ->enq and ->enqp methods push items at the end
633 of the array internally. Passing multiple arguments is possible by
634 enclosing the arguments inside an anonymous array.
635
636 The direction of flow is forward only. Thus, stepping to itself or
637 backwards will cause an error.
638
639 use MCE::Step;
640
641 sub provider {
642 if ( MCE->wid % 2 == 0 ) {
643 MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
644 } else {
645 MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
646 }
647 }
648
649 sub consumer_c {
650 my ( $mce, $args ) = @_;
651 MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
652 }
653
654 sub consumer_d {
655 my ( $mce, $args ) = @_;
656 MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
657 }
658
659 MCE::Step::init {
660 task_name => [ 'p', 'c', 'd' ],
661 max_workers => [ 2 , 3 , 3 ]
662 };
663
664 mce_step \&provider, \&consumer_c, \&consumer_d;
665
666 -- Output
667
668 C4: 10, 0.527531
669 D6: 20, 0.420108
670 C5: 11, 0.839770
671 D8: 21, 0.386414
672 C3: 12, 0.834645
673 C4: 13, 0.191014
674 D6: 23, 0.924027
675 C5: 14, 0.899357
676 D8: 24, 0.706186
677 C4: 15, 0.083823
678 D7: 22, 0.479708
679 D6: 25, 0.073882
680 C3: 16, 0.207446
681 D8: 26, 0.560755
682 C5: 17, 0.198157
683 D7: 27, 0.324909
684 C4: 18, 0.147505
685 C5: 19, 0.318371
686 D6: 28, 0.220465
687 D8: 29, 0.630111
688
689 MCE->await ( task_name, pending_threshold )
690
691 Providers may sometime run faster than consumers. Thus, increasing
692 memory consumption. MCE 1.7 adds the ->await method for pausing
693 momentarily until the receiving sub-task reaches the minimum threshold
694 for the number of items pending in its queue.
695
696 use MCE::Step;
697 use Time::HiRes 'sleep';
698
699 sub provider {
700 for ( 10 .. 29 ) {
701 # wait until 10 or less items pending
702 MCE->await( 'c', 10 );
703 # forward item to a later sub-task ( 'c' comes after 'p' )
704 MCE->enq( 'c', [ $_, rand ] );
705 }
706 }
707
708 sub consumer {
709 my ($mce, $args) = @_;
710 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
711 sleep 0.05;
712 }
713
714 MCE::Step::init {
715 task_name => [ 'p', 'c' ],
716 max_workers => [ 1 , 4 ]
717 };
718
719 mce_step \&provider, \&consumer;
720
721 -- Output
722
723 3: 10, 0.527307
724 2: 11, 0.036193
725 5: 12, 0.987168
726 4: 13, 0.998140
727 5: 14, 0.219526
728 4: 15, 0.061609
729 2: 16, 0.557664
730 3: 17, 0.658684
731 4: 18, 0.240932
732 3: 19, 0.241042
733 5: 20, 0.884830
734 2: 21, 0.902223
735 4: 22, 0.699223
736 3: 23, 0.208270
737 5: 24, 0.438919
738 2: 25, 0.268854
739 4: 26, 0.596425
740 5: 27, 0.979818
741 2: 28, 0.918173
742 3: 29, 0.358266
743
745 Unlike MCE::Map where gather and output order are done for you
746 automatically, the gather method is used to have results sent back to
747 the manager process.
748
749 use MCE::Step chunk_size => 1;
750
751 ## Output order is not guaranteed.
752 my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
753 print "@a\n\n";
754
755 ## Outputs to a hash instead (key, value).
756 my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
757 print "@h1{1..100}\n\n";
758
759 ## This does the same thing due to chunk_id starting at one.
760 my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
761 print "@h2{1..100}\n\n";
762
763 The gather method may be called multiple times within the block unlike
764 return which would leave the block. Therefore, think of gather as
765 yielding results immediately to the manager process without actually
766 leaving the block.
767
768 use MCE::Step chunk_size => 1, max_workers => 3;
769
770 my @hosts = qw(
771 hosta hostb hostc hostd hoste
772 );
773
774 my %h3 = mce_step sub {
775 my ($output, $error, $status); my $host = $_;
776
777 ## Do something with $host;
778 $output = "Worker ". MCE->wid .": Hello from $host";
779
780 if (MCE->chunk_id % 3 == 0) {
781 ## Simulating an error condition
782 local $? = 1; $status = $?;
783 $error = "Error from $host"
784 }
785 else {
786 $status = 0;
787 }
788
789 ## Ensure unique keys (key, value) when gathering to
790 ## a hash.
791 MCE->gather("$host.out", $output);
792 MCE->gather("$host.err", $error) if (defined $error);
793 MCE->gather("$host.sta", $status);
794
795 }, @hosts;
796
797 foreach my $host (@hosts) {
798 print $h3{"$host.out"}, "\n";
799 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
800 print "Exit status: ", $h3{"$host.sta"}, "\n\n";
801 }
802
803 -- Output
804
805 Worker 3: Hello from hosta
806 Exit status: 0
807
808 Worker 2: Hello from hostb
809 Exit status: 0
810
811 Worker 1: Hello from hostc
812 Error from hostc
813 Exit status: 1
814
815 Worker 3: Hello from hostd
816 Exit status: 0
817
818 Worker 2: Hello from hoste
819 Exit status: 0
820
821 The following uses an anonymous array containing 3 elements when
822 gathering data. Serialization is automatic behind the scene.
823
824 my %h3 = mce_step sub {
825 ...
826
827 MCE->gather($host, [$output, $error, $status]);
828
829 }, @hosts;
830
831 foreach my $host (@hosts) {
832 print $h3{$host}->[0], "\n";
833 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
834 print "Exit status: ", $h3{$host}->[2], "\n\n";
835 }
836
837 Although MCE::Map comes to mind, one may want additional control when
838 gathering data such as retaining output order.
839
840 use MCE::Step;
841
842 sub preserve_order {
843 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
844
845 return sub {
846 $tmp{ (shift) } = \@_;
847
848 while (1) {
849 last unless exists $tmp{$order_id};
850 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
851 }
852
853 return;
854 };
855 }
856
857 ## Workers persist for the most part after running. Though, not always
858 ## the case and depends on Perl. Pass a reference to a subroutine if
859 ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
860
861 MCE::Step::init {
862 chunk_size => 'auto', max_workers => 'auto'
863 };
864
865 for (1..2) {
866 my @m2;
867
868 mce_step {
869 gather => preserve_order(\@m2)
870 },
871 sub {
872 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
873
874 ## Compute the entire chunk data at once.
875 push @a, map { $_ * 2 } @{ $chunk_ref };
876
877 ## Afterwards, invoke the gather feature, which
878 ## will direct the data to the callback function.
879 MCE->gather(MCE->chunk_id, @a);
880
881 }, 1..100000;
882
883 print scalar @m2, "\n";
884 }
885
886 MCE::Step::finish;
887
888 All 6 models support 'auto' for chunk_size unlike the Core API. Think
889 of the models as the basis for providing JIT for MCE. They create the
890 instance, tune max_workers, and tune chunk_size automatically
891 regardless of the hardware.
892
893 The following does the same thing using the Core API. Workers persist
894 after running.
895
896 use MCE;
897
898 sub preserve_order {
899 ...
900 }
901
902 my $mce = MCE->new(
903 max_workers => 'auto', chunk_size => 8000,
904
905 user_func => sub {
906 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
907
908 ## Compute the entire chunk data at once.
909 push @a, map { $_ * 2 } @{ $chunk_ref };
910
911 ## Afterwards, invoke the gather feature, which
912 ## will direct the data to the callback function.
913 MCE->gather(MCE->chunk_id, @a);
914 }
915 );
916
917 for (1..2) {
918 my @m2;
919
920 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
921
922 print scalar @m2, "\n";
923 }
924
925 $mce->shutdown;
926
928 MCE::Step->finish
929 MCE::Step::finish
930
931 Workers remain persistent as much as possible after running. Shutdown
932 occurs automatically when the script terminates. Call finish when
933 workers are no longer needed.
934
935 use MCE::Step;
936
937 MCE::Step::init {
938 chunk_size => 20, max_workers => 'auto'
939 };
940
941 mce_step sub { ... }, 1..100;
942
943 MCE::Step::finish;
944
946 MCE, MCE::Core
947
949 Mario E. Roy, <marioeroy AT gmail DOT com>
950
951
952
953perl v5.30.1 2020-02-09 MCE::Step(3)