1MCE::Flow(3) User Contributed Perl Documentation MCE::Flow(3)
2
3
4
6 MCE::Flow - Parallel flow model for building creative applications
7
9 This document describes MCE::Flow version 1.889
10
12 MCE::Flow is great for writing custom apps to maximize on all available
13 cores. This module was created to help one harness user_tasks within
14 MCE.
15
16 It is trivial to parallelize with mce_stream shown below.
17
18 ## Native map function
19 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
20
21 ## Same as with MCE::Stream (processing from right to left)
22 @a = mce_stream
23 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
24
25 ## Pass an array reference to have writes occur simultaneously
26 mce_stream \@a,
27 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
28
29 However, let's have MCE::Flow compute the same in parallel. MCE::Queue
30 will be used for data flow among the sub-tasks.
31
32 use MCE::Flow;
33 use MCE::Queue;
34
35 This calls for preserving output order.
36
37 sub preserve_order {
38 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
39 @{ $gather_ref } = (); ## clear the array (optional)
40
41 return sub {
42 my ($data_ref, $chunk_id) = @_;
43 $tmp{$chunk_id} = $data_ref;
44
45 while (1) {
46 last unless exists $tmp{$order_id};
47 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
48 }
49
50 return;
51 };
52 }
53
54 Two queues are needed for data flow between the 3 sub-tasks. Notice
55 task_end and how the value from $task_name is used for determining
56 which task has ended.
57
58 my $b = MCE::Queue->new;
59 my $c = MCE::Queue->new;
60
61 sub task_end {
62 my ($mce, $task_id, $task_name) = @_;
63
64 if (defined $mce->{user_tasks}->[$task_id + 1]) {
65 my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
66
67 if ($task_name eq 'a') {
68 $b->enqueue((undef) x $n_workers);
69 }
70 elsif ($task_name eq 'b') {
71 $c->enqueue((undef) x $n_workers);
72 }
73 }
74
75 return;
76 }
77
78 Next are the 3 sub-tasks. The first one reads input and begins the
79 flow. The 2nd task dequeues, performs the calculation, and enqueues
80 into the next. Finally, the last task calls the gather method.
81
82 Although serialization is done for you automatically, it is done here
83 to save from double serialization. This is the fastest approach for
84 passing data between sub-tasks. Thus, the least overhead.
85
86 sub task_a {
87 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
88
89 push @ans, map { $_ * 2 } @{ $chunk_ref };
90 $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
91
92 return;
93 }
94
95 sub task_b {
96 my ($mce) = @_;
97
98 while (1) {
99 my @ans; my $chunk = $b->dequeue;
100 last unless defined $chunk;
101
102 $chunk = MCE->thaw($chunk);
103 push @ans, map { $_ * 3 } @{ $chunk->[0] };
104 $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
105 }
106
107 return;
108 }
109
110 sub task_c {
111 my ($mce) = @_;
112
113 while (1) {
114 my @ans; my $chunk = $c->dequeue;
115 last unless defined $chunk;
116
117 $chunk = MCE->thaw($chunk);
118 push @ans, map { $_ * 4 } @{ $chunk->[0] };
119 MCE->gather(\@ans, $chunk->[1]);
120 }
121
122 return;
123 }
124
125 In summary, MCE::Flow builds out a MCE instance behind the scene and
126 starts running. The task_name (shown), max_workers, and use_threads
127 options can take an anonymous array for specifying the values uniquely
128 per each sub-task.
129
130 my @a;
131
132 mce_flow {
133 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
134 gather => preserve_order(\@a)
135
136 }, \&task_a, \&task_b, \&task_c, 1..10000;
137
138 print "@a\n";
139
140 If speed is not a concern and wanting to rid of all the MCE->freeze and
141 MCE->thaw statements, simply enqueue and dequeue 2 items at a time. Or
142 better yet, see MCE::Step introduced in MCE 1.506.
143
144 First, task_end must be updated. The number of undef(s) must match the
145 number of workers times the dequeue count. Otherwise, the script will
146 stall.
147
148 sub task_end {
149 ...
150 if ($task_name eq 'a') {
151 # $b->enqueue((undef) x $n_workers);
152 $b->enqueue((undef) x ($n_workers * 2));
153 }
154 elsif ($task_name eq 'b') {
155 # $c->enqueue((undef) x $n_workers);
156 $c->enqueue((undef) x ($n_workers * 2));
157 }
158 ...
159 }
160
161 Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
162
163 sub task_a {
164 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
165
166 push @ans, map { $_ * 2 } @{ $chunk_ref };
167 $b->enqueue(\@ans, $chunk_id);
168
169 return;
170 }
171
172 sub task_b {
173 my ($mce) = @_;
174
175 while (1) {
176 my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
177 last unless defined $chunk_ref;
178
179 push @ans, map { $_ * 3 } @{ $chunk_ref };
180 $c->enqueue(\@ans, $chunk_id);
181 }
182
183 return;
184 }
185
186 sub task_c {
187 my ($mce) = @_;
188
189 while (1) {
190 my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
191 last unless defined $chunk_ref;
192
193 push @ans, map { $_ * 4 } @{ $chunk_ref };
194 MCE->gather(\@ans, $chunk_id);
195 }
196
197 return;
198 }
199
200 Finally, run as usual.
201
202 my @a;
203
204 mce_flow {
205 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
206 gather => preserve_order(\@a)
207
208 }, \&task_a, \&task_b, \&task_c, 1..10000;
209
210 print "@a\n";
211
213 Although MCE::Loop may be preferred for running using a single code
214 block, the text below also applies to this module, particularly for the
215 first block.
216
217 All models in MCE default to 'auto' for chunk_size. The arguments for
218 the block are the same as writing a user_func block using the Core API.
219
220 Beginning with MCE 1.5, the next input item is placed into the input
221 scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
222 $chunk_ref containing many items. Basically, line 2 below may be
223 omitted from your code when using $_. One can call MCE->chunk_id to
224 obtain the current chunk id.
225
226 line 1: user_func => sub {
227 line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
228 line 3:
229 line 4: $_ points to $chunk_ref->[0]
230 line 5: in MCE 1.5 when chunk_size == 1
231 line 6:
232 line 7: $_ points to $chunk_ref
233 line 8: in MCE 1.5 when chunk_size > 1
234 line 9: }
235
236 Follow this synopsis when chunk_size equals one. Looping is not
237 required from inside the first block. Hence, the block is called once
238 per each item.
239
240 ## Exports mce_flow, mce_flow_f, and mce_flow_s
241 use MCE::Flow;
242
243 MCE::Flow->init(
244 chunk_size => 1
245 );
246
247 ## Array or array_ref
248 mce_flow sub { do_work($_) }, 1..10000;
249 mce_flow sub { do_work($_) }, \@list;
250
251 ## Important; pass an array_ref for deeply input data
252 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
253 mce_flow sub { do_work($_) }, \@deeply_list;
254
255 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
256 ## Workers read directly and not involve the manager process
257 mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
258
259 ## Involves the manager process, therefore slower
260 mce_flow_f sub { chomp; do_work($_) }, $file_handle;
261 mce_flow_f sub { chomp; do_work($_) }, $io;
262 mce_flow_f sub { chomp; do_work($_) }, \$scalar;
263
264 ## Sequence of numbers (begin, end [, step, format])
265 mce_flow_s sub { do_work($_) }, 1, 10000, 5;
266 mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
267
268 mce_flow_s sub { do_work($_) }, {
269 begin => 1, end => 10000, step => 5, format => undef
270 };
271
273 Follow this synopsis when chunk_size equals 'auto' or greater than 1.
274 This means having to loop through the chunk from inside the first
275 block.
276
277 use MCE::Flow;
278
279 MCE::Flow->init( ## Chunk_size defaults to 'auto' when
280 chunk_size => 'auto' ## not specified. Therefore, the init
281 ); ## function may be omitted.
282
283 ## Syntax is shown for mce_flow for demonstration purposes.
284 ## Looping inside the block is the same for mce_flow_f and
285 ## mce_flow_s.
286
287 ## Array or array_ref
288 mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
289 mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
290
291 ## Important; pass an array_ref for deeply input data
292 mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
293 mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
294
295 ## Resembles code using the core MCE API
296 mce_flow sub {
297 my ($mce, $chunk_ref, $chunk_id) = @_;
298
299 for (@{ $chunk_ref }) {
300 do_work($_);
301 }
302
303 }, 1..10000;
304
305 Chunking reduces the number of IPC calls behind the scene. Think in
306 terms of chunks whenever processing a large amount of data. For
307 relatively small data, choosing 1 for chunk_size is fine.
308
310 The following list options which may be overridden when loading the
311 module.
312
313 use Sereal qw( encode_sereal decode_sereal );
314 use CBOR::XS qw( encode_cbor decode_cbor );
315 use JSON::XS qw( encode_json decode_json );
316
317 use MCE::Flow
318 max_workers => 8, # Default 'auto'
319 chunk_size => 500, # Default 'auto'
320 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
321 freeze => \&encode_sereal, # \&Storable::freeze
322 thaw => \&decode_sereal, # \&Storable::thaw
323 init_relay => 0, # Default undef; MCE 1.882+
324 use_threads => 0, # Default undef; MCE 1.882+
325 ;
326
327 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
328 available. Specify "Sereal => 0" to use Storable instead.
329
330 use MCE::Flow Sereal => 0;
331
333 MCE::Flow->init ( options )
334 MCE::Flow::init { options }
335
336 The init function accepts a hash of MCE options. Unlike with
337 MCE::Stream, both gather and bounds_only options may be specified when
338 calling init (not shown below).
339
340 use MCE::Flow;
341
342 MCE::Flow->init(
343 chunk_size => 1, max_workers => 4,
344
345 user_begin => sub {
346 print "## ", MCE->wid, " started\n";
347 },
348
349 user_end => sub {
350 print "## ", MCE->wid, " completed\n";
351 }
352 );
353
354 my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
355
356 print "\n", "@a{1..100}", "\n";
357
358 -- Output
359
360 ## 3 started
361 ## 2 started
362 ## 4 started
363 ## 1 started
364 ## 2 completed
365 ## 4 completed
366 ## 3 completed
367 ## 1 completed
368
369 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
370 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
371 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
372 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
373 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
374 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
375 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
376 10000
377
378 Like with MCE::Flow->init above, MCE options may be specified using an
379 anonymous hash for the first argument. Notice how task_name,
380 max_workers, and use_threads can take an anonymous array for setting
381 uniquely per each code block.
382
383 Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
384 with the first code block, thus processing from left-to-right.
385
386 use threads;
387 use MCE::Flow;
388
389 my @a = mce_flow {
390 task_name => [ 'a', 'b', 'c' ],
391 max_workers => [ 3, 4, 2, ],
392 use_threads => [ 1, 0, 0, ],
393
394 user_end => sub {
395 my ($mce, $task_id, $task_name) = @_;
396 MCE->print("$task_id - $task_name completed\n");
397 },
398
399 task_end => sub {
400 my ($mce, $task_id, $task_name) = @_;
401 MCE->print("$task_id - $task_name ended\n");
402 }
403 },
404 sub { sleep 1; }, ## 3 workers, named a
405 sub { sleep 2; }, ## 4 workers, named b
406 sub { sleep 3; }; ## 2 workers, named c
407
408 -- Output
409
410 0 - a completed
411 0 - a completed
412 0 - a completed
413 0 - a ended
414 1 - b completed
415 1 - b completed
416 1 - b completed
417 1 - b completed
418 1 - b ended
419 2 - c completed
420 2 - c completed
421 2 - c ended
422
424 Although input data is optional for MCE::Flow, the following assumes
425 chunk_size equals 1 in order to demonstrate all the possibilities for
426 providing input data.
427
428 MCE::Flow->run ( sub { code }, list )
429 mce_flow sub { code }, list
430
431 Input data may be defined using a list, an array ref, or a hash ref.
432
433 Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Flow
434 takes a "sub { ... }" or a code reference. The other difference is that
435 the comma is needed after the block.
436
437 # $_ contains the item when chunk_size => 1
438
439 mce_flow sub { do_work($_) }, 1..1000;
440 mce_flow sub { do_work($_) }, \@list;
441
442 # Important; pass an array_ref for deeply input data
443
444 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
445 mce_flow sub { do_work($_) }, \@deeply_list;
446
447 # Chunking; any chunk_size => 1 or greater
448
449 my %res = mce_flow sub {
450 my ($mce, $chunk_ref, $chunk_id) = @_;
451 my %ret;
452 for my $item (@{ $chunk_ref }) {
453 $ret{$item} = $item * 2;
454 }
455 MCE->gather(%ret);
456 },
457 \@list;
458
459 # Input hash; current API available since 1.828
460
461 my %res = mce_flow sub {
462 my ($mce, $chunk_ref, $chunk_id) = @_;
463 my %ret;
464 for my $key (keys %{ $chunk_ref }) {
465 $ret{$key} = $chunk_ref->{$key} * 2;
466 }
467 MCE->gather(%ret);
468 },
469 \%hash;
470
471 # Unlike MCE::Loop, MCE::Flow doesn't need input to run
472
473 mce_flow { max_workers => 4 }, sub {
474 MCE->say( MCE->wid );
475 };
476
477 # ... and can run multiple tasks
478
479 mce_flow {
480 max_workers => [ 1, 3 ],
481 task_name => [ 'p', 'c' ]
482 },
483 sub {
484 # 1 producer
485 MCE->say( "producer: ", MCE->wid );
486 },
487 sub {
488 # 3 consumers
489 MCE->say( "consumer: ", MCE->wid );
490 };
491
492 # Here, options are specified via init
493
494 MCE::Flow->init(
495 max_workers => [ 1, 3 ],
496 task_name => [ 'p', 'c' ]
497 );
498
499 mce_flow \&producer, \&consumers;
500
501 MCE::Flow->run_file ( sub { code }, file )
502 mce_flow_f sub { code }, file
503
504 The fastest of these is the /path/to/file. Workers communicate the next
505 offset position among themselves with zero interaction by the manager
506 process.
507
508 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
509
510 # $_ contains the line when chunk_size => 1
511
512 mce_flow_f sub { $_ }, "/path/to/file"; # faster
513 mce_flow_f sub { $_ }, $file_handle;
514 mce_flow_f sub { $_ }, $io; # IO::All
515 mce_flow_f sub { $_ }, \$scalar;
516
517 # chunking, any chunk_size => 1 or greater
518
519 my %res = mce_flow_f sub {
520 my ($mce, $chunk_ref, $chunk_id) = @_;
521 my $buf = '';
522 for my $line (@{ $chunk_ref }) {
523 $buf .= $line;
524 }
525 MCE->gather($chunk_id, $buf);
526 },
527 "/path/to/file";
528
529 MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
530 mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
531
532 Sequence may be defined as a list, an array reference, or a hash
533 reference. The functions require both begin and end values to run.
534 Step and format are optional. The format is passed to sprintf (% may be
535 omitted below).
536
537 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
538
539 # $_ contains the sequence number when chunk_size => 1
540
541 mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
542 mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
543
544 mce_flow_s sub { $_ }, {
545 begin => $beg, end => $end,
546 step => $step, format => $fmt
547 };
548
549 # chunking, any chunk_size => 1 or greater
550
551 my %res = mce_flow_s sub {
552 my ($mce, $chunk_ref, $chunk_id) = @_;
553 my $buf = '';
554 for my $seq (@{ $chunk_ref }) {
555 $buf .= "$seq\n";
556 }
557 MCE->gather($chunk_id, $buf);
558 },
559 [ $beg, $end ];
560
561 The sequence engine can compute 'begin' and 'end' items only, for the
562 chunk, and not the items in between (hence boundaries only). This
563 option applies to sequence only and has no effect when chunk_size
564 equals 1.
565
566 The time to run is 0.006s below. This becomes 0.827s without the
567 bounds_only option due to computing all items in between, thus creating
568 a very large array. Basically, specify bounds_only => 1 when boundaries
569 is all you need for looping inside the block; e.g. Monte Carlo
570 simulations.
571
572 Time was measured using 1 worker to emphasize the difference.
573
574 use MCE::Flow;
575
576 MCE::Flow->init(
577 max_workers => 1, chunk_size => 1_250_000,
578 bounds_only => 1
579 );
580
581 # Typically, the input scalar $_ contains the sequence number
582 # when chunk_size => 1, unless the bounds_only option is set
583 # which is the case here. Thus, $_ points to $chunk_ref.
584
585 mce_flow_s sub {
586 my ($mce, $chunk_ref, $chunk_id) = @_;
587
588 # $chunk_ref contains 2 items, not 1_250_000
589 # my ( $begin, $end ) = ( $_->[0], $_->[1] );
590
591 my $begin = $chunk_ref->[0];
592 my $end = $chunk_ref->[1];
593
594 # for my $seq ( $begin .. $end ) {
595 # ...
596 # }
597
598 MCE->printf("%7d .. %8d\n", $begin, $end);
599 },
600 [ 1, 10_000_000 ];
601
602 -- Output
603
604 1 .. 1250000
605 1250001 .. 2500000
606 2500001 .. 3750000
607 3750001 .. 5000000
608 5000001 .. 6250000
609 6250001 .. 7500000
610 7500001 .. 8750000
611 8750001 .. 10000000
612
613 MCE::Flow->run ( { input_data => iterator }, sub { code } )
614 mce_flow { input_data => iterator }, sub { code }
615
616 An iterator reference may be specified for input_data. The only other
617 way is to specify input_data via MCE::Flow->init. This prevents
618 MCE::Flow from configuring the iterator reference as another user task
619 which will not work.
620
621 Iterators are described under section "SYNTAX for INPUT_DATA" at
622 MCE::Core.
623
624 MCE::Flow->init(
625 input_data => iterator
626 );
627
628 mce_flow sub { $_ };
629
631 Unlike MCE::Map where gather and output order are done for you
632 automatically, the gather method is used to have results sent back to
633 the manager process.
634
635 use MCE::Flow chunk_size => 1;
636
637 ## Output order is not guaranteed.
638 my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
639 print "@a1\n\n";
640
641 ## Outputs to a hash instead (key, value).
642 my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
643 print "@h1{1..100}\n\n";
644
645 ## This does the same thing due to chunk_id starting at one.
646 my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
647 print "@h2{1..100}\n\n";
648
649 The gather method may be called multiple times within the block unlike
650 return which would leave the block. Therefore, think of gather as
651 yielding results immediately to the manager process without actually
652 leaving the block.
653
654 use MCE::Flow chunk_size => 1, max_workers => 3;
655
656 my @hosts = qw(
657 hosta hostb hostc hostd hoste
658 );
659
660 my %h3 = mce_flow sub {
661 my ($output, $error, $status); my $host = $_;
662
663 ## Do something with $host;
664 $output = "Worker ". MCE->wid .": Hello from $host";
665
666 if (MCE->chunk_id % 3 == 0) {
667 ## Simulating an error condition
668 local $? = 1; $status = $?;
669 $error = "Error from $host"
670 }
671 else {
672 $status = 0;
673 }
674
675 ## Ensure unique keys (key, value) when gathering to
676 ## a hash.
677 MCE->gather("$host.out", $output);
678 MCE->gather("$host.err", $error) if (defined $error);
679 MCE->gather("$host.sta", $status);
680
681 }, @hosts;
682
683 foreach my $host (@hosts) {
684 print $h3{"$host.out"}, "\n";
685 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
686 print "Exit status: ", $h3{"$host.sta"}, "\n\n";
687 }
688
689 -- Output
690
691 Worker 3: Hello from hosta
692 Exit status: 0
693
694 Worker 2: Hello from hostb
695 Exit status: 0
696
697 Worker 1: Hello from hostc
698 Error from hostc
699 Exit status: 1
700
701 Worker 3: Hello from hostd
702 Exit status: 0
703
704 Worker 2: Hello from hoste
705 Exit status: 0
706
707 The following uses an anonymous array containing 3 elements when
708 gathering data. Serialization is automatic behind the scene.
709
710 my %h3 = mce_flow sub {
711 ...
712
713 MCE->gather($host, [$output, $error, $status]);
714
715 }, @hosts;
716
717 foreach my $host (@hosts) {
718 print $h3{$host}->[0], "\n";
719 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
720 print "Exit status: ", $h3{$host}->[2], "\n\n";
721 }
722
723 Although MCE::Map comes to mind, one may want additional control when
724 gathering data such as retaining output order.
725
726 use MCE::Flow;
727
728 sub preserve_order {
729 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
730
731 return sub {
732 $tmp{ (shift) } = \@_;
733
734 while (1) {
735 last unless exists $tmp{$order_id};
736 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
737 }
738
739 return;
740 };
741 }
742
743 ## Workers persist for the most part after running. Though, not always
744 ## the case and depends on Perl. Pass a reference to a subroutine if
745 ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
746
747 MCE::Flow->init(
748 chunk_size => 'auto', max_workers => 'auto'
749 );
750
751 for (1..2) {
752 my @m2;
753
754 mce_flow {
755 gather => preserve_order(\@m2)
756 },
757 sub {
758 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
759
760 ## Compute the entire chunk data at once.
761 push @a, map { $_ * 2 } @{ $chunk_ref };
762
763 ## Afterwards, invoke the gather feature, which
764 ## will direct the data to the callback function.
765 MCE->gather(MCE->chunk_id, @a);
766
767 }, 1..100000;
768
769 print scalar @m2, "\n";
770 }
771
772 MCE::Flow->finish;
773
774 All 6 models support 'auto' for chunk_size unlike the Core API. Think
775 of the models as the basis for providing JIT for MCE. They create the
776 instance, tune max_workers, and tune chunk_size automatically
777 regardless of the hardware.
778
779 The following does the same thing using the Core API. Workers persist
780 after running.
781
782 use MCE;
783
784 sub preserve_order {
785 ...
786 }
787
788 my $mce = MCE->new(
789 max_workers => 'auto', chunk_size => 8000,
790
791 user_func => sub {
792 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
793
794 ## Compute the entire chunk data at once.
795 push @a, map { $_ * 2 } @{ $chunk_ref };
796
797 ## Afterwards, invoke the gather feature, which
798 ## will direct the data to the callback function.
799 MCE->gather(MCE->chunk_id, @a);
800 }
801 );
802
803 for (1..2) {
804 my @m2;
805
806 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
807
808 print scalar @m2, "\n";
809 }
810
811 $mce->shutdown;
812
814 MCE::Flow->finish
815 MCE::Flow::finish
816
817 Workers remain persistent as much as possible after running. Shutdown
818 occurs automatically when the script terminates. Call finish when
819 workers are no longer needed.
820
821 use MCE::Flow;
822
823 MCE::Flow->init(
824 chunk_size => 20, max_workers => 'auto'
825 );
826
827 mce_flow sub { ... }, 1..100;
828
829 MCE::Flow->finish;
830
832 MCE, MCE::Core
833
835 Mario E. Roy, <marioeroy AT gmail DOT com>
836
837
838
839perl v5.38.0 2023-09-14 MCE::Flow(3)