1MCE::Core(3)          User Contributed Perl Documentation         MCE::Core(3)
2
3
4

NAME

6       MCE::Core - Documentation describing the core MCE API
7

VERSION

9       This document describes MCE::Core version 1.874
10

SYNOPSIS

12       This is a simplistic use case of MCE running with 5 workers.
13
14        ## Construction using the Core API
15
16        use MCE;
17
18        my $mce = MCE->new(
19           max_workers => 5,
20           user_func => sub {
21              my ($mce) = @_;
22              $mce->say("Hello from " . $mce->wid);
23           }
24        );
25
26        $mce->run;
27
28        ## Construction using a MCE model
29
30        use MCE::Flow max_workers => 5;
31
32        mce_flow sub {
33           my ($mce) = @_;
34           MCE->say("Hello from " . MCE->wid);
35        };
36
37        -- Output
38
39        Hello from 2
40        Hello from 4
41        Hello from 5
42        Hello from 1
43        Hello from 3
44
45   MCE->new ( [ options ] )
46       Below, a new instance is configured with all available options.
47
48        use MCE;
49
50        my $mce = MCE->new(
51
52           max_workers => 8,                   ## Default 1
53
54              # Number of workers to spawn. This can be set automatically
55              # with MCE 1.412 and later releases.
56
57              # MCE 1.521 sets an upper-limit of 8 for 'auto'.
58              # See MCE::Util::get_ncpu for more info.
59
60              # max_workers => 'auto',         ## # of lcores, 8 maximum
61              # max_workers => 'auto-1',       ## 7 on HW with 16-lcores
62              # max_workers => 'auto-1',       ## 3 on HW with  4-lcores
63
64              # max_workers => MCE::Util::get_ncpu, # run on all lcores
65
66           chunk_size => 2000,                 ## Default 1
67
68              # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
69              # The default is 1 when using the Core API and 'auto' for
70              # MCE Models. For arrays or queues, chunk_size means the
71              # number of records per chunk. For iterators, MCE will not
72              # use chunk_size, though the iterator may use it to determine
73              # how much to return per iteration. For files, smaller than or
74              # equal to 8192 is the number of records.  Greater than 8192
75              # is the number of bytes. MCE reads until the end of record
76              # before calling user_func.
77
78              # chunk_size => 1,               ## Consists of 1 record
79              # chunk_size => 1000,            ## Consists of 1000 records
80              # chunk_size => '16k',           ## Approximate 16 kibiBytes (KiB)
81              # chunk_size => '20m',           ## Approximate 20 mebiBytes (MiB)
82
83           tmp_dir => $tmp_dir,                ## Default $MCE::Signal::tmp_dir
84
85              # Default is $MCE::Signal::tmp_dir which points to
86              # $ENV{TEMP} if defined. Otherwise, tmp_dir points
87              # to a location under /tmp.
88
89           freeze => \&encode_sereal,          ## Default \&Storable::freeze
90           thaw   => \&decode_sereal,          ## Default \&Storable::thaw
91
92              # Release 1.412 allows freeze and thaw to be overridden.
93              # Simply include a serialization module prior to loading
94              # MCE. Configure freeze/thaw options.
95
96              # use Sereal qw( encode_sereal decode_sereal );
97              # use CBOR::XS qw( encode_cbor decode_cbor );
98              # use JSON::XS qw( encode_json decode_json );
99              #
100              # use MCE;
101
102           gather => \@a,                      ## Default undef
103
104              # Release 1.5 allows for gathering of data to an array or
105              # hash reference, a MCE::Queue/Thread::Queue object, or code
106              # reference. One invokes gathering by calling the gather
107              # method as often as needed.
108
109              # gather => \@array,
110              # gather => \%hash,
111              # gather => $queue,
112              # gather => \&order,
113
114           init_relay => 0,                    ## Default undef
115
116              # For specifying the initial relay value. Allowed values
117              # are array_ref, hash_ref, or scalar. The MCE::Relay module
118              # is loaded automatically when specified.
119
120              # init_relay => \@array,
121              # init_relay => \%hash,
122              # init_relay => scalar,
123
124           input_data => $input_file,          ## Default undef
125           RS         => "\n>",                ## Default undef
126
127              # input_data => '/path/to/file'  ## Process file
128              # input_data => \@array          ## Process array
129              # input_data => \*FILE_HNDL      ## Process file handle
130              # input_data => $io              ## Process IO::All { File, Pipe, STDIO }
131              # input_data => \$scalar         ## Treated like a file
132              # input_data => \&iterator       ## User specified iterator
133
134              # The RS option (for input record separator) applies to files
135              # and file handles.
136
137              # MCE applies additional logic when RS begins with a newline
138              # character; e.g. RS => "\n>". It trims away characters after
139              # the newline and prepends them to the next record.
140              #
141              # Typically, the left side is what happens for $/ = "\n>".
142              # The right side is what user_func receives.
143              #
144              # All records begin with > and end with \n
145              #    Record 1:  >seq1 ... \n>   (to)   >seq1 ... \n
146              #    Record 2:  seq2  ... \n>          >seq2 ... \n
147              #    Record 3:  seq3  ... \n>          >seq3 ... \n
148              #    Last Rec:  seqN  ... \n           >seqN ... \n
149
150           loop_timeout => 20,                 ## Default 0
151
152              # Added in 1.7, enables the manager process to timeout of a read
153              # operation on channel 0 (UNIX platforms only). The manager process
154              # decrements the total workers running for any worker which have
155              # died in an uncontrollable manner. Specify this option if on
156              # occassion a worker dies unexpectedly (i.e. from an XS module).
157
158              # Option works with init_relay on UNIX platforms since MCE 1.844.
159              # A number smaller than 5 is silently increased to 5.
160
161           max_retries => 2,                   ## Default 0
162
163              # This option, added in 1.7, causes MCE to retry a failed
164              # chunk from a worker dying while processing input data or
165              # sequence of numbers.
166
167           parallel_io => 1,                   ## Default 0
168           posix_exit  => 1,                   ## Default 0
169           use_slurpio => 1,                   ## Default 0
170
171              # The parallel_io option enables parallel reads during large
172              # slurpio, useful when reading from fast storage. Do not enable
173              # parallel_io when running MCE on many nodes with input coming
174              # from shared storage.
175
176              # Set posix_exit to avoid all END and destructor processing.
177              # Constructing MCE inside a thread implies 1 or if present CGI,
178              # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
179              # Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
180
181              # Enable slurpio to pass the raw chunk (scalar ref) to the user
182              # function when reading input files.
183
184           use_threads => 1,                   ## Auto 0 or 1
185
186              # By default MCE spawns child processes on UNIX platforms and
187              # threads on Windows (i.e. $^O eq 'MSWin32').
188
189              # MCE supports threads via two threading libraries if threads
190              # is preferred over child processes. The use of threads requires
191              # a thread library prior to loading MCE, causing the use_threads
192              # option to default to 1. Specify 0 for child processes.
193              #
194              #   use threads;              use forks;
195              #   use threads::shared;      use forks::shared;
196              #   use MCE              (or) use MCE;           (or) use MCE;
197
198           spawn_delay  => 0.045,              ## Default undef
199           submit_delay => 0.015,              ## Default undef
200           job_delay    => 0.060,              ## Default undef
201
202              # Time to wait in fractional seconds after spawning a worker,
203              # after submitting parameters to worker (MCE->run, MCE->process),
204              # and worker running (one time staggered delay).
205
206              # Specify job_delay to stagger workers connecting to a database.
207
208           on_post_exit => \&on_post_exit,     ## Default undef
209           on_post_run  => \&on_post_run,      ## Default undef
210
211              # Execute the code block after a worker exits or dies.
212              # (i.e. MCE->exit, exit, die)
213
214              # Execute the code block after running.
215              # (i.e. MCE->process, MCE->run)
216
217           progress => sub { ... },            ## Default undef
218
219              # A code block for receiving info on the progress made.
220              # See section labeled "MCE PROGRESS DEMONSTRATIONS" at the
221              # end of this document.
222
223           user_args => { env => 'test' },     ## Default undef
224
225              # MCE release 1.4 added a new parameter to allow one to
226              # specify arbitrary arguments such as a string, an ARRAY
227              # or HASH reference. Workers can access this directly.
228              # (i.e. my $args = $mce->{user_args} or MCE->user_args)
229
230           user_begin => \&user_begin,         ## Default undef
231           user_func => \&user_func,           ## Default undef
232           user_end => \&user_end,             ## Default undef
233
234              # Think of user_begin, user_func, and user_end as in
235              # the awk scripting language:
236              # awk 'BEGIN { begin } { func } { func } ... END { end }'
237
238              # MCE workers call user_begin once at the start of a job,
239              # then user_func repeatedly until no chunks remain.
240              # Afterwards, user_end is called.
241
242           user_error => \&user_error,         ## Default undef
243           user_output => \&user_output,       ## Default undef
244
245              # MCE will forward data to user_error/user_output,
246              # when defined, for the following methods.
247
248              # MCE->sendto(\*STDERR, "sent to user_error\n");
249              # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
250              # MCE->print(\*STDERR, "sent to user_error\n");
251              # MCE->say(\*STDERR, "sent to user_error");
252
253              # MCE->sendto(\*STDOUT, "sent to user_output\n");
254              # MCE->printf("%s\n", "sent to user_output");
255              # MCE->print("sent to user_output\n");
256              # MCE->say("sent to user_output");
257
258           stderr_file => 'err_file',          ## Default STDERR
259           stdout_file => 'out_file',          ## Default STDOUT
260
261              # Or to file; user_error and user_output take precedence.
262
263           flush_file   => 0,                  ## Default 1
264           flush_stderr => 0,                  ## Default 1
265           flush_stdout => 0,                  ## Default 1
266
267              # Flush sendto file, standard error, or standard output.
268
269           interval => {
270               delay => 0.007 [, max_nodes => 4, node_id => 1 ]
271           },
272
273              # For use with the yield method introduced in MCE 1.5.
274              # Both max_nodes & node_id are optional and default to 1.
275              # Delay is the amount of time between intervals.
276
277              # interval => 0.007              ## Shorter; MCE 1.506+
278
279           sequence => {                       ## Default undef
280               begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
281           },
282
283           bounds_only => 1,                   ## Default undef
284
285              # For looping through a sequence of numbers in parallel.
286              # STEP, if omitted, defaults to 1 if BEGIN is smaller than
287              # END or -1 if BEGIN is greater than END. The FORMAT string
288              # is passed to sprintf behind the scene (% may be omitted).
289              # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
290
291              # Do not specify both options; input_data and sequence.
292              # Release 1.4 allows one to specify an array reference.
293              # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
294
295              # The bounds_only => 1 option will compute the 'begin' and
296              # 'end' items only for the chunk and not the items in between
297              # (hence boundaries only). This option has no effect when
298              # sequence is not specified or chunk_size equals 1.
299
300              # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
301
302           task_end => \&task_end,             ## Default undef
303
304              # This is called by the manager process after the task
305              # has completed processing. MCE 1.5 allows this option
306              # to be specified at the top level.
307
308           task_name => 'string',              ## Default 'MCE'
309
310              # Added in MCE 1.5 and mainly beneficial for user_tasks.
311              # One may specify a unique name per each sub-task.
312              # The string is passed as the 3rd arg to task_end.
313
314           user_tasks => [                     ## Default undef
315              { ... },                         ## Options for task 0
316              { ... },                         ## Options for task 1
317              { ... },                         ## Options for task 2
318           ],
319
320              # Takes a list of hash references, each allowing up to 17
321              # options. All other MCE options are ignored. The init_relay,
322              # input_data, RS, and use_slurpio options are applicable to
323              # the first task only.
324
325              # max_workers, chunk_size, input_data, interval, sequence,
326              # bounds_only, user_args, user_begin, user_end, user_func,
327              # gather, task_end, task_name, use_slurpio, use_threads,
328              # init_relay, RS
329
330              # Options not specified here will default to same option
331              # specified at the top level.
332        );
333
334   EXPORT_CONST, CONST
335       There are 3 constants which are exportable. Using the constants in lieu
336       of 0,1,2 makes it more legible when accessing the user_func arguments
337       directly.
338
339       SELF CHUNK CID - MCE CONSTANTS
340
341       Exports SELF => 0, CHUNK => 1, and CID => 2.
342
343        use MCE export_const => 1;
344        use MCE const => 1;                    ## Shorter; MCE 1.415+
345
346        user_func => sub {
347         # my ($mce, $chunk_ref, $chunk_id) = @_;
348           print "Hello from ", $_[SELF]->wid, "\n";
349        }
350
351       MCE 1.5 allows all public method to be called directly.
352
353        use MCE;
354
355        user_func => sub {
356         # my ($mce, $chunk_ref, $chunk_id) = @_;
357           print "Hello from ", MCE->wid, "\n";
358        }
359
360   OVERRIDING DEFAULTS
361       The following list options which may be overridden when loading the
362       module.
363
364        use Sereal qw( encode_sereal decode_sereal );
365        use CBOR::XS qw( encode_cbor decode_cbor );
366        use JSON::XS qw( encode_json decode_json );
367
368        use MCE
369            max_workers => 4,                  ## Default 1
370            chunk_size => 100,                 ## Default 1
371            tmp_dir => "/path/to/app/tmp",     ## $MCE::Signal::tmp_dir
372            freeze => \&encode_sereal,         ## \&Storable::freeze
373            thaw => \&decode_sereal            ## \&Storable::thaw
374        ;
375
376        my $mce = MCE->new( ... );
377
378       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
379       available.  Specify "Sereal => 0" to use Storable instead.
380
381        use MCE Sereal => 0;
382
383   RUNNING
384       Run calls spawn, submits the job; workers call user_begin, user_func,
385       and user_end. Run shuts down workers afterwards. Call spawn whenever
386       the need arises for large data structures prior to running.
387
388        $mce->spawn;                           ## Call early if desired
389
390        $mce->run;                             ## Call run or process below
391
392        ## Acquire data arrays and/or input_files. Workers persist after
393        ## processing.
394
395        $mce->process(\@input_data_1);         ## Process array
396        $mce->process(\@input_data_2);
397        $mce->process(\@input_data_n);
398
399        $mce->process(\%input_hash_1);         ## Process hash, current API
400        $mce->process(\%input_hash_2);         ## available since 1.828
401        $mce->process(\%input_hash_n);
402
403        $mce->process('input_file_1');         ## Process file
404        $mce->process('input_file_2');
405        $mce->process('input_file_n');
406
407        $mce->shutdown;                        ## Shutdown workers
408
409   SYNTAX for ON_POST_EXIT
410       Often times, one may want to capture the exit status. The on_post_exit
411       option, if defined, is executed immediately by the manager process
412       after a worker exits via exit (children only), MCE->exit (children and
413       threads), or die.
414
415       The format of $e->{pid} is PID_123 for children and THR_123 for
416       threads.
417
418        my $restart_flag = 1;
419
420        sub on_post_exit {
421           my ($mce, $e) = @_;
422
423           ## Display all possible hash elements.
424           print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
425
426           ## Restart this worker if desired.
427           if ($restart_flag && $e->{wid} == 2) {
428              $mce->restart_worker;
429              $restart_flag = 0;
430           }
431        }
432
433        sub user_func {
434           my ($mce) = @_;
435           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  ## Args, not necessary
436        }
437
438        my $mce = MCE->new(
439           on_post_exit => \&on_post_exit,
440           user_func => \&user_func,
441           max_workers => 3
442        );
443
444        $mce->run;
445
446        -- Output (child processes)
447
448        2: PID_33223: 0: msg_foo: 1002
449        1: PID_33222: 0: msg_foo: 1001
450        3: PID_33224: 0: msg_foo: 1003
451        2: PID_33225: 0: msg_foo: 1002
452
453        -- Output (running with threads)
454
455        3: TID_3: 0: msg_foo: 1003
456        2: TID_2: 0: msg_foo: 1002
457        1: TID_1: 0: msg_foo: 1001
458        2: TID_4: 0: msg_foo: 1002
459
460   SYNTAX for ON_POST_RUN
461       The on_post_run option, if defined, is executed immediately by the
462       manager process after running MCE->process or MCE->run. This option
463       receives an array reference of hashes.
464
465       The difference between on_post_exit and on_post_run is that the former
466       is called immediately whereas the latter is called after all workers
467       have completed running.
468
469        sub on_post_run {
470           my ($mce, $status_ref) = @_;
471           foreach my $e ( @{ $status_ref } ) {
472              ## Display all possible hash elements.
473              print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
474           }
475        }
476
477        sub user_func {
478           my ($mce) = @_;
479           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  ## Args, not necessary
480        }
481
482        my $mce = MCE->new(
483           on_post_run => \&on_post_run,
484           user_func => \&user_func,
485           max_workers => 3
486        );
487
488        $mce->run;
489
490        -- Output (child processes)
491
492        3: PID_33174: 0: msg_foo: 1003
493        1: PID_33172: 0: msg_foo: 1001
494        2: PID_33173: 0: msg_foo: 1002
495
496        -- Output (running with threads)
497
498        2: TID_2: 0: msg_foo: 1002
499        3: TID_3: 0: msg_foo: 1003
500        1: TID_1: 0: msg_foo: 1001
501
502   SYNTAX for INPUT_DATA
503       MCE supports many ways to specify input_data. Support for iterators was
504       added in MCE 1.505. The RS option allows one to specify the record
505       separator when processing files.
506
507       MCE is a chunking engine. Therefore, chunk_size is applicable to
508       input_data.  Specifying 1 for use_slurpio causes user_func to receive a
509       scalar reference containing the raw data (applicable to files only)
510       instead of an array reference.
511
512       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
513
514        input_data  => '/path/to/file',  ## process file
515        input_data  => \@array,          ## process array
516        input_data  => \%hash,           ## process hash, API since 1.828
517        input_data  => \*FILE_HNDL,      ## process file handle
518        input_data  => $fh,              ## open $fh, "<", "file"
519        input_data  => $fh,              ## IO::File "file", "r"
520        input_data  => $fh,              ## IO::Uncompress::Gunzip "file.gz"
521        input_data  => $io,              ## IO::All { File, Pipe, STDIO }
522        input_data  => \$scalar,         ## treated like a file
523        input_data  => \&iterator,       ## user specified iterator
524
525        chunk_size  => 1,                ## >1 means looping inside user_func
526        use_slurpio => 1,                ## $chunk_ref is a scalar ref
527        RS          => "\n>",            ## input record separator
528
529       The chunk_size value determines the chunking mode to use when
530       processing files.  Otherwise, chunk_size is the number of elements for
531       arrays. For files, a chunk size value of <= 8192 is how many records to
532       read. Greater than 8192 is how many bytes to read. MCE appends (the
533       rest) up to the next record separator.
534
535        chunk_size  => 8192,             ## Consists of 8192 records
536        chunk_size  => 8193,             ## Approximate 8193 bytes for files
537
538        chunk_size  => 1,                ## Consists of 1 record or element
539        chunk_size  => 1000,             ## Consists of 1000 records
540        chunk_size  => '16k',            ## Approximate 16 kibiBytes (KiB)
541        chunk_size  => '20m',            ## Approximate 20 mebiBytes (MiB)
542
543       The construction for user_func when chunk_size > 1 and assuming
544       use_slurpio equals 0.
545
546        user_func => sub {
547           my ($mce, $chunk_ref, $chunk_id) = @_;
548
549           ## $_ is $chunk_ref->[0] when chunk_size equals 1
550           ## $_ is $chunk_ref otherwise; $_ can be used below
551
552           for my $record ( @{ $chunk_ref } ) {
553              print "$chunk_id: $record\n";
554           }
555        }
556
557        # input_data => \%hash
558        # current API available since 1.828
559
560        user_func => sub {
561           my ($mce, $chunk_ref, $chunk_id) = @_;
562
563           ## $_ points to $chunk_ref regardless of chunk_size
564
565           for my $key ( keys %{ $chunk_ref } ) {
566              print "$key: ", $chunk_ref->{$key}, "\n";
567           }
568        }
569
570       Specifying a value for input_data is straight forward for arrays and
571       files.  The next several examples specify an iterator reference for
572       input_data.
573
574        use MCE;
575
576        ## A factory function which creates a closure (the iterator itself)
577        ## for generating a sequence of numbers. The external variables
578        ## ($n, $max, $step) are used for keeping state across successive
579        ## calls to the closure. The iterator simply returns when $n > max.
580
581        sub input_iterator {
582           my ($n, $max, $step) = @_;
583
584           return sub {
585              return if $n > $max;
586
587              my $current = $n;
588              $n += $step;
589
590              return $current;
591           };
592        }
593
594        ## Run user_func in parallel. Input data can be specified during
595        ## the construction or as an argument to the process method.
596
597        my $mce = MCE->new(
598
599         # input_data => input_iterator(10, 30, 2),
600           chunk_size => 1, max_workers => 4,
601
602           user_func => sub {
603              my ($mce, $chunk_ref, $chunk_id) = @_;
604              MCE->print("$_: ", $_ * 2, "\n");
605           }
606
607        )->spawn;
608
609        $mce->process( input_iterator(10, 30, 2) );
610
611        -- Output   Note that output order is not guaranteed
612                    Take a look at iterator.pl for ordered output
613
614        10: 20
615        12: 24
616        16: 32
617        20: 40
618        14: 28
619        22: 44
620        18: 36
621        24: 48
622        26: 52
623        28: 56
624        30: 60
625
626       The following example queries the DB for the next 1000 rows. Notice the
627       use of fetchall_arrayref. The iterator function itself receives one
628       argument which is chunk_size (added in MCE 1.510) to determine how much
629       to return per iteration.  The default is 1 for the Core API and MCE
630       Models.
631
632        use DBI;
633        use MCE;
634
635        sub db_iter {
636
637           my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
638
639           my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
640                     die "Could not connect to database: $DBI::errstr";
641
642           my $sth = $dbh->prepare('select color, desc from table');
643
644           $sth->execute;
645
646           return sub {
647              my ($chunk_size) = @_;
648
649              if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
650                 return @{ $aref };
651              }
652
653              return;
654           };
655        }
656
657        ## Let's enumerate column indexes for easy column retrieval.
658        my ($i_color, $i_desc) = (0 .. 1);
659
660        my $mce = MCE->new(
661           max_workers => 3, chunk_size => 1000,
662           input_data => db_iter(),
663
664           user_func => sub {
665              my ($mce, $chunk_ref, $chunk_id) = @_;
666              my $ret = '';
667
668              foreach my $row (@{ $chunk_ref }) {
669                 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
670              }
671
672              MCE->print($ret);
673           }
674        );
675
676        $mce->run;
677
678       There are many modules on CPAN which return an iterator reference.
679       Showing one such example below. The demonstration ensures MCE workers
680       are spawned before obtaining the iterator. Note the worker_id value
681       (left column) in the output.
682
683        use Path::Iterator::Rule;
684        use MCE;
685
686        my $start_dir = shift
687           or die "Please specify a starting directory";
688
689        -d $start_dir
690           or die "Cannot open ($start_dir): No such file or directory";
691
692        my $mce = MCE->new(
693           max_workers => 'auto',
694           user_func => sub { MCE->say( MCE->wid . ": $_" ) }
695        )->spawn;
696
697        my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
698
699        my $iterator = $rule->iter(
700           $start_dir, { follow_symlinks => 0, depthfirst => 1 }
701        );
702
703        $mce->process( $iterator );
704
705        -- Output
706
707        8: lib/MCE/Core/Input/Generator.pm
708        5: lib/MCE/Core/Input/Handle.pm
709        6: lib/MCE/Core/Input/Iterator.pm
710        2: lib/MCE/Core/Input/Request.pm
711        3: lib/MCE/Core/Manager.pm
712        4: lib/MCE/Core/Input/Sequence.pm
713        7: lib/MCE/Core/Validation.pm
714        1: lib/MCE/Core/Worker.pm
715        8: lib/MCE/Flow.pm
716        5: lib/MCE/Grep.pm
717        6: lib/MCE/Loop.pm
718        2: lib/MCE/Map.pm
719        3: lib/MCE/Queue.pm
720        4: lib/MCE/Signal.pm
721        7: lib/MCE/Stream.pm
722        1: lib/MCE/Subs.pm
723        8: lib/MCE/Util.pm
724        5: lib/MCE.pm
725
726       Although MCE supports arrays, extra measures are needed to use a "lazy"
727       array as input data. The reason for this is that MCE needs the size of
728       the array before processing which may be unknown for lazy arrays.
729       Therefore, closures provides an excellent mechanism for this.
730
731       The code block belonging to the lazy array must return undef after
732       exhausting its input data. Otherwise, the process will never end.
733
734        use Tie::Array::Lazy;
735        use MCE;
736
737        tie my @a, 'Tie::Array::Lazy', [], sub {
738           my $i = $_[0]->index;
739
740           return ($i < 10) ? $i : undef;
741        };
742
743        sub make_iterator {
744           my $i = 0; my $a_ref = shift;
745
746           return sub {
747              return $a_ref->[$i++];
748           };
749        }
750
751        my $mce = MCE->new(
752           max_workers => 4, input_data => make_iterator(\@a),
753
754           user_func => sub {
755              my ($mce, $chunk_ref, $chunk_id) = @_;
756              MCE->say($_);
757           }
758
759        )->run;
760
761        -- Output
762
763        0
764        1
765        2
766        3
767        4
768        6
769        7
770        8
771        5
772        9
773
774       The following demonstrates how to retrieve a chunk from the lazy array
775       per each successive call. Here, undef is sent by the iterator block
776       when $i is greater than $max. Iterators may optionally use chunk_size
777       to determine how much to return per iteration.
778
779        use Tie::Array::Lazy;
780        use MCE;
781
782        tie my @a, 'Tie::Array::Lazy', [], sub {
783           $_[0]->index;
784        };
785
786        sub make_iterator {
787           my $j = 0; my ($a_ref, $max) = @_;
788
789           return sub {
790              my ($chunk_size) = @_;
791              my $i = $j;  $j += $chunk_size;
792
793              return if $i > $max;
794              return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
795           };
796        }
797
798        my $mce = MCE->new(
799           chunk_size => 15, max_workers => 4,
800           input_data => make_iterator(\@a, 100),
801
802           user_func => sub {
803              my ($mce, $chunk_ref, $chunk_id) = @_;
804              MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
805           }
806
807        )->run;
808
809        -- Output
810
811        1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
812        2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
813        3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
814        4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
815        5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
816        6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
817        7: 90 91 92 93 94 95 96 97 98 99 100
818
819   SYNTAX for SEQUENCE
820       The 1.3 release and above allows workers to loop through a sequence of
821       numbers computed mathematically without the overhead of an array. The
822       sequence can be specified separately per each user_task entry unlike
823       input_data which is applicable to the first task only.
824
825       See the seq_demo.pl example, included with this distribution, on
826       applying sequences with the user_tasks option.
827
828       Sequence can be defined using an array or a hash reference.
829
830        use MCE;
831
832        my $mce = MCE->new(
833           max_workers => 3,
834
835         # sequence => [ 10, 19, 0.7, "%4.1f" ],  ## up to 4 options
836
837           sequence => {
838              begin => 10, end => 19, step => 0.7, format => "%4.1f"
839           },
840
841           user_func => sub {
842              my ($mce, $n, $chunk_id) = @_;
843              print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
844           }
845        );
846
847        $mce->run;
848
849        -- Output (sorted afterwards, notice wid and chunk_id in output)
850
851        10.0 from 1 id 1
852        10.7 from 2 id 2
853        11.4 from 3 id 3
854        12.1 from 1 id 4
855        12.8 from 2 id 5
856        13.5 from 3 id 6
857        14.2 from 1 id 7
858        14.9 from 2 id 8
859        15.6 from 3 id 9
860        16.3 from 1 id 10
861        17.0 from 2 id 11
862        17.7 from 3 id 12
863        18.4 from 1 id 13
864
865       The 1.5 release includes a new option (bounds_only). This option tells
866       the sequence engine to compute 'begin' and 'end' items only, for the
867       chunk, and not the items in between (hence boundaries only). This
868       option applies to sequence only and has no effect when chunk_size
869       equals 1.
870
871       The time to run is 0.006s below. This becomes 0.827s without the
872       bounds_only option due to computing all items in between, thus creating
873       a very large array. Basically, specify bounds_only => 1 when boundaries
874       is all you need for looping inside the block; e.g. Monte Carlo
875       simulations.
876
877       Time was measured using 1 worker to emphasize the difference.
878
879        use MCE;
880
881        my $mce = MCE->new(
882           max_workers => 1, chunk_size => 1_250_000,
883
884           sequence => { begin => 1, end => 10_000_000 },
885           bounds_only => 1,
886
887           ## For sequence, the input scalar $_ points to $chunk_ref
888           ## when chunk_size > 1, otherwise $chunk_ref->[0].
889           ##
890           ## user_func => sub {
891           ##    my $begin = $_->[0]; my $end = $_->[-1];
892           ##
893           ##    for ($begin .. $end) {
894           ##       ...
895           ##    }
896           ## },
897
898           user_func => sub {
899              my ($mce, $chunk_ref, $chunk_id) = @_;
900              ## $chunk_ref contains 2 items, not 1_250_000
901
902              my $begin = $chunk_ref->[ 0];
903              my $end   = $chunk_ref->[-1];   ## or $chunk_ref->[1]
904
905              MCE->printf("%7d .. %8d\n", $begin, $end);
906           }
907        );
908
909        $mce->run;
910
911        -- Output
912
913              1 ..  1250000
914        1250001 ..  2500000
915        2500001 ..  3750000
916        3750001 ..  5000000
917        5000001 ..  6250000
918        6250001 ..  7500000
919        7500001 ..  8750000
920        8750001 .. 10000000
921
922   SYNTAX for MAX_RETRIES
923       The max_retries option, added in 1.7, allows MCE to retry a failed
924       chunk from a worker dying while processing input data or a sequence of
925       numbers.
926
927       When max_retries is set, MCE configures the on_post_exit option
928       automatically using the following code before running. Specify
929       on_post_exit explicitly for any further tailoring. The restart_worker
930       line is necessary, obviously.
931
932        on_post_exit => sub {
933           my ( $mce, $e, $retry_cnt ) = @_;
934
935           if ( $e->{id} ) {
936              my $cnt = $retry_cnt + 1;
937              my $msg = "Error: chunk $e->{id} failed";
938
939              if ( defined $mce->{init_relay} ) {
940                 print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
941                    if ( $retry_cnt < $mce->{max_retries} );
942              }
943              else {
944                 ( $retry_cnt < $mce->{max_retries} )
945                    ? print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
946                    : print {*STDERR} "$msg\n";
947              }
948
949              $mce->restart_worker;
950           }
951        }
952
953       We let MCE handle on_post_exit automatically below, which is
954       essentially the same code shown above. For max_retries to work, the
955       worker must die, abnormally included, or call MCE->exit. Notice that we
956       pass the chunk_id value for the 3rd argument to MCE->exit (defaults to
957       chunk_id if omitted since MCE 1.844).
958
959        ## max_retries demonstration
960
961        use strict;
962        use warnings;
963
964        use MCE;
965
966        sub user_func {
967           my ( $mce, $chunk_ref, $chunk_id ) = @_;
968
969           # die "Died : chunk_id = 3\n"  if $chunk_id == 3;
970           MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
971
972           print "$chunk_id\n";
973        }
974
975        my $mce = MCE->new(
976           max_workers => 1,
977           max_retries => 2,
978           user_func   => \&user_func,
979        )->spawn;
980
981        my $input_data = [ 0..7 ];
982
983        $mce->process( { chunk_size => 1 }, $input_data );
984        $mce->shutdown;
985
986        -- Output
987
988        1
989        2
990        Error: chunk 3 failed, retrying chunk attempt # 1
991        Error: chunk 3 failed, retrying chunk attempt # 2
992        Error: chunk 3 failed
993        4
994        5
995        6
996        7
997        8
998
999       Orderly output with max_retries is possible since MCE 1.844. Below,
1000       chunk 3 succeeds whereas chunk 5 fails due to exceeding the number of
1001       retries. Be sure to call MCE::relay inside "user_func" and near the end
1002       of the block.
1003
1004        ## max_retries demonstration with init_relay
1005
1006        use strict;
1007        use warnings;
1008
1009        use MCE;
1010        use MCE::Shared;
1011
1012        tie my $retries1, 'MCE::Shared', 0;
1013        tie my $retries2, 'MCE::Shared', 0;
1014
1015        MCE->new(
1016           max_workers  => 4,
1017           input_data   => [ 1..7 ],
1018           chunk_size   => 1,
1019
1020           max_retries  => 2,
1021           init_relay   => 0,
1022
1023           user_func    => sub {
1024              if ( MCE->chunk_id == 3 ) {
1025                 MCE->exit if ++$retries1 <= 2;
1026              }
1027              if ( MCE->chunk_id == 5 ) {
1028                 MCE->exit if ++$retries2 <= 3;
1029              }
1030              MCE::relay {
1031                 $_ += 1;
1032                 print MCE->chunk_id, "\n";
1033              };
1034           }
1035        )->run;
1036
1037        print "final: ", MCE::relay_final(), "\n";
1038
1039        -- Output
1040
1041        1
1042        2
1043        Error: chunk 3 failed, retrying chunk attempt # 1
1044        Error: chunk 5 failed, retrying chunk attempt # 1
1045        Error: chunk 3 failed, retrying chunk attempt # 2
1046        Error: chunk 5 failed, retrying chunk attempt # 2
1047        3
1048        4
1049        Error: chunk 5 failed
1050        6
1051        7
1052        final: 6
1053
1054   SYNTAX for USER_BEGIN and USER_END
1055       The user_begin and user_end options, if specified, behave similarly to
1056       awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
1057       called once per worker during each run.
1058
1059       MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
1060
1061        sub user_begin {                   ## Called once at the beginning
1062           my ($mce, $task_id, $task_name) = @_;
1063           $mce->{wk_total_rows} = 0;
1064        }
1065
1066        sub user_func {                    ## Called while processing
1067           my $mce = shift;
1068           $mce->{wk_total_rows} += 1;
1069        }
1070
1071        sub user_end {                     ## Called once at the end
1072           my ($mce, $task_id, $task_name) = @_;
1073           printf "## %d: Processed %d rows\n",
1074              MCE->wid, $mce->{wk_total_rows};
1075        }
1076
1077        my $mce = MCE->new(
1078           user_begin => \&user_begin,
1079           user_func  => \&user_func,
1080           user_end   => \&user_end
1081        );
1082
1083        $mce->run;
1084
1085   SYNTAX for USER_FUNC with USE_SLURPIO => 0
1086       When processing input data, MCE can pass an array of rows or a slurped
1087       chunk.  Below, a reference to an array containing the chunk data is
1088       processed.
1089
1090       e.g. $chunk_ref = [ record1, record2, record3, ... ]
1091
1092        sub user_func {
1093
1094           my ($mce, $chunk_ref, $chunk_id) = @_;
1095
1096           foreach my $row ( @{ $chunk_ref } ) {
1097              $mce->{wk_total_rows} += 1;
1098              print $row;
1099           }
1100        }
1101
1102        my $mce = MCE->new(
1103           chunk_size  => 100,
1104           input_data  => "/path/to/file",
1105           user_func   => \&user_func,
1106           use_slurpio => 0
1107        );
1108
1109        $mce->run;
1110
1111   SYNTAX for USER_FUNC with USE_SLURPIO => 1
1112       Here, a reference to a scalar containing the raw chunk data is
1113       processed.
1114
1115        sub user_func {
1116
1117           my ($mce, $chunk_ref, $chunk_id) = @_;
1118
1119           my $count = () = $$chunk_ref =~ /abc/;
1120        }
1121
1122        my $mce = MCE->new(
1123           chunk_size  => 16000,
1124           input_data  => "/path/to/file",
1125           user_func   => \&user_func,
1126           use_slurpio => 1
1127        );
1128
1129        $mce->run;
1130
1131   SYNTAX for USER_ERROR and USER_OUTPUT
1132       Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1133       and MCE->say can be intercepted by specifying the user_error and
1134       user_output options. MCE on receiving output will forward to user_error
1135       or user_output in a serialized fashion.
1136
1137       Handy when wanting to filter, modify, and/or direct the output
1138       elsewhere.
1139
1140        sub user_error {                   ## Redirect STDERR to STDOUT
1141           my $error = shift;
1142           print {*STDOUT} $error;
1143        }
1144
1145        sub user_output {                  ## Redirect STDOUT to STDERR
1146           my $output = shift;
1147           print {*STDERR} $output;
1148        }
1149
1150        sub user_func {
1151           my ($mce, $chunk_ref, $chunk_id) = @_;
1152           my $count = 0;
1153
1154           foreach my $row ( @{ $chunk_ref } ) {
1155              MCE->print($row);
1156              $count += 1;
1157           }
1158
1159           MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1160        }
1161
1162        my $mce = MCE->new(
1163           chunk_size  => 1000,
1164           input_data  => "/path/to/file",
1165           user_error  => \&user_error,
1166           user_output => \&user_output,
1167           user_func   => \&user_func
1168        );
1169
1170        $mce->run;
1171
1172   SYNTAX for USER_TASKS and TASK_END
1173       This option takes an array of tasks. Each task allows up to 17 options.
1174       The init_relay, input_data, RS, and use_slurpio options may be defined
1175       inside the first task or at the top level, otherwise ignored under
1176       other sub-tasks.
1177
1178        max_workers, chunk_size, input_data, interval, sequence,
1179        bounds_only, user_args, user_begin, user_end, user_func,
1180        gather, task_end, task_name, use_slurpio, use_threads,
1181        init_relay, RS
1182
1183       Sequence and chunk_size were added in 1.3. User_args was introduced in
1184       1.4.  Name and input_data are new options allowed in 1.5. In addition,
1185       one can specify task_end at the top level. Task_end also receives 2
1186       additional arguments $task_id and $task_name (shown below).
1187
1188       Options not specified here will default to the same option specified at
1189       the top level. The task_end option is called by the manager process
1190       when all workers for that sub-task have completed processing.
1191
1192       Forking and threading can be intermixed among tasks unless running
1193       Cygwin.  The run method will continue running until all workers have
1194       completed processing.
1195
1196        use threads;
1197        use threads::shared;
1198
1199        use MCE;
1200
1201        sub parallel_task1 { sleep 2; }
1202        sub parallel_task2 { sleep 1; }
1203
1204        my $mce = MCE->new(
1205
1206           task_end => sub {
1207              my ($mce, $task_id, $task_name) = @_;
1208              print "Task [$task_id -- $task_name] completed processing\n";
1209           },
1210
1211           user_tasks => [{
1212              task_name   => 'foo',
1213              max_workers => 2,
1214              user_func   => \&parallel_task1,
1215              use_threads => 0             ## Not using threads
1216
1217           },{
1218              task_name   => 'bar',
1219              max_workers => 4,
1220              user_func   => \&parallel_task2,
1221              use_threads => 1             ## Yes, threads
1222
1223           }]
1224        );
1225
1226        $mce->run;
1227
1228        -- Output
1229
1230        Task [1 -- bar] completed processing
1231        Task [0 -- foo] completed processing
1232

DEFAULT INPUT SCALAR

1234       Beginning with MCE 1.5, the input scalar $_ is localized prior to
1235       calling user_func for input_data and sequence of numbers. The following
1236       applies.
1237
1238       use_slurpio => 1
1239           $_ is a reference to the buffer e.g. $_ = \$_buffer;
1240           $_ is a reference regardless of whether chunk_size is 1 or greater
1241
1242           user_func => sub {
1243            # my ($mce, $chunk_ref, $chunk_id) = @_;
1244              print ${ $_ };    ## $_ is same as $chunk_ref
1245           }
1246
1247       chunk_size is greater than 1, use_slurpio => 0
1248           $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1249           $_ is same as $chunk_ref or $_[CHUNK]
1250
1251           user_func => sub {
1252            # my ($mce, $chunk_ref, $chunk_id) = @_;
1253              for my $row ( @{ $_ } ) {
1254                 print $row, "\n";
1255              }
1256           }
1257
1258           use MCE const => 1;
1259
1260           user_func => sub {
1261            # my ($mce, $chunk_ref, $chunk_id) = @_;
1262              for my $row ( @{ $_[CHUNK] } ) {
1263                 print $row, "\n";
1264              }
1265           }
1266
1267       chunk_size equals 1, use_slurpio => 0
1268           $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1269
1270           ## Note that $_ and $chunk_ref are not the same below.
1271           ## $chunk_ref is a reference to an array.
1272
1273           user_func => sub {
1274            # my ($mce, $chunk_ref, $chunk_id) = @_;
1275              print $_, "\n;    ## Same as $chunk_ref->[0];
1276           }
1277
1278           $mce->foreach("/path/to/file", sub {
1279            # my ($mce, $chunk_ref, $chunk_id) = @_;
1280              print $_;         ## Same as $chunk_ref->[0];
1281           });
1282
1283           ## However, that is not the case for the forseq method.
1284           ## Both $_ and $n_seq are the same when chunk_size => 1.
1285
1286           $mce->forseq([ 1, 9 ], sub {
1287            # my ($mce, $n_seq, $chunk_id) = @_;
1288              print $_, "\n";   ## Same as $n_seq
1289           });
1290
1291          Sequence can also be specified using an array reference. The below
1292          is the same as the example afterwards.
1293
1294           $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1295
1296          The code block receives an array containing the next 5 sequences.
1297          Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1298          to an array, same as $_, due to chunk_size being greater than 1.
1299
1300           $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1301            # my ($mce, $n_seq, $chunk_id) = @_;
1302              my @result;
1303              for my $n ( @{ $_ } ) {
1304                 ... do work, append to result for 5
1305              }
1306              ... do something with result afterwards
1307           });
1308

METHODS for the MANAGER PROCESS and WORKERS

1310       The methods listed below are callable by the main process and workers.
1311
1312   MCE->abort ( void )
1313   $mce->abort ( void )
1314       The 'abort' method is applicable when processing input_data only. This
1315       causes all workers to abort after processing the current chunk.
1316
1317       Workers write the next offset position to the queue socket for the next
1318       available worker. In essence, the 'abort' method writes the last offset
1319       position. Workers, on requesting the next offset position, will think
1320       the end of input_data has been reached and leave the chunking loop.
1321
1322        MCE->abort;
1323        $mce->abort;
1324
1325   MCE->chunk_id ( void )
1326   $mce->chunk_id ( void )
1327       Returns the chunk_id for the current chunk. The value starts at 1.
1328       Chunking applies to input_data or sequence. The value is 0 for the
1329       manager process.
1330
1331        my $chunk_id = MCE->chunk_id;
1332        my $chunk_id = $mce->chunk_id;
1333
1334   MCE->chunk_size ( void )
1335   $mce->chunk_size ( void )
1336       Getter method for chunk_size used by MCE.
1337
1338        my $chunk_size = MCE->chunk_size;
1339        my $chunk_size = $mce->chunk_size;
1340
1341   MCE->do ( 'callback_func' [, $arg1, ... ] )
1342   $mce->do ( 'callback_func' [, $arg1, ... ] )
1343       MCE serializes data transfers from a worker process via helper
1344       functions do & sendto to the manager process. The callback function can
1345       optionally return a reply. Support for calling by the manager process
1346       was enabled in MCE 1.839.
1347
1348        [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1349
1350       Passing args to a callback function using references & scalar.
1351
1352        sub callback {
1353           my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1354           ...
1355        }
1356
1357        MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1358        MCE->do('callback', \@a, \%h, \$s, 'foo');
1359
1360       MCE knows if wanting a void, list, hash, or a scalar return value.
1361
1362        MCE->do('callback' [, $arg1, ... ]);
1363
1364        my @array  = MCE->do('callback' [, $arg1, ... ]);
1365        my %hash   = MCE->do('callback' [, $arg1, ... ]);
1366        my $scalar = MCE->do('callback' [, $arg1, ... ]);
1367
1368   MCE->freeze ( $object_ref )
1369   $mce->freeze ( $object_ref )
1370       Calls the internal freeze method to serialize an object. The default
1371       serialization routines are handled by Sereal if available or Storable.
1372
1373        my $frozen = MCE->freeze([ 0, 2, 4 ]);
1374        my $frozen = $mce->freeze([ 0, 2, 4 ]);
1375
1376   MCE->max_retries ( void )
1377   $mce->max_retries ( void )
1378       Getter method for max_retries used by MCE.
1379
1380        my $max_retries = MCE->max_retries;
1381        my $max_retries = $mce->max_retries;
1382
1383   MCE->max_workers ( void )
1384   $mce->max_workers ( void )
1385       Getter method for max_workers used by MCE.
1386
1387        my $max_workers = MCE->max_workers;
1388        my $max_workers = $mce->max_workers;
1389
1390   MCE->pid ( void )
1391   $mce->pid ( void )
1392       Returns the Process ID. Threads have thread ID attached to the value.
1393
1394        my $pid = MCE->pid;    ## 16180 (pid) ; 16180.2 (pid.tid)
1395        my $pid = $mce->pid;
1396
1397   MCE->printf ( $format, $list [, ... ] )
1398   MCE->print ( $list [, ... ] )
1399   MCE->say ( $list [, ... ] )
1400   $mce->printf ( $format, $list [, ... ] )
1401   $mce->print ( $list [, ... ] )
1402   $mce->say ( $list [, ... ] )
1403       Use the printf, print, and say methods when wanting to serialize output
1404       among workers and the manager process. These are sugar syntax for the
1405       sendto method.  These behave similar to the native subroutines in Perl
1406       with the exception that barewords must be passed as a reference and
1407       require the comma after it including file handles.
1408
1409       Say is like print, but implicitly appends a newline.
1410
1411        MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1412        MCE->printf($fh, "%s: %d\n", $name, $age);
1413        MCE->printf("%s: %d\n", $name, $age);
1414
1415        MCE->print(\*STDERR, "$error_msg\n");
1416        MCE->print($fh, $log_msg."\n");
1417        MCE->print("$output_msg\n");
1418
1419        MCE->say(\*STDERR, $error_msg);
1420        MCE->say($fh, $log_msg);
1421        MCE->say($output_msg);
1422
1423       Caveat: Use the following syntax when passing a reference not a glob or
1424       file handle. Otherwise, MCE will error indicating the first argument is
1425       not a glob reference.
1426
1427        MCE->print(\*STDOUT, \@array, "\n");
1428        MCE->print("", \@array, "\n");         ## ok
1429
1430       Sending to "IO::All" { File, Pipe, STDIO } is supported since MCE
1431       1.845.
1432
1433        use IO::All;
1434
1435        my $out = io->stdout;
1436        my $err = io->stderr;
1437
1438        MCE->printf($out, "%s\n", "sent to stdout");
1439        MCE->printf($err, "%s\n", "sent to stderr");
1440
1441        MCE->print($out, "sent to stdout\n");
1442        MCE->print($err, "sent to stderr\n");
1443
1444        MCE->say($out, "sent to stdout");
1445        MCE->say($err, "sent to stderr");
1446
1447   MCE->sess_dir ( void )
1448   $mce->sess_dir ( void )
1449       Returns the session directory used by the MCE instance. This is defined
1450       during spawning and removed during shutdown.
1451
1452        my $sess_dir = MCE->sess_dir;
1453        my $sess_dir = $mce->sess_dir;
1454
1455   MCE->task_id ( void )
1456   $mce->task_id ( void )
1457       Returns the task ID. This applies to the user_tasks option (starts at
1458       0).
1459
1460        my $task_id = MCE->task_id;
1461        my $task_id = $mce->task_id;
1462
1463   MCE->task_name ( void )
1464   $mce->task_name ( void )
1465       Returns the task_name value specified via the task_name option when
1466       configuring MCE.
1467
1468        my $task_name = MCE->task_name;
1469        my $task_name = $mce->task_name;
1470
1471   MCE->task_wid ( void )
1472   $mce->task_wid ( void )
1473       Returns the task worker ID (applies to user_tasks). The value starts at
1474       1 per each task configured within user_tasks. The value is 0 for the
1475       manager process.
1476
1477        my $task_wid = MCE->task_wid;
1478        my $task_wid = $mce->task_wid;
1479
1480   MCE->thaw ( $frozen )
1481   $mce->thaw ( $frozen )
1482       Calls the internal thaw method to un-serialize the frozen object.
1483
1484        my $object_ref = MCE->thaw($frozen);
1485        my $object_ref = $mce->thaw($frozen);
1486
1487   MCE->tmp_dir ( void )
1488   $mce->tmp_dir ( void )
1489       Returns the temporary directory used by MCE.
1490
1491        my $tmp_dir = MCE->tmp_dir;
1492        my $tmp_dir = $mce->tmp_dir;
1493
1494   MCE->user_args ( void )
1495   $mce->user_args ( void )
1496       Returns the arguments specified via the user_args option.
1497
1498        my ($arg1, $arg2, $arg3) = MCE->user_args;
1499        my ($arg1, $arg2, $arg3) = $mce->user_args;
1500
1501   MCE->wid ( void )
1502   $mce->wid ( void )
1503       Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1504       is 0 for the manager process.
1505
1506        my $wid = MCE->wid;
1507        my $wid = $mce->wid;
1508

METHODS for the MANAGER PROCESS only

1510       Methods listed below are callable by the main process only.
1511
1512   MCE->forchunk ( $input_data [, { options } ], sub { ... } )
1513   MCE->foreach ( $input_data [, { options } ], sub { ... } )
1514   MCE->forseq ( $sequence_spec [, { options } ], sub { ... } )
1515   $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1516   $mce->foreach ( $input_data [, { options } ], sub { ... } )
1517   $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1518       Forchunk, foreach, and forseq are sugar methods and described in
1519       MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1520
1521   MCE->process ( $input_data [, { options } ] )
1522   $mce->process ( $input_data [, { options } ] )
1523       The process method will spawn workers automatically if not already
1524       spawned.  It will set input_data => $input_data. It calls run(0) to not
1525       auto-shutdown workers. Specifying options is optional.
1526
1527       Allowable options { key => value, ... } are:
1528
1529        chunk_size input_data job_delay spawn_delay submit_delay
1530        flush_file flush_stderr flush_stdout stderr_file stdout_file
1531        on_post_exit on_post_run sequence user_args user_begin user_end
1532        user_func user_error user_output use_slurpio RS
1533
1534       Options remain persistent going forward unless changed. Setting
1535       user_begin, user_end, or user_func will cause already spawned workers
1536       to shut down and re-spawn automatically. Therefore, define these during
1537       instantiation.
1538
1539       The below will cause workers to re-spawn after running.
1540
1541        my $mce = MCE->new( max_workers => 'auto' );
1542
1543        $mce->process( {
1544           user_begin => sub { ## connect to DB },
1545           user_func  => sub { ## process each row },
1546           user_end   => sub { ## close handle to DB },
1547        }, \@input_data );
1548
1549        $mce->process( {
1550           user_begin => sub { ## connect to DB },
1551           user_func  => sub { ## process each file },
1552           user_end   => sub { ## close handle to DB },
1553        }, "/list/of/files" );
1554
1555       Do the following if wanting workers to persist between jobs.
1556
1557        use MCE max_workers => 'auto';
1558
1559        my $mce = MCE->new(
1560           user_begin => sub { ## connect to DB },
1561           user_func  => sub { ## process each chunk or row or host },
1562           user_end   => sub { ## close handle to DB },
1563        );
1564
1565        $mce->spawn;           ## Spawn early if desired
1566
1567        $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1568        $mce->process(\@array_of_files_to_grep);
1569        $mce->process("/path/to/host/list");
1570
1571        $mce->process($array_ref);
1572        $mce->process($array_ref, { stdout_file => $output_file });
1573
1574        ## This was not allowed before. Fixed in 1.415.
1575        $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1576        $mce->process({ sequence => [ 10, 90, 2 ] });
1577
1578        $mce->shutdown;
1579
1580   MCE->relay_final ( void )
1581   $mce->relay_final ( void )
1582       The relay methods are described in MCE::Relay. Relay capabilities are
1583       enabled by specifying the "init_relay" MCE option.
1584
1585   MCE->restart_worker ( void )
1586   $mce->restart_worker ( void )
1587       One can restart a worker who has died or exited. The job never ends
1588       below due to restarting each time. Recommended is to call MCE->exit or
1589       $mce->exit instead of the native exit function for better handling,
1590       especially under the Windows environment.
1591
1592       The $e->{wid} argument is no longer necessary starting with the 1.5
1593       release.
1594
1595       Press [ctrl-c] to terminate the script.
1596
1597        my $mce = MCE->new(
1598
1599           on_post_exit => sub {
1600              my ($mce, $e) = @_;
1601              print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1602            # $mce->restart_worker($e->{wid});    ## MCE-1.415 and below
1603              $mce->restart_worker;               ## MCE-1.500 and above
1604           },
1605
1606           user_begin => sub {
1607              my ($mce, $task_id, $task_name) = @_;
1608              ## Not interested in die messages going to STDERR,
1609              ## because the die handler calls MCE->exit(255, $_[0]).
1610              close STDERR;
1611           },
1612
1613           user_tasks => [{
1614              max_workers => 5,
1615              user_func => sub {
1616                 my ($mce) = @_; sleep MCE->wid;
1617                 MCE->exit(3, "exited from " . MCE->wid . "\n");
1618              }
1619           },{
1620              max_workers => 4,
1621              user_func => sub {
1622                 my ($mce) = @_; sleep MCE->wid;
1623                 die("died from " . MCE->wid . "\n");
1624              }
1625           }]
1626        );
1627
1628        $mce->run;
1629
1630        -- Output
1631
1632        1: PID_85388: status 3: exited from 1
1633        2: PID_85389: status 3: exited from 2
1634        1: PID_85397: status 3: exited from 1
1635        3: PID_85390: status 3: exited from 3
1636        1: PID_85399: status 3: exited from 1
1637        4: PID_85391: status 3: exited from 4
1638        2: PID_85398: status 3: exited from 2
1639        1: PID_85401: status 3: exited from 1
1640        5: PID_85392: status 3: exited from 5
1641        1: PID_85404: status 3: exited from 1
1642        6: PID_85393: status 255: died from 6
1643        3: PID_85400: status 3: exited from 3
1644        2: PID_85403: status 3: exited from 2
1645        1: PID_85406: status 3: exited from 1
1646        7: PID_85394: status 255: died from 7
1647        1: PID_85410: status 3: exited from 1
1648        8: PID_85395: status 255: died from 8
1649        4: PID_85402: status 3: exited from 4
1650        2: PID_85409: status 3: exited from 2
1651        1: PID_85412: status 3: exited from 1
1652        9: PID_85396: status 255: died from 9
1653        3: PID_85408: status 3: exited from 3
1654        1: PID_85416: status 3: exited from 1
1655
1656        ...
1657
1658   MCE->run ( [ $auto_shutdown [, { options } ] ] )
1659   $mce->run ( [ $auto_shutdown [, { options } ] ] )
1660       The run method, by default, spawns workers, processes once, and shuts
1661       down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1662       persist after running (default 1).
1663
1664       Specifying options is optional. Valid options are the same as for the
1665       process method.
1666
1667        my $mce = MCE->new( ... );
1668
1669        ## Disables auto-shutdown
1670        $mce->run(0);
1671
1672   MCE->send ( $data_ref )
1673   $mce->send ( $data_ref )
1674       The 'send' method is useful when wanting to spawn workers early to
1675       minimize memory consumption and afterwards send data individually to
1676       each worker. One cannot send more than the total workers spawned.
1677       Workers store the received data as $mce->{user_data}.
1678
1679       The data which can be sent is restricted to an ARRAY, HASH, or PDL
1680       reference.  Workers begin processing immediately after receiving data.
1681       Workers set $mce->{user_data} to undef after processing. One cannot
1682       specify input_data, sequence, or user_tasks when using the "send"
1683       method.
1684
1685       Passing any options e.g. run(0, { options }) is ignored due to workers
1686       running immediately after receiving user data. There is no guarantee to
1687       which worker will receive data first. It depends on which worker is
1688       available awaiting data.
1689
1690        use MCE;
1691
1692        my $mce = MCE->new(
1693           max_workers => 5,
1694
1695           user_func => sub {
1696              my ($mce) = @_;
1697              my $data = $mce->{user_data};
1698              my $first_name = $data->{first_name};
1699              print MCE->wid, ": Hello from $first_name\n";
1700           }
1701        );
1702
1703        $mce->spawn;     ## Optional, send will spawn if necessary.
1704
1705        $mce->send( { first_name => "Theresa" } );
1706        $mce->send( { first_name => "Francis" } );
1707        $mce->send( { first_name => "Padre"   } );
1708        $mce->send( { first_name => "Anthony" } );
1709
1710        $mce->run;       ## Wait for workers to complete processing.
1711
1712        -- Output
1713
1714        2: Hello from Theresa
1715        5: Hello from Anthony
1716        3: Hello from Francis
1717        4: Hello from Padre
1718
1719   MCE->shutdown ( void )
1720   $mce->shutdown ( void )
1721       The run method will automatically spawn workers, run once, and shutdown
1722       workers automatically. Workers persist after running below. Shutdown
1723       may be called as needed or prior to exiting.
1724
1725        my $mce = MCE->new( ... );
1726
1727        $mce->spawn;
1728
1729        $mce->process(\@input_data_1);         ## Processing multiple arrays
1730        $mce->process(\@input_data_2);
1731        $mce->process(\@input_data_n);
1732
1733        $mce->shutdown;
1734
1735        $mce->process('input_file_1');         ## Processing multiple files
1736        $mce->process('input_file_2');
1737        $mce->process('input_file_n');
1738
1739        $mce->shutdown;
1740
1741   MCE->spawn ( void )
1742   $mce->spawn ( void )
1743       Workers are normally spawned automatically. The spawn method allows one
1744       to spawn workers early if so desired.
1745
1746        my $mce = MCE->new( ... );
1747
1748        $mce->spawn;
1749
1750   MCE->status ( void )
1751   $mce->status ( void )
1752       The greatest exit status is saved among workers while running. Look at
1753       the on_post_exit or on_post_run options for callback support.
1754
1755        my $mce = MCE->new( ... );
1756
1757        $mce->run;
1758
1759        my $exit_status = $mce->status;
1760

METHODS for WORKERS only

1762       Methods listed below are callable by workers only.
1763
1764   MCE->exit ( [ $status [, $message [, $id ] ] ] )
1765   $mce->exit ( [ $status [, $message [, $id ] ] ] )
1766       A worker exits from MCE entirely. $id (optional) can be used for
1767       passing the primary key or a string along with the message. Look at the
1768       on_post_exit or on_post_run options for callback support.
1769
1770        MCE->exit;           ## default 0
1771        MCE->exit(1);
1772        MCE->exit(2, 'chunk failed', $chunk_id);
1773        MCE->exit(0, 'msg_foo', 'id_1000');
1774
1775   MCE->gather ( $arg1, [, $arg2, ... ] )
1776   $mce->gather ( $arg1, [, $arg2, ... ] )
1777       A worker can submit data to the location specified via the gather
1778       option by calling this method. See MCE::Flow and MCE::Loop for
1779       additional use-case.
1780
1781        use MCE;
1782
1783        my @hosts = qw(
1784           hosta hostb hostc hostd hoste
1785        );
1786
1787        my $mce = MCE->new(
1788           chunk_size => 1, max_workers => 3,
1789
1790           user_func => sub {
1791            # my ($mce, $chunk_ref, $chunk_id) = @_;
1792              my ($output, $error, $status); my $host = $_;
1793
1794              ## Do something with $host;
1795              $output = "Worker ". MCE->wid .": Hello from $host";
1796
1797              if (MCE->chunk_id % 3 == 0) {
1798                 ## Simulating an error condition
1799                 local $? = 1; $status = $?;
1800                 $error = "Error from $host"
1801              }
1802              else {
1803                 $status = 0;
1804              }
1805
1806              ## Ensure unique keys (key, value) when gathering to a
1807              ## hash.
1808              MCE->gather("$host.out", $output, "$host.sta", $status);
1809              MCE->gather("$host.err", $error) if (defined $error);
1810           }
1811        );
1812
1813        my %h; $mce->process(\@hosts, { gather => \%h });
1814
1815        foreach my $host (@hosts) {
1816           print $h{"$host.out"}, "\n";
1817           print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1818           print "Exit status: ", $h{"$host.sta"}, "\n\n";
1819        }
1820
1821        -- Output
1822
1823        Worker 2: Hello from hosta
1824        Exit status: 0
1825
1826        Worker 1: Hello from hostb
1827        Exit status: 0
1828
1829        Worker 3: Hello from hostc
1830        Error from hostc
1831        Exit status: 1
1832
1833        Worker 2: Hello from hostd
1834        Exit status: 0
1835
1836        Worker 1: Hello from hoste
1837        Exit status: 0
1838
1839   MCE->last ( void )
1840   $mce->last ( void )
1841       Worker leaves the chunking loop or user_func block immediately.
1842       Callable from inside foreach, forchunk, forseq, and user_func.
1843
1844        use MCE;
1845
1846        my $mce = MCE->new(
1847           max_workers => 5
1848        );
1849
1850        my @list = (1 .. 80);
1851
1852        $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1853
1854           my ($mce, $chunk_ref, $chunk_id) = @_;
1855           MCE->last if ($chunk_id > 4);
1856
1857           my @output = ();
1858
1859           foreach my $rec ( @{ $chunk_ref } ) {
1860              push @output, $rec, "\n";
1861           }
1862
1863           MCE->print(@output);
1864        });
1865
1866        -- Output (each chunk above consists of 2 elements)
1867
1868        3
1869        4
1870        1
1871        2
1872        7
1873        8
1874        5
1875        6
1876
1877   MCE->next ( void )
1878   $mce->next ( void )
1879       Worker starts the next iteration of the chunking loop. Callable from
1880       inside foreach, forchunk, forseq, and user_func.
1881
1882        use MCE;
1883
1884        my $mce = MCE->new(
1885           max_workers => 5
1886        );
1887
1888        my @list = (1 .. 80);
1889
1890        $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1891
1892           my ($mce, $chunk_ref, $chunk_id) = @_;
1893           MCE->next if ($chunk_id < 20);
1894
1895           my @output = ();
1896
1897           foreach my $rec ( @{ $chunk_ref } ) {
1898              push @output, $rec, "\n";
1899           }
1900
1901           MCE->print(@output);
1902        });
1903
1904        -- Output (each chunk above consists of 4 elements)
1905
1906        77
1907        78
1908        79
1909        80
1910
1911   MCE::relay { code }
1912   MCE->relay ( sub { code } )
1913   MCE->relay_recv ( void )
1914   $mce->relay ( sub { code } )
1915   $mce->relay_recv ( void )
1916       The relay methods are described in MCE::Relay. Relay capabilities are
1917       enabled by specifying the "init_relay" MCE option.
1918
1919   MCE->sendto ( $to, $arg1, ... )
1920   $mce->sendto ( $to, $arg1, ... )
1921       The sendto method is called by workers for serializing data to standard
1922       output, standard error, or end of file. The action is done by the
1923       manager process.
1924
1925       Release 1.00x supported 1 data argument, not more.
1926
1927        MCE->sendto('file', \@array, '/path/to/file');
1928        MCE->sendto('file', \$scalar, '/path/to/file');
1929        MCE->sendto('file', $scalar, '/path/to/file');
1930
1931        MCE->sendto('STDERR', \@array);
1932        MCE->sendto('STDERR', \$scalar);
1933        MCE->sendto('STDERR', $scalar);
1934
1935        MCE->sendto('STDOUT', \@array);
1936        MCE->sendto('STDOUT', \$scalar);
1937        MCE->sendto('STDOUT', $scalar);
1938
1939       Release 1.100 added the ability to pass multiple arguments. Notice the
1940       syntax change for sending to a file. Passing a reference to an array is
1941       no longer necessary.
1942
1943        MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1944        MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1945        MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1946
1947        MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1948
1949       To retain 1.00x compatibility, sendto outputs the content when a single
1950       data reference is specified. Otherwise, the reference for \@array or
1951       \$scalar is shown in 1.500, not the content.
1952
1953        MCE->sendto('STDERR', \@array);        ## 1.00x behavior, content
1954        MCE->sendto('STDOUT', \$scalar);
1955        MCE->sendto('file:/path/to/file', \@array);
1956
1957        ## Output matches the print statement
1958
1959        MCE->sendto(\*STDERR, \@array);        ## 1.500 behavior, reference
1960        MCE->sendto(\*STDOUT, \$scalar);
1961        MCE->sendto($fh, \@array);
1962
1963        MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1964        print {*STDOUT} \@array, "\n", \$scalar, "\n";
1965
1966       MCE 1.500 added support for sending to a glob reference, file
1967       descriptor, and file handle.
1968
1969        MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1970        MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1971        MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1972
1973   MCE->sync ( void )
1974   $mce->sync ( void )
1975       A barrier sync operation means any worker must stop at this point until
1976       all workers reach this barrier. Barrier syncing is useful for many
1977       computer algorithms.
1978
1979       Barrier synchronization is supported for task 0 only or omitting
1980       user_tasks.  All workers assigned task_id 0 must call sync whenever
1981       barrier syncing.
1982
1983        use MCE;
1984
1985        sub user_func {
1986
1987           my ($mce) = @_;
1988           my $wid = MCE->wid;
1989
1990           MCE->sendto("STDOUT", "a: $wid\n");   ## MCE 1.0+
1991           MCE->sync;
1992
1993           MCE->sendto(\*STDOUT, "b: $wid\n");   ## MCE 1.5+
1994           MCE->sync;
1995
1996           MCE->print("c: $wid\n");              ## MCE 1.5+
1997           MCE->sync;
1998
1999           return;
2000        }
2001
2002        my $mce = MCE->new(
2003           max_workers => 4, user_func => \&user_func
2004        )->run;
2005
2006        -- Output (without barrier synchronization)
2007
2008        a: 1
2009        a: 2
2010        b: 1
2011        b: 2
2012        c: 1
2013        c: 2
2014        a: 3
2015        b: 3
2016        c: 3
2017        a: 4
2018        b: 4
2019        c: 4
2020
2021        -- Output (with barrier synchronization)
2022
2023        a: 1
2024        a: 2
2025        a: 4
2026        a: 3
2027        b: 2
2028        b: 1
2029        b: 3
2030        b: 4
2031        c: 1
2032        c: 4
2033        c: 2
2034        c: 3
2035
2036       Consider the following example. The MCE->sync operation is done inside
2037       a loop along with MCE->do. A stall may occur for workers calling sync
2038       the 2nd or 3rd time while other workers are sending results via MCE->do
2039       or MCE->sendto.
2040
2041       It requires another semaphore lock in MCE to solve this which was not
2042       done in order to keep resources low. Therefore, please keep this in
2043       mind when mixing MCE->sync with MCE->do or output serialization methods
2044       inside a loop.
2045
2046        sub user_func {
2047
2048           my ($mce) = @_;
2049           my @result;
2050
2051           for (1 .. 3) {
2052              ... compute algorithm ...
2053
2054              MCE->sync;
2055
2056              ... compute algorithm ...
2057
2058              MCE->sync;
2059
2060              MCE->do('aggregate_result', \@result);  ## or MCE->sendto
2061
2062              MCE->sync;      ## The sync operation is also needed here to
2063                              ## prevent MCE from stalling.
2064           }
2065        }
2066
2067   MCE->yield ( void )
2068   $mce->yield ( void )
2069       There may be on occasion when the MCE driven app is too fast. The
2070       interval option combined with the yield method, both introduced with
2071       MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
2072       the design.
2073
2074       A use case is an app configured with 100 workers running on a 24
2075       logical way box. Data is polled from a database containing over 2.5
2076       million rows. Workers chunk away at 300 rows per chunk performing SNMP
2077       gets (300 sockets per worker) polling 25 metrics from each device. With
2078       this scenario, the load on the box may rise beyond 90+. In addition,
2079       IP_Tables may reach its contention point causing the entire application
2080       to fail.
2081
2082       The scenario above is solved by simply having workers yield among
2083       themselves in a synchronized fashion. A delay of 0.007 seconds between
2084       intervals is all that's needed. The load on the box will hover between
2085       23 ~ 27 for the duration of the run. Polling completes in under 17
2086       minutes time. This is quite fast considering the app polls 62.5 million
2087       metrics combined. The math equates to 3,676,470 per minute or rather
2088       61,275 per second from a single box.
2089
2090        ## Both max_nodes and node_id are optional (default 1).
2091
2092        interval => {
2093           delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
2094        }
2095
2096       A 4 node setup can poll 10 million devices without the additional
2097       overhead of a distribution agent. The difference between the 4 nodes
2098       are simply node_id and the where clause used to query the database. The
2099       mac addresses are random such that the data divides equally to any
2100       power of 2. The distribution key lies in the mac address itself. In
2101       fact, the 2nd character from the right is sufficient for maximizing on
2102       the power of randomness for equal distribution.
2103
2104        Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
2105        Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
2106        Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
2107        Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
2108
2109       Below, the user_tasks is configured to simulate 4 nodes. This
2110       demonstration uses 2 workers to minimize the output size. Input is from
2111       the sequence option.
2112
2113        use Time::HiRes qw(time);
2114        use MCE;
2115
2116        my $d = shift || 0.1;
2117
2118        local $| = 1;
2119
2120        sub create_task {
2121
2122           my ($node_id) = @_;
2123
2124           my $seq_size  = 6;
2125           my $seq_start = ($node_id - 1) * $seq_size + 1;
2126           my $seq_end   = $seq_start + $seq_size - 1;
2127
2128           return {
2129              max_workers => 2, sequence => [ $seq_start, $seq_end ],
2130              interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2131           };
2132        }
2133
2134        sub user_begin {
2135
2136           my ($mce, $task_id, $task_name) = @_;
2137
2138           ## The yield method causes this worker to wait for its next time
2139           ## interval slot before running. Yield has no effect without the
2140           ## 'interval' option.
2141
2142           ## Yielding is beneficial inside a user_begin block. A use case
2143           ## is staggering database connections among workers in order
2144           ## to not impact the DB server.
2145
2146           MCE->yield;
2147
2148           MCE->printf(
2149              "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2150              MCE->task_id + 1, time, MCE->task_wid, ''
2151           );
2152
2153           return;
2154        }
2155
2156        {
2157           my $prev_time = time;
2158
2159           sub user_func {
2160
2161              my ($mce, $seq_n, $chunk_id) = @_;
2162
2163              ## Yield simply waits for the next time interval.
2164              MCE->yield;
2165
2166              ## Calculate how long this worker has waited.
2167              my $curr_time = time;
2168              my $time_waited = $curr_time - $prev_time;
2169
2170              $prev_time = $curr_time;
2171
2172              MCE->printf(
2173                 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2174                 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2175              );
2176
2177              return;
2178           }
2179        }
2180
2181        ## Simulate a 4 node environment passing node_id to create_task.
2182
2183        print "Node_ID  Current_Time        Worker_ID  Time_Waited     Comment\n";
2184
2185        MCE->new(
2186           user_begin => \&user_begin,
2187           user_func  => \&user_func,
2188
2189           user_tasks => [
2190              create_task(1),
2191              create_task(2),
2192              create_task(3),
2193              create_task(4)
2194           ]
2195
2196        )->run;
2197
2198        -- Output (notice Current_Time below, stays 0.10 apart)
2199
2200        Node_ID  Current_Time        Worker_ID  Time_Waited     Comment
2201        Node  1: 1374807976.74634 -- Worker  1:              -- Started
2202        Node  2: 1374807976.84634 -- Worker  1:              -- Started
2203        Node  3: 1374807976.94638 -- Worker  1:              -- Started
2204        Node  4: 1374807977.04639 -- Worker  1:              -- Started
2205        Node  1: 1374807977.14634 -- Worker  2:              -- Started
2206        Node  2: 1374807977.24640 -- Worker  2:              -- Started
2207        Node  3: 1374807977.34649 -- Worker  2:              -- Started
2208        Node  4: 1374807977.44657 -- Worker  2:              -- Started
2209        Node  1: 1374807977.54636 -- Worker  1:      0.90037 -- Seq_N   1
2210        Node  2: 1374807977.64638 -- Worker  1:      1.00040 -- Seq_N   7
2211        Node  3: 1374807977.74642 -- Worker  1:      1.10043 -- Seq_N  13
2212        Node  4: 1374807977.84643 -- Worker  1:      1.20045 -- Seq_N  19
2213        Node  1: 1374807977.94636 -- Worker  2:      1.30037 -- Seq_N   2
2214        Node  2: 1374807978.04638 -- Worker  2:      1.40040 -- Seq_N   8
2215        Node  3: 1374807978.14641 -- Worker  2:      1.50042 -- Seq_N  14
2216        Node  4: 1374807978.24644 -- Worker  2:      1.60045 -- Seq_N  20
2217        Node  1: 1374807978.34628 -- Worker  1:      0.79996 -- Seq_N   3
2218        Node  2: 1374807978.44631 -- Worker  1:      0.79996 -- Seq_N   9
2219        Node  3: 1374807978.54634 -- Worker  1:      0.79996 -- Seq_N  15
2220        Node  4: 1374807978.64636 -- Worker  1:      0.79997 -- Seq_N  21
2221        Node  1: 1374807978.74628 -- Worker  2:      0.79996 -- Seq_N   4
2222        Node  2: 1374807978.84632 -- Worker  2:      0.79997 -- Seq_N  10
2223        Node  3: 1374807978.94634 -- Worker  2:      0.79996 -- Seq_N  16
2224        Node  4: 1374807979.04636 -- Worker  2:      0.79996 -- Seq_N  22
2225        Node  1: 1374807979.14628 -- Worker  1:      0.80001 -- Seq_N   5
2226        Node  2: 1374807979.24631 -- Worker  1:      0.80000 -- Seq_N  11
2227        Node  3: 1374807979.34634 -- Worker  1:      0.80001 -- Seq_N  17
2228        Node  4: 1374807979.44636 -- Worker  1:      0.80000 -- Seq_N  23
2229        Node  1: 1374807979.54628 -- Worker  2:      0.80000 -- Seq_N   6
2230        Node  2: 1374807979.64631 -- Worker  2:      0.80000 -- Seq_N  12
2231        Node  3: 1374807979.74633 -- Worker  2:      0.80000 -- Seq_N  18
2232        Node  4: 1374807979.84636 -- Worker  2:      0.80000 -- Seq_N  24
2233
2234       The interval.pl example above is included with MCE.
2235

MCE PROGRESS DEMONSTRATIONS

2237       The "progress" option takes a code block for receiving info on the
2238       progress made while processing input data; e.g. "input_data" or
2239       "sequence". To make this work, one provides the "progress" option a
2240       closure block like so, passing along the size of the input_data; e.g
2241       "scalar @array" or "-s /path/to/file".
2242
2243       Current API available since 1.813.
2244
2245       A worker, upon completing processing its chunk, notifies the manager-
2246       process with the size of the chunk. That could be the number of rows or
2247       literally the size of the chunk when processing an input file. The
2248       manager-process accumulates the size before calling the code block
2249       associated with the "progress" option.
2250
2251       When running many tasks simultaneously, via "user_tasks", the call is
2252       initiated by workers at level 0 only or rather the first task, not
2253       shown here.
2254
2255        use Time::HiRes 'sleep';
2256        use MCE;
2257
2258        sub make_progress {
2259           my ($total_size) = @_;
2260           return sub {
2261              my ($completed_size) = @_;
2262              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2263           };
2264        }
2265
2266        my @input = (1..150);
2267
2268        MCE->new(
2269           chunk_size  => 10,
2270           max_workers => 4,
2271           input_data  => \@input,
2272           progress    => make_progress( scalar @input ),
2273           user_func   => sub { sleep 1.5 }
2274        )->run();
2275
2276        -- Output
2277
2278        6.7%
2279        13.3%
2280        20.0%
2281        26.7%
2282        33.3%
2283        40.0%
2284        46.7%
2285        53.3%
2286        60.0%
2287        66.7%
2288        73.3%
2289        80.0%
2290        86.7%
2291        93.3%
2292        100.0%
2293
2294       Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2295       thing, practically.
2296
2297        use Time::HiRes 'sleep';
2298        use ProgressBar::Stack;
2299        use MCE::Flow;
2300
2301        sub make_progress {
2302           my ($total_size) = @_;
2303           init_progress();
2304           return sub {
2305              my ($completed_size) = @_;
2306              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2307           };
2308        }
2309
2310        my @input = (1..150);
2311
2312        MCE::Flow->init(
2313           chunk_size  => 10,
2314           max_workers => 4,
2315           progress    => make_progress( scalar @input )
2316        );
2317
2318        MCE::Flow->run( sub { sleep 1.5 }, \@input );
2319        MCE::Flow->finish();
2320
2321        print "\n";
2322
2323        -- Output
2324
2325        [################    ]  80.0% ETA: 0:01
2326
2327       For sequence of numbers, using the "sequence" option, one must account
2328       for "step_size", typically set to 1 automatically.
2329
2330        use Time::HiRes 'sleep';
2331        use MCE;
2332
2333        sub make_progress {
2334           my ($total_size) = @_;
2335           return sub {
2336              my ($completed_size) = @_;
2337              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2338           };
2339        }
2340
2341        MCE->new(
2342           chunk_size  => 10,
2343           max_workers => 4,
2344           sequence    => [ 1, 100, 2 ],
2345           progress    => make_progress( int( 100 / 2 + 0.5 ) ),
2346           user_func   => sub { sleep 1.5 }
2347        )->run();
2348
2349        -- Output
2350
2351        20.0%
2352        40.0%
2353        60.0%
2354        80.0%
2355        100.0%
2356
2357       Changing "chunk_size" to 1 means workers notify the manager process
2358       more often, thus increasing granularity. Take a look at the output.
2359
2360        2.0%
2361        4.0%
2362        6.0%
2363        8.0%
2364        10.0%
2365        ...
2366        92.0%
2367        94.0%
2368        96.0%
2369        98.0%
2370        100.0%
2371
2372       Here is the same thing using MCE::Flow together with
2373       ProgressBar::Stack.
2374
2375        use Time::HiRes 'sleep';
2376        use ProgressBar::Stack;
2377        use MCE::Flow;
2378
2379        sub make_progress {
2380           my ($total_size) = @_;
2381           init_progress();
2382           return sub {
2383              my ($completed_size) = @_;
2384              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2385           };
2386        }
2387
2388        MCE::Flow->init(
2389           chunk_size  => 1,
2390           max_workers => 4,
2391           progress    => make_progress( int( 100 / 2 + 0.5 ) )
2392        );
2393
2394        MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2395        MCE::Flow->finish();
2396
2397        print "\n";
2398
2399        -- Output
2400
2401        [#########           ]  48.0% ETA: 0:03
2402
2403       For files and file handles, workers send the actual size of the data
2404       read versus counting rows.
2405
2406        use Time::HiRes 'sleep';
2407        use MCE;
2408
2409        sub make_progress {
2410           my ($total_size) = @_;
2411           return sub {
2412              my ($completed_size) = @_;
2413              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2414           };
2415        }
2416
2417        my $input_file = "/path/to/input_file.txt";
2418
2419        MCE->new(
2420           chunk_size  => 5,
2421           max_workers => 4,
2422           input_data  => $input_file,
2423           progress    => make_progress( -s $input_file ),
2424           user_func   => sub { sleep 0.03 }
2425        )->run();
2426
2427       For consistency, here is the example using MCE::Flow, again with
2428       ProgressBar::Stack.
2429
2430        use Time::HiRes 'sleep';
2431        use ProgressBar::Stack;
2432        use MCE::Flow;
2433
2434        sub make_progress {
2435           my ($total_size) = @_;
2436           init_progress();
2437           return sub {
2438              my ($completed_size) = @_;
2439              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2440           };
2441        }
2442
2443        my $input_file = "/path/to/input_file.txt";
2444
2445        MCE::Flow->init(
2446           chunk_size  => 5,
2447           max_workers => 4,
2448           progress    => make_progress( -s $input_file )
2449        );
2450
2451        MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2452        MCE::Flow->finish();
2453
2454       The next demonstration processes three arrays consecutively. For this
2455       one, MCE workers persist after running. This needs MCE 1.814 or later
2456       to run. Otherwise, the progress output is not shown in MCE 1.813.
2457
2458        use Time::HiRes 'sleep';
2459        use ProgressBar::Stack;
2460        use MCE;
2461
2462        sub make_progress {
2463           my ($total_size, $message) = @_;
2464           init_progress();
2465           return sub {
2466              my ($completed_size) = @_;
2467              update_progress(
2468                 sprintf("%0.1f", $completed_size / $total_size * 100),
2469                 $message
2470              );
2471           };
2472        }
2473
2474        my $mce = MCE->new(
2475           chunk_size  => 10,
2476           max_workers => 4,
2477           user_func   => sub { sleep 0.5 }
2478        )->spawn();
2479
2480        my @a1 = ( 1 .. 200 );
2481        my @a2 = ( 1 .. 500 );
2482        my @a3 = ( 1 .. 300 );
2483
2484        $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2485
2486        print "\n";
2487
2488        $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2489
2490        print "\n";
2491
2492        $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2493
2494        print "\n";
2495
2496        $mce->shutdown;
2497
2498        -- Output
2499
2500        [####################] 100.0% ETA: 0:00 array 1
2501        [####################] 100.0% ETA: 0:00 array 2
2502        [####################] 100.0% ETA: 0:00 array 3
2503
2504       When size is not know, such as reading from "STDIN", the only thing one
2505       can do is report the size completed thus far.
2506
2507        # 1 kibibyte equals 1024 bytes
2508
2509        progress => sub {
2510           my ($completed_size) = @_;
2511           printf "%0.1f kibibytes\n", $completed_size / 1024;
2512        }
2513

SEE ALSO

2515       •  MCE::Examples
2516

INDEX

2518       MCE
2519

AUTHOR

2521       Mario E. Roy, <marioeroy AT gmail DOT com>
2522
2523
2524
2525perl v5.32.1                      2021-01-27                      MCE::Core(3)
Impressum