1MCE::Flow(3) User Contributed Perl Documentation MCE::Flow(3)
2
3
4
6 MCE::Flow - Parallel flow model for building creative applications
7
9 This document describes MCE::Flow version 1.878
10
12 MCE::Flow is great for writing custom apps to maximize on all available
13 cores. This module was created to help one harness user_tasks within
14 MCE.
15
16 It is trivial to parallelize with mce_stream shown below.
17
18 ## Native map function
19 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
20
21 ## Same as with MCE::Stream (processing from right to left)
22 @a = mce_stream
23 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
24
25 ## Pass an array reference to have writes occur simultaneously
26 mce_stream \@a,
27 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
28
29 However, let's have MCE::Flow compute the same in parallel. MCE::Queue
30 will be used for data flow among the sub-tasks.
31
32 use MCE::Flow;
33 use MCE::Queue;
34
35 This calls for preserving output order.
36
37 sub preserve_order {
38 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
39 @{ $gather_ref } = (); ## clear the array (optional)
40
41 return sub {
42 my ($data_ref, $chunk_id) = @_;
43 $tmp{$chunk_id} = $data_ref;
44
45 while (1) {
46 last unless exists $tmp{$order_id};
47 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
48 }
49
50 return;
51 };
52 }
53
54 Two queues are needed for data flow between the 3 sub-tasks. Notice
55 task_end and how the value from $task_name is used for determining
56 which task has ended.
57
58 my $b = MCE::Queue->new;
59 my $c = MCE::Queue->new;
60
61 sub task_end {
62 my ($mce, $task_id, $task_name) = @_;
63
64 if (defined $mce->{user_tasks}->[$task_id + 1]) {
65 my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
66
67 if ($task_name eq 'a') {
68 $b->enqueue((undef) x $n_workers);
69 }
70 elsif ($task_name eq 'b') {
71 $c->enqueue((undef) x $n_workers);
72 }
73 }
74
75 return;
76 }
77
78 Next are the 3 sub-tasks. The first one reads input and begins the
79 flow. The 2nd task dequeues, performs the calculation, and enqueues
80 into the next. Finally, the last task calls the gather method.
81
82 Although serialization is done for you automatically, it is done here
83 to save from double serialization. This is the fastest approach for
84 passing data between sub-tasks. Thus, the least overhead.
85
86 sub task_a {
87 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
88
89 push @ans, map { $_ * 2 } @{ $chunk_ref };
90 $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
91
92 return;
93 }
94
95 sub task_b {
96 my ($mce) = @_;
97
98 while (1) {
99 my @ans; my $chunk = $b->dequeue;
100 last unless defined $chunk;
101
102 $chunk = MCE->thaw($chunk);
103 push @ans, map { $_ * 3 } @{ $chunk->[0] };
104 $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
105 }
106
107 return;
108 }
109
110 sub task_c {
111 my ($mce) = @_;
112
113 while (1) {
114 my @ans; my $chunk = $c->dequeue;
115 last unless defined $chunk;
116
117 $chunk = MCE->thaw($chunk);
118 push @ans, map { $_ * 4 } @{ $chunk->[0] };
119 MCE->gather(\@ans, $chunk->[1]);
120 }
121
122 return;
123 }
124
125 In summary, MCE::Flow builds out a MCE instance behind the scene and
126 starts running. The task_name (shown), max_workers, and use_threads
127 options can take an anonymous array for specifying the values uniquely
128 per each sub-task.
129
130 my @a;
131
132 mce_flow {
133 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
134 gather => preserve_order(\@a)
135
136 }, \&task_a, \&task_b, \&task_c, 1..10000;
137
138 print "@a\n";
139
140 If speed is not a concern and wanting to rid of all the MCE->freeze and
141 MCE->thaw statements, simply enqueue and dequeue 2 items at a time. Or
142 better yet, see MCE::Step introduced in MCE 1.506.
143
144 First, task_end must be updated. The number of undef(s) must match the
145 number of workers times the dequeue count. Otherwise, the script will
146 stall.
147
148 sub task_end {
149 ...
150 if ($task_name eq 'a') {
151 # $b->enqueue((undef) x $n_workers);
152 $b->enqueue((undef) x ($n_workers * 2));
153 }
154 elsif ($task_name eq 'b') {
155 # $c->enqueue((undef) x $n_workers);
156 $c->enqueue((undef) x ($n_workers * 2));
157 }
158 ...
159 }
160
161 Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
162
163 sub task_a {
164 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
165
166 push @ans, map { $_ * 2 } @{ $chunk_ref };
167 $b->enqueue(\@ans, $chunk_id);
168
169 return;
170 }
171
172 sub task_b {
173 my ($mce) = @_;
174
175 while (1) {
176 my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
177 last unless defined $chunk_ref;
178
179 push @ans, map { $_ * 3 } @{ $chunk_ref };
180 $c->enqueue(\@ans, $chunk_id);
181 }
182
183 return;
184 }
185
186 sub task_c {
187 my ($mce) = @_;
188
189 while (1) {
190 my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
191 last unless defined $chunk_ref;
192
193 push @ans, map { $_ * 4 } @{ $chunk_ref };
194 MCE->gather(\@ans, $chunk_id);
195 }
196
197 return;
198 }
199
200 Finally, run as usual.
201
202 my @a;
203
204 mce_flow {
205 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
206 gather => preserve_order(\@a)
207
208 }, \&task_a, \&task_b, \&task_c, 1..10000;
209
210 print "@a\n";
211
213 Although MCE::Loop may be preferred for running using a single code
214 block, the text below also applies to this module, particularly for the
215 first block.
216
217 All models in MCE default to 'auto' for chunk_size. The arguments for
218 the block are the same as writing a user_func block using the Core API.
219
220 Beginning with MCE 1.5, the next input item is placed into the input
221 scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
222 $chunk_ref containing many items. Basically, line 2 below may be
223 omitted from your code when using $_. One can call MCE->chunk_id to
224 obtain the current chunk id.
225
226 line 1: user_func => sub {
227 line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
228 line 3:
229 line 4: $_ points to $chunk_ref->[0]
230 line 5: in MCE 1.5 when chunk_size == 1
231 line 6:
232 line 7: $_ points to $chunk_ref
233 line 8: in MCE 1.5 when chunk_size > 1
234 line 9: }
235
236 Follow this synopsis when chunk_size equals one. Looping is not
237 required from inside the first block. Hence, the block is called once
238 per each item.
239
240 ## Exports mce_flow, mce_flow_f, and mce_flow_s
241 use MCE::Flow;
242
243 MCE::Flow->init(
244 chunk_size => 1
245 );
246
247 ## Array or array_ref
248 mce_flow sub { do_work($_) }, 1..10000;
249 mce_flow sub { do_work($_) }, \@list;
250
251 ## Important; pass an array_ref for deeply input data
252 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
253 mce_flow sub { do_work($_) }, \@deeply_list;
254
255 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
256 ## Workers read directly and not involve the manager process
257 mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
258
259 ## Involves the manager process, therefore slower
260 mce_flow_f sub { chomp; do_work($_) }, $file_handle;
261 mce_flow_f sub { chomp; do_work($_) }, $io;
262 mce_flow_f sub { chomp; do_work($_) }, \$scalar;
263
264 ## Sequence of numbers (begin, end [, step, format])
265 mce_flow_s sub { do_work($_) }, 1, 10000, 5;
266 mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
267
268 mce_flow_s sub { do_work($_) }, {
269 begin => 1, end => 10000, step => 5, format => undef
270 };
271
273 Follow this synopsis when chunk_size equals 'auto' or greater than 1.
274 This means having to loop through the chunk from inside the first
275 block.
276
277 use MCE::Flow;
278
279 MCE::Flow->init( ## Chunk_size defaults to 'auto' when
280 chunk_size => 'auto' ## not specified. Therefore, the init
281 ); ## function may be omitted.
282
283 ## Syntax is shown for mce_flow for demonstration purposes.
284 ## Looping inside the block is the same for mce_flow_f and
285 ## mce_flow_s.
286
287 ## Array or array_ref
288 mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
289 mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
290
291 ## Important; pass an array_ref for deeply input data
292 mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
293 mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
294
295 ## Resembles code using the core MCE API
296 mce_flow sub {
297 my ($mce, $chunk_ref, $chunk_id) = @_;
298
299 for (@{ $chunk_ref }) {
300 do_work($_);
301 }
302
303 }, 1..10000;
304
305 Chunking reduces the number of IPC calls behind the scene. Think in
306 terms of chunks whenever processing a large amount of data. For
307 relatively small data, choosing 1 for chunk_size is fine.
308
310 The following list options which may be overridden when loading the
311 module.
312
313 use Sereal qw( encode_sereal decode_sereal );
314 use CBOR::XS qw( encode_cbor decode_cbor );
315 use JSON::XS qw( encode_json decode_json );
316
317 use MCE::Flow
318 max_workers => 8, # Default 'auto'
319 chunk_size => 500, # Default 'auto'
320 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
321 freeze => \&encode_sereal, # \&Storable::freeze
322 thaw => \&decode_sereal # \&Storable::thaw
323 ;
324
325 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
326 available. Specify "Sereal => 0" to use Storable instead.
327
328 use MCE::Flow Sereal => 0;
329
331 MCE::Flow->init ( options )
332 MCE::Flow::init { options }
333
334 The init function accepts a hash of MCE options. Unlike with
335 MCE::Stream, both gather and bounds_only options may be specified when
336 calling init (not shown below).
337
338 use MCE::Flow;
339
340 MCE::Flow->init(
341 chunk_size => 1, max_workers => 4,
342
343 user_begin => sub {
344 print "## ", MCE->wid, " started\n";
345 },
346
347 user_end => sub {
348 print "## ", MCE->wid, " completed\n";
349 }
350 );
351
352 my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
353
354 print "\n", "@a{1..100}", "\n";
355
356 -- Output
357
358 ## 3 started
359 ## 2 started
360 ## 4 started
361 ## 1 started
362 ## 2 completed
363 ## 4 completed
364 ## 3 completed
365 ## 1 completed
366
367 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
368 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
369 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
370 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
371 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
372 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
373 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
374 10000
375
376 Like with MCE::Flow->init above, MCE options may be specified using an
377 anonymous hash for the first argument. Notice how task_name,
378 max_workers, and use_threads can take an anonymous array for setting
379 uniquely per each code block.
380
381 Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
382 with the first code block, thus processing from left-to-right.
383
384 use threads;
385 use MCE::Flow;
386
387 my @a = mce_flow {
388 task_name => [ 'a', 'b', 'c' ],
389 max_workers => [ 3, 4, 2, ],
390 use_threads => [ 1, 0, 0, ],
391
392 user_end => sub {
393 my ($mce, $task_id, $task_name) = @_;
394 MCE->print("$task_id - $task_name completed\n");
395 },
396
397 task_end => sub {
398 my ($mce, $task_id, $task_name) = @_;
399 MCE->print("$task_id - $task_name ended\n");
400 }
401 },
402 sub { sleep 1; }, ## 3 workers, named a
403 sub { sleep 2; }, ## 4 workers, named b
404 sub { sleep 3; }; ## 2 workers, named c
405
406 -- Output
407
408 0 - a completed
409 0 - a completed
410 0 - a completed
411 0 - a ended
412 1 - b completed
413 1 - b completed
414 1 - b completed
415 1 - b completed
416 1 - b ended
417 2 - c completed
418 2 - c completed
419 2 - c ended
420
422 Although input data is optional for MCE::Flow, the following assumes
423 chunk_size equals 1 in order to demonstrate all the possibilities for
424 providing input data.
425
426 MCE::Flow->run ( sub { code }, list )
427 mce_flow sub { code }, list
428
429 Input data may be defined using a list, an array ref, or a hash ref.
430
431 Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Flow
432 takes a "sub { ... }" or a code reference. The other difference is that
433 the comma is needed after the block.
434
435 # $_ contains the item when chunk_size => 1
436
437 mce_flow sub { do_work($_) }, 1..1000;
438 mce_flow sub { do_work($_) }, \@list;
439
440 # Important; pass an array_ref for deeply input data
441
442 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
443 mce_flow sub { do_work($_) }, \@deeply_list;
444
445 # Chunking; any chunk_size => 1 or greater
446
447 my %res = mce_flow sub {
448 my ($mce, $chunk_ref, $chunk_id) = @_;
449 my %ret;
450 for my $item (@{ $chunk_ref }) {
451 $ret{$item} = $item * 2;
452 }
453 MCE->gather(%ret);
454 },
455 \@list;
456
457 # Input hash; current API available since 1.828
458
459 my %res = mce_flow sub {
460 my ($mce, $chunk_ref, $chunk_id) = @_;
461 my %ret;
462 for my $key (keys %{ $chunk_ref }) {
463 $ret{$key} = $chunk_ref->{$key} * 2;
464 }
465 MCE->gather(%ret);
466 },
467 \%hash;
468
469 # Unlike MCE::Loop, MCE::Flow doesn't need input to run
470
471 mce_flow { max_workers => 4 }, sub {
472 MCE->say( MCE->wid );
473 };
474
475 # ... and can run multiple tasks
476
477 mce_flow {
478 max_workers => [ 1, 3 ],
479 task_name => [ 'p', 'c' ]
480 },
481 sub {
482 # 1 producer
483 MCE->say( "producer: ", MCE->wid );
484 },
485 sub {
486 # 3 consumers
487 MCE->say( "consumer: ", MCE->wid );
488 };
489
490 # Here, options are specified via init
491
492 MCE::Flow->init(
493 max_workers => [ 1, 3 ],
494 task_name => [ 'p', 'c' ]
495 );
496
497 mce_flow \&producer, \&consumers;
498
499 MCE::Flow->run_file ( sub { code }, file )
500 mce_flow_f sub { code }, file
501
502 The fastest of these is the /path/to/file. Workers communicate the next
503 offset position among themselves with zero interaction by the manager
504 process.
505
506 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
507
508 # $_ contains the line when chunk_size => 1
509
510 mce_flow_f sub { $_ }, "/path/to/file"; # faster
511 mce_flow_f sub { $_ }, $file_handle;
512 mce_flow_f sub { $_ }, $io; # IO::All
513 mce_flow_f sub { $_ }, \$scalar;
514
515 # chunking, any chunk_size => 1 or greater
516
517 my %res = mce_flow_f sub {
518 my ($mce, $chunk_ref, $chunk_id) = @_;
519 my $buf = '';
520 for my $line (@{ $chunk_ref }) {
521 $buf .= $line;
522 }
523 MCE->gather($chunk_id, $buf);
524 },
525 "/path/to/file";
526
527 MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
528 mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
529
530 Sequence may be defined as a list, an array reference, or a hash
531 reference. The functions require both begin and end values to run.
532 Step and format are optional. The format is passed to sprintf (% may be
533 omitted below).
534
535 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
536
537 # $_ contains the sequence number when chunk_size => 1
538
539 mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
540 mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
541
542 mce_flow_s sub { $_ }, {
543 begin => $beg, end => $end,
544 step => $step, format => $fmt
545 };
546
547 # chunking, any chunk_size => 1 or greater
548
549 my %res = mce_flow_s sub {
550 my ($mce, $chunk_ref, $chunk_id) = @_;
551 my $buf = '';
552 for my $seq (@{ $chunk_ref }) {
553 $buf .= "$seq\n";
554 }
555 MCE->gather($chunk_id, $buf);
556 },
557 [ $beg, $end ];
558
559 The sequence engine can compute 'begin' and 'end' items only, for the
560 chunk, and not the items in between (hence boundaries only). This
561 option applies to sequence only and has no effect when chunk_size
562 equals 1.
563
564 The time to run is 0.006s below. This becomes 0.827s without the
565 bounds_only option due to computing all items in between, thus creating
566 a very large array. Basically, specify bounds_only => 1 when boundaries
567 is all you need for looping inside the block; e.g. Monte Carlo
568 simulations.
569
570 Time was measured using 1 worker to emphasize the difference.
571
572 use MCE::Flow;
573
574 MCE::Flow->init(
575 max_workers => 1, chunk_size => 1_250_000,
576 bounds_only => 1
577 );
578
579 # Typically, the input scalar $_ contains the sequence number
580 # when chunk_size => 1, unless the bounds_only option is set
581 # which is the case here. Thus, $_ points to $chunk_ref.
582
583 mce_flow_s sub {
584 my ($mce, $chunk_ref, $chunk_id) = @_;
585
586 # $chunk_ref contains 2 items, not 1_250_000
587 # my ( $begin, $end ) = ( $_->[0], $_->[1] );
588
589 my $begin = $chunk_ref->[0];
590 my $end = $chunk_ref->[1];
591
592 # for my $seq ( $begin .. $end ) {
593 # ...
594 # }
595
596 MCE->printf("%7d .. %8d\n", $begin, $end);
597 },
598 [ 1, 10_000_000 ];
599
600 -- Output
601
602 1 .. 1250000
603 1250001 .. 2500000
604 2500001 .. 3750000
605 3750001 .. 5000000
606 5000001 .. 6250000
607 6250001 .. 7500000
608 7500001 .. 8750000
609 8750001 .. 10000000
610
611 MCE::Flow->run ( { input_data => iterator }, sub { code } )
612 mce_flow { input_data => iterator }, sub { code }
613
614 An iterator reference may be specified for input_data. The only other
615 way is to specify input_data via MCE::Flow->init. This prevents
616 MCE::Flow from configuring the iterator reference as another user task
617 which will not work.
618
619 Iterators are described under section "SYNTAX for INPUT_DATA" at
620 MCE::Core.
621
622 MCE::Flow->init(
623 input_data => iterator
624 );
625
626 mce_flow sub { $_ };
627
629 Unlike MCE::Map where gather and output order are done for you
630 automatically, the gather method is used to have results sent back to
631 the manager process.
632
633 use MCE::Flow chunk_size => 1;
634
635 ## Output order is not guaranteed.
636 my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
637 print "@a1\n\n";
638
639 ## Outputs to a hash instead (key, value).
640 my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
641 print "@h1{1..100}\n\n";
642
643 ## This does the same thing due to chunk_id starting at one.
644 my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
645 print "@h2{1..100}\n\n";
646
647 The gather method may be called multiple times within the block unlike
648 return which would leave the block. Therefore, think of gather as
649 yielding results immediately to the manager process without actually
650 leaving the block.
651
652 use MCE::Flow chunk_size => 1, max_workers => 3;
653
654 my @hosts = qw(
655 hosta hostb hostc hostd hoste
656 );
657
658 my %h3 = mce_flow sub {
659 my ($output, $error, $status); my $host = $_;
660
661 ## Do something with $host;
662 $output = "Worker ". MCE->wid .": Hello from $host";
663
664 if (MCE->chunk_id % 3 == 0) {
665 ## Simulating an error condition
666 local $? = 1; $status = $?;
667 $error = "Error from $host"
668 }
669 else {
670 $status = 0;
671 }
672
673 ## Ensure unique keys (key, value) when gathering to
674 ## a hash.
675 MCE->gather("$host.out", $output);
676 MCE->gather("$host.err", $error) if (defined $error);
677 MCE->gather("$host.sta", $status);
678
679 }, @hosts;
680
681 foreach my $host (@hosts) {
682 print $h3{"$host.out"}, "\n";
683 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
684 print "Exit status: ", $h3{"$host.sta"}, "\n\n";
685 }
686
687 -- Output
688
689 Worker 3: Hello from hosta
690 Exit status: 0
691
692 Worker 2: Hello from hostb
693 Exit status: 0
694
695 Worker 1: Hello from hostc
696 Error from hostc
697 Exit status: 1
698
699 Worker 3: Hello from hostd
700 Exit status: 0
701
702 Worker 2: Hello from hoste
703 Exit status: 0
704
705 The following uses an anonymous array containing 3 elements when
706 gathering data. Serialization is automatic behind the scene.
707
708 my %h3 = mce_flow sub {
709 ...
710
711 MCE->gather($host, [$output, $error, $status]);
712
713 }, @hosts;
714
715 foreach my $host (@hosts) {
716 print $h3{$host}->[0], "\n";
717 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
718 print "Exit status: ", $h3{$host}->[2], "\n\n";
719 }
720
721 Although MCE::Map comes to mind, one may want additional control when
722 gathering data such as retaining output order.
723
724 use MCE::Flow;
725
726 sub preserve_order {
727 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
728
729 return sub {
730 $tmp{ (shift) } = \@_;
731
732 while (1) {
733 last unless exists $tmp{$order_id};
734 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
735 }
736
737 return;
738 };
739 }
740
741 ## Workers persist for the most part after running. Though, not always
742 ## the case and depends on Perl. Pass a reference to a subroutine if
743 ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
744
745 MCE::Flow->init(
746 chunk_size => 'auto', max_workers => 'auto'
747 );
748
749 for (1..2) {
750 my @m2;
751
752 mce_flow {
753 gather => preserve_order(\@m2)
754 },
755 sub {
756 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
757
758 ## Compute the entire chunk data at once.
759 push @a, map { $_ * 2 } @{ $chunk_ref };
760
761 ## Afterwards, invoke the gather feature, which
762 ## will direct the data to the callback function.
763 MCE->gather(MCE->chunk_id, @a);
764
765 }, 1..100000;
766
767 print scalar @m2, "\n";
768 }
769
770 MCE::Flow->finish;
771
772 All 6 models support 'auto' for chunk_size unlike the Core API. Think
773 of the models as the basis for providing JIT for MCE. They create the
774 instance, tune max_workers, and tune chunk_size automatically
775 regardless of the hardware.
776
777 The following does the same thing using the Core API. Workers persist
778 after running.
779
780 use MCE;
781
782 sub preserve_order {
783 ...
784 }
785
786 my $mce = MCE->new(
787 max_workers => 'auto', chunk_size => 8000,
788
789 user_func => sub {
790 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
791
792 ## Compute the entire chunk data at once.
793 push @a, map { $_ * 2 } @{ $chunk_ref };
794
795 ## Afterwards, invoke the gather feature, which
796 ## will direct the data to the callback function.
797 MCE->gather(MCE->chunk_id, @a);
798 }
799 );
800
801 for (1..2) {
802 my @m2;
803
804 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
805
806 print scalar @m2, "\n";
807 }
808
809 $mce->shutdown;
810
812 MCE::Flow->finish
813 MCE::Flow::finish
814
815 Workers remain persistent as much as possible after running. Shutdown
816 occurs automatically when the script terminates. Call finish when
817 workers are no longer needed.
818
819 use MCE::Flow;
820
821 MCE::Flow->init(
822 chunk_size => 20, max_workers => 'auto'
823 );
824
825 mce_flow sub { ... }, 1..100;
826
827 MCE::Flow->finish;
828
830 MCE, MCE::Core
831
833 Mario E. Roy, <marioeroy AT gmail DOT com>
834
835
836
837perl v5.34.0 2022-02-20 MCE::Flow(3)