1MCE::Flow(3)          User Contributed Perl Documentation         MCE::Flow(3)
2
3
4

NAME

6       MCE::Flow - Parallel flow model for building creative applications
7

VERSION

9       This document describes MCE::Flow version 1.838
10

DESCRIPTION

12       MCE::Flow is great for writing custom apps to maximize on all available
13       cores.  This module was created to help one harness user_tasks within
14       MCE.
15
16       It is trivial to parallelize with mce_stream shown below.
17
18        ## Native map function
19        my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
20
21        ## Same as with MCE::Stream (processing from right to left)
22        @a = mce_stream
23             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
24
25        ## Pass an array reference to have writes occur simultaneously
26        mce_stream \@a,
27             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
28
29       However, let's have MCE::Flow compute the same in parallel. MCE::Queue
30       will be used for data flow among the sub-tasks.
31
32        use MCE::Flow;
33        use MCE::Queue;
34
35       This calls for preserving output order.
36
37        sub preserve_order {
38           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
39           @{ $gather_ref } = ();  ## clear the array (optional)
40
41           return sub {
42              my ($data_ref, $chunk_id) = @_;
43              $tmp{$chunk_id} = $data_ref;
44
45              while (1) {
46                 last unless exists $tmp{$order_id};
47                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
48              }
49
50              return;
51           };
52        }
53
54       Two queues are needed for data flow between the 3 sub-tasks. Notice
55       task_end and how the value from $task_name is used for determining
56       which task has ended.
57
58        my $b = MCE::Queue->new;
59        my $c = MCE::Queue->new;
60
61        sub task_end {
62           my ($mce, $task_id, $task_name) = @_;
63
64           if (defined $mce->{user_tasks}->[$task_id + 1]) {
65              my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
66
67              if ($task_name eq 'a') {
68                 $b->enqueue((undef) x $n_workers);
69              }
70              elsif ($task_name eq 'b') {
71                 $c->enqueue((undef) x $n_workers);
72              }
73           }
74
75           return;
76        }
77
78       Next are the 3 sub-tasks. The first one reads input and begins the
79       flow.  The 2nd task dequeues, performs the calculation, and enqueues
80       into the next.  Finally, the last task calls the gather method.
81
82       Although serialization is done for you automatically, it is done here
83       to save from double serialization. This is the fastest approach for
84       passing data between sub-tasks. Thus, the least overhead.
85
86        sub task_a {
87           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
88
89           push @ans, map { $_ * 2 } @{ $chunk_ref };
90           $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
91
92           return;
93        }
94
95        sub task_b {
96           my ($mce) = @_;
97
98           while (1) {
99              my @ans; my $chunk = $b->dequeue;
100              last unless defined $chunk;
101
102              $chunk = MCE->thaw($chunk);
103              push @ans, map { $_ * 3 } @{ $chunk->[0] };
104              $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
105           }
106
107           return;
108        }
109
110        sub task_c {
111           my ($mce) = @_;
112
113           while (1) {
114              my @ans; my $chunk = $c->dequeue;
115              last unless defined $chunk;
116
117              $chunk = MCE->thaw($chunk);
118              push @ans, map { $_ * 4 } @{ $chunk->[0] };
119              MCE->gather(\@ans, $chunk->[1]);
120           }
121
122           return;
123        }
124
125       In summary, MCE::Flow builds out a MCE instance behind the scene and
126       starts running. The task_name (shown), max_workers, and use_threads
127       options can take an anonymous array for specifying the values uniquely
128       per each sub-task.
129
130        my @a;
131
132        mce_flow {
133           task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
134           gather => preserve_order(\@a)
135
136        }, \&task_a, \&task_b, \&task_c, 1..10000;
137
138        print "@a\n";
139
140       If speed is not a concern and wanting to rid of all the MCE->freeze and
141       MCE->thaw statements, simply enqueue and dequeue 2 items at a time.  Or
142       better yet, see MCE::Step introduced in MCE 1.506.
143
144       First, task_end must be updated. The number of undef(s) must match the
145       number of workers times the dequeue count. Otherwise, the script will
146       stall.
147
148        sub task_end {
149           ...
150              if ($task_name eq 'a') {
151               # $b->enqueue((undef) x $n_workers);
152                 $b->enqueue((undef) x ($n_workers * 2));
153              }
154              elsif ($task_name eq 'b') {
155               # $c->enqueue((undef) x $n_workers);
156                 $c->enqueue((undef) x ($n_workers * 2));
157              }
158           ...
159        }
160
161       Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
162
163        sub task_a {
164           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
165
166           push @ans, map { $_ * 2 } @{ $chunk_ref };
167           $b->enqueue(\@ans, $chunk_id);
168
169           return;
170        }
171
172        sub task_b {
173           my ($mce) = @_;
174
175           while (1) {
176              my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
177              last unless defined $chunk_ref;
178
179              push @ans, map { $_ * 3 } @{ $chunk_ref };
180              $c->enqueue(\@ans, $chunk_id);
181           }
182
183           return;
184        }
185
186        sub task_c {
187           my ($mce) = @_;
188
189           while (1) {
190              my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
191              last unless defined $chunk_ref;
192
193              push @ans, map { $_ * 4 } @{ $chunk_ref };
194              MCE->gather(\@ans, $chunk_id);
195           }
196
197           return;
198        }
199
200       Finally, run as usual.
201
202        my @a;
203
204        mce_flow {
205           task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
206           gather => preserve_order(\@a)
207
208        }, \&task_a, \&task_b, \&task_c, 1..10000;
209
210        print "@a\n";
211

SYNOPSIS when CHUNK_SIZE EQUALS 1

213       Although MCE::Loop may be preferred for running using a single code
214       block, the text below also applies to this module, particularly for the
215       first block.
216
217       All models in MCE default to 'auto' for chunk_size. The arguments for
218       the block are the same as writing a user_func block using the Core API.
219
220       Beginning with MCE 1.5, the next input item is placed into the input
221       scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
222       $chunk_ref containing many items. Basically, line 2 below may be
223       omitted from your code when using $_. One can call MCE->chunk_id to
224       obtain the current chunk id.
225
226        line 1:  user_func => sub {
227        line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
228        line 3:
229        line 4:     $_ points to $chunk_ref->[0]
230        line 5:        in MCE 1.5 when chunk_size == 1
231        line 6:
232        line 7:     $_ points to $chunk_ref
233        line 8:        in MCE 1.5 when chunk_size  > 1
234        line 9:  }
235
236       Follow this synopsis when chunk_size equals one. Looping is not
237       required from inside the first block. Hence, the block is called once
238       per each item.
239
240        ## Exports mce_flow, mce_flow_f, and mce_flow_s
241        use MCE::Flow;
242
243        MCE::Flow::init {
244           chunk_size => 1
245        };
246
247        ## Array or array_ref
248        mce_flow sub { do_work($_) }, 1..10000;
249        mce_flow sub { do_work($_) }, [ 1..10000 ];
250
251        ## File_path, glob_ref, or scalar_ref
252        mce_flow_f sub { chomp; do_work($_) }, "/path/to/file";
253        mce_flow_f sub { chomp; do_work($_) }, $file_handle;
254        mce_flow_f sub { chomp; do_work($_) }, \$scalar;
255
256        ## Sequence of numbers (begin, end [, step, format])
257        mce_flow_s sub { do_work($_) }, 1, 10000, 5;
258        mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
259
260        mce_flow_s sub { do_work($_) }, {
261           begin => 1, end => 10000, step => 5, format => undef
262        };
263

SYNOPSIS when CHUNK_SIZE is GREATER THAN 1

265       Follow this synopsis when chunk_size equals 'auto' or greater than 1.
266       This means having to loop through the chunk from inside the first
267       block.
268
269        use MCE::Flow;
270
271        MCE::Flow::init {          ## Chunk_size defaults to 'auto' when
272           chunk_size => 'auto'    ## not specified. Therefore, the init
273        };                         ## function may be omitted.
274
275        ## Syntax is shown for mce_flow for demonstration purposes.
276        ## Looping inside the block is the same for mce_flow_f and
277        ## mce_flow_s.
278
279        mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
280
281        ## Same as above, resembles code using the Core API.
282
283        mce_flow sub {
284           my ($mce, $chunk_ref, $chunk_id) = @_;
285
286           for (@{ $chunk_ref }) {
287              do_work($_);
288           }
289
290        }, 1..10000;
291
292       Chunking reduces the number of IPC calls behind the scene. Think in
293       terms of chunks whenever processing a large amount of data. For
294       relatively small data, choosing 1 for chunk_size is fine.
295

OVERRIDING DEFAULTS

297       The following list options which may be overridden when loading the
298       module.
299
300        use Sereal qw( encode_sereal decode_sereal );
301        use CBOR::XS qw( encode_cbor decode_cbor );
302        use JSON::XS qw( encode_json decode_json );
303
304        use MCE::Flow
305            max_workers => 8,                # Default 'auto'
306            chunk_size => 500,               # Default 'auto'
307            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
308            freeze => \&encode_sereal,       # \&Storable::freeze
309            thaw => \&decode_sereal          # \&Storable::thaw
310        ;
311
312       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
313       available.  Specify "Sereal =" 0> to use Storable instead.
314
315        use MCE::Flow Sereal => 0;
316

CUSTOMIZING MCE

318       MCE::Flow->init ( options )
319       MCE::Flow::init { options }
320          The init function accepts a hash of MCE options. Unlike with
321          MCE::Stream, both gather and bounds_only options may be specified
322          when calling init (not shown below).
323
324           use MCE::Flow;
325
326           MCE::Flow::init {
327              chunk_size => 1, max_workers => 4,
328
329              user_begin => sub {
330                 print "## ", MCE->wid, " started\n";
331              },
332
333              user_end => sub {
334                 print "## ", MCE->wid, " completed\n";
335              }
336           };
337
338           my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
339
340           print "\n", "@a{1..100}", "\n";
341
342           -- Output
343
344           ## 3 started
345           ## 2 started
346           ## 4 started
347           ## 1 started
348           ## 2 completed
349           ## 4 completed
350           ## 3 completed
351           ## 1 completed
352
353           1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
354           400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
355           1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
356           2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
357           3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
358           5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
359           7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
360           10000
361
362       Like with MCE::Flow::init above, MCE options may be specified using an
363       anonymous hash for the first argument. Notice how task_name,
364       max_workers, and use_threads can take an anonymous array for setting
365       uniquely per each code block.
366
367       Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
368       with the first code block, thus processing from left-to-right.
369
370        use threads;
371        use MCE::Flow;
372
373        my @a = mce_flow {
374           task_name   => [ 'a', 'b', 'c' ],
375           max_workers => [  3,   4,   2, ],
376           use_threads => [  1,   0,   0, ],
377
378           user_end => sub {
379              my ($mce, $task_id, $task_name) = @_;
380              MCE->print("$task_id - $task_name completed\n");
381           },
382
383           task_end => sub {
384              my ($mce, $task_id, $task_name) = @_;
385              MCE->print("$task_id - $task_name ended\n");
386           }
387        },
388        sub { sleep 1; },   ## 3 workers, named a
389        sub { sleep 2; },   ## 4 workers, named b
390        sub { sleep 3; };   ## 2 workers, named c
391
392        -- Output
393
394        0 - a completed
395        0 - a completed
396        0 - a completed
397        0 - a ended
398        1 - b completed
399        1 - b completed
400        1 - b completed
401        1 - b completed
402        1 - b ended
403        2 - c completed
404        2 - c completed
405        2 - c ended
406

API DOCUMENTATION

408       Although input data is optional for MCE::Flow, the following assumes
409       chunk_size equals 1 in order to demonstrate all the possibilities for
410       providing input data.
411
412       MCE::Flow->run ( sub { code }, list )
413       mce_flow sub { code }, list
414          Input data may be defined using a list, an array ref, or a hash ref.
415
416          Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }",
417          Flow takes a "sub { ... }" or a code reference. The other difference
418          is that the comma is needed after the block.
419
420           # $_ contains the item when chunk_size => 1
421
422           mce_flow sub { $_ }, 1..1000;
423           mce_flow sub { $_ }, \@list;
424
425           # chunking, any chunk_size => 1 or higher
426
427           my %res = mce_flow sub {
428              my ($mce, $chunk_ref, $chunk_id) = @_;
429              my %ret;
430              for my $item (@{ $chunk_ref }) {
431                 $ret{$item} = $item * 2;
432              }
433              MCE->gather(%ret);
434           },
435           \@list;
436
437           # input hash, current API available since 1.828
438
439           my %res = mce_flow sub {
440              my ($mce, $chunk_ref, $chunk_id) = @_;
441              my %ret;
442              for my $key (keys %{ $chunk_ref }) {
443                 $ret{$key} = $chunk_ref->{$key} * 2;
444              }
445              MCE->gather(%ret);
446           },
447           \%hash;
448
449           # unlike MCE::Loop, MCE::Flow doesn't need input to run
450
451           mce_flow { max_workers => 4 }, sub {
452              MCE->say( MCE->wid );
453           };
454
455           # ... and can run multiple tasks
456
457           mce_flow {
458              max_workers => [  1,   3  ],
459              task_name   => [ 'p', 'c' ]
460           },
461           sub {
462              # 1 producer
463              MCE->say( "producer: ", MCE->wid );
464           },
465           sub {
466              # 3 consumers
467              MCE->say( "consumer: ", MCE->wid );
468           };
469
470           # here, options are specified via init
471
472           MCE::Flow::init {
473              max_workers => [  1,   3  ],
474              task_name   => [ 'p', 'c' ]
475           };
476
477           mce_flow \&producer, \&consumers;
478
479       MCE::Flow->run_file ( sub { code }, file )
480       mce_flow_f sub { code }, file
481          The fastest of these is the /path/to/file. Workers communicate the
482          next offset position among themselves with zero interaction by the
483          manager process.
484
485           # $_ contains the line when chunk_size => 1
486
487           mce_flow_f sub { $_ }, "/path/to/file";  # faster
488           mce_flow_f sub { $_ }, $file_handle;
489           mce_flow_f sub { $_ }, \$scalar;
490
491           # chunking, any chunk_size => 1 or higher
492
493           my %res = mce_flow_f sub {
494              my ($mce, $chunk_ref, $chunk_id) = @_;
495              my $buf = '';
496              for my $line (@{ $chunk_ref }) {
497                 $buf .= $line;
498              }
499              MCE->gather($chunk_id, $buf);
500           },
501           "/path/to/file";
502
503       MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
504       mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
505          Sequence may be defined as a list, an array reference, or a hash
506          reference.  The functions require both begin and end values to run.
507          Step and format are optional. The format is passed to sprintf (% may
508          be omitted below).
509
510           my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
511
512           # $_ contains the sequence number when chunk_size => 1
513
514           mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
515           mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
516
517           mce_flow_s sub { $_ }, {
518              begin => $beg, end => $end,
519              step => $step, format => $fmt
520           };
521
522           # chunking, any chunk_size => 1 or higher
523
524           my %res = mce_flow_s sub {
525              my ($mce, $chunk_ref, $chunk_id) = @_;
526              my $buf = '';
527              for my $seq (@{ $chunk_ref }) {
528                 $buf .= "$seq\n";
529              }
530              MCE->gather($chunk_id, $buf);
531           },
532           [ $beg, $end ];
533
534          The sequence engine can compute 'begin' and 'end' items only, for
535          the chunk, and not the items in between (hence boundaries only).
536          This option applies to sequence only and has no effect when
537          chunk_size equals 1.
538
539          The time to run is 0.006s below. This becomes 0.827s without the
540          bounds_only option due to computing all items in between, thus
541          creating a very large array. Basically, specify bounds_only => 1
542          when boundaries is all you need for looping inside the block; e.g.
543          Monte Carlo simulations.
544
545          Time was measured using 1 worker to emphasize the difference.
546
547           use MCE::Flow;
548
549           MCE::Flow::init {
550              max_workers => 1, chunk_size => 1_250_000,
551              bounds_only => 1
552           };
553
554           # Typically, the input scalar $_ contains the sequence number
555           # when chunk_size => 1, unless the bounds_only option is set
556           # which is the case here. Thus, $_ points to $chunk_ref.
557
558           mce_flow_s sub {
559              my ($mce, $chunk_ref, $chunk_id) = @_;
560
561              # $chunk_ref contains 2 items, not 1_250_000
562              # my ( $begin, $end ) = ( $_->[0], $_->[1] );
563
564              my $begin = $chunk_ref->[0];
565              my $end   = $chunk_ref->[1];
566
567              # for my $seq ( $begin .. $end ) {
568              #    ...
569              # }
570
571              MCE->printf("%7d .. %8d\n", $begin, $end);
572           },
573           [ 1, 10_000_000 ];
574
575           -- Output
576
577                 1 ..  1250000
578           1250001 ..  2500000
579           2500001 ..  3750000
580           3750001 ..  5000000
581           5000001 ..  6250000
582           6250001 ..  7500000
583           7500001 ..  8750000
584           8750001 .. 10000000
585
586       MCE::Flow->run ( { input_data => iterator }, sub { code } )
587       mce_flow { input_data => iterator }, sub { code }
588          An iterator reference may be specified for input_data. The only
589          other way is to specify input_data via MCE::Flow::init. This
590          prevents MCE::Flow from configuring the iterator reference as
591          another user task which will not work.
592
593          Iterators are described under section "SYNTAX for INPUT_DATA" at
594          MCE::Core.
595
596           MCE::Flow::init {
597              input_data => iterator
598           };
599
600           mce_flow sub { $_ };
601

GATHERING DATA

603       Unlike MCE::Map where gather and output order are done for you
604       automatically, the gather method is used to have results sent back to
605       the manager process.
606
607        use MCE::Flow chunk_size => 1;
608
609        ## Output order is not guaranteed.
610        my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
611        print "@a1\n\n";
612
613        ## Outputs to a hash instead (key, value).
614        my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
615        print "@h1{1..100}\n\n";
616
617        ## This does the same thing due to chunk_id starting at one.
618        my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
619        print "@h2{1..100}\n\n";
620
621       The gather method may be called multiple times within the block unlike
622       return which would leave the block. Therefore, think of gather as
623       yielding results immediately to the manager process without actually
624       leaving the block.
625
626        use MCE::Flow chunk_size => 1, max_workers => 3;
627
628        my @hosts = qw(
629           hosta hostb hostc hostd hoste
630        );
631
632        my %h3 = mce_flow sub {
633           my ($output, $error, $status); my $host = $_;
634
635           ## Do something with $host;
636           $output = "Worker ". MCE->wid .": Hello from $host";
637
638           if (MCE->chunk_id % 3 == 0) {
639              ## Simulating an error condition
640              local $? = 1; $status = $?;
641              $error = "Error from $host"
642           }
643           else {
644              $status = 0;
645           }
646
647           ## Ensure unique keys (key, value) when gathering to
648           ## a hash.
649           MCE->gather("$host.out", $output);
650           MCE->gather("$host.err", $error) if (defined $error);
651           MCE->gather("$host.sta", $status);
652
653        }, @hosts;
654
655        foreach my $host (@hosts) {
656           print $h3{"$host.out"}, "\n";
657           print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
658           print "Exit status: ", $h3{"$host.sta"}, "\n\n";
659        }
660
661        -- Output
662
663        Worker 3: Hello from hosta
664        Exit status: 0
665
666        Worker 2: Hello from hostb
667        Exit status: 0
668
669        Worker 1: Hello from hostc
670        Error from hostc
671        Exit status: 1
672
673        Worker 3: Hello from hostd
674        Exit status: 0
675
676        Worker 2: Hello from hoste
677        Exit status: 0
678
679       The following uses an anonymous array containing 3 elements when
680       gathering data. Serialization is automatic behind the scene.
681
682        my %h3 = mce_flow sub {
683           ...
684
685           MCE->gather($host, [$output, $error, $status]);
686
687        }, @hosts;
688
689        foreach my $host (@hosts) {
690           print $h3{$host}->[0], "\n";
691           print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
692           print "Exit status: ", $h3{$host}->[2], "\n\n";
693        }
694
695       Although MCE::Map comes to mind, one may want additional control when
696       gathering data such as retaining output order.
697
698        use MCE::Flow;
699
700        sub preserve_order {
701           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
702
703           return sub {
704              $tmp{ (shift) } = \@_;
705
706              while (1) {
707                 last unless exists $tmp{$order_id};
708                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
709              }
710
711              return;
712           };
713        }
714
715        ## Workers persist for the most part after running. Though, not always
716        ## the case and depends on Perl. Pass a reference to a subroutine if
717        ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
718
719        MCE::Flow::init {
720           chunk_size => 'auto', max_workers => 'auto'
721        };
722
723        for (1..2) {
724           my @m2;
725
726           mce_flow {
727              gather => preserve_order(\@m2)
728           },
729           sub {
730              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
731
732              ## Compute the entire chunk data at once.
733              push @a, map { $_ * 2 } @{ $chunk_ref };
734
735              ## Afterwards, invoke the gather feature, which
736              ## will direct the data to the callback function.
737              MCE->gather(MCE->chunk_id, @a);
738
739           }, 1..100000;
740
741           print scalar @m2, "\n";
742        }
743
744        MCE::Flow::finish;
745
746       All 6 models support 'auto' for chunk_size unlike the Core API. Think
747       of the models as the basis for providing JIT for MCE. They create the
748       instance, tune max_workers, and tune chunk_size automatically
749       regardless of the hardware.
750
751       The following does the same thing using the Core API. Workers persist
752       after running.
753
754        use MCE;
755
756        sub preserve_order {
757           ...
758        }
759
760        my $mce = MCE->new(
761           max_workers => 'auto', chunk_size => 8000,
762
763           user_func => sub {
764              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
765
766              ## Compute the entire chunk data at once.
767              push @a, map { $_ * 2 } @{ $chunk_ref };
768
769              ## Afterwards, invoke the gather feature, which
770              ## will direct the data to the callback function.
771              MCE->gather(MCE->chunk_id, @a);
772           }
773        );
774
775        for (1..2) {
776           my @m2;
777
778           $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
779
780           print scalar @m2, "\n";
781        }
782
783        $mce->shutdown;
784

MANUAL SHUTDOWN

786       MCE::Flow->finish
787       MCE::Flow::finish
788          Workers remain persistent as much as possible after running.
789          Shutdown occurs automatically when the script terminates. Call
790          finish when workers are no longer needed.
791
792           use MCE::Flow;
793
794           MCE::Flow::init {
795              chunk_size => 20, max_workers => 'auto'
796           };
797
798           mce_flow sub { ... }, 1..100;
799
800           MCE::Flow::finish;
801

INDEX

803       MCE, MCE::Core
804

AUTHOR

806       Mario E. Roy, <marioeroy AT gmail DOT com>
807
808
809
810perl v5.28.1                      2019-01-23                      MCE::Flow(3)
Impressum