1MCE::Flow(3)          User Contributed Perl Documentation         MCE::Flow(3)
2
3
4

NAME

6       MCE::Flow - Parallel flow model for building creative applications
7

VERSION

9       This document describes MCE::Flow version 1.874
10

DESCRIPTION

12       MCE::Flow is great for writing custom apps to maximize on all available
13       cores.  This module was created to help one harness user_tasks within
14       MCE.
15
16       It is trivial to parallelize with mce_stream shown below.
17
18        ## Native map function
19        my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
20
21        ## Same as with MCE::Stream (processing from right to left)
22        @a = mce_stream
23             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
24
25        ## Pass an array reference to have writes occur simultaneously
26        mce_stream \@a,
27             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
28
29       However, let's have MCE::Flow compute the same in parallel. MCE::Queue
30       will be used for data flow among the sub-tasks.
31
32        use MCE::Flow;
33        use MCE::Queue;
34
35       This calls for preserving output order.
36
37        sub preserve_order {
38           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
39           @{ $gather_ref } = ();  ## clear the array (optional)
40
41           return sub {
42              my ($data_ref, $chunk_id) = @_;
43              $tmp{$chunk_id} = $data_ref;
44
45              while (1) {
46                 last unless exists $tmp{$order_id};
47                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
48              }
49
50              return;
51           };
52        }
53
54       Two queues are needed for data flow between the 3 sub-tasks. Notice
55       task_end and how the value from $task_name is used for determining
56       which task has ended.
57
58        my $b = MCE::Queue->new;
59        my $c = MCE::Queue->new;
60
61        sub task_end {
62           my ($mce, $task_id, $task_name) = @_;
63
64           if (defined $mce->{user_tasks}->[$task_id + 1]) {
65              my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
66
67              if ($task_name eq 'a') {
68                 $b->enqueue((undef) x $n_workers);
69              }
70              elsif ($task_name eq 'b') {
71                 $c->enqueue((undef) x $n_workers);
72              }
73           }
74
75           return;
76        }
77
78       Next are the 3 sub-tasks. The first one reads input and begins the
79       flow.  The 2nd task dequeues, performs the calculation, and enqueues
80       into the next.  Finally, the last task calls the gather method.
81
82       Although serialization is done for you automatically, it is done here
83       to save from double serialization. This is the fastest approach for
84       passing data between sub-tasks. Thus, the least overhead.
85
86        sub task_a {
87           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
88
89           push @ans, map { $_ * 2 } @{ $chunk_ref };
90           $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
91
92           return;
93        }
94
95        sub task_b {
96           my ($mce) = @_;
97
98           while (1) {
99              my @ans; my $chunk = $b->dequeue;
100              last unless defined $chunk;
101
102              $chunk = MCE->thaw($chunk);
103              push @ans, map { $_ * 3 } @{ $chunk->[0] };
104              $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
105           }
106
107           return;
108        }
109
110        sub task_c {
111           my ($mce) = @_;
112
113           while (1) {
114              my @ans; my $chunk = $c->dequeue;
115              last unless defined $chunk;
116
117              $chunk = MCE->thaw($chunk);
118              push @ans, map { $_ * 4 } @{ $chunk->[0] };
119              MCE->gather(\@ans, $chunk->[1]);
120           }
121
122           return;
123        }
124
125       In summary, MCE::Flow builds out a MCE instance behind the scene and
126       starts running. The task_name (shown), max_workers, and use_threads
127       options can take an anonymous array for specifying the values uniquely
128       per each sub-task.
129
130        my @a;
131
132        mce_flow {
133           task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
134           gather => preserve_order(\@a)
135
136        }, \&task_a, \&task_b, \&task_c, 1..10000;
137
138        print "@a\n";
139
140       If speed is not a concern and wanting to rid of all the MCE->freeze and
141       MCE->thaw statements, simply enqueue and dequeue 2 items at a time.  Or
142       better yet, see MCE::Step introduced in MCE 1.506.
143
144       First, task_end must be updated. The number of undef(s) must match the
145       number of workers times the dequeue count. Otherwise, the script will
146       stall.
147
148        sub task_end {
149           ...
150              if ($task_name eq 'a') {
151               # $b->enqueue((undef) x $n_workers);
152                 $b->enqueue((undef) x ($n_workers * 2));
153              }
154              elsif ($task_name eq 'b') {
155               # $c->enqueue((undef) x $n_workers);
156                 $c->enqueue((undef) x ($n_workers * 2));
157              }
158           ...
159        }
160
161       Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
162
163        sub task_a {
164           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
165
166           push @ans, map { $_ * 2 } @{ $chunk_ref };
167           $b->enqueue(\@ans, $chunk_id);
168
169           return;
170        }
171
172        sub task_b {
173           my ($mce) = @_;
174
175           while (1) {
176              my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
177              last unless defined $chunk_ref;
178
179              push @ans, map { $_ * 3 } @{ $chunk_ref };
180              $c->enqueue(\@ans, $chunk_id);
181           }
182
183           return;
184        }
185
186        sub task_c {
187           my ($mce) = @_;
188
189           while (1) {
190              my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
191              last unless defined $chunk_ref;
192
193              push @ans, map { $_ * 4 } @{ $chunk_ref };
194              MCE->gather(\@ans, $chunk_id);
195           }
196
197           return;
198        }
199
200       Finally, run as usual.
201
202        my @a;
203
204        mce_flow {
205           task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
206           gather => preserve_order(\@a)
207
208        }, \&task_a, \&task_b, \&task_c, 1..10000;
209
210        print "@a\n";
211

SYNOPSIS when CHUNK_SIZE EQUALS 1

213       Although MCE::Loop may be preferred for running using a single code
214       block, the text below also applies to this module, particularly for the
215       first block.
216
217       All models in MCE default to 'auto' for chunk_size. The arguments for
218       the block are the same as writing a user_func block using the Core API.
219
220       Beginning with MCE 1.5, the next input item is placed into the input
221       scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
222       $chunk_ref containing many items. Basically, line 2 below may be
223       omitted from your code when using $_. One can call MCE->chunk_id to
224       obtain the current chunk id.
225
226        line 1:  user_func => sub {
227        line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
228        line 3:
229        line 4:     $_ points to $chunk_ref->[0]
230        line 5:        in MCE 1.5 when chunk_size == 1
231        line 6:
232        line 7:     $_ points to $chunk_ref
233        line 8:        in MCE 1.5 when chunk_size  > 1
234        line 9:  }
235
236       Follow this synopsis when chunk_size equals one. Looping is not
237       required from inside the first block. Hence, the block is called once
238       per each item.
239
240        ## Exports mce_flow, mce_flow_f, and mce_flow_s
241        use MCE::Flow;
242
243        MCE::Flow->init(
244           chunk_size => 1
245        );
246
247        ## Array or array_ref
248        mce_flow sub { do_work($_) }, 1..10000;
249        mce_flow sub { do_work($_) }, \@list;
250
251        ## Important; pass an array_ref for deeply input data
252        mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
253        mce_flow sub { do_work($_) }, \@deeply_list;
254
255        ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
256        ## Workers read directly and not involve the manager process
257        mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
258
259        ## Involves the manager process, therefore slower
260        mce_flow_f sub { chomp; do_work($_) }, $file_handle;
261        mce_flow_f sub { chomp; do_work($_) }, $io;
262        mce_flow_f sub { chomp; do_work($_) }, \$scalar;
263
264        ## Sequence of numbers (begin, end [, step, format])
265        mce_flow_s sub { do_work($_) }, 1, 10000, 5;
266        mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
267
268        mce_flow_s sub { do_work($_) }, {
269           begin => 1, end => 10000, step => 5, format => undef
270        };
271

SYNOPSIS when CHUNK_SIZE is GREATER THAN 1

273       Follow this synopsis when chunk_size equals 'auto' or greater than 1.
274       This means having to loop through the chunk from inside the first
275       block.
276
277        use MCE::Flow;
278
279        MCE::Flow->init(           ## Chunk_size defaults to 'auto' when
280           chunk_size => 'auto'    ## not specified. Therefore, the init
281        );                         ## function may be omitted.
282
283        ## Syntax is shown for mce_flow for demonstration purposes.
284        ## Looping inside the block is the same for mce_flow_f and
285        ## mce_flow_s.
286
287        ## Array or array_ref
288        mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
289        mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
290
291        ## Important; pass an array_ref for deeply input data
292        mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
293        mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
294
295        ## Resembles code using the core MCE API
296        mce_flow sub {
297           my ($mce, $chunk_ref, $chunk_id) = @_;
298
299           for (@{ $chunk_ref }) {
300              do_work($_);
301           }
302
303        }, 1..10000;
304
305       Chunking reduces the number of IPC calls behind the scene. Think in
306       terms of chunks whenever processing a large amount of data. For
307       relatively small data, choosing 1 for chunk_size is fine.
308

OVERRIDING DEFAULTS

310       The following list options which may be overridden when loading the
311       module.
312
313        use Sereal qw( encode_sereal decode_sereal );
314        use CBOR::XS qw( encode_cbor decode_cbor );
315        use JSON::XS qw( encode_json decode_json );
316
317        use MCE::Flow
318            max_workers => 8,                # Default 'auto'
319            chunk_size => 500,               # Default 'auto'
320            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
321            freeze => \&encode_sereal,       # \&Storable::freeze
322            thaw => \&decode_sereal          # \&Storable::thaw
323        ;
324
325       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
326       available.  Specify "Sereal => 0" to use Storable instead.
327
328        use MCE::Flow Sereal => 0;
329

CUSTOMIZING MCE

331       MCE::Flow->init ( options )
332       MCE::Flow::init { options }
333
334       The init function accepts a hash of MCE options. Unlike with
335       MCE::Stream, both gather and bounds_only options may be specified when
336       calling init (not shown below).
337
338        use MCE::Flow;
339
340        MCE::Flow->init(
341           chunk_size => 1, max_workers => 4,
342
343           user_begin => sub {
344              print "## ", MCE->wid, " started\n";
345           },
346
347           user_end => sub {
348              print "## ", MCE->wid, " completed\n";
349           }
350        );
351
352        my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
353
354        print "\n", "@a{1..100}", "\n";
355
356        -- Output
357
358        ## 3 started
359        ## 2 started
360        ## 4 started
361        ## 1 started
362        ## 2 completed
363        ## 4 completed
364        ## 3 completed
365        ## 1 completed
366
367        1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
368        400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
369        1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
370        2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
371        3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
372        5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
373        7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
374        10000
375
376       Like with MCE::Flow->init above, MCE options may be specified using an
377       anonymous hash for the first argument. Notice how task_name,
378       max_workers, and use_threads can take an anonymous array for setting
379       uniquely per each code block.
380
381       Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
382       with the first code block, thus processing from left-to-right.
383
384        use threads;
385        use MCE::Flow;
386
387        my @a = mce_flow {
388           task_name   => [ 'a', 'b', 'c' ],
389           max_workers => [  3,   4,   2, ],
390           use_threads => [  1,   0,   0, ],
391
392           user_end => sub {
393              my ($mce, $task_id, $task_name) = @_;
394              MCE->print("$task_id - $task_name completed\n");
395           },
396
397           task_end => sub {
398              my ($mce, $task_id, $task_name) = @_;
399              MCE->print("$task_id - $task_name ended\n");
400           }
401        },
402        sub { sleep 1; },   ## 3 workers, named a
403        sub { sleep 2; },   ## 4 workers, named b
404        sub { sleep 3; };   ## 2 workers, named c
405
406        -- Output
407
408        0 - a completed
409        0 - a completed
410        0 - a completed
411        0 - a ended
412        1 - b completed
413        1 - b completed
414        1 - b completed
415        1 - b completed
416        1 - b ended
417        2 - c completed
418        2 - c completed
419        2 - c ended
420

API DOCUMENTATION

422       Although input data is optional for MCE::Flow, the following assumes
423       chunk_size equals 1 in order to demonstrate all the possibilities for
424       providing input data.
425
426       MCE::Flow->run ( sub { code }, list )
427       mce_flow sub { code }, list
428
429       Input data may be defined using a list, an array ref, or a hash ref.
430
431       Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Flow
432       takes a "sub { ... }" or a code reference. The other difference is that
433       the comma is needed after the block.
434
435        # $_ contains the item when chunk_size => 1
436
437        mce_flow sub { do_work($_) }, 1..1000;
438        mce_flow sub { do_work($_) }, \@list;
439
440        # Important; pass an array_ref for deeply input data
441
442        mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
443        mce_flow sub { do_work($_) }, \@deeply_list;
444
445        # Chunking; any chunk_size => 1 or greater
446
447        my %res = mce_flow sub {
448           my ($mce, $chunk_ref, $chunk_id) = @_;
449           my %ret;
450           for my $item (@{ $chunk_ref }) {
451              $ret{$item} = $item * 2;
452           }
453           MCE->gather(%ret);
454        },
455        \@list;
456
457        # Input hash; current API available since 1.828
458
459        my %res = mce_flow sub {
460           my ($mce, $chunk_ref, $chunk_id) = @_;
461           my %ret;
462           for my $key (keys %{ $chunk_ref }) {
463              $ret{$key} = $chunk_ref->{$key} * 2;
464           }
465           MCE->gather(%ret);
466        },
467        \%hash;
468
469        # Unlike MCE::Loop, MCE::Flow doesn't need input to run
470
471        mce_flow { max_workers => 4 }, sub {
472           MCE->say( MCE->wid );
473        };
474
475        # ... and can run multiple tasks
476
477        mce_flow {
478           max_workers => [  1,   3  ],
479           task_name   => [ 'p', 'c' ]
480        },
481        sub {
482           # 1 producer
483           MCE->say( "producer: ", MCE->wid );
484        },
485        sub {
486           # 3 consumers
487           MCE->say( "consumer: ", MCE->wid );
488        };
489
490        # Here, options are specified via init
491
492        MCE::Flow->init(
493           max_workers => [  1,   3  ],
494           task_name   => [ 'p', 'c' ]
495        );
496
497        mce_flow \&producer, \&consumers;
498
499       MCE::Flow->run_file ( sub { code }, file )
500       mce_flow_f sub { code }, file
501
502       The fastest of these is the /path/to/file. Workers communicate the next
503       offset position among themselves with zero interaction by the manager
504       process.
505
506       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
507
508        # $_ contains the line when chunk_size => 1
509
510        mce_flow_f sub { $_ }, "/path/to/file";  # faster
511        mce_flow_f sub { $_ }, $file_handle;
512        mce_flow_f sub { $_ }, $io;              # IO::All
513        mce_flow_f sub { $_ }, \$scalar;
514
515        # chunking, any chunk_size => 1 or greater
516
517        my %res = mce_flow_f sub {
518           my ($mce, $chunk_ref, $chunk_id) = @_;
519           my $buf = '';
520           for my $line (@{ $chunk_ref }) {
521              $buf .= $line;
522           }
523           MCE->gather($chunk_id, $buf);
524        },
525        "/path/to/file";
526
527       MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
528       mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
529
530       Sequence may be defined as a list, an array reference, or a hash
531       reference.  The functions require both begin and end values to run.
532       Step and format are optional. The format is passed to sprintf (% may be
533       omitted below).
534
535        my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
536
537        # $_ contains the sequence number when chunk_size => 1
538
539        mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
540        mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
541
542        mce_flow_s sub { $_ }, {
543           begin => $beg, end => $end,
544           step => $step, format => $fmt
545        };
546
547        # chunking, any chunk_size => 1 or greater
548
549        my %res = mce_flow_s sub {
550           my ($mce, $chunk_ref, $chunk_id) = @_;
551           my $buf = '';
552           for my $seq (@{ $chunk_ref }) {
553              $buf .= "$seq\n";
554           }
555           MCE->gather($chunk_id, $buf);
556        },
557        [ $beg, $end ];
558
559       The sequence engine can compute 'begin' and 'end' items only, for the
560       chunk, and not the items in between (hence boundaries only). This
561       option applies to sequence only and has no effect when chunk_size
562       equals 1.
563
564       The time to run is 0.006s below. This becomes 0.827s without the
565       bounds_only option due to computing all items in between, thus creating
566       a very large array. Basically, specify bounds_only => 1 when boundaries
567       is all you need for looping inside the block; e.g. Monte Carlo
568       simulations.
569
570       Time was measured using 1 worker to emphasize the difference.
571
572        use MCE::Flow;
573
574        MCE::Flow->init(
575           max_workers => 1, chunk_size => 1_250_000,
576           bounds_only => 1
577        );
578
579        # Typically, the input scalar $_ contains the sequence number
580        # when chunk_size => 1, unless the bounds_only option is set
581        # which is the case here. Thus, $_ points to $chunk_ref.
582
583        mce_flow_s sub {
584           my ($mce, $chunk_ref, $chunk_id) = @_;
585
586           # $chunk_ref contains 2 items, not 1_250_000
587           # my ( $begin, $end ) = ( $_->[0], $_->[1] );
588
589           my $begin = $chunk_ref->[0];
590           my $end   = $chunk_ref->[1];
591
592           # for my $seq ( $begin .. $end ) {
593           #    ...
594           # }
595
596           MCE->printf("%7d .. %8d\n", $begin, $end);
597        },
598        [ 1, 10_000_000 ];
599
600        -- Output
601
602              1 ..  1250000
603        1250001 ..  2500000
604        2500001 ..  3750000
605        3750001 ..  5000000
606        5000001 ..  6250000
607        6250001 ..  7500000
608        7500001 ..  8750000
609        8750001 .. 10000000
610
611       MCE::Flow->run ( { input_data => iterator }, sub { code } )
612       mce_flow { input_data => iterator }, sub { code }
613
614       An iterator reference may be specified for input_data. The only other
615       way is to specify input_data via MCE::Flow->init. This prevents
616       MCE::Flow from configuring the iterator reference as another user task
617       which will not work.
618
619       Iterators are described under section "SYNTAX for INPUT_DATA" at
620       MCE::Core.
621
622        MCE::Flow->init(
623           input_data => iterator
624        );
625
626        mce_flow sub { $_ };
627

GATHERING DATA

629       Unlike MCE::Map where gather and output order are done for you
630       automatically, the gather method is used to have results sent back to
631       the manager process.
632
633        use MCE::Flow chunk_size => 1;
634
635        ## Output order is not guaranteed.
636        my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
637        print "@a1\n\n";
638
639        ## Outputs to a hash instead (key, value).
640        my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
641        print "@h1{1..100}\n\n";
642
643        ## This does the same thing due to chunk_id starting at one.
644        my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
645        print "@h2{1..100}\n\n";
646
647       The gather method may be called multiple times within the block unlike
648       return which would leave the block. Therefore, think of gather as
649       yielding results immediately to the manager process without actually
650       leaving the block.
651
652        use MCE::Flow chunk_size => 1, max_workers => 3;
653
654        my @hosts = qw(
655           hosta hostb hostc hostd hoste
656        );
657
658        my %h3 = mce_flow sub {
659           my ($output, $error, $status); my $host = $_;
660
661           ## Do something with $host;
662           $output = "Worker ". MCE->wid .": Hello from $host";
663
664           if (MCE->chunk_id % 3 == 0) {
665              ## Simulating an error condition
666              local $? = 1; $status = $?;
667              $error = "Error from $host"
668           }
669           else {
670              $status = 0;
671           }
672
673           ## Ensure unique keys (key, value) when gathering to
674           ## a hash.
675           MCE->gather("$host.out", $output);
676           MCE->gather("$host.err", $error) if (defined $error);
677           MCE->gather("$host.sta", $status);
678
679        }, @hosts;
680
681        foreach my $host (@hosts) {
682           print $h3{"$host.out"}, "\n";
683           print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
684           print "Exit status: ", $h3{"$host.sta"}, "\n\n";
685        }
686
687        -- Output
688
689        Worker 3: Hello from hosta
690        Exit status: 0
691
692        Worker 2: Hello from hostb
693        Exit status: 0
694
695        Worker 1: Hello from hostc
696        Error from hostc
697        Exit status: 1
698
699        Worker 3: Hello from hostd
700        Exit status: 0
701
702        Worker 2: Hello from hoste
703        Exit status: 0
704
705       The following uses an anonymous array containing 3 elements when
706       gathering data. Serialization is automatic behind the scene.
707
708        my %h3 = mce_flow sub {
709           ...
710
711           MCE->gather($host, [$output, $error, $status]);
712
713        }, @hosts;
714
715        foreach my $host (@hosts) {
716           print $h3{$host}->[0], "\n";
717           print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
718           print "Exit status: ", $h3{$host}->[2], "\n\n";
719        }
720
721       Although MCE::Map comes to mind, one may want additional control when
722       gathering data such as retaining output order.
723
724        use MCE::Flow;
725
726        sub preserve_order {
727           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
728
729           return sub {
730              $tmp{ (shift) } = \@_;
731
732              while (1) {
733                 last unless exists $tmp{$order_id};
734                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
735              }
736
737              return;
738           };
739        }
740
741        ## Workers persist for the most part after running. Though, not always
742        ## the case and depends on Perl. Pass a reference to a subroutine if
743        ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
744
745        MCE::Flow->init(
746           chunk_size => 'auto', max_workers => 'auto'
747        );
748
749        for (1..2) {
750           my @m2;
751
752           mce_flow {
753              gather => preserve_order(\@m2)
754           },
755           sub {
756              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
757
758              ## Compute the entire chunk data at once.
759              push @a, map { $_ * 2 } @{ $chunk_ref };
760
761              ## Afterwards, invoke the gather feature, which
762              ## will direct the data to the callback function.
763              MCE->gather(MCE->chunk_id, @a);
764
765           }, 1..100000;
766
767           print scalar @m2, "\n";
768        }
769
770        MCE::Flow->finish;
771
772       All 6 models support 'auto' for chunk_size unlike the Core API. Think
773       of the models as the basis for providing JIT for MCE. They create the
774       instance, tune max_workers, and tune chunk_size automatically
775       regardless of the hardware.
776
777       The following does the same thing using the Core API. Workers persist
778       after running.
779
780        use MCE;
781
782        sub preserve_order {
783           ...
784        }
785
786        my $mce = MCE->new(
787           max_workers => 'auto', chunk_size => 8000,
788
789           user_func => sub {
790              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
791
792              ## Compute the entire chunk data at once.
793              push @a, map { $_ * 2 } @{ $chunk_ref };
794
795              ## Afterwards, invoke the gather feature, which
796              ## will direct the data to the callback function.
797              MCE->gather(MCE->chunk_id, @a);
798           }
799        );
800
801        for (1..2) {
802           my @m2;
803
804           $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
805
806           print scalar @m2, "\n";
807        }
808
809        $mce->shutdown;
810

MANUAL SHUTDOWN

812       MCE::Flow->finish
813       MCE::Flow::finish
814
815       Workers remain persistent as much as possible after running. Shutdown
816       occurs automatically when the script terminates. Call finish when
817       workers are no longer needed.
818
819        use MCE::Flow;
820
821        MCE::Flow->init(
822           chunk_size => 20, max_workers => 'auto'
823        );
824
825        mce_flow sub { ... }, 1..100;
826
827        MCE::Flow->finish;
828

INDEX

830       MCE, MCE::Core
831

AUTHOR

833       Mario E. Roy, <marioeroy AT gmail DOT com>
834
835
836
837perl v5.32.1                      2021-01-27                      MCE::Flow(3)
Impressum