1MCE::Step(3) User Contributed Perl Documentation MCE::Step(3)
2
3
4
6 MCE::Step - Parallel step model for building creative steps
7
9 This document describes MCE::Step version 1.884
10
12 MCE::Step is similar to MCE::Flow for writing custom apps. The main
13 difference comes from the transparent use of queues between sub-tasks.
14 MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
15 QUEUE-LIKE FEATURES below.
16
17 It is trivial to parallelize with mce_stream shown below.
18
19 ## Native map function
20 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
21
22 ## Same as with MCE::Stream (processing from right to left)
23 @a = mce_stream
24 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
25
26 ## Pass an array reference to have writes occur simultaneously
27 mce_stream \@a,
28 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
29
30 However, let's have MCE::Step compute the same in parallel. Unlike the
31 example in MCE::Flow, the use of MCE::Queue is totally transparent.
32 This calls for preserving output order provided by MCE::Candy.
33
34 use MCE::Step;
35 use MCE::Candy;
36
37 Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as
38 described in MCE::Flow. The call to MCE->step simplifies the passing of
39 data to subsequent sub-task.
40
41 sub task_a {
42 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
43 push @ans, map { $_ * 2 } @{ $chunk_ref };
44 MCE->step(\@ans, $chunk_id);
45 }
46
47 sub task_b {
48 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
49 push @ans, map { $_ * 3 } @{ $chunk_ref };
50 MCE->step(\@ans, $chunk_id);
51 }
52
53 sub task_c {
54 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
55 push @ans, map { $_ * 4 } @{ $chunk_ref };
56 MCE->gather($chunk_id, \@ans);
57 }
58
59 In summary, MCE::Step builds out a MCE instance behind the scene and
60 starts running. The task_name (shown), max_workers, and use_threads
61 options can take an anonymous array for specifying the values uniquely
62 per each sub-task.
63
64 The task_name option is required to use ->enq, ->enqp, and ->await.
65
66 my @a;
67
68 mce_step {
69 task_name => [ 'a', 'b', 'c' ],
70 gather => MCE::Candy::out_iter_array(\@a)
71
72 }, \&task_a, \&task_b, \&task_c, 1..10000;
73
74 print "@a\n";
75
77 In the demonstration below, one may call ->gather or ->step any number
78 of times although ->step is not allowed in the last sub-block. Data is
79 gathered to @arr which may likely be out-of-order. Gathering data is
80 optional. All sub-blocks receive $mce as the first argument.
81
82 First, defining 3 sub-tasks.
83
84 use MCE::Step;
85
86 sub task_a {
87 my ($mce, $chunk_ref, $chunk_id) = @_;
88
89 if ($_ % 2 == 0) {
90 MCE->gather($_);
91 # MCE->gather($_ * 4); ## Ok to gather multiple times
92 }
93 else {
94 MCE->print("a step: $_, $_ * $_\n");
95 MCE->step($_, $_ * $_);
96 # MCE->step($_, $_ * 4 ); ## Ok to step multiple times
97 }
98 }
99
100 sub task_b {
101 my ($mce, $arg1, $arg2) = @_;
102
103 MCE->print("b args: $arg1, $arg2\n");
104
105 if ($_ % 3 == 0) { ## $_ is the same as $arg1
106 MCE->gather($_);
107 }
108 else {
109 MCE->print("b step: $_ * $_\n");
110 MCE->step($_ * $_);
111 }
112 }
113
114 sub task_c {
115 my ($mce, $arg1) = @_;
116
117 MCE->print("c: $_\n");
118 MCE->gather($_);
119 }
120
121 Next, pass MCE options, using chunk_size 1, and run all 3 tasks in
122 parallel. Notice how max_workers and use_threads can take an anonymous
123 array, similarly to task_name.
124
125 my @arr = mce_step {
126 task_name => [ 'a', 'b', 'c' ],
127 max_workers => [ 2, 2, 2 ],
128 use_threads => [ 0, 0, 0 ],
129 chunk_size => 1
130
131 }, \&task_a, \&task_b, \&task_c, 1..10;
132
133 Finally, sort the array and display its contents.
134
135 @arr = sort { $a <=> $b } @arr;
136
137 print "\n@arr\n\n";
138
139 -- Output
140
141 a step: 1, 1 * 1
142 a step: 3, 3 * 3
143 a step: 5, 5 * 5
144 a step: 7, 7 * 7
145 a step: 9, 9 * 9
146 b args: 1, 1
147 b step: 1 * 1
148 b args: 3, 9
149 b args: 7, 49
150 b step: 7 * 7
151 b args: 5, 25
152 b step: 5 * 5
153 b args: 9, 81
154 c: 1
155 c: 49
156 c: 25
157
158 1 2 3 4 6 8 9 10 25 49
159
161 Although MCE::Loop may be preferred for running using a single code
162 block, the text below also applies to this module, particularly for the
163 first block.
164
165 All models in MCE default to 'auto' for chunk_size. The arguments for
166 the block are the same as writing a user_func block using the Core API.
167
168 Beginning with MCE 1.5, the next input item is placed into the input
169 scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
170 $chunk_ref containing many items. Basically, line 2 below may be
171 omitted from your code when using $_. One can call MCE->chunk_id to
172 obtain the current chunk id.
173
174 line 1: user_func => sub {
175 line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
176 line 3:
177 line 4: $_ points to $chunk_ref->[0]
178 line 5: in MCE 1.5 when chunk_size == 1
179 line 6:
180 line 7: $_ points to $chunk_ref
181 line 8: in MCE 1.5 when chunk_size > 1
182 line 9: }
183
184 Follow this synopsis when chunk_size equals one. Looping is not
185 required from inside the first block. Hence, the block is called once
186 per each item.
187
188 ## Exports mce_step, mce_step_f, and mce_step_s
189 use MCE::Step;
190
191 MCE::Step->init(
192 chunk_size => 1
193 );
194
195 ## Array or array_ref
196 mce_step sub { do_work($_) }, 1..10000;
197 mce_step sub { do_work($_) }, \@list;
198
199 ## Important; pass an array_ref for deeply input data
200 mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
201 mce_step sub { do_work($_) }, \@deeply_list;
202
203 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
204 ## Workers read directly and not involve the manager process
205 mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
206
207 ## Involves the manager process, therefore slower
208 mce_step_f sub { chomp; do_work($_) }, $file_handle;
209 mce_step_f sub { chomp; do_work($_) }, $io;
210 mce_step_f sub { chomp; do_work($_) }, \$scalar;
211
212 ## Sequence of numbers (begin, end [, step, format])
213 mce_step_s sub { do_work($_) }, 1, 10000, 5;
214 mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
215
216 mce_step_s sub { do_work($_) }, {
217 begin => 1, end => 10000, step => 5, format => undef
218 };
219
221 Follow this synopsis when chunk_size equals 'auto' or greater than 1.
222 This means having to loop through the chunk from inside the first
223 block.
224
225 use MCE::Step;
226
227 MCE::Step->init( ## Chunk_size defaults to 'auto' when
228 chunk_size => 'auto' ## not specified. Therefore, the init
229 ); ## function may be omitted.
230
231 ## Syntax is shown for mce_step for demonstration purposes.
232 ## Looping inside the block is the same for mce_step_f and
233 ## mce_step_s.
234
235 ## Array or array_ref
236 mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
237 mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
238
239 ## Important; pass an array_ref for deeply input data
240 mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
241 mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
242
243 ## Resembles code using the core MCE API
244 mce_step sub {
245 my ($mce, $chunk_ref, $chunk_id) = @_;
246
247 for (@{ $chunk_ref }) {
248 do_work($_);
249 }
250
251 }, 1..10000;
252
253 Chunking reduces the number of IPC calls behind the scene. Think in
254 terms of chunks whenever processing a large amount of data. For
255 relatively small data, choosing 1 for chunk_size is fine.
256
258 The following list options which may be overridden when loading the
259 module. The fast option is obsolete in 1.867 onwards; ignored if
260 specified.
261
262 use Sereal qw( encode_sereal decode_sereal );
263 use CBOR::XS qw( encode_cbor decode_cbor );
264 use JSON::XS qw( encode_json decode_json );
265
266 use MCE::Step
267 max_workers => 8, # Default 'auto'
268 chunk_size => 500, # Default 'auto'
269 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
270 freeze => \&encode_sereal, # \&Storable::freeze
271 thaw => \&decode_sereal, # \&Storable::thaw
272 init_relay => 0, # Default undef; MCE 1.882+
273 use_threads => 0, # Default undef; MCE 1.882+
274 ;
275
276 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
277 available. Specify "Sereal => 0" to use Storable instead.
278
279 use MCE::Step Sereal => 0;
280
282 MCE::Step->init ( options )
283 MCE::Step::init { options }
284
285 The init function accepts a hash of MCE options. Unlike with
286 MCE::Stream, both gather and bounds_only options may be specified when
287 calling init (not shown below).
288
289 use MCE::Step;
290
291 MCE::Step->init(
292 chunk_size => 1, max_workers => 4,
293
294 user_begin => sub {
295 print "## ", MCE->wid, " started\n";
296 },
297
298 user_end => sub {
299 print "## ", MCE->wid, " completed\n";
300 }
301 );
302
303 my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
304
305 print "\n", "@a{1..100}", "\n";
306
307 -- Output
308
309 ## 3 started
310 ## 1 started
311 ## 4 started
312 ## 2 started
313 ## 3 completed
314 ## 4 completed
315 ## 1 completed
316 ## 2 completed
317
318 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
319 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
320 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
321 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
322 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
323 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
324 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
325 10000
326
327 Like with MCE::Step->init above, MCE options may be specified using an
328 anonymous hash for the first argument. Notice how task_name,
329 max_workers, and use_threads can take an anonymous array for setting
330 uniquely per each code block.
331
332 Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
333 with the first code block, thus processing from left-to-right.
334
335 The following takes 9 seconds to complete. The 9 seconds is from having
336 only 2 workers assigned for the last sub-task and waiting 1 or 2
337 seconds initially before calling MCE->step.
338
339 Removing both calls to MCE->step will cause the script to complete in
340 just 1 second. The reason is due to the 2nd and subsequent sub-tasks
341 awaiting data from an internal queue. Workers terminate upon receiving
342 an undef.
343
344 use threads;
345 use MCE::Step;
346
347 my @a = mce_step {
348 task_name => [ 'a', 'b', 'c' ],
349 max_workers => [ 3, 4, 2, ],
350 use_threads => [ 1, 0, 0, ],
351
352 user_end => sub {
353 my ($mce, $task_id, $task_name) = @_;
354 MCE->print("$task_id - $task_name completed\n");
355 },
356
357 task_end => sub {
358 my ($mce, $task_id, $task_name) = @_;
359 MCE->print("$task_id - $task_name ended\n");
360 }
361 },
362 sub { sleep 1; MCE->step(""); }, ## 3 workers, named a
363 sub { sleep 2; MCE->step(""); }, ## 4 workers, named b
364 sub { sleep 3; }; ## 2 workers, named c
365
366 -- Output
367
368 0 - a completed
369 0 - a completed
370 0 - a completed
371 0 - a ended
372 1 - b completed
373 1 - b completed
374 1 - b completed
375 1 - b completed
376 1 - b ended
377 2 - c completed
378 2 - c completed
379 2 - c ended
380
382 Although input data is optional for MCE::Step, the following assumes
383 chunk_size equals 1 in order to demonstrate all the possibilities for
384 providing input data.
385
386 MCE::Step->run ( sub { code }, list )
387 mce_step sub { code }, list
388
389 Input data may be defined using a list, an array ref, or a hash ref.
390
391 Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Step
392 takes a "sub { ... }" or a code reference. The other difference is that
393 the comma is needed after the block.
394
395 # $_ contains the item when chunk_size => 1
396
397 mce_step sub { do_work($_) }, 1..1000;
398 mce_step sub { do_work($_) }, \@list;
399
400 # Important; pass an array_ref for deeply input data
401
402 mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
403 mce_step sub { do_work($_) }, \@deeply_list;
404
405 # Chunking; any chunk_size => 1 or greater
406
407 my %res = mce_step sub {
408 my ($mce, $chunk_ref, $chunk_id) = @_;
409 my %ret;
410 for my $item (@{ $chunk_ref }) {
411 $ret{$item} = $item * 2;
412 }
413 MCE->gather(%ret);
414 },
415 \@list;
416
417 # Input hash; current API available since 1.828
418
419 my %res = mce_step sub {
420 my ($mce, $chunk_ref, $chunk_id) = @_;
421 my %ret;
422 for my $key (keys %{ $chunk_ref }) {
423 $ret{$key} = $chunk_ref->{$key} * 2;
424 }
425 MCE->gather(%ret);
426 },
427 \%hash;
428
429 # Unlike MCE::Loop, MCE::Step doesn't need input to run
430
431 mce_step { max_workers => 4 }, sub {
432 MCE->say( MCE->wid );
433 };
434
435 # ... and can run multiple tasks
436
437 mce_step {
438 max_workers => [ 1, 3 ],
439 task_name => [ 'p', 'c' ]
440 },
441 sub {
442 # 1 producer
443 MCE->say( "producer: ", MCE->wid );
444 },
445 sub {
446 # 3 consumers
447 MCE->say( "consumer: ", MCE->wid );
448 };
449
450 # Here, options are specified via init
451
452 MCE::Step->init(
453 max_workers => [ 1, 3 ],
454 task_name => [ 'p', 'c' ]
455 );
456
457 mce_step \&producer, \&consumers;
458
459 MCE::Step->run_file ( sub { code }, file )
460 mce_step_f sub { code }, file
461
462 The fastest of these is the /path/to/file. Workers communicate the next
463 offset position among themselves with zero interaction by the manager
464 process.
465
466 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
467
468 # $_ contains the line when chunk_size => 1
469
470 mce_step_f sub { $_ }, "/path/to/file"; # faster
471 mce_step_f sub { $_ }, $file_handle;
472 mce_step_f sub { $_ }, $io; # IO::All
473 mce_step_f sub { $_ }, \$scalar;
474
475 # chunking, any chunk_size => 1 or greater
476
477 my %res = mce_step_f sub {
478 my ($mce, $chunk_ref, $chunk_id) = @_;
479 my $buf = '';
480 for my $line (@{ $chunk_ref }) {
481 $buf .= $line;
482 }
483 MCE->gather($chunk_id, $buf);
484 },
485 "/path/to/file";
486
487 MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
488 mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
489
490 Sequence may be defined as a list, an array reference, or a hash
491 reference. The functions require both begin and end values to run.
492 Step and format are optional. The format is passed to sprintf (% may be
493 omitted below).
494
495 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
496
497 # $_ contains the sequence number when chunk_size => 1
498
499 mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
500 mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
501
502 mce_step_s sub { $_ }, {
503 begin => $beg, end => $end,
504 step => $step, format => $fmt
505 };
506
507 # chunking, any chunk_size => 1 or greater
508
509 my %res = mce_step_s sub {
510 my ($mce, $chunk_ref, $chunk_id) = @_;
511 my $buf = '';
512 for my $seq (@{ $chunk_ref }) {
513 $buf .= "$seq\n";
514 }
515 MCE->gather($chunk_id, $buf);
516 },
517 [ $beg, $end ];
518
519 The sequence engine can compute 'begin' and 'end' items only, for the
520 chunk, and not the items in between (hence boundaries only). This
521 option applies to sequence only and has no effect when chunk_size
522 equals 1.
523
524 The time to run is 0.006s below. This becomes 0.827s without the
525 bounds_only option due to computing all items in between, thus creating
526 a very large array. Basically, specify bounds_only => 1 when boundaries
527 is all you need for looping inside the block; e.g. Monte Carlo
528 simulations.
529
530 Time was measured using 1 worker to emphasize the difference.
531
532 use MCE::Step;
533
534 MCE::Step->init(
535 max_workers => 1, chunk_size => 1_250_000,
536 bounds_only => 1
537 );
538
539 # Typically, the input scalar $_ contains the sequence number
540 # when chunk_size => 1, unless the bounds_only option is set
541 # which is the case here. Thus, $_ points to $chunk_ref.
542
543 mce_step_s sub {
544 my ($mce, $chunk_ref, $chunk_id) = @_;
545
546 # $chunk_ref contains 2 items, not 1_250_000
547 # my ( $begin, $end ) = ( $_->[0], $_->[1] );
548
549 my $begin = $chunk_ref->[0];
550 my $end = $chunk_ref->[1];
551
552 # for my $seq ( $begin .. $end ) {
553 # ...
554 # }
555
556 MCE->printf("%7d .. %8d\n", $begin, $end);
557 },
558 [ 1, 10_000_000 ];
559
560 -- Output
561
562 1 .. 1250000
563 1250001 .. 2500000
564 2500001 .. 3750000
565 3750001 .. 5000000
566 5000001 .. 6250000
567 6250001 .. 7500000
568 7500001 .. 8750000
569 8750001 .. 10000000
570
571 MCE::Step->run ( { input_data => iterator }, sub { code } )
572 mce_step { input_data => iterator }, sub { code }
573
574 An iterator reference may be specified for input_data. The only other
575 way is to specify input_data via MCE::Step->init. This prevents
576 MCE::Step from configuring the iterator reference as another user task
577 which will not work.
578
579 Iterators are described under section "SYNTAX for INPUT_DATA" at
580 MCE::Core.
581
582 MCE::Step->init(
583 input_data => iterator
584 );
585
586 mce_step sub { $_ };
587
589 MCE->step ( item )
590 MCE->step ( arg1, arg2, argN )
591
592 The ->step method is the simplest form for passing elements into the
593 next sub-task.
594
595 use MCE::Step;
596
597 sub provider {
598 MCE->step( $_, rand ) for 10 .. 19;
599 }
600
601 sub consumer {
602 my ( $mce, @args ) = @_;
603 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
604 }
605
606 MCE::Step->init(
607 task_name => [ 'p', 'c' ],
608 max_workers => [ 1 , 4 ]
609 );
610
611 mce_step \&provider, \&consumer;
612
613 -- Output
614
615 2: 10, 0.583551
616 4: 11, 0.175319
617 3: 12, 0.843662
618 4: 15, 0.748302
619 2: 14, 0.591752
620 3: 16, 0.357858
621 5: 13, 0.953528
622 4: 17, 0.698907
623 2: 18, 0.985448
624 3: 19, 0.146548
625
626 MCE->enq ( task_name, item )
627 MCE->enq ( task_name, [ arg1, arg2, argN ] )
628 MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
629 MCE->enqp ( task_name, priority, item )
630 MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
631 MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
632
633 The MCE 1.7 release enables finer control. Unlike ->step, which take
634 multiple arguments, the ->enq and ->enqp methods push items at the end
635 of the array internally. Passing multiple arguments is possible by
636 enclosing the arguments inside an anonymous array.
637
638 The direction of flow is forward only. Thus, stepping to itself or
639 backwards will cause an error.
640
641 use MCE::Step;
642
643 sub provider {
644 if ( MCE->wid % 2 == 0 ) {
645 MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
646 } else {
647 MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
648 }
649 }
650
651 sub consumer_c {
652 my ( $mce, $args ) = @_;
653 MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
654 }
655
656 sub consumer_d {
657 my ( $mce, $args ) = @_;
658 MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
659 }
660
661 MCE::Step->init(
662 task_name => [ 'p', 'c', 'd' ],
663 max_workers => [ 2 , 3 , 3 ]
664 );
665
666 mce_step \&provider, \&consumer_c, \&consumer_d;
667
668 -- Output
669
670 C4: 10, 0.527531
671 D6: 20, 0.420108
672 C5: 11, 0.839770
673 D8: 21, 0.386414
674 C3: 12, 0.834645
675 C4: 13, 0.191014
676 D6: 23, 0.924027
677 C5: 14, 0.899357
678 D8: 24, 0.706186
679 C4: 15, 0.083823
680 D7: 22, 0.479708
681 D6: 25, 0.073882
682 C3: 16, 0.207446
683 D8: 26, 0.560755
684 C5: 17, 0.198157
685 D7: 27, 0.324909
686 C4: 18, 0.147505
687 C5: 19, 0.318371
688 D6: 28, 0.220465
689 D8: 29, 0.630111
690
691 MCE->await ( task_name, pending_threshold )
692
693 Providers may sometime run faster than consumers. Thus, increasing
694 memory consumption. MCE 1.7 adds the ->await method for pausing
695 momentarily until the receiving sub-task reaches the minimum threshold
696 for the number of items pending in its queue.
697
698 use MCE::Step;
699 use Time::HiRes 'sleep';
700
701 sub provider {
702 for ( 10 .. 29 ) {
703 # wait until 10 or less items pending
704 MCE->await( 'c', 10 );
705 # forward item to a later sub-task ( 'c' comes after 'p' )
706 MCE->enq( 'c', [ $_, rand ] );
707 }
708 }
709
710 sub consumer {
711 my ($mce, $args) = @_;
712 MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
713 sleep 0.05;
714 }
715
716 MCE::Step->init(
717 task_name => [ 'p', 'c' ],
718 max_workers => [ 1 , 4 ]
719 );
720
721 mce_step \&provider, \&consumer;
722
723 -- Output
724
725 3: 10, 0.527307
726 2: 11, 0.036193
727 5: 12, 0.987168
728 4: 13, 0.998140
729 5: 14, 0.219526
730 4: 15, 0.061609
731 2: 16, 0.557664
732 3: 17, 0.658684
733 4: 18, 0.240932
734 3: 19, 0.241042
735 5: 20, 0.884830
736 2: 21, 0.902223
737 4: 22, 0.699223
738 3: 23, 0.208270
739 5: 24, 0.438919
740 2: 25, 0.268854
741 4: 26, 0.596425
742 5: 27, 0.979818
743 2: 28, 0.918173
744 3: 29, 0.358266
745
747 Unlike MCE::Map where gather and output order are done for you
748 automatically, the gather method is used to have results sent back to
749 the manager process.
750
751 use MCE::Step chunk_size => 1;
752
753 ## Output order is not guaranteed.
754 my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
755 print "@a\n\n";
756
757 ## Outputs to a hash instead (key, value).
758 my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
759 print "@h1{1..100}\n\n";
760
761 ## This does the same thing due to chunk_id starting at one.
762 my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
763 print "@h2{1..100}\n\n";
764
765 The gather method may be called multiple times within the block unlike
766 return which would leave the block. Therefore, think of gather as
767 yielding results immediately to the manager process without actually
768 leaving the block.
769
770 use MCE::Step chunk_size => 1, max_workers => 3;
771
772 my @hosts = qw(
773 hosta hostb hostc hostd hoste
774 );
775
776 my %h3 = mce_step sub {
777 my ($output, $error, $status); my $host = $_;
778
779 ## Do something with $host;
780 $output = "Worker ". MCE->wid .": Hello from $host";
781
782 if (MCE->chunk_id % 3 == 0) {
783 ## Simulating an error condition
784 local $? = 1; $status = $?;
785 $error = "Error from $host"
786 }
787 else {
788 $status = 0;
789 }
790
791 ## Ensure unique keys (key, value) when gathering to
792 ## a hash.
793 MCE->gather("$host.out", $output);
794 MCE->gather("$host.err", $error) if (defined $error);
795 MCE->gather("$host.sta", $status);
796
797 }, @hosts;
798
799 foreach my $host (@hosts) {
800 print $h3{"$host.out"}, "\n";
801 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
802 print "Exit status: ", $h3{"$host.sta"}, "\n\n";
803 }
804
805 -- Output
806
807 Worker 3: Hello from hosta
808 Exit status: 0
809
810 Worker 2: Hello from hostb
811 Exit status: 0
812
813 Worker 1: Hello from hostc
814 Error from hostc
815 Exit status: 1
816
817 Worker 3: Hello from hostd
818 Exit status: 0
819
820 Worker 2: Hello from hoste
821 Exit status: 0
822
823 The following uses an anonymous array containing 3 elements when
824 gathering data. Serialization is automatic behind the scene.
825
826 my %h3 = mce_step sub {
827 ...
828
829 MCE->gather($host, [$output, $error, $status]);
830
831 }, @hosts;
832
833 foreach my $host (@hosts) {
834 print $h3{$host}->[0], "\n";
835 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
836 print "Exit status: ", $h3{$host}->[2], "\n\n";
837 }
838
839 Although MCE::Map comes to mind, one may want additional control when
840 gathering data such as retaining output order.
841
842 use MCE::Step;
843
844 sub preserve_order {
845 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
846
847 return sub {
848 $tmp{ (shift) } = \@_;
849
850 while (1) {
851 last unless exists $tmp{$order_id};
852 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
853 }
854
855 return;
856 };
857 }
858
859 ## Workers persist for the most part after running. Though, not always
860 ## the case and depends on Perl. Pass a reference to a subroutine if
861 ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
862
863 MCE::Step->init(
864 chunk_size => 'auto', max_workers => 'auto'
865 );
866
867 for (1..2) {
868 my @m2;
869
870 mce_step {
871 gather => preserve_order(\@m2)
872 },
873 sub {
874 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
875
876 ## Compute the entire chunk data at once.
877 push @a, map { $_ * 2 } @{ $chunk_ref };
878
879 ## Afterwards, invoke the gather feature, which
880 ## will direct the data to the callback function.
881 MCE->gather(MCE->chunk_id, @a);
882
883 }, 1..100000;
884
885 print scalar @m2, "\n";
886 }
887
888 MCE::Step->finish;
889
890 All 6 models support 'auto' for chunk_size unlike the Core API. Think
891 of the models as the basis for providing JIT for MCE. They create the
892 instance, tune max_workers, and tune chunk_size automatically
893 regardless of the hardware.
894
895 The following does the same thing using the Core API. Workers persist
896 after running.
897
898 use MCE;
899
900 sub preserve_order {
901 ...
902 }
903
904 my $mce = MCE->new(
905 max_workers => 'auto', chunk_size => 8000,
906
907 user_func => sub {
908 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
909
910 ## Compute the entire chunk data at once.
911 push @a, map { $_ * 2 } @{ $chunk_ref };
912
913 ## Afterwards, invoke the gather feature, which
914 ## will direct the data to the callback function.
915 MCE->gather(MCE->chunk_id, @a);
916 }
917 );
918
919 for (1..2) {
920 my @m2;
921
922 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
923
924 print scalar @m2, "\n";
925 }
926
927 $mce->shutdown;
928
930 MCE::Step->finish
931 MCE::Step::finish
932
933 Workers remain persistent as much as possible after running. Shutdown
934 occurs automatically when the script terminates. Call finish when
935 workers are no longer needed.
936
937 use MCE::Step;
938
939 MCE::Step->init(
940 chunk_size => 20, max_workers => 'auto'
941 );
942
943 mce_step sub { ... }, 1..100;
944
945 MCE::Step->finish;
946
948 MCE, MCE::Core
949
951 Mario E. Roy, <marioeroy AT gmail DOT com>
952
953
954
955perl v5.36.0 2023-01-20 MCE::Step(3)