1MCE::Core(3)          User Contributed Perl Documentation         MCE::Core(3)
2
3
4

NAME

6       MCE::Core - Documentation describing the core MCE API
7

VERSION

9       This document describes MCE::Core version 1.866
10

SYNOPSIS

12       This is a simplistic use case of MCE running with 5 workers.
13
14        ## Construction using the Core API
15
16        use MCE;
17
18        my $mce = MCE->new(
19           max_workers => 5,
20           user_func => sub {
21              my ($mce) = @_;
22              $mce->say("Hello from " . $mce->wid);
23           }
24        );
25
26        $mce->run;
27
28        ## Construction using a MCE model
29
30        use MCE::Flow max_workers => 5;
31
32        mce_flow sub {
33           my ($mce) = @_;
34           MCE->say("Hello from " . MCE->wid);
35        };
36
37        -- Output
38
39        Hello from 2
40        Hello from 4
41        Hello from 5
42        Hello from 1
43        Hello from 3
44
45   MCE->new ( [ options ] )
46       Below, a new instance is configured with all available options.
47
48        use MCE;
49
50        my $mce = MCE->new(
51
52           max_workers => 8,                   ## Default 1
53
54              # Number of workers to spawn. This can be set automatically
55              # with MCE 1.412 and later releases.
56
57              # MCE 1.521 sets an upper-limit of 8 for 'auto'.
58              # See MCE::Util::get_ncpu for more info.
59
60              # max_workers => 'auto',         ## # of lcores, 8 maximum
61              # max_workers => 'auto-1',       ## 7 on HW with 16-lcores
62              # max_workers => 'auto-1',       ## 3 on HW with  4-lcores
63
64              # max_workers => MCE::Util::get_ncpu, # run on all lcores
65
66           chunk_size => 2000,                 ## Default 1
67
68              # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
69              # The default is 1 when using the Core API and 'auto' for
70              # MCE Models. For arrays or queues, chunk_size means the
71              # number of records per chunk. For iterators, MCE will not
72              # use chunk_size, though the iterator may use it to determine
73              # how much to return per iteration. For files, smaller than or
74              # equal to 8192 is the number of records.  Greater than 8192
75              # is the number of bytes. MCE reads until the end of record
76              # before calling user_func.
77
78              # chunk_size => 1,               ## Consists of 1 record
79              # chunk_size => 1000,            ## Consists of 1000 records
80              # chunk_size => '16k',           ## Approximate 16 kibiBytes (KiB)
81              # chunk_size => '20m',           ## Approximate 20 mebiBytes (MiB)
82
83           tmp_dir => $tmp_dir,                ## Default $MCE::Signal::tmp_dir
84
85              # Default is $MCE::Signal::tmp_dir which points to
86              # $ENV{TEMP} if defined. Otherwise, tmp_dir points
87              # to a location under /tmp.
88
89           freeze => \&encode_sereal,          ## Default \&Storable::freeze
90           thaw   => \&decode_sereal,          ## Default \&Storable::thaw
91
92              # Release 1.412 allows freeze and thaw to be overridden.
93              # Simply include a serialization module prior to loading
94              # MCE. Configure freeze/thaw options.
95
96              # use Sereal qw( encode_sereal decode_sereal );
97              # use CBOR::XS qw( encode_cbor decode_cbor );
98              # use JSON::XS qw( encode_json decode_json );
99              #
100              # use MCE;
101
102           gather => \@a,                      ## Default undef
103
104              # Release 1.5 allows for gathering of data to an array or
105              # hash reference, a MCE::Queue/Thread::Queue object, or code
106              # reference. One invokes gathering by calling the gather
107              # method as often as needed.
108
109              # gather => \@array,
110              # gather => \%hash,
111              # gather => $queue,
112              # gather => \&order,
113
114           init_relay => 0,                    ## Default undef
115
116              # For specifying the initial relay value. Allowed values
117              # are array_ref, hash_ref, or scalar. The MCE::Relay module
118              # is loaded automatically when specified.
119
120              # init_relay => \@array,
121              # init_relay => \%hash,
122              # init_relay => scalar,
123
124           input_data => $input_file,          ## Default undef
125           RS         => "\n>",                ## Default undef
126
127              # input_data => '/path/to/file'  ## Process file
128              # input_data => \@array          ## Process array
129              # input_data => \*FILE_HNDL      ## Process file handle
130              # input_data => $io              ## Process IO::All { File, Pipe, STDIO }
131              # input_data => \$scalar         ## Treated like a file
132              # input_data => \&iterator       ## User specified iterator
133
134              # The RS option (for input record separator) applies to files
135              # and file handles.
136
137              # MCE applies additional logic when RS begins with a newline
138              # character; e.g. RS => "\n>". It trims away characters after
139              # the newline and prepends them to the next record.
140              #
141              # Typically, the left side is what happens for $/ = "\n>".
142              # The right side is what user_func receives.
143              #
144              # All records begin with > and end with \n
145              #    Record 1:  >seq1 ... \n>   (to)   >seq1 ... \n
146              #    Record 2:  seq2  ... \n>          >seq2 ... \n
147              #    Record 3:  seq3  ... \n>          >seq3 ... \n
148              #    Last Rec:  seqN  ... \n           >seqN ... \n
149
150           loop_timeout => 20,                 ## Default 0
151
152              # Added in 1.7, enables the manager process to timeout of a read
153              # operation on channel 0 (UNIX platforms only). The manager process
154              # decrements the total workers running for any worker which have
155              # died in an uncontrollable manner. Specify this option if on
156              # occassion a worker dies unexpectedly (i.e. from an XS module).
157
158              # Option works with init_relay on UNIX platforms since MCE 1.844.
159              # A number smaller than 5 is silently increased to 5.
160
161           max_retries => 2,                   ## Default 0
162
163              # This option, added in 1.7, causes MCE to retry a failed
164              # chunk from a worker dying while processing input data or
165              # sequence of numbers.
166
167           parallel_io => 1,                   ## Default 0
168           posix_exit  => 1,                   ## Default 0
169           use_slurpio => 1,                   ## Default 0
170
171              # The parallel_io option enables parallel reads during large
172              # slurpio, useful when reading from fast storage. Do not enable
173              # parallel_io when running MCE on many nodes with input coming
174              # from shared storage.
175
176              # Set posix_exit to avoid all END and destructor processing.
177              # Constructing MCE inside a thread implies 1 or if present CGI,
178              # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
179              # Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
180
181              # Enable slurpio to pass the raw chunk (scalar ref) to the user
182              # function when reading input files.
183
184           use_threads => 1,                   ## Auto 0 or 1
185
186              # MCE spawns child processes by default, not threads.
187              #
188              # However, MCE supports threads via 2 threading
189              # libraries if threads is desired.
190
191              # The use of threads in MCE requires that you include
192              # threads support prior to loading MCE. The use_threads
193              # option defaults to 1 when a thread library is loaded.
194              # Threads is loaded automatically for $^O eq 'MSWin32'.
195              #
196              #   use threads;                use forks;
197              #   use threads::shared;  (or)  use forks::shared;
198              #   use MCE                     use MCE;
199
200           spawn_delay  => 0.045,              ## Default undef
201           submit_delay => 0.015,              ## Default undef
202           job_delay    => 0.060,              ## Default undef
203
204              # Time to wait in fractional seconds after spawning a worker,
205              # after submitting parameters to worker (MCE->run, MCE->process),
206              # and worker running (one time staggered delay).
207
208              # Specify job_delay to stagger workers connecting to a database.
209
210           on_post_exit => \&on_post_exit,     ## Default undef
211           on_post_run  => \&on_post_run,      ## Default undef
212
213              # Execute the code block after a worker exits or dies.
214              # (i.e. MCE->exit, exit, die)
215
216              # Execute the code block after running.
217              # (i.e. MCE->process, MCE->run)
218
219           progress => sub { ... },            ## Default undef
220
221              # A code block for receiving info on the progress made.
222              # See section labeled "MCE PROGRESS DEMONSTRATIONS" at the
223              # end of this document.
224
225           user_args => { env => 'test' },     ## Default undef
226
227              # MCE release 1.4 added a new parameter to allow one to
228              # specify arbitrary arguments such as a string, an ARRAY
229              # or HASH reference. Workers can access this directly.
230              # (i.e. my $args = $mce->{user_args} or MCE->user_args)
231
232           user_begin => \&user_begin,         ## Default undef
233           user_func => \&user_func,           ## Default undef
234           user_end => \&user_end,             ## Default undef
235
236              # Think of user_begin, user_func, and user_end as in
237              # the awk scripting language:
238              # awk 'BEGIN { begin } { func } { func } ... END { end }'
239
240              # MCE workers call user_begin once at the start of a job,
241              # then user_func repeatedly until no chunks remain.
242              # Afterwards, user_end is called.
243
244           user_error => \&user_error,         ## Default undef
245           user_output => \&user_output,       ## Default undef
246
247              # MCE will forward data to user_error/user_output,
248              # when defined, for the following methods.
249
250              # MCE->sendto(\*STDERR, "sent to user_error\n");
251              # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
252              # MCE->print(\*STDERR, "sent to user_error\n");
253              # MCE->say(\*STDERR, "sent to user_error");
254
255              # MCE->sendto(\*STDOUT, "sent to user_output\n");
256              # MCE->printf("%s\n", "sent to user_output");
257              # MCE->print("sent to user_output\n");
258              # MCE->say("sent to user_output");
259
260           stderr_file => 'err_file',          ## Default STDERR
261           stdout_file => 'out_file',          ## Default STDOUT
262
263              # Or to file; user_error and user_output take precedence.
264
265           flush_file   => 1,                  ## Default 0
266           flush_stderr => 1,                  ## Default 0
267           flush_stdout => 1,                  ## Default 0
268
269              # Flush sendto file, standard error, or standard output.
270
271           interval => {
272               delay => 0.007 [, max_nodes => 4, node_id => 1 ]
273           },
274
275              # For use with the yield method introduced in MCE 1.5.
276              # Both max_nodes & node_id are optional and default to 1.
277              # Delay is the amount of time between intervals.
278
279              # interval => 0.007              ## Shorter; MCE 1.506+
280
281           sequence => {                       ## Default undef
282               begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
283           },
284
285           bounds_only => 1,                   ## Default undef
286
287              # For looping through a sequence of numbers in parallel.
288              # STEP, if omitted, defaults to 1 if BEGIN is smaller than
289              # END or -1 if BEGIN is greater than END. The FORMAT string
290              # is passed to sprintf behind the scene (% may be omitted).
291              # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
292
293              # Do not specify both options; input_data and sequence.
294              # Release 1.4 allows one to specify an array reference.
295              # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
296
297              # The bounds_only => 1 option will compute the 'begin' and
298              # 'end' items only for the chunk and not the items in between
299              # (hence boundaries only). This option has no effect when
300              # sequence is not specified or chunk_size equals 1.
301
302              # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
303
304           task_end => \&task_end,             ## Default undef
305
306              # This is called by the manager process after the task
307              # has completed processing. MCE 1.5 allows this option
308              # to be specified at the top level.
309
310           task_name => 'string',              ## Default 'MCE'
311
312              # Added in MCE 1.5 and mainly beneficial for user_tasks.
313              # One may specify a unique name per each sub-task.
314              # The string is passed as the 3rd arg to task_end.
315
316           user_tasks => [                     ## Default undef
317              { ... },                         ## Options for task 0
318              { ... },                         ## Options for task 1
319              { ... },                         ## Options for task 2
320           ],
321
322              # Takes a list of hash references, each allowing up to 17
323              # options. All other MCE options are ignored. The init_relay,
324              # input_data, RS, and use_slurpio options are applicable to
325              # the first task only.
326
327              # max_workers, chunk_size, input_data, interval, sequence,
328              # bounds_only, user_args, user_begin, user_end, user_func,
329              # gather, task_end, task_name, use_slurpio, use_threads,
330              # init_relay, RS
331
332              # Options not specified here will default to same option
333              # specified at the top level.
334        );
335
336   EXPORT_CONST, CONST
337       There are 3 constants which are exportable. Using the constants in lieu
338       of 0,1,2 makes it more legible when accessing the user_func arguments
339       directly.
340
341       SELF CHUNK CID - MCE CONSTANTS
342
343       Exports SELF => 0, CHUNK => 1, and CID => 2.
344
345        use MCE export_const => 1;
346        use MCE const => 1;                    ## Shorter; MCE 1.415+
347
348        user_func => sub {
349         # my ($mce, $chunk_ref, $chunk_id) = @_;
350           print "Hello from ", $_[SELF]->wid, "\n";
351        }
352
353       MCE 1.5 allows all public method to be called directly.
354
355        use MCE;
356
357        user_func => sub {
358         # my ($mce, $chunk_ref, $chunk_id) = @_;
359           print "Hello from ", MCE->wid, "\n";
360        }
361
362   OVERRIDING DEFAULTS
363       The following list options which may be overridden when loading the
364       module.
365
366        use Sereal qw( encode_sereal decode_sereal );
367        use CBOR::XS qw( encode_cbor decode_cbor );
368        use JSON::XS qw( encode_json decode_json );
369
370        use MCE
371            max_workers => 4,                  ## Default 1
372            chunk_size => 100,                 ## Default 1
373            tmp_dir => "/path/to/app/tmp",     ## $MCE::Signal::tmp_dir
374            freeze => \&encode_sereal,         ## \&Storable::freeze
375            thaw => \&decode_sereal            ## \&Storable::thaw
376        ;
377
378        my $mce = MCE->new( ... );
379
380       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
381       available.  Specify "Sereal => 0" to use Storable instead.
382
383        use MCE Sereal => 0;
384
385   RUNNING
386       Run calls spawn, submits the job; workers call user_begin, user_func,
387       and user_end. Run shuts down workers afterwards. Call spawn whenever
388       the need arises for large data structures prior to running.
389
390        $mce->spawn;                           ## Call early if desired
391
392        $mce->run;                             ## Call run or process below
393
394        ## Acquire data arrays and/or input_files. Workers persist after
395        ## processing.
396
397        $mce->process(\@input_data_1);         ## Process array
398        $mce->process(\@input_data_2);
399        $mce->process(\@input_data_n);
400
401        $mce->process(\%input_hash_1);         ## Process hash, current API
402        $mce->process(\%input_hash_2);         ## available since 1.828
403        $mce->process(\%input_hash_n);
404
405        $mce->process('input_file_1');         ## Process file
406        $mce->process('input_file_2');
407        $mce->process('input_file_n');
408
409        $mce->shutdown;                        ## Shutdown workers
410
411   SYNTAX for ON_POST_EXIT
412       Often times, one may want to capture the exit status. The on_post_exit
413       option, if defined, is executed immediately by the manager process
414       after a worker exits via exit (children only), MCE->exit (children and
415       threads), or die.
416
417       The format of $e->{pid} is PID_123 for children and THR_123 for
418       threads.
419
420        my $restart_flag = 1;
421
422        sub on_post_exit {
423           my ($mce, $e) = @_;
424
425           ## Display all possible hash elements.
426           print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
427
428           ## Restart this worker if desired.
429           if ($restart_flag && $e->{wid} == 2) {
430              $mce->restart_worker;
431              $restart_flag = 0;
432           }
433        }
434
435        sub user_func {
436           my ($mce) = @_;
437           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  ## Args, not necessary
438        }
439
440        my $mce = MCE->new(
441           on_post_exit => \&on_post_exit,
442           user_func => \&user_func,
443           max_workers => 3
444        );
445
446        $mce->run;
447
448        -- Output (child processes)
449
450        2: PID_33223: 0: msg_foo: 1002
451        1: PID_33222: 0: msg_foo: 1001
452        3: PID_33224: 0: msg_foo: 1003
453        2: PID_33225: 0: msg_foo: 1002
454
455        -- Output (running with threads)
456
457        3: TID_3: 0: msg_foo: 1003
458        2: TID_2: 0: msg_foo: 1002
459        1: TID_1: 0: msg_foo: 1001
460        2: TID_4: 0: msg_foo: 1002
461
462   SYNTAX for ON_POST_RUN
463       The on_post_run option, if defined, is executed immediately by the
464       manager process after running MCE->process or MCE->run. This option
465       receives an array reference of hashes.
466
467       The difference between on_post_exit and on_post_run is that the former
468       is called immediately whereas the latter is called after all workers
469       have completed running.
470
471        sub on_post_run {
472           my ($mce, $status_ref) = @_;
473           foreach my $e ( @{ $status_ref } ) {
474              ## Display all possible hash elements.
475              print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
476           }
477        }
478
479        sub user_func {
480           my ($mce) = @_;
481           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  ## Args, not necessary
482        }
483
484        my $mce = MCE->new(
485           on_post_run => \&on_post_run,
486           user_func => \&user_func,
487           max_workers => 3
488        );
489
490        $mce->run;
491
492        -- Output (child processes)
493
494        3: PID_33174: 0: msg_foo: 1003
495        1: PID_33172: 0: msg_foo: 1001
496        2: PID_33173: 0: msg_foo: 1002
497
498        -- Output (running with threads)
499
500        2: TID_2: 0: msg_foo: 1002
501        3: TID_3: 0: msg_foo: 1003
502        1: TID_1: 0: msg_foo: 1001
503
504   SYNTAX for INPUT_DATA
505       MCE supports many ways to specify input_data. Support for iterators was
506       added in MCE 1.505. The RS option allows one to specify the record
507       separator when processing files.
508
509       MCE is a chunking engine. Therefore, chunk_size is applicable to
510       input_data.  Specifying 1 for use_slurpio causes user_func to receive a
511       scalar reference containing the raw data (applicable to files only)
512       instead of an array reference.
513
514       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
515
516        input_data  => '/path/to/file',  ## process file
517        input_data  => \@array,          ## process array
518        input_data  => \%hash,           ## process hash, API since 1.828
519        input_data  => \*FILE_HNDL,      ## process file handle
520        input_data  => $fh,              ## open $fh, "<", "file"
521        input_data  => $fh,              ## IO::File "file", "r"
522        input_data  => $fh,              ## IO::Uncompress::Gunzip "file.gz"
523        input_data  => $io,              ## IO::All { File, Pipe, STDIO }
524        input_data  => \$scalar,         ## treated like a file
525        input_data  => \&iterator,       ## user specified iterator
526
527        chunk_size  => 1,                ## >1 means looping inside user_func
528        use_slurpio => 1,                ## $chunk_ref is a scalar ref
529        RS          => "\n>",            ## input record separator
530
531       The chunk_size value determines the chunking mode to use when
532       processing files.  Otherwise, chunk_size is the number of elements for
533       arrays. For files, a chunk size value of <= 8192 is how many records to
534       read. Greater than 8192 is how many bytes to read. MCE appends (the
535       rest) up to the next record separator.
536
537        chunk_size  => 8192,             ## Consists of 8192 records
538        chunk_size  => 8193,             ## Approximate 8193 bytes for files
539
540        chunk_size  => 1,                ## Consists of 1 record or element
541        chunk_size  => 1000,             ## Consists of 1000 records
542        chunk_size  => '16k',            ## Approximate 16 kibiBytes (KiB)
543        chunk_size  => '20m',            ## Approximate 20 mebiBytes (MiB)
544
545       The construction for user_func when chunk_size > 1 and assuming
546       use_slurpio equals 0.
547
548        user_func => sub {
549           my ($mce, $chunk_ref, $chunk_id) = @_;
550
551           ## $_ is $chunk_ref->[0] when chunk_size equals 1
552           ## $_ is $chunk_ref otherwise; $_ can be used below
553
554           for my $record ( @{ $chunk_ref } ) {
555              print "$chunk_id: $record\n";
556           }
557        }
558
559        # input_data => \%hash
560        # current API available since 1.828
561
562        user_func => sub {
563           my ($mce, $chunk_ref, $chunk_id) = @_;
564
565           ## $_ points to $chunk_ref regardless of chunk_size
566
567           for my $key ( keys %{ $chunk_ref } ) {
568              print "$key: ", $chunk_ref->{$key}, "\n";
569           }
570        }
571
572       Specifying a value for input_data is straight forward for arrays and
573       files.  The next several examples specify an iterator reference for
574       input_data.
575
576        use MCE;
577
578        ## A factory function which creates a closure (the iterator itself)
579        ## for generating a sequence of numbers. The external variables
580        ## ($n, $max, $step) are used for keeping state across successive
581        ## calls to the closure. The iterator simply returns when $n > max.
582
583        sub input_iterator {
584           my ($n, $max, $step) = @_;
585
586           return sub {
587              return if $n > $max;
588
589              my $current = $n;
590              $n += $step;
591
592              return $current;
593           };
594        }
595
596        ## Run user_func in parallel. Input data can be specified during
597        ## the construction or as an argument to the process method.
598
599        my $mce = MCE->new(
600
601         # input_data => input_iterator(10, 30, 2),
602           chunk_size => 1, max_workers => 4,
603
604           user_func => sub {
605              my ($mce, $chunk_ref, $chunk_id) = @_;
606              MCE->print("$_: ", $_ * 2, "\n");
607           }
608
609        )->spawn;
610
611        $mce->process( input_iterator(10, 30, 2) );
612
613        -- Output   Note that output order is not guaranteed
614                    Take a look at iterator.pl for ordered output
615
616        10: 20
617        12: 24
618        16: 32
619        20: 40
620        14: 28
621        22: 44
622        18: 36
623        24: 48
624        26: 52
625        28: 56
626        30: 60
627
628       The following example queries the DB for the next 1000 rows. Notice the
629       use of fetchall_arrayref. The iterator function itself receives one
630       argument which is chunk_size (added in MCE 1.510) to determine how much
631       to return per iteration.  The default is 1 for the Core API and MCE
632       Models.
633
634        use DBI;
635        use MCE;
636
637        sub db_iter {
638
639           my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
640
641           my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
642                     die "Could not connect to database: $DBI::errstr";
643
644           my $sth = $dbh->prepare('select color, desc from table');
645
646           $sth->execute;
647
648           return sub {
649              my ($chunk_size) = @_;
650
651              if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
652                 return @{ $aref };
653              }
654
655              return;
656           };
657        }
658
659        ## Let's enumerate column indexes for easy column retrieval.
660        my ($i_color, $i_desc) = (0 .. 1);
661
662        my $mce = MCE->new(
663           max_workers => 3, chunk_size => 1000,
664           input_data => db_iter(),
665
666           user_func => sub {
667              my ($mce, $chunk_ref, $chunk_id) = @_;
668              my $ret = '';
669
670              foreach my $row (@{ $chunk_ref }) {
671                 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
672              }
673
674              MCE->print($ret);
675           }
676        );
677
678        $mce->run;
679
680       There are many modules on CPAN which return an iterator reference.
681       Showing one such example below. The demonstration ensures MCE workers
682       are spawned before obtaining the iterator. Note the worker_id value
683       (left column) in the output.
684
685        use Path::Iterator::Rule;
686        use MCE;
687
688        my $start_dir = shift
689           or die "Please specify a starting directory";
690
691        -d $start_dir
692           or die "Cannot open ($start_dir): No such file or directory";
693
694        my $mce = MCE->new(
695           max_workers => 'auto',
696           user_func => sub { MCE->say( MCE->wid . ": $_" ) }
697        )->spawn;
698
699        my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
700
701        my $iterator = $rule->iter(
702           $start_dir, { follow_symlinks => 0, depthfirst => 1 }
703        );
704
705        $mce->process( $iterator );
706
707        -- Output
708
709        8: lib/MCE/Core/Input/Generator.pm
710        5: lib/MCE/Core/Input/Handle.pm
711        6: lib/MCE/Core/Input/Iterator.pm
712        2: lib/MCE/Core/Input/Request.pm
713        3: lib/MCE/Core/Manager.pm
714        4: lib/MCE/Core/Input/Sequence.pm
715        7: lib/MCE/Core/Validation.pm
716        1: lib/MCE/Core/Worker.pm
717        8: lib/MCE/Flow.pm
718        5: lib/MCE/Grep.pm
719        6: lib/MCE/Loop.pm
720        2: lib/MCE/Map.pm
721        3: lib/MCE/Queue.pm
722        4: lib/MCE/Signal.pm
723        7: lib/MCE/Stream.pm
724        1: lib/MCE/Subs.pm
725        8: lib/MCE/Util.pm
726        5: lib/MCE.pm
727
728       Although MCE supports arrays, extra measures are needed to use a "lazy"
729       array as input data. The reason for this is that MCE needs the size of
730       the array before processing which may be unknown for lazy arrays.
731       Therefore, closures provides an excellent mechanism for this.
732
733       The code block belonging to the lazy array must return undef after
734       exhausting its input data. Otherwise, the process will never end.
735
736        use Tie::Array::Lazy;
737        use MCE;
738
739        tie my @a, 'Tie::Array::Lazy', [], sub {
740           my $i = $_[0]->index;
741
742           return ($i < 10) ? $i : undef;
743        };
744
745        sub make_iterator {
746           my $i = 0; my $a_ref = shift;
747
748           return sub {
749              return $a_ref->[$i++];
750           };
751        }
752
753        my $mce = MCE->new(
754           max_workers => 4, input_data => make_iterator(\@a),
755
756           user_func => sub {
757              my ($mce, $chunk_ref, $chunk_id) = @_;
758              MCE->say($_);
759           }
760
761        )->run;
762
763        -- Output
764
765        0
766        1
767        2
768        3
769        4
770        6
771        7
772        8
773        5
774        9
775
776       The following demonstrates how to retrieve a chunk from the lazy array
777       per each successive call. Here, undef is sent by the iterator block
778       when $i is greater than $max. Iterators may optionally use chunk_size
779       to determine how much to return per iteration.
780
781        use Tie::Array::Lazy;
782        use MCE;
783
784        tie my @a, 'Tie::Array::Lazy', [], sub {
785           $_[0]->index;
786        };
787
788        sub make_iterator {
789           my $j = 0; my ($a_ref, $max) = @_;
790
791           return sub {
792              my ($chunk_size) = @_;
793              my $i = $j;  $j += $chunk_size;
794
795              return if $i > $max;
796              return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
797           };
798        }
799
800        my $mce = MCE->new(
801           chunk_size => 15, max_workers => 4,
802           input_data => make_iterator(\@a, 100),
803
804           user_func => sub {
805              my ($mce, $chunk_ref, $chunk_id) = @_;
806              MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
807           }
808
809        )->run;
810
811        -- Output
812
813        1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
814        2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
815        3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
816        4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
817        5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
818        6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
819        7: 90 91 92 93 94 95 96 97 98 99 100
820
821   SYNTAX for SEQUENCE
822       The 1.3 release and above allows workers to loop through a sequence of
823       numbers computed mathematically without the overhead of an array. The
824       sequence can be specified separately per each user_task entry unlike
825       input_data which is applicable to the first task only.
826
827       See the seq_demo.pl example, included with this distribution, on
828       applying sequences with the user_tasks option.
829
830       Sequence can be defined using an array or a hash reference.
831
832        use MCE;
833
834        my $mce = MCE->new(
835           max_workers => 3,
836
837         # sequence => [ 10, 19, 0.7, "%4.1f" ],  ## up to 4 options
838
839           sequence => {
840              begin => 10, end => 19, step => 0.7, format => "%4.1f"
841           },
842
843           user_func => sub {
844              my ($mce, $n, $chunk_id) = @_;
845              print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
846           }
847        );
848
849        $mce->run;
850
851        -- Output (sorted afterwards, notice wid and chunk_id in output)
852
853        10.0 from 1 id 1
854        10.7 from 2 id 2
855        11.4 from 3 id 3
856        12.1 from 1 id 4
857        12.8 from 2 id 5
858        13.5 from 3 id 6
859        14.2 from 1 id 7
860        14.9 from 2 id 8
861        15.6 from 3 id 9
862        16.3 from 1 id 10
863        17.0 from 2 id 11
864        17.7 from 3 id 12
865        18.4 from 1 id 13
866
867       The 1.5 release includes a new option (bounds_only). This option tells
868       the sequence engine to compute 'begin' and 'end' items only, for the
869       chunk, and not the items in between (hence boundaries only). This
870       option applies to sequence only and has no effect when chunk_size
871       equals 1.
872
873       The time to run is 0.006s below. This becomes 0.827s without the
874       bounds_only option due to computing all items in between, thus creating
875       a very large array. Basically, specify bounds_only => 1 when boundaries
876       is all you need for looping inside the block; e.g. Monte Carlo
877       simulations.
878
879       Time was measured using 1 worker to emphasize the difference.
880
881        use MCE;
882
883        my $mce = MCE->new(
884           max_workers => 1, chunk_size => 1_250_000,
885
886           sequence => { begin => 1, end => 10_000_000 },
887           bounds_only => 1,
888
889           ## For sequence, the input scalar $_ points to $chunk_ref
890           ## when chunk_size > 1, otherwise $chunk_ref->[0].
891           ##
892           ## user_func => sub {
893           ##    my $begin = $_->[0]; my $end = $_->[-1];
894           ##
895           ##    for ($begin .. $end) {
896           ##       ...
897           ##    }
898           ## },
899
900           user_func => sub {
901              my ($mce, $chunk_ref, $chunk_id) = @_;
902              ## $chunk_ref contains 2 items, not 1_250_000
903
904              my $begin = $chunk_ref->[ 0];
905              my $end   = $chunk_ref->[-1];   ## or $chunk_ref->[1]
906
907              MCE->printf("%7d .. %8d\n", $begin, $end);
908           }
909        );
910
911        $mce->run;
912
913        -- Output
914
915              1 ..  1250000
916        1250001 ..  2500000
917        2500001 ..  3750000
918        3750001 ..  5000000
919        5000001 ..  6250000
920        6250001 ..  7500000
921        7500001 ..  8750000
922        8750001 .. 10000000
923
924   SYNTAX for MAX_RETRIES
925       The max_retries option, added in 1.7, allows MCE to retry a failed
926       chunk from a worker dying while processing input data or a sequence of
927       numbers.
928
929       When max_retries is set, MCE configures the on_post_exit option
930       automatically using the following code before running. Specify
931       on_post_exit explicitly for any further tailoring. The restart_worker
932       line is necessary, obviously.
933
934        on_post_exit => sub {
935           my ( $mce, $e, $retry_cnt ) = @_;
936
937           if ( $e->{id} ) {
938              my $cnt = $retry_cnt + 1;
939              my $msg = "Error: chunk $e->{id} failed";
940
941              if ( defined $mce->{init_relay} ) {
942                 print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
943                    if ( $retry_cnt < $mce->{max_retries} );
944              }
945              else {
946                 ( $retry_cnt < $mce->{max_retries} )
947                    ? print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
948                    : print {*STDERR} "$msg\n";
949              }
950
951              $mce->restart_worker;
952           }
953        }
954
955       We let MCE handle on_post_exit automatically below, which is
956       essentially the same code shown above. For max_retries to work, the
957       worker must die, abnormally included, or call MCE->exit. Notice that we
958       pass the chunk_id value for the 3rd argument to MCE->exit (defaults to
959       chunk_id if omitted since MCE 1.844).
960
961        ## max_retries demonstration
962
963        use strict;
964        use warnings;
965
966        use MCE;
967
968        sub user_func {
969           my ( $mce, $chunk_ref, $chunk_id ) = @_;
970
971           # die "Died : chunk_id = 3\n"  if $chunk_id == 3;
972           MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
973
974           print "$chunk_id\n";
975        }
976
977        my $mce = MCE->new(
978           max_workers => 1,
979           max_retries => 2,
980           user_func   => \&user_func,
981        )->spawn;
982
983        my $input_data = [ 0..7 ];
984
985        $mce->process( { chunk_size => 1 }, $input_data );
986        $mce->shutdown;
987
988        -- Output
989
990        1
991        2
992        Error: chunk 3 failed, retrying chunk attempt # 1
993        Error: chunk 3 failed, retrying chunk attempt # 2
994        Error: chunk 3 failed
995        4
996        5
997        6
998        7
999        8
1000
1001       Orderly output with max_retries is possible since MCE 1.844. Below,
1002       chunk 3 succeeds whereas chunk 5 fails due to exceeding the number of
1003       retries. Be sure to call MCE::relay inside "user_func" and near the end
1004       of the block.
1005
1006        ## max_retries demonstration with init_relay
1007
1008        use strict;
1009        use warnings;
1010
1011        use MCE;
1012        use MCE::Shared;
1013
1014        tie my $retries1, 'MCE::Shared', 0;
1015        tie my $retries2, 'MCE::Shared', 0;
1016
1017        MCE->new(
1018           max_workers  => 4,
1019           input_data   => [ 1..7 ],
1020           chunk_size   => 1,
1021
1022           max_retries  => 2,
1023           init_relay   => 0,
1024
1025           user_func    => sub {
1026              if ( MCE->chunk_id == 3 ) {
1027                 MCE->exit if ++$retries1 <= 2;
1028              }
1029              if ( MCE->chunk_id == 5 ) {
1030                 MCE->exit if ++$retries2 <= 3;
1031              }
1032              MCE::relay {
1033                 $_ += 1;
1034                 print MCE->chunk_id, "\n";
1035              };
1036           }
1037        )->run;
1038
1039        print "final: ", MCE::relay_final(), "\n";
1040
1041        -- Output
1042
1043        1
1044        2
1045        Error: chunk 3 failed, retrying chunk attempt # 1
1046        Error: chunk 5 failed, retrying chunk attempt # 1
1047        Error: chunk 3 failed, retrying chunk attempt # 2
1048        Error: chunk 5 failed, retrying chunk attempt # 2
1049        3
1050        4
1051        Error: chunk 5 failed
1052        6
1053        7
1054        final: 6
1055
1056   SYNTAX for USER_BEGIN and USER_END
1057       The user_begin and user_end options, if specified, behave similarly to
1058       awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
1059       called once per worker during each run.
1060
1061       MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
1062
1063        sub user_begin {                   ## Called once at the beginning
1064           my ($mce, $task_id, $task_name) = @_;
1065           $mce->{wk_total_rows} = 0;
1066        }
1067
1068        sub user_func {                    ## Called while processing
1069           my $mce = shift;
1070           $mce->{wk_total_rows} += 1;
1071        }
1072
1073        sub user_end {                     ## Called once at the end
1074           my ($mce, $task_id, $task_name) = @_;
1075           printf "## %d: Processed %d rows\n",
1076              MCE->wid, $mce->{wk_total_rows};
1077        }
1078
1079        my $mce = MCE->new(
1080           user_begin => \&user_begin,
1081           user_func  => \&user_func,
1082           user_end   => \&user_end
1083        );
1084
1085        $mce->run;
1086
1087   SYNTAX for USER_FUNC with USE_SLURPIO => 0
1088       When processing input data, MCE can pass an array of rows or a slurped
1089       chunk.  Below, a reference to an array containing the chunk data is
1090       processed.
1091
1092       e.g. $chunk_ref = [ record1, record2, record3, ... ]
1093
1094        sub user_func {
1095
1096           my ($mce, $chunk_ref, $chunk_id) = @_;
1097
1098           foreach my $row ( @{ $chunk_ref } ) {
1099              $mce->{wk_total_rows} += 1;
1100              print $row;
1101           }
1102        }
1103
1104        my $mce = MCE->new(
1105           chunk_size  => 100,
1106           input_data  => "/path/to/file",
1107           user_func   => \&user_func,
1108           use_slurpio => 0
1109        );
1110
1111        $mce->run;
1112
1113   SYNTAX for USER_FUNC with USE_SLURPIO => 1
1114       Here, a reference to a scalar containing the raw chunk data is
1115       processed.
1116
1117        sub user_func {
1118
1119           my ($mce, $chunk_ref, $chunk_id) = @_;
1120
1121           my $count = () = $$chunk_ref =~ /abc/;
1122        }
1123
1124        my $mce = MCE->new(
1125           chunk_size  => 16000,
1126           input_data  => "/path/to/file",
1127           user_func   => \&user_func,
1128           use_slurpio => 1
1129        );
1130
1131        $mce->run;
1132
1133   SYNTAX for USER_ERROR and USER_OUTPUT
1134       Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1135       and MCE->say can be intercepted by specifying the user_error and
1136       user_output options. MCE on receiving output will forward to user_error
1137       or user_output in a serialized fashion.
1138
1139       Handy when wanting to filter, modify, and/or direct the output
1140       elsewhere.
1141
1142        sub user_error {                   ## Redirect STDERR to STDOUT
1143           my $error = shift;
1144           print {*STDOUT} $error;
1145        }
1146
1147        sub user_output {                  ## Redirect STDOUT to STDERR
1148           my $output = shift;
1149           print {*STDERR} $output;
1150        }
1151
1152        sub user_func {
1153           my ($mce, $chunk_ref, $chunk_id) = @_;
1154           my $count = 0;
1155
1156           foreach my $row ( @{ $chunk_ref } ) {
1157              MCE->print($row);
1158              $count += 1;
1159           }
1160
1161           MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1162        }
1163
1164        my $mce = MCE->new(
1165           chunk_size  => 1000,
1166           input_data  => "/path/to/file",
1167           user_error  => \&user_error,
1168           user_output => \&user_output,
1169           user_func   => \&user_func
1170        );
1171
1172        $mce->run;
1173
1174   SYNTAX for USER_TASKS and TASK_END
1175       This option takes an array of tasks. Each task allows up to 17 options.
1176       The init_relay, input_data, RS, and use_slurpio options may be defined
1177       inside the first task or at the top level, otherwise ignored under
1178       other sub-tasks.
1179
1180        max_workers, chunk_size, input_data, interval, sequence,
1181        bounds_only, user_args, user_begin, user_end, user_func,
1182        gather, task_end, task_name, use_slurpio, use_threads,
1183        init_relay, RS
1184
1185       Sequence and chunk_size were added in 1.3. User_args was introduced in
1186       1.4.  Name and input_data are new options allowed in 1.5. In addition,
1187       one can specify task_end at the top level. Task_end also receives 2
1188       additional arguments $task_id and $task_name (shown below).
1189
1190       Options not specified here will default to the same option specified at
1191       the top level. The task_end option is called by the manager process
1192       when all workers for that sub-task have completed processing.
1193
1194       Forking and threading can be intermixed among tasks unless running
1195       Cygwin.  The run method will continue running until all workers have
1196       completed processing.
1197
1198        use threads;
1199        use threads::shared;
1200
1201        use MCE;
1202
1203        sub parallel_task1 { sleep 2; }
1204        sub parallel_task2 { sleep 1; }
1205
1206        my $mce = MCE->new(
1207
1208           task_end => sub {
1209              my ($mce, $task_id, $task_name) = @_;
1210              print "Task [$task_id -- $task_name] completed processing\n";
1211           },
1212
1213           user_tasks => [{
1214              task_name   => 'foo',
1215              max_workers => 2,
1216              user_func   => \&parallel_task1,
1217              use_threads => 0             ## Not using threads
1218
1219           },{
1220              task_name   => 'bar',
1221              max_workers => 4,
1222              user_func   => \&parallel_task2,
1223              use_threads => 1             ## Yes, threads
1224
1225           }]
1226        );
1227
1228        $mce->run;
1229
1230        -- Output
1231
1232        Task [1 -- bar] completed processing
1233        Task [0 -- foo] completed processing
1234

DEFAULT INPUT SCALAR

1236       Beginning with MCE 1.5, the input scalar $_ is localized prior to
1237       calling user_func for input_data and sequence of numbers. The following
1238       applies.
1239
1240       use_slurpio => 1
1241           $_ is a reference to the buffer e.g. $_ = \$_buffer;
1242           $_ is a reference regardless of whether chunk_size is 1 or greater
1243
1244           user_func => sub {
1245            # my ($mce, $chunk_ref, $chunk_id) = @_;
1246              print ${ $_ };    ## $_ is same as $chunk_ref
1247           }
1248
1249       chunk_size is greater than 1, use_slurpio => 0
1250           $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1251           $_ is same as $chunk_ref or $_[CHUNK]
1252
1253           user_func => sub {
1254            # my ($mce, $chunk_ref, $chunk_id) = @_;
1255              for my $row ( @{ $_ } ) {
1256                 print $row, "\n";
1257              }
1258           }
1259
1260           use MCE const => 1;
1261
1262           user_func => sub {
1263            # my ($mce, $chunk_ref, $chunk_id) = @_;
1264              for my $row ( @{ $_[CHUNK] } ) {
1265                 print $row, "\n";
1266              }
1267           }
1268
1269       chunk_size equals 1, use_slurpio => 0
1270           $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1271
1272           ## Note that $_ and $chunk_ref are not the same below.
1273           ## $chunk_ref is a reference to an array.
1274
1275           user_func => sub {
1276            # my ($mce, $chunk_ref, $chunk_id) = @_;
1277              print $_, "\n;    ## Same as $chunk_ref->[0];
1278           }
1279
1280           $mce->foreach("/path/to/file", sub {
1281            # my ($mce, $chunk_ref, $chunk_id) = @_;
1282              print $_;         ## Same as $chunk_ref->[0];
1283           });
1284
1285           ## However, that is not the case for the forseq method.
1286           ## Both $_ and $n_seq are the same when chunk_size => 1.
1287
1288           $mce->forseq([ 1, 9 ], sub {
1289            # my ($mce, $n_seq, $chunk_id) = @_;
1290              print $_, "\n";   ## Same as $n_seq
1291           });
1292
1293          Sequence can also be specified using an array reference. The below
1294          is the same as the example afterwards.
1295
1296           $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1297
1298          The code block receives an array containing the next 5 sequences.
1299          Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1300          to an array, same as $_, due to chunk_size being greater than 1.
1301
1302           $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1303            # my ($mce, $n_seq, $chunk_id) = @_;
1304              my @result;
1305              for my $n ( @{ $_ } ) {
1306                 ... do work, append to result for 5
1307              }
1308              ... do something with result afterwards
1309           });
1310

METHODS for the MANAGER PROCESS and WORKERS

1312       The methods listed below are callable by the main process and workers.
1313
1314   MCE->abort ( void )
1315   $mce->abort ( void )
1316       The 'abort' method is applicable when processing input_data only. This
1317       causes all workers to abort after processing the current chunk.
1318
1319       Workers write the next offset position to the queue socket for the next
1320       available worker. In essence, the 'abort' method writes the last offset
1321       position. Workers, on requesting the next offset position, will think
1322       the end of input_data has been reached and leave the chunking loop.
1323
1324        MCE->abort;
1325        $mce->abort;
1326
1327   MCE->chunk_id ( void )
1328   $mce->chunk_id ( void )
1329       Returns the chunk_id for the current chunk. The value starts at 1.
1330       Chunking applies to input_data or sequence. The value is 0 for the
1331       manager process.
1332
1333        my $chunk_id = MCE->chunk_id;
1334        my $chunk_id = $mce->chunk_id;
1335
1336   MCE->chunk_size ( void )
1337   $mce->chunk_size ( void )
1338       Getter method for chunk_size used by MCE.
1339
1340        my $chunk_size = MCE->chunk_size;
1341        my $chunk_size = $mce->chunk_size;
1342
1343   MCE->do ( 'callback_func' [, $arg1, ... ] )
1344   $mce->do ( 'callback_func' [, $arg1, ... ] )
1345       MCE serializes data transfers from a worker process via helper
1346       functions do & sendto to the manager process. The callback function can
1347       optionally return a reply. Support for calling by the manager process
1348       was enabled in MCE 1.839.
1349
1350        [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1351
1352       Passing args to a callback function using references & scalar.
1353
1354        sub callback {
1355           my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1356           ...
1357        }
1358
1359        MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1360        MCE->do('callback', \@a, \%h, \$s, 'foo');
1361
1362       MCE knows if wanting a void, list, hash, or a scalar return value.
1363
1364        MCE->do('callback' [, $arg1, ... ]);
1365
1366        my @array  = MCE->do('callback' [, $arg1, ... ]);
1367        my %hash   = MCE->do('callback' [, $arg1, ... ]);
1368        my $scalar = MCE->do('callback' [, $arg1, ... ]);
1369
1370   MCE->freeze ( $object_ref )
1371   $mce->freeze ( $object_ref )
1372       Calls the internal freeze method to serialize an object. The default
1373       serialization routines are handled by Sereal if available or Storable.
1374
1375        my $frozen = MCE->freeze([ 0, 2, 4 ]);
1376        my $frozen = $mce->freeze([ 0, 2, 4 ]);
1377
1378   MCE->max_retries ( void )
1379   $mce->max_retries ( void )
1380       Getter method for max_retries used by MCE.
1381
1382        my $max_retries = MCE->max_retries;
1383        my $max_retries = $mce->max_retries;
1384
1385   MCE->max_workers ( void )
1386   $mce->max_workers ( void )
1387       Getter method for max_workers used by MCE.
1388
1389        my $max_workers = MCE->max_workers;
1390        my $max_workers = $mce->max_workers;
1391
1392   MCE->pid ( void )
1393   $mce->pid ( void )
1394       Returns the Process ID. Threads have thread ID attached to the value.
1395
1396        my $pid = MCE->pid;    ## 16180 (pid) ; 16180.2 (pid.tid)
1397        my $pid = $mce->pid;
1398
1399   MCE->printf ( $format, $list [, ... ] )
1400   MCE->print ( $list [, ... ] )
1401   MCE->say ( $list [, ... ] )
1402   $mce->printf ( $format, $list [, ... ] )
1403   $mce->print ( $list [, ... ] )
1404   $mce->say ( $list [, ... ] )
1405       Use the printf, print, and say methods when wanting to serialize output
1406       among workers and the manager process. These are sugar syntax for the
1407       sendto method.  These behave similar to the native subroutines in Perl
1408       with the exception that barewords must be passed as a reference and
1409       require the comma after it including file handles.
1410
1411       Say is like print, but implicitly appends a newline.
1412
1413        MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1414        MCE->printf($fh, "%s: %d\n", $name, $age);
1415        MCE->printf("%s: %d\n", $name, $age);
1416
1417        MCE->print(\*STDERR, "$error_msg\n");
1418        MCE->print($fh, $log_msg."\n");
1419        MCE->print("$output_msg\n");
1420
1421        MCE->say(\*STDERR, $error_msg);
1422        MCE->say($fh, $log_msg);
1423        MCE->say($output_msg);
1424
1425       Caveat: Use the following syntax when passing a reference not a glob or
1426       file handle. Otherwise, MCE will error indicating the first argument is
1427       not a glob reference.
1428
1429        MCE->print(\*STDOUT, \@array, "\n");
1430        MCE->print("", \@array, "\n");         ## ok
1431
1432       Sending to "IO::All" { File, Pipe, STDIO } is supported since MCE
1433       1.845.
1434
1435        use IO::All;
1436
1437        my $out = io->stdout;
1438        my $err = io->stderr;
1439
1440        MCE->printf($out, "%s\n", "sent to stdout");
1441        MCE->printf($err, "%s\n", "sent to stderr");
1442
1443        MCE->print($out, "sent to stdout\n");
1444        MCE->print($err, "sent to stderr\n");
1445
1446        MCE->say($out, "sent to stdout");
1447        MCE->say($err, "sent to stderr");
1448
1449   MCE->sess_dir ( void )
1450   $mce->sess_dir ( void )
1451       Returns the session directory used by the MCE instance. This is defined
1452       during spawning and removed during shutdown.
1453
1454        my $sess_dir = MCE->sess_dir;
1455        my $sess_dir = $mce->sess_dir;
1456
1457   MCE->task_id ( void )
1458   $mce->task_id ( void )
1459       Returns the task ID. This applies to the user_tasks option (starts at
1460       0).
1461
1462        my $task_id = MCE->task_id;
1463        my $task_id = $mce->task_id;
1464
1465   MCE->task_name ( void )
1466   $mce->task_name ( void )
1467       Returns the task_name value specified via the task_name option when
1468       configuring MCE.
1469
1470        my $task_name = MCE->task_name;
1471        my $task_name = $mce->task_name;
1472
1473   MCE->task_wid ( void )
1474   $mce->task_wid ( void )
1475       Returns the task worker ID (applies to user_tasks). The value starts at
1476       1 per each task configured within user_tasks. The value is 0 for the
1477       manager process.
1478
1479        my $task_wid = MCE->task_wid;
1480        my $task_wid = $mce->task_wid;
1481
1482   MCE->thaw ( $frozen )
1483   $mce->thaw ( $frozen )
1484       Calls the internal thaw method to un-serialize the frozen object.
1485
1486        my $object_ref = MCE->thaw($frozen);
1487        my $object_ref = $mce->thaw($frozen);
1488
1489   MCE->tmp_dir ( void )
1490   $mce->tmp_dir ( void )
1491       Returns the temporary directory used by MCE.
1492
1493        my $tmp_dir = MCE->tmp_dir;
1494        my $tmp_dir = $mce->tmp_dir;
1495
1496   MCE->user_args ( void )
1497   $mce->user_args ( void )
1498       Returns the arguments specified via the user_args option.
1499
1500        my ($arg1, $arg2, $arg3) = MCE->user_args;
1501        my ($arg1, $arg2, $arg3) = $mce->user_args;
1502
1503   MCE->wid ( void )
1504   $mce->wid ( void )
1505       Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1506       is 0 for the manager process.
1507
1508        my $wid = MCE->wid;
1509        my $wid = $mce->wid;
1510

METHODS for the MANAGER PROCESS only

1512       Methods listed below are callable by the main process only.
1513
1514   MCE->forchunk ( $input_data [, { options } ], sub { ... } )
1515   MCE->foreach ( $input_data [, { options } ], sub { ... } )
1516   MCE->forseq ( $sequence_spec [, { options } ], sub { ... } )
1517   $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1518   $mce->foreach ( $input_data [, { options } ], sub { ... } )
1519   $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1520       Forchunk, foreach, and forseq are sugar methods and described in
1521       MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1522
1523   MCE->process ( $input_data [, { options } ] )
1524   $mce->process ( $input_data [, { options } ] )
1525       The process method will spawn workers automatically if not already
1526       spawned.  It will set input_data => $input_data. It calls run(0) to not
1527       auto-shutdown workers. Specifying options is optional.
1528
1529       Allowable options { key => value, ... } are:
1530
1531        chunk_size input_data job_delay spawn_delay submit_delay
1532        flush_file flush_stderr flush_stdout stderr_file stdout_file
1533        on_post_exit on_post_run sequence user_args user_begin user_end
1534        user_func user_error user_output use_slurpio RS
1535
1536       Options remain persistent going forward unless changed. Setting
1537       user_begin, user_end, or user_func will cause already spawned workers
1538       to shut down and re-spawn automatically. Therefore, define these during
1539       instantiation.
1540
1541       The below will cause workers to re-spawn after running.
1542
1543        my $mce = MCE->new( max_workers => 'auto' );
1544
1545        $mce->process( {
1546           user_begin => sub { ## connect to DB },
1547           user_func  => sub { ## process each row },
1548           user_end   => sub { ## close handle to DB },
1549        }, \@input_data );
1550
1551        $mce->process( {
1552           user_begin => sub { ## connect to DB },
1553           user_func  => sub { ## process each file },
1554           user_end   => sub { ## close handle to DB },
1555        }, "/list/of/files" );
1556
1557       Do the following if wanting workers to persist between jobs.
1558
1559        use MCE max_workers => 'auto';
1560
1561        my $mce = MCE->new(
1562           user_begin => sub { ## connect to DB },
1563           user_func  => sub { ## process each chunk or row or host },
1564           user_end   => sub { ## close handle to DB },
1565        );
1566
1567        $mce->spawn;           ## Spawn early if desired
1568
1569        $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1570        $mce->process(\@array_of_files_to_grep);
1571        $mce->process("/path/to/host/list");
1572
1573        $mce->process($array_ref);
1574        $mce->process($array_ref, { stdout_file => $output_file });
1575
1576        ## This was not allowed before. Fixed in 1.415.
1577        $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1578        $mce->process({ sequence => [ 10, 90, 2 ] });
1579
1580        $mce->shutdown;
1581
1582   MCE->relay_final ( void )
1583   $mce->relay_final ( void )
1584       The relay methods are described in MCE::Relay. Relay capabilities are
1585       enabled by specifying the "init_relay" MCE option.
1586
1587   MCE->restart_worker ( void )
1588   $mce->restart_worker ( void )
1589       One can restart a worker who has died or exited. The job never ends
1590       below due to restarting each time. Recommended is to call MCE->exit or
1591       $mce->exit instead of the native exit function for better handling,
1592       especially under the Windows environment.
1593
1594       The $e->{wid} argument is no longer necessary starting with the 1.5
1595       release.
1596
1597       Press [ctrl-c] to terminate the script.
1598
1599        my $mce = MCE->new(
1600
1601           on_post_exit => sub {
1602              my ($mce, $e) = @_;
1603              print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1604            # $mce->restart_worker($e->{wid});    ## MCE-1.415 and below
1605              $mce->restart_worker;               ## MCE-1.500 and above
1606           },
1607
1608           user_begin => sub {
1609              my ($mce, $task_id, $task_name) = @_;
1610              ## Not interested in die messages going to STDERR,
1611              ## because the die handler calls MCE->exit(255, $_[0]).
1612              close STDERR;
1613           },
1614
1615           user_tasks => [{
1616              max_workers => 5,
1617              user_func => sub {
1618                 my ($mce) = @_; sleep MCE->wid;
1619                 MCE->exit(3, "exited from " . MCE->wid . "\n");
1620              }
1621           },{
1622              max_workers => 4,
1623              user_func => sub {
1624                 my ($mce) = @_; sleep MCE->wid;
1625                 die("died from " . MCE->wid . "\n");
1626              }
1627           }]
1628        );
1629
1630        $mce->run;
1631
1632        -- Output
1633
1634        1: PID_85388: status 3: exited from 1
1635        2: PID_85389: status 3: exited from 2
1636        1: PID_85397: status 3: exited from 1
1637        3: PID_85390: status 3: exited from 3
1638        1: PID_85399: status 3: exited from 1
1639        4: PID_85391: status 3: exited from 4
1640        2: PID_85398: status 3: exited from 2
1641        1: PID_85401: status 3: exited from 1
1642        5: PID_85392: status 3: exited from 5
1643        1: PID_85404: status 3: exited from 1
1644        6: PID_85393: status 255: died from 6
1645        3: PID_85400: status 3: exited from 3
1646        2: PID_85403: status 3: exited from 2
1647        1: PID_85406: status 3: exited from 1
1648        7: PID_85394: status 255: died from 7
1649        1: PID_85410: status 3: exited from 1
1650        8: PID_85395: status 255: died from 8
1651        4: PID_85402: status 3: exited from 4
1652        2: PID_85409: status 3: exited from 2
1653        1: PID_85412: status 3: exited from 1
1654        9: PID_85396: status 255: died from 9
1655        3: PID_85408: status 3: exited from 3
1656        1: PID_85416: status 3: exited from 1
1657
1658        ...
1659
1660   MCE->run ( [ $auto_shutdown [, { options } ] ] )
1661   $mce->run ( [ $auto_shutdown [, { options } ] ] )
1662       The run method, by default, spawns workers, processes once, and shuts
1663       down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1664       persist after running (default 1).
1665
1666       Specifying options is optional. Valid options are the same as for the
1667       process method.
1668
1669        my $mce = MCE->new( ... );
1670
1671        ## Disables auto-shutdown
1672        $mce->run(0);
1673
1674   MCE->send ( $data_ref )
1675   $mce->send ( $data_ref )
1676       The 'send' method is useful when wanting to spawn workers early to
1677       minimize memory consumption and afterwards send data individually to
1678       each worker. One cannot send more than the total workers spawned.
1679       Workers store the received data as $mce->{user_data}.
1680
1681       The data which can be sent is restricted to an ARRAY, HASH, or PDL
1682       reference.  Workers begin processing immediately after receiving data.
1683       Workers set $mce->{user_data} to undef after processing. One cannot
1684       specify input_data, sequence, or user_tasks when using the "send"
1685       method.
1686
1687       Passing any options e.g. run(0, { options }) is ignored due to workers
1688       running immediately after receiving user data. There is no guarantee to
1689       which worker will receive data first. It depends on which worker is
1690       available awaiting data.
1691
1692        use MCE;
1693
1694        my $mce = MCE->new(
1695           max_workers => 5,
1696
1697           user_func => sub {
1698              my ($mce) = @_;
1699              my $data = $mce->{user_data};
1700              my $first_name = $data->{first_name};
1701              print MCE->wid, ": Hello from $first_name\n";
1702           }
1703        );
1704
1705        $mce->spawn;     ## Optional, send will spawn if necessary.
1706
1707        $mce->send( { first_name => "Theresa" } );
1708        $mce->send( { first_name => "Francis" } );
1709        $mce->send( { first_name => "Padre"   } );
1710        $mce->send( { first_name => "Anthony" } );
1711
1712        $mce->run;       ## Wait for workers to complete processing.
1713
1714        -- Output
1715
1716        2: Hello from Theresa
1717        5: Hello from Anthony
1718        3: Hello from Francis
1719        4: Hello from Padre
1720
1721   MCE->shutdown ( void )
1722   $mce->shutdown ( void )
1723       The run method will automatically spawn workers, run once, and shutdown
1724       workers automatically. Workers persist after running below. Shutdown
1725       may be called as needed or prior to exiting.
1726
1727        my $mce = MCE->new( ... );
1728
1729        $mce->spawn;
1730
1731        $mce->process(\@input_data_1);         ## Processing multiple arrays
1732        $mce->process(\@input_data_2);
1733        $mce->process(\@input_data_n);
1734
1735        $mce->shutdown;
1736
1737        $mce->process('input_file_1');         ## Processing multiple files
1738        $mce->process('input_file_2');
1739        $mce->process('input_file_n');
1740
1741        $mce->shutdown;
1742
1743   MCE->spawn ( void )
1744   $mce->spawn ( void )
1745       Workers are normally spawned automatically. The spawn method allows one
1746       to spawn workers early if so desired.
1747
1748        my $mce = MCE->new( ... );
1749
1750        $mce->spawn;
1751
1752   MCE->status ( void )
1753   $mce->status ( void )
1754       The greatest exit status is saved among workers while running. Look at
1755       the on_post_exit or on_post_run options for callback support.
1756
1757        my $mce = MCE->new( ... );
1758
1759        $mce->run;
1760
1761        my $exit_status = $mce->status;
1762

METHODS for WORKERS only

1764       Methods listed below are callable by workers only.
1765
1766   MCE->exit ( [ $status [, $message [, $id ] ] ] )
1767   $mce->exit ( [ $status [, $message [, $id ] ] ] )
1768       A worker exits from MCE entirely. $id (optional) can be used for
1769       passing the primary key or a string along with the message. Look at the
1770       on_post_exit or on_post_run options for callback support.
1771
1772        MCE->exit;           ## default 0
1773        MCE->exit(1);
1774        MCE->exit(2, 'chunk failed', $chunk_id);
1775        MCE->exit(0, 'msg_foo', 'id_1000');
1776
1777   MCE->gather ( $arg1, [, $arg2, ... ] )
1778   $mce->gather ( $arg1, [, $arg2, ... ] )
1779       A worker can submit data to the location specified via the gather
1780       option by calling this method. See MCE::Flow and MCE::Loop for
1781       additional use-case.
1782
1783        use MCE;
1784
1785        my @hosts = qw(
1786           hosta hostb hostc hostd hoste
1787        );
1788
1789        my $mce = MCE->new(
1790           chunk_size => 1, max_workers => 3,
1791
1792           user_func => sub {
1793            # my ($mce, $chunk_ref, $chunk_id) = @_;
1794              my ($output, $error, $status); my $host = $_;
1795
1796              ## Do something with $host;
1797              $output = "Worker ". MCE->wid .": Hello from $host";
1798
1799              if (MCE->chunk_id % 3 == 0) {
1800                 ## Simulating an error condition
1801                 local $? = 1; $status = $?;
1802                 $error = "Error from $host"
1803              }
1804              else {
1805                 $status = 0;
1806              }
1807
1808              ## Ensure unique keys (key, value) when gathering to a
1809              ## hash.
1810              MCE->gather("$host.out", $output, "$host.sta", $status);
1811              MCE->gather("$host.err", $error) if (defined $error);
1812           }
1813        );
1814
1815        my %h; $mce->process(\@hosts, { gather => \%h });
1816
1817        foreach my $host (@hosts) {
1818           print $h{"$host.out"}, "\n";
1819           print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1820           print "Exit status: ", $h{"$host.sta"}, "\n\n";
1821        }
1822
1823        -- Output
1824
1825        Worker 2: Hello from hosta
1826        Exit status: 0
1827
1828        Worker 1: Hello from hostb
1829        Exit status: 0
1830
1831        Worker 3: Hello from hostc
1832        Error from hostc
1833        Exit status: 1
1834
1835        Worker 2: Hello from hostd
1836        Exit status: 0
1837
1838        Worker 1: Hello from hoste
1839        Exit status: 0
1840
1841   MCE->last ( void )
1842   $mce->last ( void )
1843       Worker leaves the chunking loop or user_func block immediately.
1844       Callable from inside foreach, forchunk, forseq, and user_func.
1845
1846        use MCE;
1847
1848        my $mce = MCE->new(
1849           max_workers => 5
1850        );
1851
1852        my @list = (1 .. 80);
1853
1854        $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1855
1856           my ($mce, $chunk_ref, $chunk_id) = @_;
1857           MCE->last if ($chunk_id > 4);
1858
1859           my @output = ();
1860
1861           foreach my $rec ( @{ $chunk_ref } ) {
1862              push @output, $rec, "\n";
1863           }
1864
1865           MCE->print(@output);
1866        });
1867
1868        -- Output (each chunk above consists of 2 elements)
1869
1870        3
1871        4
1872        1
1873        2
1874        7
1875        8
1876        5
1877        6
1878
1879   MCE->next ( void )
1880   $mce->next ( void )
1881       Worker starts the next iteration of the chunking loop. Callable from
1882       inside foreach, forchunk, forseq, and user_func.
1883
1884        use MCE;
1885
1886        my $mce = MCE->new(
1887           max_workers => 5
1888        );
1889
1890        my @list = (1 .. 80);
1891
1892        $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1893
1894           my ($mce, $chunk_ref, $chunk_id) = @_;
1895           MCE->next if ($chunk_id < 20);
1896
1897           my @output = ();
1898
1899           foreach my $rec ( @{ $chunk_ref } ) {
1900              push @output, $rec, "\n";
1901           }
1902
1903           MCE->print(@output);
1904        });
1905
1906        -- Output (each chunk above consists of 4 elements)
1907
1908        77
1909        78
1910        79
1911        80
1912
1913   MCE::relay { code }
1914   MCE->relay ( sub { code } )
1915   MCE->relay_recv ( void )
1916   $mce->relay ( sub { code } )
1917   $mce->relay_recv ( void )
1918       The relay methods are described in MCE::Relay. Relay capabilities are
1919       enabled by specifying the "init_relay" MCE option.
1920
1921   MCE->sendto ( $to, $arg1, ... )
1922   $mce->sendto ( $to, $arg1, ... )
1923       The sendto method is called by workers for serializing data to standard
1924       output, standard error, or end of file. The action is done by the
1925       manager process.
1926
1927       Release 1.00x supported 1 data argument, not more.
1928
1929        MCE->sendto('file', \@array, '/path/to/file');
1930        MCE->sendto('file', \$scalar, '/path/to/file');
1931        MCE->sendto('file', $scalar, '/path/to/file');
1932
1933        MCE->sendto('STDERR', \@array);
1934        MCE->sendto('STDERR', \$scalar);
1935        MCE->sendto('STDERR', $scalar);
1936
1937        MCE->sendto('STDOUT', \@array);
1938        MCE->sendto('STDOUT', \$scalar);
1939        MCE->sendto('STDOUT', $scalar);
1940
1941       Release 1.100 added the ability to pass multiple arguments. Notice the
1942       syntax change for sending to a file. Passing a reference to an array is
1943       no longer necessary.
1944
1945        MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1946        MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1947        MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1948
1949        MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1950
1951       To retain 1.00x compatibility, sendto outputs the content when a single
1952       data reference is specified. Otherwise, the reference for \@array or
1953       \$scalar is shown in 1.500, not the content.
1954
1955        MCE->sendto('STDERR', \@array);        ## 1.00x behavior, content
1956        MCE->sendto('STDOUT', \$scalar);
1957        MCE->sendto('file:/path/to/file', \@array);
1958
1959        ## Output matches the print statement
1960
1961        MCE->sendto(\*STDERR, \@array);        ## 1.500 behavior, reference
1962        MCE->sendto(\*STDOUT, \$scalar);
1963        MCE->sendto($fh, \@array);
1964
1965        MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1966        print {*STDOUT} \@array, "\n", \$scalar, "\n";
1967
1968       MCE 1.500 added support for sending to a glob reference, file
1969       descriptor, and file handle.
1970
1971        MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1972        MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1973        MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1974
1975   MCE->sync ( void )
1976   $mce->sync ( void )
1977       A barrier sync operation means any worker must stop at this point until
1978       all workers reach this barrier. Barrier syncing is useful for many
1979       computer algorithms.
1980
1981       Barrier synchronization is supported for task 0 only or omitting
1982       user_tasks.  All workers assigned task_id 0 must call sync whenever
1983       barrier syncing.
1984
1985        use MCE;
1986
1987        sub user_func {
1988
1989           my ($mce) = @_;
1990           my $wid = MCE->wid;
1991
1992           MCE->sendto("STDOUT", "a: $wid\n");   ## MCE 1.0+
1993           MCE->sync;
1994
1995           MCE->sendto(\*STDOUT, "b: $wid\n");   ## MCE 1.5+
1996           MCE->sync;
1997
1998           MCE->print("c: $wid\n");              ## MCE 1.5+
1999           MCE->sync;
2000
2001           return;
2002        }
2003
2004        my $mce = MCE->new(
2005           max_workers => 4, user_func => \&user_func
2006        )->run;
2007
2008        -- Output (without barrier synchronization)
2009
2010        a: 1
2011        a: 2
2012        b: 1
2013        b: 2
2014        c: 1
2015        c: 2
2016        a: 3
2017        b: 3
2018        c: 3
2019        a: 4
2020        b: 4
2021        c: 4
2022
2023        -- Output (with barrier synchronization)
2024
2025        a: 1
2026        a: 2
2027        a: 4
2028        a: 3
2029        b: 2
2030        b: 1
2031        b: 3
2032        b: 4
2033        c: 1
2034        c: 4
2035        c: 2
2036        c: 3
2037
2038       Consider the following example. The MCE->sync operation is done inside
2039       a loop along with MCE->do. A stall may occur for workers calling sync
2040       the 2nd or 3rd time while other workers are sending results via MCE->do
2041       or MCE->sendto.
2042
2043       It requires another semaphore lock in MCE to solve this which was not
2044       done in order to keep resources low. Therefore, please keep this in
2045       mind when mixing MCE->sync with MCE->do or output serialization methods
2046       inside a loop.
2047
2048        sub user_func {
2049
2050           my ($mce) = @_;
2051           my @result;
2052
2053           for (1 .. 3) {
2054              ... compute algorithm ...
2055
2056              MCE->sync;
2057
2058              ... compute algorithm ...
2059
2060              MCE->sync;
2061
2062              MCE->do('aggregate_result', \@result);  ## or MCE->sendto
2063
2064              MCE->sync;      ## The sync operation is also needed here to
2065                              ## prevent MCE from stalling.
2066           }
2067        }
2068
2069   MCE->yield ( void )
2070   $mce->yield ( void )
2071       There may be on occasion when the MCE driven app is too fast. The
2072       interval option combined with the yield method, both introduced with
2073       MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
2074       the design.
2075
2076       A use case is an app configured with 100 workers running on a 24
2077       logical way box. Data is polled from a database containing over 2.5
2078       million rows. Workers chunk away at 300 rows per chunk performing SNMP
2079       gets (300 sockets per worker) polling 25 metrics from each device. With
2080       this scenario, the load on the box may rise beyond 90+. In addition,
2081       IP_Tables may reach its contention point causing the entire application
2082       to fail.
2083
2084       The scenario above is solved by simply having workers yield among
2085       themselves in a synchronized fashion. A delay of 0.007 seconds between
2086       intervals is all that's needed. The load on the box will hover between
2087       23 ~ 27 for the duration of the run. Polling completes in under 17
2088       minutes time. This is quite fast considering the app polls 62.5 million
2089       metrics combined. The math equates to 3,676,470 per minute or rather
2090       61,275 per second from a single box.
2091
2092        ## Both max_nodes and node_id are optional (default 1).
2093
2094        interval => {
2095           delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
2096        }
2097
2098       A 4 node setup can poll 10 million devices without the additional
2099       overhead of a distribution agent. The difference between the 4 nodes
2100       are simply node_id and the where clause used to query the database. The
2101       mac addresses are random such that the data divides equally to any
2102       power of 2. The distribution key lies in the mac address itself. In
2103       fact, the 2nd character from the right is sufficient for maximizing on
2104       the power of randomness for equal distribution.
2105
2106        Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
2107        Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
2108        Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
2109        Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
2110
2111       Below, the user_tasks is configured to simulate 4 nodes. This
2112       demonstration uses 2 workers to minimize the output size. Input is from
2113       the sequence option.
2114
2115        use Time::HiRes qw(time);
2116        use MCE;
2117
2118        my $d = shift || 0.1;
2119
2120        local $| = 1;
2121
2122        sub create_task {
2123
2124           my ($node_id) = @_;
2125
2126           my $seq_size  = 6;
2127           my $seq_start = ($node_id - 1) * $seq_size + 1;
2128           my $seq_end   = $seq_start + $seq_size - 1;
2129
2130           return {
2131              max_workers => 2, sequence => [ $seq_start, $seq_end ],
2132              interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2133           };
2134        }
2135
2136        sub user_begin {
2137
2138           my ($mce, $task_id, $task_name) = @_;
2139
2140           ## The yield method causes this worker to wait for its next time
2141           ## interval slot before running. Yield has no effect without the
2142           ## 'interval' option.
2143
2144           ## Yielding is beneficial inside a user_begin block. A use case
2145           ## is staggering database connections among workers in order
2146           ## to not impact the DB server.
2147
2148           MCE->yield;
2149
2150           MCE->printf(
2151              "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2152              MCE->task_id + 1, time, MCE->task_wid, ''
2153           );
2154
2155           return;
2156        }
2157
2158        {
2159           my $prev_time = time;
2160
2161           sub user_func {
2162
2163              my ($mce, $seq_n, $chunk_id) = @_;
2164
2165              ## Yield simply waits for the next time interval.
2166              MCE->yield;
2167
2168              ## Calculate how long this worker has waited.
2169              my $curr_time = time;
2170              my $time_waited = $curr_time - $prev_time;
2171
2172              $prev_time = $curr_time;
2173
2174              MCE->printf(
2175                 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2176                 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2177              );
2178
2179              return;
2180           }
2181        }
2182
2183        ## Simulate a 4 node environment passing node_id to create_task.
2184
2185        print "Node_ID  Current_Time        Worker_ID  Time_Waited     Comment\n";
2186
2187        MCE->new(
2188           user_begin => \&user_begin,
2189           user_func  => \&user_func,
2190
2191           user_tasks => [
2192              create_task(1),
2193              create_task(2),
2194              create_task(3),
2195              create_task(4)
2196           ]
2197
2198        )->run;
2199
2200        -- Output (notice Current_Time below, stays 0.10 apart)
2201
2202        Node_ID  Current_Time        Worker_ID  Time_Waited     Comment
2203        Node  1: 1374807976.74634 -- Worker  1:              -- Started
2204        Node  2: 1374807976.84634 -- Worker  1:              -- Started
2205        Node  3: 1374807976.94638 -- Worker  1:              -- Started
2206        Node  4: 1374807977.04639 -- Worker  1:              -- Started
2207        Node  1: 1374807977.14634 -- Worker  2:              -- Started
2208        Node  2: 1374807977.24640 -- Worker  2:              -- Started
2209        Node  3: 1374807977.34649 -- Worker  2:              -- Started
2210        Node  4: 1374807977.44657 -- Worker  2:              -- Started
2211        Node  1: 1374807977.54636 -- Worker  1:      0.90037 -- Seq_N   1
2212        Node  2: 1374807977.64638 -- Worker  1:      1.00040 -- Seq_N   7
2213        Node  3: 1374807977.74642 -- Worker  1:      1.10043 -- Seq_N  13
2214        Node  4: 1374807977.84643 -- Worker  1:      1.20045 -- Seq_N  19
2215        Node  1: 1374807977.94636 -- Worker  2:      1.30037 -- Seq_N   2
2216        Node  2: 1374807978.04638 -- Worker  2:      1.40040 -- Seq_N   8
2217        Node  3: 1374807978.14641 -- Worker  2:      1.50042 -- Seq_N  14
2218        Node  4: 1374807978.24644 -- Worker  2:      1.60045 -- Seq_N  20
2219        Node  1: 1374807978.34628 -- Worker  1:      0.79996 -- Seq_N   3
2220        Node  2: 1374807978.44631 -- Worker  1:      0.79996 -- Seq_N   9
2221        Node  3: 1374807978.54634 -- Worker  1:      0.79996 -- Seq_N  15
2222        Node  4: 1374807978.64636 -- Worker  1:      0.79997 -- Seq_N  21
2223        Node  1: 1374807978.74628 -- Worker  2:      0.79996 -- Seq_N   4
2224        Node  2: 1374807978.84632 -- Worker  2:      0.79997 -- Seq_N  10
2225        Node  3: 1374807978.94634 -- Worker  2:      0.79996 -- Seq_N  16
2226        Node  4: 1374807979.04636 -- Worker  2:      0.79996 -- Seq_N  22
2227        Node  1: 1374807979.14628 -- Worker  1:      0.80001 -- Seq_N   5
2228        Node  2: 1374807979.24631 -- Worker  1:      0.80000 -- Seq_N  11
2229        Node  3: 1374807979.34634 -- Worker  1:      0.80001 -- Seq_N  17
2230        Node  4: 1374807979.44636 -- Worker  1:      0.80000 -- Seq_N  23
2231        Node  1: 1374807979.54628 -- Worker  2:      0.80000 -- Seq_N   6
2232        Node  2: 1374807979.64631 -- Worker  2:      0.80000 -- Seq_N  12
2233        Node  3: 1374807979.74633 -- Worker  2:      0.80000 -- Seq_N  18
2234        Node  4: 1374807979.84636 -- Worker  2:      0.80000 -- Seq_N  24
2235
2236       The interval.pl example above is included with MCE.
2237

MCE PROGRESS DEMONSTRATIONS

2239       The "progress" option takes a code block for receiving info on the
2240       progress made while processing input data; e.g. "input_data" or
2241       "sequence". To make this work, one provides the "progress" option a
2242       closure block like so, passing along the size of the input_data; e.g
2243       "scalar @array" or "-s /path/to/file".
2244
2245       Current API available since 1.813.
2246
2247       A worker, upon completing processing its chunk, notifies the manager-
2248       process with the size of the chunk. That could be the number of rows or
2249       literally the size of the chunk when processing an input file. The
2250       manager-process accumulates the size before calling the code block
2251       associated with the "progress" option.
2252
2253       When running many tasks simultaneously, via "user_tasks", the call is
2254       initiated by workers at level 0 only or rather the first task, not
2255       shown here.
2256
2257        use Time::HiRes 'sleep';
2258        use MCE;
2259
2260        sub make_progress {
2261           my ($total_size) = @_;
2262           return sub {
2263              my ($completed_size) = @_;
2264              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2265           };
2266        }
2267
2268        my @input = (1..150);
2269
2270        MCE->new(
2271           chunk_size  => 10,
2272           max_workers => 4,
2273           input_data  => \@input,
2274           progress    => make_progress( scalar @input ),
2275           user_func   => sub { sleep 1.5 }
2276        )->run();
2277
2278        -- Output
2279
2280        6.7%
2281        13.3%
2282        20.0%
2283        26.7%
2284        33.3%
2285        40.0%
2286        46.7%
2287        53.3%
2288        60.0%
2289        66.7%
2290        73.3%
2291        80.0%
2292        86.7%
2293        93.3%
2294        100.0%
2295
2296       Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2297       thing, practically.
2298
2299        use Time::HiRes 'sleep';
2300        use ProgressBar::Stack;
2301        use MCE::Flow;
2302
2303        sub make_progress {
2304           my ($total_size) = @_;
2305           init_progress();
2306           return sub {
2307              my ($completed_size) = @_;
2308              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2309           };
2310        }
2311
2312        my @input = (1..150);
2313
2314        MCE::Flow->init(
2315           chunk_size  => 10,
2316           max_workers => 4,
2317           progress    => make_progress( scalar @input )
2318        );
2319
2320        MCE::Flow->run( sub { sleep 1.5 }, \@input );
2321        MCE::Flow->finish();
2322
2323        print "\n";
2324
2325        -- Output
2326
2327        [################    ]  80.0% ETA: 0:01
2328
2329       For sequence of numbers, using the "sequence" option, one must account
2330       for "step_size", typically set to 1 automatically.
2331
2332        use Time::HiRes 'sleep';
2333        use MCE;
2334
2335        sub make_progress {
2336           my ($total_size) = @_;
2337           return sub {
2338              my ($completed_size) = @_;
2339              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2340           };
2341        }
2342
2343        MCE->new(
2344           chunk_size  => 10,
2345           max_workers => 4,
2346           sequence    => [ 1, 100, 2 ],
2347           progress    => make_progress( int( 100 / 2 + 0.5 ) ),
2348           user_func   => sub { sleep 1.5 }
2349        )->run();
2350
2351        -- Output
2352
2353        20.0%
2354        40.0%
2355        60.0%
2356        80.0%
2357        100.0%
2358
2359       Changing "chunk_size" to 1 means workers notify the manager process
2360       more often, thus increasing granularity. Take a look at the output.
2361
2362        2.0%
2363        4.0%
2364        6.0%
2365        8.0%
2366        10.0%
2367        ...
2368        92.0%
2369        94.0%
2370        96.0%
2371        98.0%
2372        100.0%
2373
2374       Here is the same thing using MCE::Flow together with
2375       ProgressBar::Stack.
2376
2377        use Time::HiRes 'sleep';
2378        use ProgressBar::Stack;
2379        use MCE::Flow;
2380
2381        sub make_progress {
2382           my ($total_size) = @_;
2383           init_progress();
2384           return sub {
2385              my ($completed_size) = @_;
2386              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2387           };
2388        }
2389
2390        MCE::Flow->init(
2391           chunk_size  => 1,
2392           max_workers => 4,
2393           progress    => make_progress( int( 100 / 2 + 0.5 ) )
2394        );
2395
2396        MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2397        MCE::Flow->finish();
2398
2399        print "\n";
2400
2401        -- Output
2402
2403        [#########           ]  48.0% ETA: 0:03
2404
2405       For files and file handles, workers send the actual size of the data
2406       read versus counting rows.
2407
2408        use Time::HiRes 'sleep';
2409        use MCE;
2410
2411        sub make_progress {
2412           my ($total_size) = @_;
2413           return sub {
2414              my ($completed_size) = @_;
2415              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2416           };
2417        }
2418
2419        my $input_file = "/path/to/input_file.txt";
2420
2421        MCE->new(
2422           chunk_size  => 5,
2423           max_workers => 4,
2424           input_data  => $input_file,
2425           progress    => make_progress( -s $input_file ),
2426           user_func   => sub { sleep 0.03 }
2427        )->run();
2428
2429       For consistency, here is the example using MCE::Flow, again with
2430       ProgressBar::Stack.
2431
2432        use Time::HiRes 'sleep';
2433        use ProgressBar::Stack;
2434        use MCE::Flow;
2435
2436        sub make_progress {
2437           my ($total_size) = @_;
2438           init_progress();
2439           return sub {
2440              my ($completed_size) = @_;
2441              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2442           };
2443        }
2444
2445        my $input_file = "/path/to/input_file.txt";
2446
2447        MCE::Flow->init(
2448           chunk_size  => 5,
2449           max_workers => 4,
2450           progress    => make_progress( -s $input_file )
2451        );
2452
2453        MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2454        MCE::Flow->finish();
2455
2456       The next demonstration processes three arrays consecutively. For this
2457       one, MCE workers persist after running. This needs MCE 1.814 or later
2458       to run. Otherwise, the progress output is not shown in MCE 1.813.
2459
2460        use Time::HiRes 'sleep';
2461        use ProgressBar::Stack;
2462        use MCE;
2463
2464        sub make_progress {
2465           my ($total_size, $message) = @_;
2466           init_progress();
2467           return sub {
2468              my ($completed_size) = @_;
2469              update_progress(
2470                 sprintf("%0.1f", $completed_size / $total_size * 100),
2471                 $message
2472              );
2473           };
2474        }
2475
2476        my $mce = MCE->new(
2477           chunk_size  => 10,
2478           max_workers => 4,
2479           user_func   => sub { sleep 0.5 }
2480        )->spawn();
2481
2482        my @a1 = ( 1 .. 200 );
2483        my @a2 = ( 1 .. 500 );
2484        my @a3 = ( 1 .. 300 );
2485
2486        $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2487
2488        print "\n";
2489
2490        $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2491
2492        print "\n";
2493
2494        $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2495
2496        print "\n";
2497
2498        $mce->shutdown;
2499
2500        -- Output
2501
2502        [####################] 100.0% ETA: 0:00 array 1
2503        [####################] 100.0% ETA: 0:00 array 2
2504        [####################] 100.0% ETA: 0:00 array 3
2505
2506       When size is not know, such as reading from "STDIN", the only thing one
2507       can do is report the size completed thus far.
2508
2509        # 1 kibibyte equals 1024 bytes
2510
2511        progress => sub {
2512           my ($completed_size) = @_;
2513           printf "%0.1f kibibytes\n", $completed_size / 1024;
2514        }
2515

SEE ALSO

2517       ·  MCE::Examples
2518

INDEX

2520       MCE
2521

AUTHOR

2523       Mario E. Roy, <marioeroy AT gmail DOT com>
2524
2525
2526
2527perl v5.30.1                      2020-02-09                      MCE::Core(3)
Impressum