1MCE::Flow(3) User Contributed Perl Documentation MCE::Flow(3)
2
3
4
6 MCE::Flow - Parallel flow model for building creative applications
7
9 This document describes MCE::Flow version 1.838
10
12 MCE::Flow is great for writing custom apps to maximize on all available
13 cores. This module was created to help one harness user_tasks within
14 MCE.
15
16 It is trivial to parallelize with mce_stream shown below.
17
18 ## Native map function
19 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
20
21 ## Same as with MCE::Stream (processing from right to left)
22 @a = mce_stream
23 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
24
25 ## Pass an array reference to have writes occur simultaneously
26 mce_stream \@a,
27 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
28
29 However, let's have MCE::Flow compute the same in parallel. MCE::Queue
30 will be used for data flow among the sub-tasks.
31
32 use MCE::Flow;
33 use MCE::Queue;
34
35 This calls for preserving output order.
36
37 sub preserve_order {
38 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
39 @{ $gather_ref } = (); ## clear the array (optional)
40
41 return sub {
42 my ($data_ref, $chunk_id) = @_;
43 $tmp{$chunk_id} = $data_ref;
44
45 while (1) {
46 last unless exists $tmp{$order_id};
47 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
48 }
49
50 return;
51 };
52 }
53
54 Two queues are needed for data flow between the 3 sub-tasks. Notice
55 task_end and how the value from $task_name is used for determining
56 which task has ended.
57
58 my $b = MCE::Queue->new;
59 my $c = MCE::Queue->new;
60
61 sub task_end {
62 my ($mce, $task_id, $task_name) = @_;
63
64 if (defined $mce->{user_tasks}->[$task_id + 1]) {
65 my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
66
67 if ($task_name eq 'a') {
68 $b->enqueue((undef) x $n_workers);
69 }
70 elsif ($task_name eq 'b') {
71 $c->enqueue((undef) x $n_workers);
72 }
73 }
74
75 return;
76 }
77
78 Next are the 3 sub-tasks. The first one reads input and begins the
79 flow. The 2nd task dequeues, performs the calculation, and enqueues
80 into the next. Finally, the last task calls the gather method.
81
82 Although serialization is done for you automatically, it is done here
83 to save from double serialization. This is the fastest approach for
84 passing data between sub-tasks. Thus, the least overhead.
85
86 sub task_a {
87 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
88
89 push @ans, map { $_ * 2 } @{ $chunk_ref };
90 $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
91
92 return;
93 }
94
95 sub task_b {
96 my ($mce) = @_;
97
98 while (1) {
99 my @ans; my $chunk = $b->dequeue;
100 last unless defined $chunk;
101
102 $chunk = MCE->thaw($chunk);
103 push @ans, map { $_ * 3 } @{ $chunk->[0] };
104 $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
105 }
106
107 return;
108 }
109
110 sub task_c {
111 my ($mce) = @_;
112
113 while (1) {
114 my @ans; my $chunk = $c->dequeue;
115 last unless defined $chunk;
116
117 $chunk = MCE->thaw($chunk);
118 push @ans, map { $_ * 4 } @{ $chunk->[0] };
119 MCE->gather(\@ans, $chunk->[1]);
120 }
121
122 return;
123 }
124
125 In summary, MCE::Flow builds out a MCE instance behind the scene and
126 starts running. The task_name (shown), max_workers, and use_threads
127 options can take an anonymous array for specifying the values uniquely
128 per each sub-task.
129
130 my @a;
131
132 mce_flow {
133 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
134 gather => preserve_order(\@a)
135
136 }, \&task_a, \&task_b, \&task_c, 1..10000;
137
138 print "@a\n";
139
140 If speed is not a concern and wanting to rid of all the MCE->freeze and
141 MCE->thaw statements, simply enqueue and dequeue 2 items at a time. Or
142 better yet, see MCE::Step introduced in MCE 1.506.
143
144 First, task_end must be updated. The number of undef(s) must match the
145 number of workers times the dequeue count. Otherwise, the script will
146 stall.
147
148 sub task_end {
149 ...
150 if ($task_name eq 'a') {
151 # $b->enqueue((undef) x $n_workers);
152 $b->enqueue((undef) x ($n_workers * 2));
153 }
154 elsif ($task_name eq 'b') {
155 # $c->enqueue((undef) x $n_workers);
156 $c->enqueue((undef) x ($n_workers * 2));
157 }
158 ...
159 }
160
161 Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
162
163 sub task_a {
164 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
165
166 push @ans, map { $_ * 2 } @{ $chunk_ref };
167 $b->enqueue(\@ans, $chunk_id);
168
169 return;
170 }
171
172 sub task_b {
173 my ($mce) = @_;
174
175 while (1) {
176 my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
177 last unless defined $chunk_ref;
178
179 push @ans, map { $_ * 3 } @{ $chunk_ref };
180 $c->enqueue(\@ans, $chunk_id);
181 }
182
183 return;
184 }
185
186 sub task_c {
187 my ($mce) = @_;
188
189 while (1) {
190 my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
191 last unless defined $chunk_ref;
192
193 push @ans, map { $_ * 4 } @{ $chunk_ref };
194 MCE->gather(\@ans, $chunk_id);
195 }
196
197 return;
198 }
199
200 Finally, run as usual.
201
202 my @a;
203
204 mce_flow {
205 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
206 gather => preserve_order(\@a)
207
208 }, \&task_a, \&task_b, \&task_c, 1..10000;
209
210 print "@a\n";
211
213 Although MCE::Loop may be preferred for running using a single code
214 block, the text below also applies to this module, particularly for the
215 first block.
216
217 All models in MCE default to 'auto' for chunk_size. The arguments for
218 the block are the same as writing a user_func block using the Core API.
219
220 Beginning with MCE 1.5, the next input item is placed into the input
221 scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
222 $chunk_ref containing many items. Basically, line 2 below may be
223 omitted from your code when using $_. One can call MCE->chunk_id to
224 obtain the current chunk id.
225
226 line 1: user_func => sub {
227 line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
228 line 3:
229 line 4: $_ points to $chunk_ref->[0]
230 line 5: in MCE 1.5 when chunk_size == 1
231 line 6:
232 line 7: $_ points to $chunk_ref
233 line 8: in MCE 1.5 when chunk_size > 1
234 line 9: }
235
236 Follow this synopsis when chunk_size equals one. Looping is not
237 required from inside the first block. Hence, the block is called once
238 per each item.
239
240 ## Exports mce_flow, mce_flow_f, and mce_flow_s
241 use MCE::Flow;
242
243 MCE::Flow::init {
244 chunk_size => 1
245 };
246
247 ## Array or array_ref
248 mce_flow sub { do_work($_) }, 1..10000;
249 mce_flow sub { do_work($_) }, [ 1..10000 ];
250
251 ## File_path, glob_ref, or scalar_ref
252 mce_flow_f sub { chomp; do_work($_) }, "/path/to/file";
253 mce_flow_f sub { chomp; do_work($_) }, $file_handle;
254 mce_flow_f sub { chomp; do_work($_) }, \$scalar;
255
256 ## Sequence of numbers (begin, end [, step, format])
257 mce_flow_s sub { do_work($_) }, 1, 10000, 5;
258 mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
259
260 mce_flow_s sub { do_work($_) }, {
261 begin => 1, end => 10000, step => 5, format => undef
262 };
263
265 Follow this synopsis when chunk_size equals 'auto' or greater than 1.
266 This means having to loop through the chunk from inside the first
267 block.
268
269 use MCE::Flow;
270
271 MCE::Flow::init { ## Chunk_size defaults to 'auto' when
272 chunk_size => 'auto' ## not specified. Therefore, the init
273 }; ## function may be omitted.
274
275 ## Syntax is shown for mce_flow for demonstration purposes.
276 ## Looping inside the block is the same for mce_flow_f and
277 ## mce_flow_s.
278
279 mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
280
281 ## Same as above, resembles code using the Core API.
282
283 mce_flow sub {
284 my ($mce, $chunk_ref, $chunk_id) = @_;
285
286 for (@{ $chunk_ref }) {
287 do_work($_);
288 }
289
290 }, 1..10000;
291
292 Chunking reduces the number of IPC calls behind the scene. Think in
293 terms of chunks whenever processing a large amount of data. For
294 relatively small data, choosing 1 for chunk_size is fine.
295
297 The following list options which may be overridden when loading the
298 module.
299
300 use Sereal qw( encode_sereal decode_sereal );
301 use CBOR::XS qw( encode_cbor decode_cbor );
302 use JSON::XS qw( encode_json decode_json );
303
304 use MCE::Flow
305 max_workers => 8, # Default 'auto'
306 chunk_size => 500, # Default 'auto'
307 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
308 freeze => \&encode_sereal, # \&Storable::freeze
309 thaw => \&decode_sereal # \&Storable::thaw
310 ;
311
312 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
313 available. Specify "Sereal =" 0> to use Storable instead.
314
315 use MCE::Flow Sereal => 0;
316
318 MCE::Flow->init ( options )
319 MCE::Flow::init { options }
320 The init function accepts a hash of MCE options. Unlike with
321 MCE::Stream, both gather and bounds_only options may be specified
322 when calling init (not shown below).
323
324 use MCE::Flow;
325
326 MCE::Flow::init {
327 chunk_size => 1, max_workers => 4,
328
329 user_begin => sub {
330 print "## ", MCE->wid, " started\n";
331 },
332
333 user_end => sub {
334 print "## ", MCE->wid, " completed\n";
335 }
336 };
337
338 my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
339
340 print "\n", "@a{1..100}", "\n";
341
342 -- Output
343
344 ## 3 started
345 ## 2 started
346 ## 4 started
347 ## 1 started
348 ## 2 completed
349 ## 4 completed
350 ## 3 completed
351 ## 1 completed
352
353 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
354 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
355 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
356 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
357 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
358 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
359 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
360 10000
361
362 Like with MCE::Flow::init above, MCE options may be specified using an
363 anonymous hash for the first argument. Notice how task_name,
364 max_workers, and use_threads can take an anonymous array for setting
365 uniquely per each code block.
366
367 Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
368 with the first code block, thus processing from left-to-right.
369
370 use threads;
371 use MCE::Flow;
372
373 my @a = mce_flow {
374 task_name => [ 'a', 'b', 'c' ],
375 max_workers => [ 3, 4, 2, ],
376 use_threads => [ 1, 0, 0, ],
377
378 user_end => sub {
379 my ($mce, $task_id, $task_name) = @_;
380 MCE->print("$task_id - $task_name completed\n");
381 },
382
383 task_end => sub {
384 my ($mce, $task_id, $task_name) = @_;
385 MCE->print("$task_id - $task_name ended\n");
386 }
387 },
388 sub { sleep 1; }, ## 3 workers, named a
389 sub { sleep 2; }, ## 4 workers, named b
390 sub { sleep 3; }; ## 2 workers, named c
391
392 -- Output
393
394 0 - a completed
395 0 - a completed
396 0 - a completed
397 0 - a ended
398 1 - b completed
399 1 - b completed
400 1 - b completed
401 1 - b completed
402 1 - b ended
403 2 - c completed
404 2 - c completed
405 2 - c ended
406
408 Although input data is optional for MCE::Flow, the following assumes
409 chunk_size equals 1 in order to demonstrate all the possibilities for
410 providing input data.
411
412 MCE::Flow->run ( sub { code }, list )
413 mce_flow sub { code }, list
414 Input data may be defined using a list, an array ref, or a hash ref.
415
416 Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }",
417 Flow takes a "sub { ... }" or a code reference. The other difference
418 is that the comma is needed after the block.
419
420 # $_ contains the item when chunk_size => 1
421
422 mce_flow sub { $_ }, 1..1000;
423 mce_flow sub { $_ }, \@list;
424
425 # chunking, any chunk_size => 1 or higher
426
427 my %res = mce_flow sub {
428 my ($mce, $chunk_ref, $chunk_id) = @_;
429 my %ret;
430 for my $item (@{ $chunk_ref }) {
431 $ret{$item} = $item * 2;
432 }
433 MCE->gather(%ret);
434 },
435 \@list;
436
437 # input hash, current API available since 1.828
438
439 my %res = mce_flow sub {
440 my ($mce, $chunk_ref, $chunk_id) = @_;
441 my %ret;
442 for my $key (keys %{ $chunk_ref }) {
443 $ret{$key} = $chunk_ref->{$key} * 2;
444 }
445 MCE->gather(%ret);
446 },
447 \%hash;
448
449 # unlike MCE::Loop, MCE::Flow doesn't need input to run
450
451 mce_flow { max_workers => 4 }, sub {
452 MCE->say( MCE->wid );
453 };
454
455 # ... and can run multiple tasks
456
457 mce_flow {
458 max_workers => [ 1, 3 ],
459 task_name => [ 'p', 'c' ]
460 },
461 sub {
462 # 1 producer
463 MCE->say( "producer: ", MCE->wid );
464 },
465 sub {
466 # 3 consumers
467 MCE->say( "consumer: ", MCE->wid );
468 };
469
470 # here, options are specified via init
471
472 MCE::Flow::init {
473 max_workers => [ 1, 3 ],
474 task_name => [ 'p', 'c' ]
475 };
476
477 mce_flow \&producer, \&consumers;
478
479 MCE::Flow->run_file ( sub { code }, file )
480 mce_flow_f sub { code }, file
481 The fastest of these is the /path/to/file. Workers communicate the
482 next offset position among themselves with zero interaction by the
483 manager process.
484
485 # $_ contains the line when chunk_size => 1
486
487 mce_flow_f sub { $_ }, "/path/to/file"; # faster
488 mce_flow_f sub { $_ }, $file_handle;
489 mce_flow_f sub { $_ }, \$scalar;
490
491 # chunking, any chunk_size => 1 or higher
492
493 my %res = mce_flow_f sub {
494 my ($mce, $chunk_ref, $chunk_id) = @_;
495 my $buf = '';
496 for my $line (@{ $chunk_ref }) {
497 $buf .= $line;
498 }
499 MCE->gather($chunk_id, $buf);
500 },
501 "/path/to/file";
502
503 MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
504 mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
505 Sequence may be defined as a list, an array reference, or a hash
506 reference. The functions require both begin and end values to run.
507 Step and format are optional. The format is passed to sprintf (% may
508 be omitted below).
509
510 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
511
512 # $_ contains the sequence number when chunk_size => 1
513
514 mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
515 mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
516
517 mce_flow_s sub { $_ }, {
518 begin => $beg, end => $end,
519 step => $step, format => $fmt
520 };
521
522 # chunking, any chunk_size => 1 or higher
523
524 my %res = mce_flow_s sub {
525 my ($mce, $chunk_ref, $chunk_id) = @_;
526 my $buf = '';
527 for my $seq (@{ $chunk_ref }) {
528 $buf .= "$seq\n";
529 }
530 MCE->gather($chunk_id, $buf);
531 },
532 [ $beg, $end ];
533
534 The sequence engine can compute 'begin' and 'end' items only, for
535 the chunk, and not the items in between (hence boundaries only).
536 This option applies to sequence only and has no effect when
537 chunk_size equals 1.
538
539 The time to run is 0.006s below. This becomes 0.827s without the
540 bounds_only option due to computing all items in between, thus
541 creating a very large array. Basically, specify bounds_only => 1
542 when boundaries is all you need for looping inside the block; e.g.
543 Monte Carlo simulations.
544
545 Time was measured using 1 worker to emphasize the difference.
546
547 use MCE::Flow;
548
549 MCE::Flow::init {
550 max_workers => 1, chunk_size => 1_250_000,
551 bounds_only => 1
552 };
553
554 # Typically, the input scalar $_ contains the sequence number
555 # when chunk_size => 1, unless the bounds_only option is set
556 # which is the case here. Thus, $_ points to $chunk_ref.
557
558 mce_flow_s sub {
559 my ($mce, $chunk_ref, $chunk_id) = @_;
560
561 # $chunk_ref contains 2 items, not 1_250_000
562 # my ( $begin, $end ) = ( $_->[0], $_->[1] );
563
564 my $begin = $chunk_ref->[0];
565 my $end = $chunk_ref->[1];
566
567 # for my $seq ( $begin .. $end ) {
568 # ...
569 # }
570
571 MCE->printf("%7d .. %8d\n", $begin, $end);
572 },
573 [ 1, 10_000_000 ];
574
575 -- Output
576
577 1 .. 1250000
578 1250001 .. 2500000
579 2500001 .. 3750000
580 3750001 .. 5000000
581 5000001 .. 6250000
582 6250001 .. 7500000
583 7500001 .. 8750000
584 8750001 .. 10000000
585
586 MCE::Flow->run ( { input_data => iterator }, sub { code } )
587 mce_flow { input_data => iterator }, sub { code }
588 An iterator reference may be specified for input_data. The only
589 other way is to specify input_data via MCE::Flow::init. This
590 prevents MCE::Flow from configuring the iterator reference as
591 another user task which will not work.
592
593 Iterators are described under section "SYNTAX for INPUT_DATA" at
594 MCE::Core.
595
596 MCE::Flow::init {
597 input_data => iterator
598 };
599
600 mce_flow sub { $_ };
601
603 Unlike MCE::Map where gather and output order are done for you
604 automatically, the gather method is used to have results sent back to
605 the manager process.
606
607 use MCE::Flow chunk_size => 1;
608
609 ## Output order is not guaranteed.
610 my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
611 print "@a1\n\n";
612
613 ## Outputs to a hash instead (key, value).
614 my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
615 print "@h1{1..100}\n\n";
616
617 ## This does the same thing due to chunk_id starting at one.
618 my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
619 print "@h2{1..100}\n\n";
620
621 The gather method may be called multiple times within the block unlike
622 return which would leave the block. Therefore, think of gather as
623 yielding results immediately to the manager process without actually
624 leaving the block.
625
626 use MCE::Flow chunk_size => 1, max_workers => 3;
627
628 my @hosts = qw(
629 hosta hostb hostc hostd hoste
630 );
631
632 my %h3 = mce_flow sub {
633 my ($output, $error, $status); my $host = $_;
634
635 ## Do something with $host;
636 $output = "Worker ". MCE->wid .": Hello from $host";
637
638 if (MCE->chunk_id % 3 == 0) {
639 ## Simulating an error condition
640 local $? = 1; $status = $?;
641 $error = "Error from $host"
642 }
643 else {
644 $status = 0;
645 }
646
647 ## Ensure unique keys (key, value) when gathering to
648 ## a hash.
649 MCE->gather("$host.out", $output);
650 MCE->gather("$host.err", $error) if (defined $error);
651 MCE->gather("$host.sta", $status);
652
653 }, @hosts;
654
655 foreach my $host (@hosts) {
656 print $h3{"$host.out"}, "\n";
657 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
658 print "Exit status: ", $h3{"$host.sta"}, "\n\n";
659 }
660
661 -- Output
662
663 Worker 3: Hello from hosta
664 Exit status: 0
665
666 Worker 2: Hello from hostb
667 Exit status: 0
668
669 Worker 1: Hello from hostc
670 Error from hostc
671 Exit status: 1
672
673 Worker 3: Hello from hostd
674 Exit status: 0
675
676 Worker 2: Hello from hoste
677 Exit status: 0
678
679 The following uses an anonymous array containing 3 elements when
680 gathering data. Serialization is automatic behind the scene.
681
682 my %h3 = mce_flow sub {
683 ...
684
685 MCE->gather($host, [$output, $error, $status]);
686
687 }, @hosts;
688
689 foreach my $host (@hosts) {
690 print $h3{$host}->[0], "\n";
691 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
692 print "Exit status: ", $h3{$host}->[2], "\n\n";
693 }
694
695 Although MCE::Map comes to mind, one may want additional control when
696 gathering data such as retaining output order.
697
698 use MCE::Flow;
699
700 sub preserve_order {
701 my %tmp; my $order_id = 1; my $gather_ref = $_[0];
702
703 return sub {
704 $tmp{ (shift) } = \@_;
705
706 while (1) {
707 last unless exists $tmp{$order_id};
708 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
709 }
710
711 return;
712 };
713 }
714
715 ## Workers persist for the most part after running. Though, not always
716 ## the case and depends on Perl. Pass a reference to a subroutine if
717 ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
718
719 MCE::Flow::init {
720 chunk_size => 'auto', max_workers => 'auto'
721 };
722
723 for (1..2) {
724 my @m2;
725
726 mce_flow {
727 gather => preserve_order(\@m2)
728 },
729 sub {
730 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
731
732 ## Compute the entire chunk data at once.
733 push @a, map { $_ * 2 } @{ $chunk_ref };
734
735 ## Afterwards, invoke the gather feature, which
736 ## will direct the data to the callback function.
737 MCE->gather(MCE->chunk_id, @a);
738
739 }, 1..100000;
740
741 print scalar @m2, "\n";
742 }
743
744 MCE::Flow::finish;
745
746 All 6 models support 'auto' for chunk_size unlike the Core API. Think
747 of the models as the basis for providing JIT for MCE. They create the
748 instance, tune max_workers, and tune chunk_size automatically
749 regardless of the hardware.
750
751 The following does the same thing using the Core API. Workers persist
752 after running.
753
754 use MCE;
755
756 sub preserve_order {
757 ...
758 }
759
760 my $mce = MCE->new(
761 max_workers => 'auto', chunk_size => 8000,
762
763 user_func => sub {
764 my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
765
766 ## Compute the entire chunk data at once.
767 push @a, map { $_ * 2 } @{ $chunk_ref };
768
769 ## Afterwards, invoke the gather feature, which
770 ## will direct the data to the callback function.
771 MCE->gather(MCE->chunk_id, @a);
772 }
773 );
774
775 for (1..2) {
776 my @m2;
777
778 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
779
780 print scalar @m2, "\n";
781 }
782
783 $mce->shutdown;
784
786 MCE::Flow->finish
787 MCE::Flow::finish
788 Workers remain persistent as much as possible after running.
789 Shutdown occurs automatically when the script terminates. Call
790 finish when workers are no longer needed.
791
792 use MCE::Flow;
793
794 MCE::Flow::init {
795 chunk_size => 20, max_workers => 'auto'
796 };
797
798 mce_flow sub { ... }, 1..100;
799
800 MCE::Flow::finish;
801
803 MCE, MCE::Core
804
806 Mario E. Roy, <marioeroy AT gmail DOT com>
807
808
809
810perl v5.28.1 2019-01-23 MCE::Flow(3)