1MCE::Flow(3)          User Contributed Perl Documentation         MCE::Flow(3)
2
3
4

NAME

6       MCE::Flow - Parallel flow model for building creative applications
7

VERSION

9       This document describes MCE::Flow version 1.884
10

DESCRIPTION

12       MCE::Flow is great for writing custom apps to maximize on all available
13       cores.  This module was created to help one harness user_tasks within
14       MCE.
15
16       It is trivial to parallelize with mce_stream shown below.
17
18        ## Native map function
19        my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
20
21        ## Same as with MCE::Stream (processing from right to left)
22        @a = mce_stream
23             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
24
25        ## Pass an array reference to have writes occur simultaneously
26        mce_stream \@a,
27             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
28
29       However, let's have MCE::Flow compute the same in parallel. MCE::Queue
30       will be used for data flow among the sub-tasks.
31
32        use MCE::Flow;
33        use MCE::Queue;
34
35       This calls for preserving output order.
36
37        sub preserve_order {
38           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
39           @{ $gather_ref } = ();  ## clear the array (optional)
40
41           return sub {
42              my ($data_ref, $chunk_id) = @_;
43              $tmp{$chunk_id} = $data_ref;
44
45              while (1) {
46                 last unless exists $tmp{$order_id};
47                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
48              }
49
50              return;
51           };
52        }
53
54       Two queues are needed for data flow between the 3 sub-tasks. Notice
55       task_end and how the value from $task_name is used for determining
56       which task has ended.
57
58        my $b = MCE::Queue->new;
59        my $c = MCE::Queue->new;
60
61        sub task_end {
62           my ($mce, $task_id, $task_name) = @_;
63
64           if (defined $mce->{user_tasks}->[$task_id + 1]) {
65              my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
66
67              if ($task_name eq 'a') {
68                 $b->enqueue((undef) x $n_workers);
69              }
70              elsif ($task_name eq 'b') {
71                 $c->enqueue((undef) x $n_workers);
72              }
73           }
74
75           return;
76        }
77
78       Next are the 3 sub-tasks. The first one reads input and begins the
79       flow.  The 2nd task dequeues, performs the calculation, and enqueues
80       into the next.  Finally, the last task calls the gather method.
81
82       Although serialization is done for you automatically, it is done here
83       to save from double serialization. This is the fastest approach for
84       passing data between sub-tasks. Thus, the least overhead.
85
86        sub task_a {
87           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
88
89           push @ans, map { $_ * 2 } @{ $chunk_ref };
90           $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
91
92           return;
93        }
94
95        sub task_b {
96           my ($mce) = @_;
97
98           while (1) {
99              my @ans; my $chunk = $b->dequeue;
100              last unless defined $chunk;
101
102              $chunk = MCE->thaw($chunk);
103              push @ans, map { $_ * 3 } @{ $chunk->[0] };
104              $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
105           }
106
107           return;
108        }
109
110        sub task_c {
111           my ($mce) = @_;
112
113           while (1) {
114              my @ans; my $chunk = $c->dequeue;
115              last unless defined $chunk;
116
117              $chunk = MCE->thaw($chunk);
118              push @ans, map { $_ * 4 } @{ $chunk->[0] };
119              MCE->gather(\@ans, $chunk->[1]);
120           }
121
122           return;
123        }
124
125       In summary, MCE::Flow builds out a MCE instance behind the scene and
126       starts running. The task_name (shown), max_workers, and use_threads
127       options can take an anonymous array for specifying the values uniquely
128       per each sub-task.
129
130        my @a;
131
132        mce_flow {
133           task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
134           gather => preserve_order(\@a)
135
136        }, \&task_a, \&task_b, \&task_c, 1..10000;
137
138        print "@a\n";
139
140       If speed is not a concern and wanting to rid of all the MCE->freeze and
141       MCE->thaw statements, simply enqueue and dequeue 2 items at a time.  Or
142       better yet, see MCE::Step introduced in MCE 1.506.
143
144       First, task_end must be updated. The number of undef(s) must match the
145       number of workers times the dequeue count. Otherwise, the script will
146       stall.
147
148        sub task_end {
149           ...
150              if ($task_name eq 'a') {
151               # $b->enqueue((undef) x $n_workers);
152                 $b->enqueue((undef) x ($n_workers * 2));
153              }
154              elsif ($task_name eq 'b') {
155               # $c->enqueue((undef) x $n_workers);
156                 $c->enqueue((undef) x ($n_workers * 2));
157              }
158           ...
159        }
160
161       Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
162
163        sub task_a {
164           my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
165
166           push @ans, map { $_ * 2 } @{ $chunk_ref };
167           $b->enqueue(\@ans, $chunk_id);
168
169           return;
170        }
171
172        sub task_b {
173           my ($mce) = @_;
174
175           while (1) {
176              my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
177              last unless defined $chunk_ref;
178
179              push @ans, map { $_ * 3 } @{ $chunk_ref };
180              $c->enqueue(\@ans, $chunk_id);
181           }
182
183           return;
184        }
185
186        sub task_c {
187           my ($mce) = @_;
188
189           while (1) {
190              my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
191              last unless defined $chunk_ref;
192
193              push @ans, map { $_ * 4 } @{ $chunk_ref };
194              MCE->gather(\@ans, $chunk_id);
195           }
196
197           return;
198        }
199
200       Finally, run as usual.
201
202        my @a;
203
204        mce_flow {
205           task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
206           gather => preserve_order(\@a)
207
208        }, \&task_a, \&task_b, \&task_c, 1..10000;
209
210        print "@a\n";
211

SYNOPSIS when CHUNK_SIZE EQUALS 1

213       Although MCE::Loop may be preferred for running using a single code
214       block, the text below also applies to this module, particularly for the
215       first block.
216
217       All models in MCE default to 'auto' for chunk_size. The arguments for
218       the block are the same as writing a user_func block using the Core API.
219
220       Beginning with MCE 1.5, the next input item is placed into the input
221       scalar variable $_ when chunk_size equals 1. Otherwise, $_ points to
222       $chunk_ref containing many items. Basically, line 2 below may be
223       omitted from your code when using $_. One can call MCE->chunk_id to
224       obtain the current chunk id.
225
226        line 1:  user_func => sub {
227        line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
228        line 3:
229        line 4:     $_ points to $chunk_ref->[0]
230        line 5:        in MCE 1.5 when chunk_size == 1
231        line 6:
232        line 7:     $_ points to $chunk_ref
233        line 8:        in MCE 1.5 when chunk_size  > 1
234        line 9:  }
235
236       Follow this synopsis when chunk_size equals one. Looping is not
237       required from inside the first block. Hence, the block is called once
238       per each item.
239
240        ## Exports mce_flow, mce_flow_f, and mce_flow_s
241        use MCE::Flow;
242
243        MCE::Flow->init(
244           chunk_size => 1
245        );
246
247        ## Array or array_ref
248        mce_flow sub { do_work($_) }, 1..10000;
249        mce_flow sub { do_work($_) }, \@list;
250
251        ## Important; pass an array_ref for deeply input data
252        mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
253        mce_flow sub { do_work($_) }, \@deeply_list;
254
255        ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
256        ## Workers read directly and not involve the manager process
257        mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
258
259        ## Involves the manager process, therefore slower
260        mce_flow_f sub { chomp; do_work($_) }, $file_handle;
261        mce_flow_f sub { chomp; do_work($_) }, $io;
262        mce_flow_f sub { chomp; do_work($_) }, \$scalar;
263
264        ## Sequence of numbers (begin, end [, step, format])
265        mce_flow_s sub { do_work($_) }, 1, 10000, 5;
266        mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
267
268        mce_flow_s sub { do_work($_) }, {
269           begin => 1, end => 10000, step => 5, format => undef
270        };
271

SYNOPSIS when CHUNK_SIZE is GREATER THAN 1

273       Follow this synopsis when chunk_size equals 'auto' or greater than 1.
274       This means having to loop through the chunk from inside the first
275       block.
276
277        use MCE::Flow;
278
279        MCE::Flow->init(           ## Chunk_size defaults to 'auto' when
280           chunk_size => 'auto'    ## not specified. Therefore, the init
281        );                         ## function may be omitted.
282
283        ## Syntax is shown for mce_flow for demonstration purposes.
284        ## Looping inside the block is the same for mce_flow_f and
285        ## mce_flow_s.
286
287        ## Array or array_ref
288        mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
289        mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
290
291        ## Important; pass an array_ref for deeply input data
292        mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
293        mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
294
295        ## Resembles code using the core MCE API
296        mce_flow sub {
297           my ($mce, $chunk_ref, $chunk_id) = @_;
298
299           for (@{ $chunk_ref }) {
300              do_work($_);
301           }
302
303        }, 1..10000;
304
305       Chunking reduces the number of IPC calls behind the scene. Think in
306       terms of chunks whenever processing a large amount of data. For
307       relatively small data, choosing 1 for chunk_size is fine.
308

OVERRIDING DEFAULTS

310       The following list options which may be overridden when loading the
311       module.
312
313        use Sereal qw( encode_sereal decode_sereal );
314        use CBOR::XS qw( encode_cbor decode_cbor );
315        use JSON::XS qw( encode_json decode_json );
316
317        use MCE::Flow
318            max_workers => 8,                # Default 'auto'
319            chunk_size => 500,               # Default 'auto'
320            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
321            freeze => \&encode_sereal,       # \&Storable::freeze
322            thaw => \&decode_sereal,         # \&Storable::thaw
323            init_relay => 0,                 # Default undef; MCE 1.882+
324            use_threads => 0,                # Default undef; MCE 1.882+
325        ;
326
327       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
328       available.  Specify "Sereal => 0" to use Storable instead.
329
330        use MCE::Flow Sereal => 0;
331

CUSTOMIZING MCE

333       MCE::Flow->init ( options )
334       MCE::Flow::init { options }
335
336       The init function accepts a hash of MCE options. Unlike with
337       MCE::Stream, both gather and bounds_only options may be specified when
338       calling init (not shown below).
339
340        use MCE::Flow;
341
342        MCE::Flow->init(
343           chunk_size => 1, max_workers => 4,
344
345           user_begin => sub {
346              print "## ", MCE->wid, " started\n";
347           },
348
349           user_end => sub {
350              print "## ", MCE->wid, " completed\n";
351           }
352        );
353
354        my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
355
356        print "\n", "@a{1..100}", "\n";
357
358        -- Output
359
360        ## 3 started
361        ## 2 started
362        ## 4 started
363        ## 1 started
364        ## 2 completed
365        ## 4 completed
366        ## 3 completed
367        ## 1 completed
368
369        1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
370        400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
371        1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
372        2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
373        3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
374        5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
375        7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
376        10000
377
378       Like with MCE::Flow->init above, MCE options may be specified using an
379       anonymous hash for the first argument. Notice how task_name,
380       max_workers, and use_threads can take an anonymous array for setting
381       uniquely per each code block.
382
383       Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
384       with the first code block, thus processing from left-to-right.
385
386        use threads;
387        use MCE::Flow;
388
389        my @a = mce_flow {
390           task_name   => [ 'a', 'b', 'c' ],
391           max_workers => [  3,   4,   2, ],
392           use_threads => [  1,   0,   0, ],
393
394           user_end => sub {
395              my ($mce, $task_id, $task_name) = @_;
396              MCE->print("$task_id - $task_name completed\n");
397           },
398
399           task_end => sub {
400              my ($mce, $task_id, $task_name) = @_;
401              MCE->print("$task_id - $task_name ended\n");
402           }
403        },
404        sub { sleep 1; },   ## 3 workers, named a
405        sub { sleep 2; },   ## 4 workers, named b
406        sub { sleep 3; };   ## 2 workers, named c
407
408        -- Output
409
410        0 - a completed
411        0 - a completed
412        0 - a completed
413        0 - a ended
414        1 - b completed
415        1 - b completed
416        1 - b completed
417        1 - b completed
418        1 - b ended
419        2 - c completed
420        2 - c completed
421        2 - c ended
422

API DOCUMENTATION

424       Although input data is optional for MCE::Flow, the following assumes
425       chunk_size equals 1 in order to demonstrate all the possibilities for
426       providing input data.
427
428       MCE::Flow->run ( sub { code }, list )
429       mce_flow sub { code }, list
430
431       Input data may be defined using a list, an array ref, or a hash ref.
432
433       Unlike MCE::Loop, Map, and Grep which take a block as "{ ... }", Flow
434       takes a "sub { ... }" or a code reference. The other difference is that
435       the comma is needed after the block.
436
437        # $_ contains the item when chunk_size => 1
438
439        mce_flow sub { do_work($_) }, 1..1000;
440        mce_flow sub { do_work($_) }, \@list;
441
442        # Important; pass an array_ref for deeply input data
443
444        mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
445        mce_flow sub { do_work($_) }, \@deeply_list;
446
447        # Chunking; any chunk_size => 1 or greater
448
449        my %res = mce_flow sub {
450           my ($mce, $chunk_ref, $chunk_id) = @_;
451           my %ret;
452           for my $item (@{ $chunk_ref }) {
453              $ret{$item} = $item * 2;
454           }
455           MCE->gather(%ret);
456        },
457        \@list;
458
459        # Input hash; current API available since 1.828
460
461        my %res = mce_flow sub {
462           my ($mce, $chunk_ref, $chunk_id) = @_;
463           my %ret;
464           for my $key (keys %{ $chunk_ref }) {
465              $ret{$key} = $chunk_ref->{$key} * 2;
466           }
467           MCE->gather(%ret);
468        },
469        \%hash;
470
471        # Unlike MCE::Loop, MCE::Flow doesn't need input to run
472
473        mce_flow { max_workers => 4 }, sub {
474           MCE->say( MCE->wid );
475        };
476
477        # ... and can run multiple tasks
478
479        mce_flow {
480           max_workers => [  1,   3  ],
481           task_name   => [ 'p', 'c' ]
482        },
483        sub {
484           # 1 producer
485           MCE->say( "producer: ", MCE->wid );
486        },
487        sub {
488           # 3 consumers
489           MCE->say( "consumer: ", MCE->wid );
490        };
491
492        # Here, options are specified via init
493
494        MCE::Flow->init(
495           max_workers => [  1,   3  ],
496           task_name   => [ 'p', 'c' ]
497        );
498
499        mce_flow \&producer, \&consumers;
500
501       MCE::Flow->run_file ( sub { code }, file )
502       mce_flow_f sub { code }, file
503
504       The fastest of these is the /path/to/file. Workers communicate the next
505       offset position among themselves with zero interaction by the manager
506       process.
507
508       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
509
510        # $_ contains the line when chunk_size => 1
511
512        mce_flow_f sub { $_ }, "/path/to/file";  # faster
513        mce_flow_f sub { $_ }, $file_handle;
514        mce_flow_f sub { $_ }, $io;              # IO::All
515        mce_flow_f sub { $_ }, \$scalar;
516
517        # chunking, any chunk_size => 1 or greater
518
519        my %res = mce_flow_f sub {
520           my ($mce, $chunk_ref, $chunk_id) = @_;
521           my $buf = '';
522           for my $line (@{ $chunk_ref }) {
523              $buf .= $line;
524           }
525           MCE->gather($chunk_id, $buf);
526        },
527        "/path/to/file";
528
529       MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
530       mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
531
532       Sequence may be defined as a list, an array reference, or a hash
533       reference.  The functions require both begin and end values to run.
534       Step and format are optional. The format is passed to sprintf (% may be
535       omitted below).
536
537        my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
538
539        # $_ contains the sequence number when chunk_size => 1
540
541        mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
542        mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
543
544        mce_flow_s sub { $_ }, {
545           begin => $beg, end => $end,
546           step => $step, format => $fmt
547        };
548
549        # chunking, any chunk_size => 1 or greater
550
551        my %res = mce_flow_s sub {
552           my ($mce, $chunk_ref, $chunk_id) = @_;
553           my $buf = '';
554           for my $seq (@{ $chunk_ref }) {
555              $buf .= "$seq\n";
556           }
557           MCE->gather($chunk_id, $buf);
558        },
559        [ $beg, $end ];
560
561       The sequence engine can compute 'begin' and 'end' items only, for the
562       chunk, and not the items in between (hence boundaries only). This
563       option applies to sequence only and has no effect when chunk_size
564       equals 1.
565
566       The time to run is 0.006s below. This becomes 0.827s without the
567       bounds_only option due to computing all items in between, thus creating
568       a very large array. Basically, specify bounds_only => 1 when boundaries
569       is all you need for looping inside the block; e.g. Monte Carlo
570       simulations.
571
572       Time was measured using 1 worker to emphasize the difference.
573
574        use MCE::Flow;
575
576        MCE::Flow->init(
577           max_workers => 1, chunk_size => 1_250_000,
578           bounds_only => 1
579        );
580
581        # Typically, the input scalar $_ contains the sequence number
582        # when chunk_size => 1, unless the bounds_only option is set
583        # which is the case here. Thus, $_ points to $chunk_ref.
584
585        mce_flow_s sub {
586           my ($mce, $chunk_ref, $chunk_id) = @_;
587
588           # $chunk_ref contains 2 items, not 1_250_000
589           # my ( $begin, $end ) = ( $_->[0], $_->[1] );
590
591           my $begin = $chunk_ref->[0];
592           my $end   = $chunk_ref->[1];
593
594           # for my $seq ( $begin .. $end ) {
595           #    ...
596           # }
597
598           MCE->printf("%7d .. %8d\n", $begin, $end);
599        },
600        [ 1, 10_000_000 ];
601
602        -- Output
603
604              1 ..  1250000
605        1250001 ..  2500000
606        2500001 ..  3750000
607        3750001 ..  5000000
608        5000001 ..  6250000
609        6250001 ..  7500000
610        7500001 ..  8750000
611        8750001 .. 10000000
612
613       MCE::Flow->run ( { input_data => iterator }, sub { code } )
614       mce_flow { input_data => iterator }, sub { code }
615
616       An iterator reference may be specified for input_data. The only other
617       way is to specify input_data via MCE::Flow->init. This prevents
618       MCE::Flow from configuring the iterator reference as another user task
619       which will not work.
620
621       Iterators are described under section "SYNTAX for INPUT_DATA" at
622       MCE::Core.
623
624        MCE::Flow->init(
625           input_data => iterator
626        );
627
628        mce_flow sub { $_ };
629

GATHERING DATA

631       Unlike MCE::Map where gather and output order are done for you
632       automatically, the gather method is used to have results sent back to
633       the manager process.
634
635        use MCE::Flow chunk_size => 1;
636
637        ## Output order is not guaranteed.
638        my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
639        print "@a1\n\n";
640
641        ## Outputs to a hash instead (key, value).
642        my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
643        print "@h1{1..100}\n\n";
644
645        ## This does the same thing due to chunk_id starting at one.
646        my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
647        print "@h2{1..100}\n\n";
648
649       The gather method may be called multiple times within the block unlike
650       return which would leave the block. Therefore, think of gather as
651       yielding results immediately to the manager process without actually
652       leaving the block.
653
654        use MCE::Flow chunk_size => 1, max_workers => 3;
655
656        my @hosts = qw(
657           hosta hostb hostc hostd hoste
658        );
659
660        my %h3 = mce_flow sub {
661           my ($output, $error, $status); my $host = $_;
662
663           ## Do something with $host;
664           $output = "Worker ". MCE->wid .": Hello from $host";
665
666           if (MCE->chunk_id % 3 == 0) {
667              ## Simulating an error condition
668              local $? = 1; $status = $?;
669              $error = "Error from $host"
670           }
671           else {
672              $status = 0;
673           }
674
675           ## Ensure unique keys (key, value) when gathering to
676           ## a hash.
677           MCE->gather("$host.out", $output);
678           MCE->gather("$host.err", $error) if (defined $error);
679           MCE->gather("$host.sta", $status);
680
681        }, @hosts;
682
683        foreach my $host (@hosts) {
684           print $h3{"$host.out"}, "\n";
685           print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
686           print "Exit status: ", $h3{"$host.sta"}, "\n\n";
687        }
688
689        -- Output
690
691        Worker 3: Hello from hosta
692        Exit status: 0
693
694        Worker 2: Hello from hostb
695        Exit status: 0
696
697        Worker 1: Hello from hostc
698        Error from hostc
699        Exit status: 1
700
701        Worker 3: Hello from hostd
702        Exit status: 0
703
704        Worker 2: Hello from hoste
705        Exit status: 0
706
707       The following uses an anonymous array containing 3 elements when
708       gathering data. Serialization is automatic behind the scene.
709
710        my %h3 = mce_flow sub {
711           ...
712
713           MCE->gather($host, [$output, $error, $status]);
714
715        }, @hosts;
716
717        foreach my $host (@hosts) {
718           print $h3{$host}->[0], "\n";
719           print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
720           print "Exit status: ", $h3{$host}->[2], "\n\n";
721        }
722
723       Although MCE::Map comes to mind, one may want additional control when
724       gathering data such as retaining output order.
725
726        use MCE::Flow;
727
728        sub preserve_order {
729           my %tmp; my $order_id = 1; my $gather_ref = $_[0];
730
731           return sub {
732              $tmp{ (shift) } = \@_;
733
734              while (1) {
735                 last unless exists $tmp{$order_id};
736                 push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
737              }
738
739              return;
740           };
741        }
742
743        ## Workers persist for the most part after running. Though, not always
744        ## the case and depends on Perl. Pass a reference to a subroutine if
745        ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
746
747        MCE::Flow->init(
748           chunk_size => 'auto', max_workers => 'auto'
749        );
750
751        for (1..2) {
752           my @m2;
753
754           mce_flow {
755              gather => preserve_order(\@m2)
756           },
757           sub {
758              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
759
760              ## Compute the entire chunk data at once.
761              push @a, map { $_ * 2 } @{ $chunk_ref };
762
763              ## Afterwards, invoke the gather feature, which
764              ## will direct the data to the callback function.
765              MCE->gather(MCE->chunk_id, @a);
766
767           }, 1..100000;
768
769           print scalar @m2, "\n";
770        }
771
772        MCE::Flow->finish;
773
774       All 6 models support 'auto' for chunk_size unlike the Core API. Think
775       of the models as the basis for providing JIT for MCE. They create the
776       instance, tune max_workers, and tune chunk_size automatically
777       regardless of the hardware.
778
779       The following does the same thing using the Core API. Workers persist
780       after running.
781
782        use MCE;
783
784        sub preserve_order {
785           ...
786        }
787
788        my $mce = MCE->new(
789           max_workers => 'auto', chunk_size => 8000,
790
791           user_func => sub {
792              my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
793
794              ## Compute the entire chunk data at once.
795              push @a, map { $_ * 2 } @{ $chunk_ref };
796
797              ## Afterwards, invoke the gather feature, which
798              ## will direct the data to the callback function.
799              MCE->gather(MCE->chunk_id, @a);
800           }
801        );
802
803        for (1..2) {
804           my @m2;
805
806           $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
807
808           print scalar @m2, "\n";
809        }
810
811        $mce->shutdown;
812

MANUAL SHUTDOWN

814       MCE::Flow->finish
815       MCE::Flow::finish
816
817       Workers remain persistent as much as possible after running. Shutdown
818       occurs automatically when the script terminates. Call finish when
819       workers are no longer needed.
820
821        use MCE::Flow;
822
823        MCE::Flow->init(
824           chunk_size => 20, max_workers => 'auto'
825        );
826
827        mce_flow sub { ... }, 1..100;
828
829        MCE::Flow->finish;
830

INDEX

832       MCE, MCE::Core
833

AUTHOR

835       Mario E. Roy, <marioeroy AT gmail DOT com>
836
837
838
839perl v5.36.0                      2023-01-20                      MCE::Flow(3)
Impressum