1MCE::Core(3)          User Contributed Perl Documentation         MCE::Core(3)
2
3
4

NAME

6       MCE::Core - Documentation describing the core MCE API
7

VERSION

9       This document describes MCE::Core version 1.889
10

SYNOPSIS

12       This is a simplistic use case of MCE running with 5 workers.
13
14        # Construction using the Core API
15
16        use MCE;
17
18        my $mce = MCE->new(
19           max_workers => 5,
20           user_func => sub {
21              my ($mce) = @_;
22              $mce->say("Hello from " . $mce->wid);
23           }
24        );
25
26        $mce->run;
27
28        # Construction using a MCE model
29
30        use MCE::Flow max_workers => 5;
31
32        mce_flow sub {
33           my ($mce) = @_;
34           MCE->say("Hello from " . MCE->wid);
35        };
36
37        -- Output
38
39        Hello from 2
40        Hello from 4
41        Hello from 5
42        Hello from 1
43        Hello from 3
44
45   MCE->new ( [ options ] )
46       Below, a new instance is configured with all available options.
47
48        use MCE;
49
50        my $mce = MCE->new(
51
52           max_workers => 8,                   # Default 1
53
54              # Number of workers to spawn.
55
56              # MCE sets an upper-limit of 8 for 'auto'. MCE 1.521+.
57              # max_workers => 'auto',         # # of lcores, 8 maximum
58              # max_workers => 'auto-1',       # 7 on HW with 16 lcores
59              # max_workers => 'auto-1',       # 3 on HW with  4 lcores
60
61              # Specify a percentage. MCE 1.875+.
62              # max_workers => '25%',          # 4 on HW with 16 lcores
63              # max_workers => '50%',          # 8 on HW with 16 lcores
64
65              # Run on all logical cores.
66              # max_workers => MCE::Util::get_ncpu(),
67
68           chunk_size => 2000,                 # Default 1
69
70              # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
71              # The default is 1 when using the Core API and 'auto' for
72              # MCE Models. For arrays or queues, chunk_size means the
73              # number of records per chunk. For iterators, MCE will not
74              # use chunk_size, though the iterator may use it to determine
75              # how much to return per iteration. For files, smaller than or
76              # equal to 8192 is the number of records.  Greater than 8192
77              # is the number of bytes. MCE reads until the end of record
78              # before calling user_func.
79
80              # chunk_size => 1,               # Consists of 1 record
81              # chunk_size => 1000,            # Consists of 1000 records
82              # chunk_size => '16k',           # Approximate 16 kibiBytes (KiB)
83              # chunk_size => '20m',           # Approximate 20 mebiBytes (MiB)
84
85           tmp_dir => $tmp_dir,                # Default $MCE::Signal::tmp_dir
86
87              # Default is $MCE::Signal::tmp_dir which points to
88              # $ENV{TEMP} if defined. Otherwise, tmp_dir points
89              # to a location under /tmp.
90
91           freeze => \&encode_sereal,          # Default \&Storable::freeze
92           thaw   => \&decode_sereal,          # Default \&Storable::thaw
93
94              # Release 1.412 allows freeze and thaw to be overridden.
95              # Simply include a serialization module prior to loading
96              # MCE. Configure freeze/thaw options.
97
98              # use Sereal qw( encode_sereal decode_sereal );
99              # use CBOR::XS qw( encode_cbor decode_cbor );
100              # use JSON::XS qw( encode_json decode_json );
101              #
102              # use MCE;
103
104           gather => \@a,                      # Default undef
105
106              # Release 1.5 allows for gathering of data to an array or
107              # hash reference, a MCE::Queue/Thread::Queue object, or code
108              # reference. One invokes gathering by calling the gather
109              # method as often as needed.
110
111              # gather => \@array,
112              # gather => \%hash,
113              # gather => $queue,
114              # gather => \&order,
115
116           init_relay => 0,                    # Default undef
117
118              # For specifying the initial relay value. Allowed values
119              # are array_ref, hash_ref, or scalar. The MCE::Relay module
120              # is loaded automatically when specified.
121
122              # init_relay => \@array,
123              # init_relay => \%hash,
124              # init_relay => scalar,
125
126           input_data => $input_file,          # Default undef
127           RS         => "\n>",                # Default undef
128
129              # input_data => '/path/to/file'  # Process file
130              # input_data => \@array          # Process array
131              # input_data => \*FILE_HNDL      # Process file handle
132              # input_data => $io              # Process IO::All { File, Pipe, STDIO }
133              # input_data => \$scalar         # Treated like a file
134              # input_data => \&iterator       # User specified iterator
135
136              # The RS option (for input record separator) applies to files
137              # and file handles.
138
139              # MCE applies additional logic when RS begins with a newline
140              # character; e.g. RS => "\n>". It trims away characters after
141              # the newline and prepends them to the next record.
142              #
143              # Typically, the left side is what happens for $/ = "\n>".
144              # The right side is what user_func receives.
145              #
146              # All records begin with > and end with \n
147              #    Record 1:  >seq1 ... \n>   (to)   >seq1 ... \n
148              #    Record 2:  seq2  ... \n>          >seq2 ... \n
149              #    Record 3:  seq3  ... \n>          >seq3 ... \n
150              #    Last Rec:  seqN  ... \n           >seqN ... \n
151
152           loop_timeout => 20,                 # Default 0
153
154              # Added in 1.7, enables the manager process to timeout of a read
155              # operation on channel 0 (UNIX platforms only). The manager process
156              # decrements the total workers running for any worker which have
157              # died in an uncontrollable manner. Specify this option if on
158              # occassion a worker dies unexpectedly (i.e. from an XS module).
159
160              # Option works with init_relay on UNIX platforms since MCE 1.844.
161              # A number smaller than 5 is silently increased to 5.
162
163           max_retries => 2,                   # Default 0
164
165              # This option, added in 1.7, causes MCE to retry a failed
166              # chunk from a worker dying while processing input data or
167              # sequence of numbers.
168
169           parallel_io => 1,                   # Default 0
170           posix_exit  => 1,                   # Default 0
171           use_slurpio => 1,                   # Default 0
172
173              # The parallel_io option enables parallel reads during large
174              # slurpio, useful when reading from fast storage. Do not enable
175              # parallel_io when running MCE on many nodes with input coming
176              # from shared storage.
177
178              # Set posix_exit to avoid all END and destructor processing.
179              # Constructing MCE inside a thread implies 1 or if present CGI,
180              # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
181              # Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
182
183              # Enable slurpio to pass the raw chunk (scalar ref) to the user
184              # function when reading input files.
185
186           use_threads => 1,                   # Auto 0 or 1
187
188              # By default MCE spawns child processes on UNIX platforms and
189              # threads on Windows (i.e. $^O eq 'MSWin32').
190
191              # MCE supports threads via two threading libraries if threads
192              # is preferred over child processes. The use of threads requires
193              # a thread library prior to loading MCE, causing the use_threads
194              # option to default to 1. Specify 0 for child processes.
195              #
196              #   use threads;              use forks;
197              #   use threads::shared;      use forks::shared;
198              #   use MCE              (or) use MCE;           (or) use MCE;
199
200           spawn_delay  => 0.045,              # Default undef
201           submit_delay => 0.015,              # Default undef
202           job_delay    => 0.060,              # Default undef
203
204              # Time to wait in fractional seconds after spawning a worker,
205              # after submitting parameters to worker (MCE->run, MCE->process),
206              # and worker running (one time staggered delay).
207
208              # Specify job_delay to stagger workers connecting to a database.
209
210           on_post_exit => \&on_post_exit,     # Default undef
211           on_post_run  => \&on_post_run,      # Default undef
212
213              # Execute the code block after a worker exits or dies.
214              # (i.e. MCE->exit, exit, die)
215
216              # Execute the code block after running.
217              # (i.e. MCE->process, MCE->run)
218
219           progress => sub { ... },            # Default undef
220
221              # A code block for receiving info on the progress made.
222              # See section labeled "MCE PROGRESS DEMONSTRATIONS" at the
223              # end of this document.
224
225           user_args => { env => 'test' },     # Default undef
226
227              # MCE release 1.4 added a new parameter to allow one to
228              # specify arbitrary arguments such as a string, an ARRAY
229              # or HASH reference. Workers can access this directly.
230              # (i.e. my $args = $mce->{user_args} or MCE->user_args)
231
232           user_begin => \&user_begin,         # Default undef
233           user_func => \&user_func,           # Default undef
234           user_end => \&user_end,             # Default undef
235
236              # Think of user_begin, user_func, and user_end as in
237              # the awk scripting language:
238              # awk 'BEGIN { begin } { func } { func } ... END { end }'
239
240              # MCE workers call user_begin once at the start of a job,
241              # then user_func repeatedly until no chunks remain.
242              # Afterwards, user_end is called.
243
244           user_error => \&user_error,         # Default undef
245           user_output => \&user_output,       # Default undef
246
247              # MCE will forward data to user_error/user_output,
248              # when defined, for the following methods.
249
250              # MCE->sendto(\*STDERR, "sent to user_error\n");
251              # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
252              # MCE->print(\*STDERR, "sent to user_error\n");
253              # MCE->say(\*STDERR, "sent to user_error");
254
255              # MCE->sendto(\*STDOUT, "sent to user_output\n");
256              # MCE->printf("%s\n", "sent to user_output");
257              # MCE->print("sent to user_output\n");
258              # MCE->say("sent to user_output");
259
260           stderr_file => 'err_file',          # Default STDERR
261           stdout_file => 'out_file',          # Default STDOUT
262
263              # Or to file; user_error and user_output take precedence.
264
265           flush_file   => 0,                  # Default 1
266           flush_stderr => 0,                  # Default 1
267           flush_stdout => 0,                  # Default 1
268
269              # Flush sendto file, standard error, or standard output.
270
271           interval => {
272               delay => 0.007 [, max_nodes => 4, node_id => 1 ]
273           },
274
275              # For use with the yield method introduced in MCE 1.5.
276              # Both max_nodes & node_id are optional and default to 1.
277              # Delay is the amount of time between intervals.
278
279              # interval => 0.007              # Shorter; MCE 1.506+
280
281           sequence => {                       # Default undef
282               begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
283           },
284
285           bounds_only => 1,                   # Default undef
286
287              # For looping through a sequence of numbers in parallel.
288              # STEP, if omitted, defaults to 1 if BEGIN is smaller than
289              # END or -1 if BEGIN is greater than END. The FORMAT string
290              # is passed to sprintf behind the scene (% may be omitted).
291              # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
292
293              # Do not specify both options; input_data and sequence.
294              # Release 1.4 allows one to specify an array reference.
295              # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
296
297              # The bounds_only => 1 option will compute the 'begin' and
298              # 'end' items only for the chunk and not the items in between
299              # (hence boundaries only). This option has no effect when
300              # sequence is not specified or chunk_size equals 1.
301
302              # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
303
304           task_end => \&task_end,             # Default undef
305
306              # This is called by the manager process after the task
307              # has completed processing. MCE 1.5 allows this option
308              # to be specified at the top level.
309
310           task_name => 'string',              # Default 'MCE'
311
312              # Added in MCE 1.5 and mainly beneficial for user_tasks.
313              # One may specify a unique name per each sub-task.
314              # The string is passed as the 3rd arg to task_end.
315
316           user_tasks => [                     # Default undef
317              { ... },                         # Options for task 0
318              { ... },                         # Options for task 1
319              { ... },                         # Options for task 2
320           ],
321
322              # Takes a list of hash references, each allowing up to 17
323              # options. All other MCE options are ignored. The init_relay,
324              # input_data, RS, and use_slurpio options are applicable to
325              # the first task only.
326
327              # max_workers, chunk_size, input_data, interval, sequence,
328              # bounds_only, user_args, user_begin, user_end, user_func,
329              # gather, task_end, task_name, use_slurpio, use_threads,
330              # init_relay, RS
331
332              # Options not specified here will default to same option
333              # specified at the top level.
334        );
335
336   EXPORT_CONST, CONST
337       There are 3 constants which are exportable. Using the constants in lieu
338       of 0,1,2 makes it more legible when accessing the user_func arguments
339       directly.
340
341       SELF CHUNK CID - MCE CONSTANTS
342
343       Exports SELF => 0, CHUNK => 1, and CID => 2.
344
345        use MCE export_const => 1;
346        use MCE const => 1;                    # Shorter; MCE 1.415+
347
348        user_func => sub {
349         # my ($mce, $chunk_ref, $chunk_id) = @_;
350           print "Hello from ", $_[SELF]->wid, "\n";
351        }
352
353       MCE 1.5 allows all public method to be called directly.
354
355        use MCE;
356
357        user_func => sub {
358         # my ($mce, $chunk_ref, $chunk_id) = @_;
359           print "Hello from ", MCE->wid, "\n";
360        }
361
362   OVERRIDING DEFAULTS
363       The following list options which may be overridden when loading the
364       module.
365
366        use Sereal qw( encode_sereal decode_sereal );
367        use CBOR::XS qw( encode_cbor decode_cbor );
368        use JSON::XS qw( encode_json decode_json );
369
370        use MCE
371            max_workers => 4,                  # Default 1
372            chunk_size => 100,                 # Default 1
373            tmp_dir => "/path/to/app/tmp",     # $MCE::Signal::tmp_dir
374            freeze => \&encode_sereal,         # \&Storable::freeze
375            thaw => \&decode_sereal,           # \&Storable::thaw
376            init_relay => 0,                   # Default undef; MCE 1.882+
377            use_threads => 0,                  # Default undef; MCE 1.882+
378        ;
379
380        my $mce = MCE->new( ... );
381
382       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
383       available.  Specify "Sereal => 0" to use Storable instead.
384
385        use MCE Sereal => 0;
386
387   RUNNING
388       Run calls spawn, submits the job; workers call user_begin, user_func,
389       and user_end. Run shuts down workers afterwards. Call spawn whenever
390       the need arises for large data structures prior to running.
391
392        $mce->spawn;                           # Call early if desired
393
394        $mce->run;                             # Call run or process below
395
396        # Acquire data arrays and/or input_files. Workers persist after
397        # processing.
398
399        $mce->process(\@input_data_1);         # Process array
400        $mce->process(\@input_data_2);
401        $mce->process(\@input_data_n);
402
403        $mce->process(\%input_hash_1);         # Process hash, current API
404        $mce->process(\%input_hash_2);         # available since 1.828
405        $mce->process(\%input_hash_n);
406
407        $mce->process('input_file_1');         # Process file
408        $mce->process('input_file_2');
409        $mce->process('input_file_n');
410
411        $mce->shutdown;                        # Shutdown workers
412
413   SYNTAX for ON_POST_EXIT
414       Often times, one may want to capture the exit status. The on_post_exit
415       option, if defined, is executed immediately by the manager process
416       after a worker exits via exit (children only), MCE->exit (children and
417       threads), or die.
418
419       The format of $e->{pid} is PID_123 for children and THR_123 for
420       threads.
421
422        my $restart_flag = 1;
423
424        sub on_post_exit {
425           my ($mce, $e) = @_;
426
427           # Display all possible hash elements.
428           print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
429
430           # Restart this worker if desired.
431           if ($restart_flag && $e->{wid} == 2) {
432              $mce->restart_worker;
433              $restart_flag = 0;
434           }
435        }
436
437        sub user_func {
438           my ($mce) = @_;
439           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  # Args, not necessary
440        }
441
442        my $mce = MCE->new(
443           on_post_exit => \&on_post_exit,
444           user_func => \&user_func,
445           max_workers => 3
446        );
447
448        $mce->run;
449
450        -- Output (child processes)
451
452        2: PID_33223: 0: msg_foo: 1002
453        1: PID_33222: 0: msg_foo: 1001
454        3: PID_33224: 0: msg_foo: 1003
455        2: PID_33225: 0: msg_foo: 1002
456
457        -- Output (running with threads)
458
459        3: TID_3: 0: msg_foo: 1003
460        2: TID_2: 0: msg_foo: 1002
461        1: TID_1: 0: msg_foo: 1001
462        2: TID_4: 0: msg_foo: 1002
463
464   SYNTAX for ON_POST_RUN
465       The on_post_run option, if defined, is executed immediately by the
466       manager process after running MCE->process or MCE->run. This option
467       receives an array reference of hashes.
468
469       The difference between on_post_exit and on_post_run is that the former
470       is called immediately whereas the latter is called after all workers
471       have completed running.
472
473        sub on_post_run {
474           my ($mce, $status_ref) = @_;
475           foreach my $e ( @{ $status_ref } ) {
476              # Display all possible hash elements.
477              print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
478           }
479        }
480
481        sub user_func {
482           my ($mce) = @_;
483           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  # Args, not necessary
484        }
485
486        my $mce = MCE->new(
487           on_post_run => \&on_post_run,
488           user_func => \&user_func,
489           max_workers => 3
490        );
491
492        $mce->run;
493
494        -- Output (child processes)
495
496        3: PID_33174: 0: msg_foo: 1003
497        1: PID_33172: 0: msg_foo: 1001
498        2: PID_33173: 0: msg_foo: 1002
499
500        -- Output (running with threads)
501
502        2: TID_2: 0: msg_foo: 1002
503        3: TID_3: 0: msg_foo: 1003
504        1: TID_1: 0: msg_foo: 1001
505
506   SYNTAX for INPUT_DATA
507       MCE supports many ways to specify input_data. Support for iterators was
508       added in MCE 1.505. The RS option allows one to specify the record
509       separator when processing files.
510
511       MCE is a chunking engine. Therefore, chunk_size is applicable to
512       input_data.  Specifying 1 for use_slurpio causes user_func to receive a
513       scalar reference containing the raw data (applicable to files only)
514       instead of an array reference.
515
516       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
517
518        input_data  => '/path/to/file',  # process file
519        input_data  => \@array,          # process array
520        input_data  => \%hash,           # process hash, API since 1.828
521        input_data  => \*FILE_HNDL,      # process file handle
522        input_data  => $fh,              # open $fh, "<", "file"
523        input_data  => $fh,              # IO::File "file", "r"
524        input_data  => $fh,              # IO::Uncompress::Gunzip "file.gz"
525        input_data  => $io,              # IO::All { File, Pipe, STDIO }
526        input_data  => \$scalar,         # treated like a file
527        input_data  => \&iterator,       # user specified iterator
528
529        chunk_size  => 1,                # >1 means looping inside user_func
530        use_slurpio => 1,                # $chunk_ref is a scalar ref
531        RS          => "\n>",            # input record separator
532
533       The chunk_size value determines the chunking mode to use when
534       processing files.  Otherwise, chunk_size is the number of elements for
535       arrays. For files, a chunk size value of <= 8192 is how many records to
536       read. Greater than 8192 is how many bytes to read. MCE appends (the
537       rest) up to the next record separator.
538
539        chunk_size  => 8192,             # Consists of 8192 records
540        chunk_size  => 8193,             # Approximate 8193 bytes for files
541
542        chunk_size  => 1,                # Consists of 1 record or element
543        chunk_size  => 1000,             # Consists of 1000 records
544        chunk_size  => '16k',            # Approximate 16 kibiBytes (KiB)
545        chunk_size  => '20m',            # Approximate 20 mebiBytes (MiB)
546
547       The construction for user_func when chunk_size > 1 and assuming
548       use_slurpio equals 0.
549
550        user_func => sub {
551           my ($mce, $chunk_ref, $chunk_id) = @_;
552
553           # $_ is $chunk_ref->[0] when chunk_size equals 1
554           # $_ is $chunk_ref otherwise; $_ can be used below
555
556           for my $record ( @{ $chunk_ref } ) {
557              print "$chunk_id: $record\n";
558           }
559        }
560
561        # input_data => \%hash
562        # current API available since 1.828
563
564        user_func => sub {
565           my ($mce, $chunk_ref, $chunk_id) = @_;
566
567           # $_ points to $chunk_ref regardless of chunk_size
568
569           for my $key ( keys %{ $chunk_ref } ) {
570              print "$key: ", $chunk_ref->{$key}, "\n";
571           }
572        }
573
574       Specifying a value for input_data is straight forward for arrays and
575       files.  The next several examples specify an iterator reference for
576       input_data.
577
578        use MCE;
579
580        # A factory function which creates a closure (the iterator itself)
581        # for generating a sequence of numbers. The external variables
582        # ($n, $max, $step) are used for keeping state across successive
583        # calls to the closure. The iterator simply returns when $n > max.
584
585        sub input_iterator {
586           my ($n, $max, $step) = @_;
587
588           return sub {
589              return if $n > $max;
590
591              my $current = $n;
592              $n += $step;
593
594              return $current;
595           };
596        }
597
598        # Run user_func in parallel. Input data can be specified during
599        # the construction or as an argument to the process method.
600
601        my $mce = MCE->new(
602
603         # input_data => input_iterator(10, 30, 2),
604           chunk_size => 1, max_workers => 4,
605
606           user_func => sub {
607              my ($mce, $chunk_ref, $chunk_id) = @_;
608              MCE->print("$_: ", $_ * 2, "\n");
609           }
610
611        )->spawn;
612
613        $mce->process( input_iterator(10, 30, 2) );
614
615        -- Output   Note that output order is not guaranteed
616                    Take a look at iterator.pl for ordered output
617
618        10: 20
619        12: 24
620        16: 32
621        20: 40
622        14: 28
623        22: 44
624        18: 36
625        24: 48
626        26: 52
627        28: 56
628        30: 60
629
630       The following example queries the DB for the next 1000 rows. Notice the
631       use of fetchall_arrayref. The iterator function itself receives one
632       argument which is chunk_size (added in MCE 1.510) to determine how much
633       to return per iteration.  The default is 1 for the Core API and MCE
634       Models.
635
636        use DBI;
637        use MCE;
638
639        sub db_iter {
640
641           my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
642
643           my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
644                     die "Could not connect to database: $DBI::errstr";
645
646           my $sth = $dbh->prepare('select color, desc from table');
647
648           $sth->execute;
649
650           return sub {
651              my ($chunk_size) = @_;
652
653              if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
654                 return @{ $aref };
655              }
656
657              return;
658           };
659        }
660
661        # Let's enumerate column indexes for easy column retrieval.
662        my ($i_color, $i_desc) = (0 .. 1);
663
664        my $mce = MCE->new(
665           max_workers => 3, chunk_size => 1000,
666           input_data => db_iter(),
667
668           user_func => sub {
669              my ($mce, $chunk_ref, $chunk_id) = @_;
670              my $ret = '';
671
672              foreach my $row (@{ $chunk_ref }) {
673                 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
674              }
675
676              MCE->print($ret);
677           }
678        );
679
680        $mce->run;
681
682       There are many modules on CPAN which return an iterator reference.
683       Showing one such example below. The demonstration ensures MCE workers
684       are spawned before obtaining the iterator. Note the worker_id value
685       (left column) in the output.
686
687        use Path::Iterator::Rule;
688        use MCE;
689
690        my $start_dir = shift
691           or die "Please specify a starting directory";
692
693        -d $start_dir
694           or die "Cannot open ($start_dir): No such file or directory";
695
696        my $mce = MCE->new(
697           max_workers => 'auto',
698           user_func => sub { MCE->say( MCE->wid . ": $_" ) }
699        )->spawn;
700
701        my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
702
703        my $iterator = $rule->iter(
704           $start_dir, { follow_symlinks => 0, depthfirst => 1 }
705        );
706
707        $mce->process( $iterator );
708
709        -- Output
710
711        8: lib/MCE/Core/Input/Generator.pm
712        5: lib/MCE/Core/Input/Handle.pm
713        6: lib/MCE/Core/Input/Iterator.pm
714        2: lib/MCE/Core/Input/Request.pm
715        3: lib/MCE/Core/Manager.pm
716        4: lib/MCE/Core/Input/Sequence.pm
717        7: lib/MCE/Core/Validation.pm
718        1: lib/MCE/Core/Worker.pm
719        8: lib/MCE/Flow.pm
720        5: lib/MCE/Grep.pm
721        6: lib/MCE/Loop.pm
722        2: lib/MCE/Map.pm
723        3: lib/MCE/Queue.pm
724        4: lib/MCE/Signal.pm
725        7: lib/MCE/Stream.pm
726        1: lib/MCE/Subs.pm
727        8: lib/MCE/Util.pm
728        5: lib/MCE.pm
729
730       Although MCE supports arrays, extra measures are needed to use a "lazy"
731       array as input data. The reason for this is that MCE needs the size of
732       the array before processing which may be unknown for lazy arrays.
733       Therefore, closures provides an excellent mechanism for this.
734
735       The code block belonging to the lazy array must return undef after
736       exhausting its input data. Otherwise, the process will never end.
737
738        use Tie::Array::Lazy;
739        use MCE;
740
741        tie my @a, 'Tie::Array::Lazy', [], sub {
742           my $i = $_[0]->index;
743
744           return ($i < 10) ? $i : undef;
745        };
746
747        sub make_iterator {
748           my $i = 0; my $a_ref = shift;
749
750           return sub {
751              return $a_ref->[$i++];
752           };
753        }
754
755        my $mce = MCE->new(
756           max_workers => 4, input_data => make_iterator(\@a),
757
758           user_func => sub {
759              my ($mce, $chunk_ref, $chunk_id) = @_;
760              MCE->say($_);
761           }
762
763        )->run;
764
765        -- Output
766
767        0
768        1
769        2
770        3
771        4
772        6
773        7
774        8
775        5
776        9
777
778       The following demonstrates how to retrieve a chunk from the lazy array
779       per each successive call. Here, undef is sent by the iterator block
780       when $i is greater than $max. Iterators may optionally use chunk_size
781       to determine how much to return per iteration.
782
783        use Tie::Array::Lazy;
784        use MCE;
785
786        tie my @a, 'Tie::Array::Lazy', [], sub {
787           $_[0]->index;
788        };
789
790        sub make_iterator {
791           my $j = 0; my ($a_ref, $max) = @_;
792
793           return sub {
794              my ($chunk_size) = @_;
795              my $i = $j;  $j += $chunk_size;
796
797              return if $i > $max;
798              return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
799           };
800        }
801
802        my $mce = MCE->new(
803           chunk_size => 15, max_workers => 4,
804           input_data => make_iterator(\@a, 100),
805
806           user_func => sub {
807              my ($mce, $chunk_ref, $chunk_id) = @_;
808              MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
809           }
810
811        )->run;
812
813        -- Output
814
815        1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
816        2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
817        3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
818        4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
819        5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
820        6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
821        7: 90 91 92 93 94 95 96 97 98 99 100
822
823   SYNTAX for SEQUENCE
824       The 1.3 release and above allows workers to loop through a sequence of
825       numbers computed mathematically without the overhead of an array. The
826       sequence can be specified separately per each user_task entry unlike
827       input_data which is applicable to the first task only.
828
829       See the seq_demo.pl example, included with this distribution, on
830       applying sequences with the user_tasks option.
831
832       Sequence can be defined using an array or a hash reference.
833
834        use MCE;
835
836        my $mce = MCE->new(
837           max_workers => 3,
838
839         # sequence => [ 10, 19, 0.7, "%4.1f" ],  # up to 4 options
840
841           sequence => {
842              begin => 10, end => 19, step => 0.7, format => "%4.1f"
843           },
844
845           user_func => sub {
846              my ($mce, $n, $chunk_id) = @_;
847              print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
848           }
849        );
850
851        $mce->run;
852
853        -- Output (sorted afterwards, notice wid and chunk_id in output)
854
855        10.0 from 1 id 1
856        10.7 from 2 id 2
857        11.4 from 3 id 3
858        12.1 from 1 id 4
859        12.8 from 2 id 5
860        13.5 from 3 id 6
861        14.2 from 1 id 7
862        14.9 from 2 id 8
863        15.6 from 3 id 9
864        16.3 from 1 id 10
865        17.0 from 2 id 11
866        17.7 from 3 id 12
867        18.4 from 1 id 13
868
869       The 1.5 release includes a new option (bounds_only). This option tells
870       the sequence engine to compute 'begin' and 'end' items only, for the
871       chunk, and not the items in between (hence boundaries only). This
872       option applies to sequence only and has no effect when chunk_size
873       equals 1.
874
875       The time to run is 0.006s below. This becomes 0.827s without the
876       bounds_only option due to computing all items in between, thus creating
877       a very large array. Basically, specify bounds_only => 1 when boundaries
878       is all you need for looping inside the block; e.g. Monte Carlo
879       simulations.
880
881       Time was measured using 1 worker to emphasize the difference.
882
883        use MCE;
884
885        my $mce = MCE->new(
886           max_workers => 1, chunk_size => 1_250_000,
887
888           sequence => { begin => 1, end => 10_000_000 },
889           bounds_only => 1,
890
891           # For sequence, the input scalar $_ points to $chunk_ref
892           # when chunk_size > 1, otherwise $chunk_ref->[0].
893           #
894           # user_func => sub {
895           #    my $begin = $_->[0]; my $end = $_->[-1];
896           #
897           #    for ($begin .. $end) {
898           #       ...
899           #    }
900           # },
901
902           user_func => sub {
903              my ($mce, $chunk_ref, $chunk_id) = @_;
904              # $chunk_ref contains 2 items, not 1_250_000
905
906              my $begin = $chunk_ref->[ 0];
907              my $end   = $chunk_ref->[-1];   # or $chunk_ref->[1]
908
909              MCE->printf("%7d .. %8d\n", $begin, $end);
910           }
911        );
912
913        $mce->run;
914
915        -- Output
916
917              1 ..  1250000
918        1250001 ..  2500000
919        2500001 ..  3750000
920        3750001 ..  5000000
921        5000001 ..  6250000
922        6250001 ..  7500000
923        7500001 ..  8750000
924        8750001 .. 10000000
925
926   SYNTAX for MAX_RETRIES
927       The max_retries option, added in 1.7, allows MCE to retry a failed
928       chunk from a worker dying while processing input data or a sequence of
929       numbers.
930
931       When max_retries is set, MCE configures the on_post_exit option
932       automatically using the following code before running. Specify
933       on_post_exit explicitly for any further tailoring. The restart_worker
934       line is necessary, obviously.
935
936        on_post_exit => sub {
937           my ( $mce, $e, $retry_cnt ) = @_;
938
939           if ( $e->{id} ) {
940              my $cnt = $retry_cnt + 1;
941              my $msg = "Error: chunk $e->{id} failed";
942
943              if ( defined $mce->{init_relay} ) {
944                 print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
945                    if ( $retry_cnt < $mce->{max_retries} );
946              }
947              else {
948                 ( $retry_cnt < $mce->{max_retries} )
949                    ? print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
950                    : print {*STDERR} "$msg\n";
951              }
952
953              $mce->restart_worker;
954           }
955        }
956
957       We let MCE handle on_post_exit automatically below, which is
958       essentially the same code shown above. For max_retries to work, the
959       worker must die, abnormally included, or call MCE->exit. Notice that we
960       pass the chunk_id value for the 3rd argument to MCE->exit (defaults to
961       chunk_id if omitted since MCE 1.844).
962
963        # max_retries demonstration
964
965        use strict;
966        use warnings;
967
968        use MCE;
969
970        sub user_func {
971           my ( $mce, $chunk_ref, $chunk_id ) = @_;
972
973           # die "Died : chunk_id = 3\n"  if $chunk_id == 3;
974           MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
975
976           print "$chunk_id\n";
977        }
978
979        my $mce = MCE->new(
980           max_workers => 1,
981           max_retries => 2,
982           user_func   => \&user_func,
983        )->spawn;
984
985        my $input_data = [ 0..7 ];
986
987        $mce->process( { chunk_size => 1 }, $input_data );
988        $mce->shutdown;
989
990        -- Output
991
992        1
993        2
994        Error: chunk 3 failed, retrying chunk attempt # 1
995        Error: chunk 3 failed, retrying chunk attempt # 2
996        Error: chunk 3 failed
997        4
998        5
999        6
1000        7
1001        8
1002
1003       Orderly output with max_retries is possible since MCE 1.844. Below,
1004       chunk 3 succeeds whereas chunk 5 fails due to exceeding the number of
1005       retries. Be sure to call MCE::relay inside "user_func" and near the end
1006       of the block.
1007
1008        # max_retries demonstration with init_relay
1009
1010        use strict;
1011        use warnings;
1012
1013        use MCE;
1014        use MCE::Shared;
1015
1016        tie my $retries1, 'MCE::Shared', 0;
1017        tie my $retries2, 'MCE::Shared', 0;
1018
1019        MCE->new(
1020           max_workers  => 4,
1021           input_data   => [ 1..7 ],
1022           chunk_size   => 1,
1023
1024           max_retries  => 2,
1025           init_relay   => 0,
1026
1027           user_func    => sub {
1028              if ( MCE->chunk_id == 3 ) {
1029                 MCE->exit if ++$retries1 <= 2;
1030              }
1031              if ( MCE->chunk_id == 5 ) {
1032                 MCE->exit if ++$retries2 <= 3;
1033              }
1034              MCE::relay {
1035                 $_ += 1;
1036                 print MCE->chunk_id, "\n";
1037              };
1038           }
1039        )->run;
1040
1041        print "final: ", MCE::relay_final(), "\n";
1042
1043        -- Output
1044
1045        1
1046        2
1047        Error: chunk 3 failed, retrying chunk attempt # 1
1048        Error: chunk 5 failed, retrying chunk attempt # 1
1049        Error: chunk 3 failed, retrying chunk attempt # 2
1050        Error: chunk 5 failed, retrying chunk attempt # 2
1051        3
1052        4
1053        Error: chunk 5 failed
1054        6
1055        7
1056        final: 6
1057
1058   SYNTAX for USER_BEGIN and USER_END
1059       The user_begin and user_end options, if specified, behave similarly to
1060       awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
1061       called once per worker during each run.
1062
1063       MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
1064
1065        sub user_begin {                   # Called once at the beginning
1066           my ($mce, $task_id, $task_name) = @_;
1067           $mce->{wk_total_rows} = 0;
1068        }
1069
1070        sub user_func {                    # Called while processing
1071           my $mce = shift;
1072           $mce->{wk_total_rows} += 1;
1073        }
1074
1075        sub user_end {                     # Called once at the end
1076           my ($mce, $task_id, $task_name) = @_;
1077           printf "## %d: Processed %d rows\n",
1078              MCE->wid, $mce->{wk_total_rows};
1079        }
1080
1081        my $mce = MCE->new(
1082           user_begin => \&user_begin,
1083           user_func  => \&user_func,
1084           user_end   => \&user_end
1085        );
1086
1087        $mce->run;
1088
1089   SYNTAX for USER_FUNC with USE_SLURPIO => 0
1090       When processing input data, MCE can pass an array of rows or a slurped
1091       chunk.  Below, a reference to an array containing the chunk data is
1092       processed.
1093
1094       e.g. $chunk_ref = [ record1, record2, record3, ... ]
1095
1096        sub user_func {
1097
1098           my ($mce, $chunk_ref, $chunk_id) = @_;
1099
1100           foreach my $row ( @{ $chunk_ref } ) {
1101              $mce->{wk_total_rows} += 1;
1102              print $row;
1103           }
1104        }
1105
1106        my $mce = MCE->new(
1107           chunk_size  => 100,
1108           input_data  => "/path/to/file",
1109           user_func   => \&user_func,
1110           use_slurpio => 0
1111        );
1112
1113        $mce->run;
1114
1115   SYNTAX for USER_FUNC with USE_SLURPIO => 1
1116       Here, a reference to a scalar containing the raw chunk data is
1117       processed.
1118
1119        sub user_func {
1120
1121           my ($mce, $chunk_ref, $chunk_id) = @_;
1122
1123           my $count = () = $$chunk_ref =~ /abc/;
1124        }
1125
1126        my $mce = MCE->new(
1127           chunk_size  => 16000,
1128           input_data  => "/path/to/file",
1129           user_func   => \&user_func,
1130           use_slurpio => 1
1131        );
1132
1133        $mce->run;
1134
1135   SYNTAX for USER_ERROR and USER_OUTPUT
1136       Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1137       and MCE->say can be intercepted by specifying the user_error and
1138       user_output options. MCE on receiving output will forward to user_error
1139       or user_output in a serialized fashion.
1140
1141       Handy when wanting to filter, modify, and/or direct the output
1142       elsewhere.
1143
1144        sub user_error {                   # Redirect STDERR to STDOUT
1145           my $error = shift;
1146           print {*STDOUT} $error;
1147        }
1148
1149        sub user_output {                  # Redirect STDOUT to STDERR
1150           my $output = shift;
1151           print {*STDERR} $output;
1152        }
1153
1154        sub user_func {
1155           my ($mce, $chunk_ref, $chunk_id) = @_;
1156           my $count = 0;
1157
1158           foreach my $row ( @{ $chunk_ref } ) {
1159              MCE->print($row);
1160              $count += 1;
1161           }
1162
1163           MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1164        }
1165
1166        my $mce = MCE->new(
1167           chunk_size  => 1000,
1168           input_data  => "/path/to/file",
1169           user_error  => \&user_error,
1170           user_output => \&user_output,
1171           user_func   => \&user_func
1172        );
1173
1174        $mce->run;
1175
1176   SYNTAX for USER_TASKS and TASK_END
1177       This option takes an array of tasks. Each task allows up to 17 options.
1178       The init_relay, input_data, RS, and use_slurpio options may be defined
1179       inside the first task or at the top level, otherwise ignored under
1180       other sub-tasks.
1181
1182        max_workers, chunk_size, input_data, interval, sequence,
1183        bounds_only, user_args, user_begin, user_end, user_func,
1184        gather, task_end, task_name, use_slurpio, use_threads,
1185        init_relay, RS
1186
1187       Sequence and chunk_size were added in 1.3. User_args was introduced in
1188       1.4.  Name and input_data are new options allowed in 1.5. In addition,
1189       one can specify task_end at the top level. Task_end also receives 2
1190       additional arguments $task_id and $task_name (shown below).
1191
1192       Options not specified here will default to the same option specified at
1193       the top level. The task_end option is called by the manager process
1194       when all workers for that sub-task have completed processing.
1195
1196       Forking and threading can be intermixed among tasks unless running
1197       Cygwin.  The run method will continue running until all workers have
1198       completed processing.
1199
1200        use threads;
1201        use threads::shared;
1202
1203        use MCE;
1204
1205        sub parallel_task1 { sleep 2; }
1206        sub parallel_task2 { sleep 1; }
1207
1208        my $mce = MCE->new(
1209
1210           task_end => sub {
1211              my ($mce, $task_id, $task_name) = @_;
1212              print "Task [$task_id -- $task_name] completed processing\n";
1213           },
1214
1215           user_tasks => [{
1216              task_name   => 'foo',
1217              max_workers => 2,
1218              user_func   => \&parallel_task1,
1219              use_threads => 0             # Not using threads
1220
1221           },{
1222              task_name   => 'bar',
1223              max_workers => 4,
1224              user_func   => \&parallel_task2,
1225              use_threads => 1             # Yes, threads
1226
1227           }]
1228        );
1229
1230        $mce->run;
1231
1232        -- Output
1233
1234        Task [1 -- bar] completed processing
1235        Task [0 -- foo] completed processing
1236

DEFAULT INPUT SCALAR

1238       Beginning with MCE 1.5, the input scalar $_ is localized prior to
1239       calling user_func for input_data and sequence of numbers. The following
1240       applies.
1241
1242       use_slurpio => 1
1243           $_ is a reference to the buffer e.g. $_ = \$_buffer;
1244           $_ is a reference regardless of whether chunk_size is 1 or greater
1245
1246           user_func => sub {
1247            # my ($mce, $chunk_ref, $chunk_id) = @_;
1248              print ${ $_ };    # $_ is same as $chunk_ref
1249           }
1250
1251       chunk_size is greater than 1, use_slurpio => 0
1252           $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1253           $_ is same as $chunk_ref or $_[CHUNK]
1254
1255           user_func => sub {
1256            # my ($mce, $chunk_ref, $chunk_id) = @_;
1257              for my $row ( @{ $_ } ) {
1258                 print $row, "\n";
1259              }
1260           }
1261
1262           use MCE const => 1;
1263
1264           user_func => sub {
1265            # my ($mce, $chunk_ref, $chunk_id) = @_;
1266              for my $row ( @{ $_[CHUNK] } ) {
1267                 print $row, "\n";
1268              }
1269           }
1270
1271       chunk_size equals 1, use_slurpio => 0
1272           $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1273
1274           # Note that $_ and $chunk_ref are not the same below.
1275           # $chunk_ref is a reference to an array.
1276
1277           user_func => sub {
1278            # my ($mce, $chunk_ref, $chunk_id) = @_;
1279              print $_, "\n;    # Same as $chunk_ref->[0];
1280           }
1281
1282           $mce->foreach("/path/to/file", sub {
1283            # my ($mce, $chunk_ref, $chunk_id) = @_;
1284              print $_;         # Same as $chunk_ref->[0];
1285           });
1286
1287           # However, that is not the case for the forseq method.
1288           # Both $_ and $n_seq are the same when chunk_size => 1.
1289
1290           $mce->forseq([ 1, 9 ], sub {
1291            # my ($mce, $n_seq, $chunk_id) = @_;
1292              print $_, "\n";   # Same as $n_seq
1293           });
1294
1295          Sequence can also be specified using an array reference. The below
1296          is the same as the example afterwards.
1297
1298           $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1299
1300          The code block receives an array containing the next 5 sequences.
1301          Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1302          to an array, same as $_, due to chunk_size being greater than 1.
1303
1304           $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1305            # my ($mce, $n_seq, $chunk_id) = @_;
1306              my @result;
1307              for my $n ( @{ $_ } ) {
1308                 ... do work, append to result for 5
1309              }
1310              ... do something with result afterwards
1311           });
1312

METHODS for the MANAGER PROCESS and WORKERS

1314       The methods listed below are callable by the main process and workers.
1315
1316   MCE->abort ( void )
1317   $mce->abort ( void )
1318       The 'abort' method is applicable when processing input_data only. This
1319       causes all workers to abort after processing the current chunk.
1320
1321       Workers write the next offset position to the queue socket for the next
1322       available worker. In essence, the 'abort' method writes the last offset
1323       position. Workers, on requesting the next offset position, will think
1324       the end of input_data has been reached and leave the chunking loop.
1325
1326        MCE->abort;
1327        $mce->abort;
1328
1329   MCE->chunk_id ( void )
1330   $mce->chunk_id ( void )
1331       Returns the chunk_id for the current chunk. The value starts at 1.
1332       Chunking applies to input_data or sequence. The value is 0 for the
1333       manager process.
1334
1335        my $chunk_id = MCE->chunk_id;
1336        my $chunk_id = $mce->chunk_id;
1337
1338   MCE->chunk_size ( void )
1339   $mce->chunk_size ( void )
1340       Getter method for chunk_size used by MCE.
1341
1342        my $chunk_size = MCE->chunk_size;
1343        my $chunk_size = $mce->chunk_size;
1344
1345   MCE->do ( 'callback_func' [, $arg1, ... ] )
1346   $mce->do ( 'callback_func' [, $arg1, ... ] )
1347       MCE serializes data transfers from a worker process via helper
1348       functions do & sendto to the manager process. The callback function can
1349       optionally return a reply. Support for calling by the manager process
1350       was enabled in MCE 1.839.
1351
1352        [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1353
1354       Passing args to a callback function using references & scalar.
1355
1356        sub callback {
1357           my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1358           ...
1359        }
1360
1361        MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1362        MCE->do('callback', \@a, \%h, \$s, 'foo');
1363
1364       MCE knows if wanting a void, list, hash, or a scalar return value.
1365
1366        MCE->do('callback' [, $arg1, ... ]);
1367
1368        my @array  = MCE->do('callback' [, $arg1, ... ]);
1369        my %hash   = MCE->do('callback' [, $arg1, ... ]);
1370        my $scalar = MCE->do('callback' [, $arg1, ... ]);
1371
1372   MCE->freeze ( $object_ref )
1373   $mce->freeze ( $object_ref )
1374       Calls the internal freeze method to serialize an object. The default
1375       serialization routines are handled by Sereal if available or Storable.
1376
1377        my $frozen = MCE->freeze([ 0, 2, 4 ]);
1378        my $frozen = $mce->freeze([ 0, 2, 4 ]);
1379
1380   MCE->max_retries ( void )
1381   $mce->max_retries ( void )
1382       Getter method for max_retries used by MCE.
1383
1384        my $max_retries = MCE->max_retries;
1385        my $max_retries = $mce->max_retries;
1386
1387   MCE->max_workers ( void )
1388   $mce->max_workers ( void )
1389       Getter method for max_workers used by MCE.
1390
1391        my $max_workers = MCE->max_workers;
1392        my $max_workers = $mce->max_workers;
1393
1394   MCE->pid ( void )
1395   $mce->pid ( void )
1396       Returns the Process ID. Threads have thread ID attached to the value.
1397
1398        my $pid = MCE->pid;    # 16180 (pid) ; 16180.2 (pid.tid)
1399        my $pid = $mce->pid;
1400
1401   MCE->printf ( $format, $list [, ... ] )
1402   MCE->print ( $list [, ... ] )
1403   MCE->say ( $list [, ... ] )
1404   $mce->printf ( $format, $list [, ... ] )
1405   $mce->print ( $list [, ... ] )
1406   $mce->say ( $list [, ... ] )
1407       Use the printf, print, and say methods when wanting to serialize output
1408       among workers and the manager process. These are sugar syntax for the
1409       sendto method.  These behave similar to the native subroutines in Perl
1410       with the exception that barewords must be passed as a reference and
1411       require the comma after it including file handles.
1412
1413       Say is like print, but implicitly appends a newline.
1414
1415        MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1416        MCE->printf($fh, "%s: %d\n", $name, $age);
1417        MCE->printf("%s: %d\n", $name, $age);
1418
1419        MCE->print(\*STDERR, "$error_msg\n");
1420        MCE->print($fh, $log_msg."\n");
1421        MCE->print("$output_msg\n");
1422
1423        MCE->say(\*STDERR, $error_msg);
1424        MCE->say($fh, $log_msg);
1425        MCE->say($output_msg);
1426
1427       Caveat: Use the following syntax when passing a reference not a glob or
1428       file handle. Otherwise, MCE will error indicating the first argument is
1429       not a glob reference.
1430
1431        MCE->print(\*STDOUT, \@array, "\n");
1432        MCE->print("", \@array, "\n");         # ok
1433
1434       Sending to "IO::All" { File, Pipe, STDIO } is supported since MCE
1435       1.845.
1436
1437        use IO::All;
1438
1439        my $out = io->stdout;
1440        my $err = io->stderr;
1441
1442        MCE->printf($out, "%s\n", "sent to stdout");
1443        MCE->printf($err, "%s\n", "sent to stderr");
1444
1445        MCE->print($out, "sent to stdout\n");
1446        MCE->print($err, "sent to stderr\n");
1447
1448        MCE->say($out, "sent to stdout");
1449        MCE->say($err, "sent to stderr");
1450
1451   MCE->sess_dir ( void )
1452   $mce->sess_dir ( void )
1453       Returns the session directory used by the MCE instance. This is defined
1454       during spawning and removed during shutdown.
1455
1456        my $sess_dir = MCE->sess_dir;
1457        my $sess_dir = $mce->sess_dir;
1458
1459   MCE->task_id ( void )
1460   $mce->task_id ( void )
1461       Returns the task ID. This applies to the user_tasks option (starts at
1462       0).
1463
1464        my $task_id = MCE->task_id;
1465        my $task_id = $mce->task_id;
1466
1467   MCE->task_name ( void )
1468   $mce->task_name ( void )
1469       Returns the task_name value specified via the task_name option when
1470       configuring MCE.
1471
1472        my $task_name = MCE->task_name;
1473        my $task_name = $mce->task_name;
1474
1475   MCE->task_wid ( void )
1476   $mce->task_wid ( void )
1477       Returns the task worker ID (applies to user_tasks). The value starts at
1478       1 per each task configured within user_tasks. The value is 0 for the
1479       manager process.
1480
1481        my $task_wid = MCE->task_wid;
1482        my $task_wid = $mce->task_wid;
1483
1484   MCE->thaw ( $frozen )
1485   $mce->thaw ( $frozen )
1486       Calls the internal thaw method to un-serialize the frozen object.
1487
1488        my $object_ref = MCE->thaw($frozen);
1489        my $object_ref = $mce->thaw($frozen);
1490
1491   MCE->tmp_dir ( void )
1492   $mce->tmp_dir ( void )
1493       Returns the temporary directory used by MCE.
1494
1495        my $tmp_dir = MCE->tmp_dir;
1496        my $tmp_dir = $mce->tmp_dir;
1497
1498   MCE->user_args ( void )
1499   $mce->user_args ( void )
1500       Returns the arguments specified via the user_args option.
1501
1502        my ($arg1, $arg2, $arg3) = MCE->user_args;
1503        my ($arg1, $arg2, $arg3) = $mce->user_args;
1504
1505   MCE->wid ( void )
1506   $mce->wid ( void )
1507       Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1508       is 0 for the manager process.
1509
1510        my $wid = MCE->wid;
1511        my $wid = $mce->wid;
1512

METHODS for the MANAGER PROCESS only

1514       Methods listed below are callable by the main process only.
1515
1516   MCE->forchunk ( $input_data [, { options } ], sub { ... } )
1517   MCE->foreach ( $input_data [, { options } ], sub { ... } )
1518   MCE->forseq ( $sequence_spec [, { options } ], sub { ... } )
1519   $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1520   $mce->foreach ( $input_data [, { options } ], sub { ... } )
1521   $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1522       Forchunk, foreach, and forseq are sugar methods and described in
1523       MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1524
1525   MCE->process ( $input_data [, { options } ] )
1526   $mce->process ( $input_data [, { options } ] )
1527       The process method will spawn workers automatically if not already
1528       spawned.  It will set input_data => $input_data. It calls run(0) to not
1529       auto-shutdown workers. Specifying options is optional.
1530
1531       Allowable options { key => value, ... } are:
1532
1533        chunk_size input_data job_delay spawn_delay submit_delay
1534        flush_file flush_stderr flush_stdout stderr_file stdout_file
1535        on_post_exit on_post_run sequence user_args user_begin user_end
1536        user_func user_error user_output use_slurpio RS
1537
1538       Options remain persistent going forward unless changed. Setting
1539       user_begin, user_end, or user_func will cause already spawned workers
1540       to shut down and re-spawn automatically. Therefore, define these during
1541       instantiation.
1542
1543       The below will cause workers to re-spawn after running.
1544
1545        my $mce = MCE->new( max_workers => 'auto' );
1546
1547        $mce->process( {
1548           user_begin => sub { # connect to DB },
1549           user_func  => sub { # process each row },
1550           user_end   => sub { # close handle to DB },
1551        }, \@input_data );
1552
1553        $mce->process( {
1554           user_begin => sub { # connect to DB },
1555           user_func  => sub { # process each file },
1556           user_end   => sub { # close handle to DB },
1557        }, "/list/of/files" );
1558
1559       Do the following if wanting workers to persist between jobs.
1560
1561        use MCE max_workers => 'auto';
1562
1563        my $mce = MCE->new(
1564           user_begin => sub { # connect to DB },
1565           user_func  => sub { # process each chunk or row or host },
1566           user_end   => sub { # close handle to DB },
1567        );
1568
1569        $mce->spawn;           # Spawn early if desired
1570
1571        $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1572        $mce->process(\@array_of_files_to_grep);
1573        $mce->process("/path/to/host/list");
1574
1575        $mce->process($array_ref);
1576        $mce->process($array_ref, { stdout_file => $output_file });
1577
1578        # This was not allowed before. Fixed in 1.415.
1579        $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1580        $mce->process({ sequence => [ 10, 90, 2 ] });
1581
1582        $mce->shutdown;
1583
1584   MCE->relay_final ( void )
1585   $mce->relay_final ( void )
1586       The relay methods are described in MCE::Relay. Relay capabilities are
1587       enabled by specifying the "init_relay" MCE option.
1588
1589   MCE->restart_worker ( void )
1590   $mce->restart_worker ( void )
1591       One can restart a worker who has died or exited. The job never ends
1592       below due to restarting each time. Recommended is to call MCE->exit or
1593       $mce->exit instead of the native exit function for better handling,
1594       especially under the Windows environment.
1595
1596       The $e->{wid} argument is no longer necessary starting with the 1.5
1597       release.
1598
1599       Press [ctrl-c] to terminate the script.
1600
1601        my $mce = MCE->new(
1602
1603           on_post_exit => sub {
1604              my ($mce, $e) = @_;
1605              print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1606            # $mce->restart_worker($e->{wid});    # MCE-1.415 and below
1607              $mce->restart_worker;               # MCE-1.500 and above
1608           },
1609
1610           user_begin => sub {
1611              my ($mce, $task_id, $task_name) = @_;
1612              # Not interested in die messages going to STDERR,
1613              # because the die handler calls MCE->exit(255, $_[0]).
1614              close STDERR;
1615           },
1616
1617           user_tasks => [{
1618              max_workers => 5,
1619              user_func => sub {
1620                 my ($mce) = @_; sleep MCE->wid;
1621                 MCE->exit(3, "exited from " . MCE->wid . "\n");
1622              }
1623           },{
1624              max_workers => 4,
1625              user_func => sub {
1626                 my ($mce) = @_; sleep MCE->wid;
1627                 die("died from " . MCE->wid . "\n");
1628              }
1629           }]
1630        );
1631
1632        $mce->run;
1633
1634        -- Output
1635
1636        1: PID_85388: status 3: exited from 1
1637        2: PID_85389: status 3: exited from 2
1638        1: PID_85397: status 3: exited from 1
1639        3: PID_85390: status 3: exited from 3
1640        1: PID_85399: status 3: exited from 1
1641        4: PID_85391: status 3: exited from 4
1642        2: PID_85398: status 3: exited from 2
1643        1: PID_85401: status 3: exited from 1
1644        5: PID_85392: status 3: exited from 5
1645        1: PID_85404: status 3: exited from 1
1646        6: PID_85393: status 255: died from 6
1647        3: PID_85400: status 3: exited from 3
1648        2: PID_85403: status 3: exited from 2
1649        1: PID_85406: status 3: exited from 1
1650        7: PID_85394: status 255: died from 7
1651        1: PID_85410: status 3: exited from 1
1652        8: PID_85395: status 255: died from 8
1653        4: PID_85402: status 3: exited from 4
1654        2: PID_85409: status 3: exited from 2
1655        1: PID_85412: status 3: exited from 1
1656        9: PID_85396: status 255: died from 9
1657        3: PID_85408: status 3: exited from 3
1658        1: PID_85416: status 3: exited from 1
1659
1660        ...
1661
1662   MCE->run ( [ $auto_shutdown [, { options } ] ] )
1663   $mce->run ( [ $auto_shutdown [, { options } ] ] )
1664       The run method, by default, spawns workers, processes once, and shuts
1665       down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1666       persist after running (default 1).
1667
1668       Specifying options is optional. Valid options are the same as for the
1669       process method.
1670
1671        my $mce = MCE->new( ... );
1672
1673        # Disables auto-shutdown
1674        $mce->run(0);
1675
1676   MCE->send ( $data_ref )
1677   $mce->send ( $data_ref )
1678       The 'send' method is useful when wanting to spawn workers early to
1679       minimize memory consumption and afterwards send data individually to
1680       each worker. One cannot send more than the total workers spawned.
1681       Workers store the received data as $mce->{user_data}.
1682
1683       The data which can be sent is restricted to an ARRAY, HASH, or PDL
1684       reference.  Workers begin processing immediately after receiving data.
1685       Workers set $mce->{user_data} to undef after processing. One cannot
1686       specify input_data, sequence, or user_tasks when using the "send"
1687       method.
1688
1689       Passing any options e.g. run(0, { options }) is ignored due to workers
1690       running immediately after receiving user data. There is no guarantee to
1691       which worker will receive data first. It depends on which worker is
1692       available awaiting data.
1693
1694        use MCE;
1695
1696        my $mce = MCE->new(
1697           max_workers => 5,
1698
1699           user_func => sub {
1700              my ($mce) = @_;
1701              my $data = $mce->{user_data};
1702              my $first_name = $data->{first_name};
1703              print MCE->wid, ": Hello from $first_name\n";
1704           }
1705        );
1706
1707        $mce->spawn;     # Optional, send will spawn if necessary.
1708
1709        $mce->send( { first_name => "Theresa" } );
1710        $mce->send( { first_name => "Francis" } );
1711        $mce->send( { first_name => "Padre"   } );
1712        $mce->send( { first_name => "Anthony" } );
1713
1714        $mce->run;       # Wait for workers to complete processing.
1715
1716        -- Output
1717
1718        2: Hello from Theresa
1719        5: Hello from Anthony
1720        3: Hello from Francis
1721        4: Hello from Padre
1722
1723   MCE->shutdown ( void )
1724   $mce->shutdown ( void )
1725       The run method will automatically spawn workers, run once, and shutdown
1726       workers automatically. Workers persist after running below. Shutdown
1727       may be called as needed or prior to exiting.
1728
1729        my $mce = MCE->new( ... );
1730
1731        $mce->spawn;
1732
1733        $mce->process(\@input_data_1);         # Processing multiple arrays
1734        $mce->process(\@input_data_2);
1735        $mce->process(\@input_data_n);
1736
1737        $mce->shutdown;
1738
1739        $mce->process('input_file_1');         # Processing multiple files
1740        $mce->process('input_file_2');
1741        $mce->process('input_file_n');
1742
1743        $mce->shutdown;
1744
1745   MCE->spawn ( void )
1746   $mce->spawn ( void )
1747       Workers are normally spawned automatically. The spawn method allows one
1748       to spawn workers early if so desired.
1749
1750        my $mce = MCE->new( ... );
1751
1752        $mce->spawn;
1753
1754   MCE->status ( void )
1755   $mce->status ( void )
1756       The greatest exit status is saved among workers while running. Look at
1757       the on_post_exit or on_post_run options for callback support.
1758
1759        my $mce = MCE->new( ... );
1760
1761        $mce->run;
1762
1763        my $exit_status = $mce->status;
1764

METHODS for WORKERS only

1766       Methods listed below are callable by workers only.
1767
1768   MCE->exit ( [ $status [, $message [, $id ] ] ] )
1769   $mce->exit ( [ $status [, $message [, $id ] ] ] )
1770       A worker exits from MCE entirely. $id (optional) can be used for
1771       passing the primary key or a string along with the message. Look at the
1772       on_post_exit or on_post_run options for callback support.
1773
1774        MCE->exit;           # default 0
1775        MCE->exit(1);
1776        MCE->exit(2, 'chunk failed', $chunk_id);
1777        MCE->exit(0, 'msg_foo', 'id_1000');
1778
1779   MCE->gather ( $arg1, [, $arg2, ... ] )
1780   $mce->gather ( $arg1, [, $arg2, ... ] )
1781       A worker can submit data to the location specified via the gather
1782       option by calling this method. See MCE::Flow and MCE::Loop for
1783       additional use-case.
1784
1785        use MCE;
1786
1787        my @hosts = qw(
1788           hosta hostb hostc hostd hoste
1789        );
1790
1791        my $mce = MCE->new(
1792           chunk_size => 1, max_workers => 3,
1793
1794           user_func => sub {
1795            # my ($mce, $chunk_ref, $chunk_id) = @_;
1796              my ($output, $error, $status); my $host = $_;
1797
1798              # Do something with $host;
1799              $output = "Worker ". MCE->wid .": Hello from $host";
1800
1801              if (MCE->chunk_id % 3 == 0) {
1802                 # Simulating an error condition
1803                 local $? = 1; $status = $?;
1804                 $error = "Error from $host"
1805              }
1806              else {
1807                 $status = 0;
1808              }
1809
1810              # Ensure unique keys (key, value) when gathering to a
1811              # hash.
1812              MCE->gather("$host.out", $output, "$host.sta", $status);
1813              MCE->gather("$host.err", $error) if (defined $error);
1814           }
1815        );
1816
1817        my %h; $mce->process(\@hosts, { gather => \%h });
1818
1819        foreach my $host (@hosts) {
1820           print $h{"$host.out"}, "\n";
1821           print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1822           print "Exit status: ", $h{"$host.sta"}, "\n\n";
1823        }
1824
1825        -- Output
1826
1827        Worker 2: Hello from hosta
1828        Exit status: 0
1829
1830        Worker 1: Hello from hostb
1831        Exit status: 0
1832
1833        Worker 3: Hello from hostc
1834        Error from hostc
1835        Exit status: 1
1836
1837        Worker 2: Hello from hostd
1838        Exit status: 0
1839
1840        Worker 1: Hello from hoste
1841        Exit status: 0
1842
1843   MCE->last ( void )
1844   $mce->last ( void )
1845       Worker leaves the chunking loop or user_func block immediately.
1846       Callable from inside foreach, forchunk, forseq, and user_func.
1847
1848        use MCE;
1849
1850        my $mce = MCE->new(
1851           max_workers => 5
1852        );
1853
1854        my @list = (1 .. 80);
1855
1856        $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1857
1858           my ($mce, $chunk_ref, $chunk_id) = @_;
1859           MCE->last if ($chunk_id > 4);
1860
1861           my @output = ();
1862
1863           foreach my $rec ( @{ $chunk_ref } ) {
1864              push @output, $rec, "\n";
1865           }
1866
1867           MCE->print(@output);
1868        });
1869
1870        -- Output (each chunk above consists of 2 elements)
1871
1872        3
1873        4
1874        1
1875        2
1876        7
1877        8
1878        5
1879        6
1880
1881   MCE->next ( void )
1882   $mce->next ( void )
1883       Worker starts the next iteration of the chunking loop. Callable from
1884       inside foreach, forchunk, forseq, and user_func.
1885
1886        use MCE;
1887
1888        my $mce = MCE->new(
1889           max_workers => 5
1890        );
1891
1892        my @list = (1 .. 80);
1893
1894        $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1895
1896           my ($mce, $chunk_ref, $chunk_id) = @_;
1897           MCE->next if ($chunk_id < 20);
1898
1899           my @output = ();
1900
1901           foreach my $rec ( @{ $chunk_ref } ) {
1902              push @output, $rec, "\n";
1903           }
1904
1905           MCE->print(@output);
1906        });
1907
1908        -- Output (each chunk above consists of 4 elements)
1909
1910        77
1911        78
1912        79
1913        80
1914
1915   MCE::relay { code }
1916   MCE->relay ( sub { code } )
1917   MCE->relay_recv ( void )
1918   $mce->relay ( sub { code } )
1919   $mce->relay_recv ( void )
1920       The relay methods are described in MCE::Relay. Relay capabilities are
1921       enabled by specifying the "init_relay" MCE option.
1922
1923   MCE->sendto ( $to, $arg1, ... )
1924   $mce->sendto ( $to, $arg1, ... )
1925       The sendto method is called by workers for serializing data to standard
1926       output, standard error, or end of file. The action is done by the
1927       manager process.
1928
1929       Release 1.00x supported 1 data argument, not more.
1930
1931        MCE->sendto('file', \@array, '/path/to/file');
1932        MCE->sendto('file', \$scalar, '/path/to/file');
1933        MCE->sendto('file', $scalar, '/path/to/file');
1934
1935        MCE->sendto('STDERR', \@array);
1936        MCE->sendto('STDERR', \$scalar);
1937        MCE->sendto('STDERR', $scalar);
1938
1939        MCE->sendto('STDOUT', \@array);
1940        MCE->sendto('STDOUT', \$scalar);
1941        MCE->sendto('STDOUT', $scalar);
1942
1943       Release 1.100 added the ability to pass multiple arguments. Notice the
1944       syntax change for sending to a file. Passing a reference to an array is
1945       no longer necessary.
1946
1947        MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1948        MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1949        MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1950
1951        MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1952
1953       To retain 1.00x compatibility, sendto outputs the content when a single
1954       data reference is specified. Otherwise, the reference for \@array or
1955       \$scalar is shown in 1.500, not the content.
1956
1957        MCE->sendto('STDERR', \@array);        # 1.00x behavior, content
1958        MCE->sendto('STDOUT', \$scalar);
1959        MCE->sendto('file:/path/to/file', \@array);
1960
1961        # Output matches the print statement
1962
1963        MCE->sendto(\*STDERR, \@array);        # 1.500 behavior, reference
1964        MCE->sendto(\*STDOUT, \$scalar);
1965        MCE->sendto($fh, \@array);
1966
1967        MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1968        print {*STDOUT} \@array, "\n", \$scalar, "\n";
1969
1970       MCE 1.500 added support for sending to a glob reference, file
1971       descriptor, and file handle.
1972
1973        MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1974        MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1975        MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1976
1977   MCE->sync ( void )
1978   $mce->sync ( void )
1979       A barrier sync operation means any worker must stop at this point until
1980       all workers reach this barrier. Barrier syncing is useful for many
1981       computer algorithms.
1982
1983       Barrier synchronization is supported for task 0 only or omitting
1984       user_tasks.  All workers assigned task_id 0 must call sync whenever
1985       barrier syncing.
1986
1987        use MCE;
1988
1989        sub user_func {
1990
1991           my ($mce) = @_;
1992           my $wid = MCE->wid;
1993
1994           MCE->sendto("STDOUT", "a: $wid\n");   # MCE 1.0+
1995           MCE->sync;
1996
1997           MCE->sendto(\*STDOUT, "b: $wid\n");   # MCE 1.5+
1998           MCE->sync;
1999
2000           MCE->print("c: $wid\n");              # MCE 1.5+
2001           MCE->sync;
2002
2003           return;
2004        }
2005
2006        my $mce = MCE->new(
2007           max_workers => 4, user_func => \&user_func
2008        )->run;
2009
2010        -- Output (without barrier synchronization)
2011
2012        a: 1
2013        a: 2
2014        b: 1
2015        b: 2
2016        c: 1
2017        c: 2
2018        a: 3
2019        b: 3
2020        c: 3
2021        a: 4
2022        b: 4
2023        c: 4
2024
2025        -- Output (with barrier synchronization)
2026
2027        a: 1
2028        a: 2
2029        a: 4
2030        a: 3
2031        b: 2
2032        b: 1
2033        b: 3
2034        b: 4
2035        c: 1
2036        c: 4
2037        c: 2
2038        c: 3
2039
2040       Consider the following example. The MCE->sync operation is done inside
2041       a loop along with MCE->do. A stall may occur for workers calling sync
2042       the 2nd or 3rd time while other workers are sending results via MCE->do
2043       or MCE->sendto.
2044
2045       It requires another semaphore lock in MCE to solve this which was not
2046       done in order to keep resources low. Therefore, please keep this in
2047       mind when mixing MCE->sync with MCE->do or output serialization methods
2048       inside a loop.
2049
2050        sub user_func {
2051
2052           my ($mce) = @_;
2053           my @result;
2054
2055           for (1 .. 3) {
2056              ... compute algorithm ...
2057
2058              MCE->sync;
2059
2060              ... compute algorithm ...
2061
2062              MCE->sync;
2063
2064              MCE->do('aggregate_result', \@result);  # or MCE->sendto
2065
2066              MCE->sync;      # The sync operation is also needed here to
2067                              # prevent MCE from stalling.
2068           }
2069        }
2070
2071   MCE->yield ( void )
2072   $mce->yield ( void )
2073       There may be on occasion when the MCE driven app is too fast. The
2074       interval option combined with the yield method, both introduced with
2075       MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
2076       the design.
2077
2078       A use case is an app configured with 100 workers running on a 24
2079       logical way box. Data is polled from a database containing over 2.5
2080       million rows. Workers chunk away at 300 rows per chunk performing SNMP
2081       gets (300 sockets per worker) polling 25 metrics from each device. With
2082       this scenario, the load on the box may rise beyond 90+. In addition,
2083       IP_Tables may reach its contention point causing the entire application
2084       to fail.
2085
2086       The scenario above is solved by simply having workers yield among
2087       themselves in a synchronized fashion. A delay of 0.007 seconds between
2088       intervals is all that's needed. The load on the box will hover between
2089       23 ~ 27 for the duration of the run. Polling completes in under 17
2090       minutes time. This is quite fast considering the app polls 62.5 million
2091       metrics combined. The math equates to 3,676,470 per minute or rather
2092       61,275 per second from a single box.
2093
2094        # Both max_nodes and node_id are optional (default 1).
2095
2096        interval => {
2097           delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
2098        }
2099
2100       A 4 node setup can poll 10 million devices without the additional
2101       overhead of a distribution agent. The difference between the 4 nodes
2102       are simply node_id and the where clause used to query the database. The
2103       mac addresses are random such that the data divides equally to any
2104       power of 2. The distribution key lies in the mac address itself. In
2105       fact, the 2nd character from the right is sufficient for maximizing on
2106       the power of randomness for equal distribution.
2107
2108        Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
2109        Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
2110        Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
2111        Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
2112
2113       Below, the user_tasks is configured to simulate 4 nodes. This
2114       demonstration uses 2 workers to minimize the output size. Input is from
2115       the sequence option.
2116
2117        use Time::HiRes qw(time);
2118        use MCE;
2119
2120        my $d = shift || 0.1;
2121
2122        local $| = 1;
2123
2124        sub create_task {
2125
2126           my ($node_id) = @_;
2127
2128           my $seq_size  = 6;
2129           my $seq_start = ($node_id - 1) * $seq_size + 1;
2130           my $seq_end   = $seq_start + $seq_size - 1;
2131
2132           return {
2133              max_workers => 2, sequence => [ $seq_start, $seq_end ],
2134              interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2135           };
2136        }
2137
2138        sub user_begin {
2139
2140           my ($mce, $task_id, $task_name) = @_;
2141
2142           # The yield method causes this worker to wait for its next time
2143           # interval slot before running. Yield has no effect without the
2144           # 'interval' option.
2145
2146           # Yielding is beneficial inside a user_begin block. A use case
2147           # is staggering database connections among workers in order
2148           # to not impact the DB server.
2149
2150           MCE->yield;
2151
2152           MCE->printf(
2153              "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2154              MCE->task_id + 1, time, MCE->task_wid, ''
2155           );
2156
2157           return;
2158        }
2159
2160        {
2161           my $prev_time = time;
2162
2163           sub user_func {
2164
2165              my ($mce, $seq_n, $chunk_id) = @_;
2166
2167              # Yield simply waits for the next time interval.
2168              MCE->yield;
2169
2170              # Calculate how long this worker has waited.
2171              my $curr_time = time;
2172              my $time_waited = $curr_time - $prev_time;
2173
2174              $prev_time = $curr_time;
2175
2176              MCE->printf(
2177                 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2178                 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2179              );
2180
2181              return;
2182           }
2183        }
2184
2185        # Simulate a 4 node environment passing node_id to create_task.
2186
2187        print "Node_ID  Current_Time        Worker_ID  Time_Waited     Comment\n";
2188
2189        MCE->new(
2190           user_begin => \&user_begin,
2191           user_func  => \&user_func,
2192
2193           user_tasks => [
2194              create_task(1),
2195              create_task(2),
2196              create_task(3),
2197              create_task(4)
2198           ]
2199
2200        )->run;
2201
2202        -- Output (notice Current_Time below, stays 0.10 apart)
2203
2204        Node_ID  Current_Time        Worker_ID  Time_Waited     Comment
2205        Node  1: 1374807976.74634 -- Worker  1:              -- Started
2206        Node  2: 1374807976.84634 -- Worker  1:              -- Started
2207        Node  3: 1374807976.94638 -- Worker  1:              -- Started
2208        Node  4: 1374807977.04639 -- Worker  1:              -- Started
2209        Node  1: 1374807977.14634 -- Worker  2:              -- Started
2210        Node  2: 1374807977.24640 -- Worker  2:              -- Started
2211        Node  3: 1374807977.34649 -- Worker  2:              -- Started
2212        Node  4: 1374807977.44657 -- Worker  2:              -- Started
2213        Node  1: 1374807977.54636 -- Worker  1:      0.90037 -- Seq_N   1
2214        Node  2: 1374807977.64638 -- Worker  1:      1.00040 -- Seq_N   7
2215        Node  3: 1374807977.74642 -- Worker  1:      1.10043 -- Seq_N  13
2216        Node  4: 1374807977.84643 -- Worker  1:      1.20045 -- Seq_N  19
2217        Node  1: 1374807977.94636 -- Worker  2:      1.30037 -- Seq_N   2
2218        Node  2: 1374807978.04638 -- Worker  2:      1.40040 -- Seq_N   8
2219        Node  3: 1374807978.14641 -- Worker  2:      1.50042 -- Seq_N  14
2220        Node  4: 1374807978.24644 -- Worker  2:      1.60045 -- Seq_N  20
2221        Node  1: 1374807978.34628 -- Worker  1:      0.79996 -- Seq_N   3
2222        Node  2: 1374807978.44631 -- Worker  1:      0.79996 -- Seq_N   9
2223        Node  3: 1374807978.54634 -- Worker  1:      0.79996 -- Seq_N  15
2224        Node  4: 1374807978.64636 -- Worker  1:      0.79997 -- Seq_N  21
2225        Node  1: 1374807978.74628 -- Worker  2:      0.79996 -- Seq_N   4
2226        Node  2: 1374807978.84632 -- Worker  2:      0.79997 -- Seq_N  10
2227        Node  3: 1374807978.94634 -- Worker  2:      0.79996 -- Seq_N  16
2228        Node  4: 1374807979.04636 -- Worker  2:      0.79996 -- Seq_N  22
2229        Node  1: 1374807979.14628 -- Worker  1:      0.80001 -- Seq_N   5
2230        Node  2: 1374807979.24631 -- Worker  1:      0.80000 -- Seq_N  11
2231        Node  3: 1374807979.34634 -- Worker  1:      0.80001 -- Seq_N  17
2232        Node  4: 1374807979.44636 -- Worker  1:      0.80000 -- Seq_N  23
2233        Node  1: 1374807979.54628 -- Worker  2:      0.80000 -- Seq_N   6
2234        Node  2: 1374807979.64631 -- Worker  2:      0.80000 -- Seq_N  12
2235        Node  3: 1374807979.74633 -- Worker  2:      0.80000 -- Seq_N  18
2236        Node  4: 1374807979.84636 -- Worker  2:      0.80000 -- Seq_N  24
2237
2238       The interval.pl example above is included with MCE.
2239

MCE PROGRESS DEMONSTRATIONS

2241       The "progress" option takes a code block for receiving info on the
2242       progress made while processing input data; e.g. "input_data" or
2243       "sequence". To make this work, one provides the "progress" option a
2244       closure block like so, passing along the size of the input_data; e.g
2245       "scalar @array" or "-s /path/to/file".
2246
2247       Current API available since 1.813.
2248
2249       A worker, upon completing processing its chunk, notifies the manager-
2250       process with the size of the chunk. That could be the number of rows or
2251       literally the size of the chunk when processing an input file. The
2252       manager-process accumulates the size before calling the code block
2253       associated with the "progress" option.
2254
2255       When running many tasks simultaneously, via "user_tasks", the call is
2256       initiated by workers at level 0 only or rather the first task, not
2257       shown here.
2258
2259        use Time::HiRes 'sleep';
2260        use MCE;
2261
2262        sub make_progress {
2263           my ($total_size) = @_;
2264           return sub {
2265              my ($completed_size) = @_;
2266              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2267           };
2268        }
2269
2270        my @input = (1..150);
2271
2272        MCE->new(
2273           chunk_size  => 10,
2274           max_workers => 4,
2275           input_data  => \@input,
2276           progress    => make_progress( scalar @input ),
2277           user_func   => sub { sleep 1.5 }
2278        )->run();
2279
2280        -- Output
2281
2282        6.7%
2283        13.3%
2284        20.0%
2285        26.7%
2286        33.3%
2287        40.0%
2288        46.7%
2289        53.3%
2290        60.0%
2291        66.7%
2292        73.3%
2293        80.0%
2294        86.7%
2295        93.3%
2296        100.0%
2297
2298       Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2299       thing, practically.
2300
2301        use Time::HiRes 'sleep';
2302        use ProgressBar::Stack;
2303        use MCE::Flow;
2304
2305        sub make_progress {
2306           my ($total_size) = @_;
2307           init_progress();
2308           return sub {
2309              my ($completed_size) = @_;
2310              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2311           };
2312        }
2313
2314        my @input = (1..150);
2315
2316        MCE::Flow->init(
2317           chunk_size  => 10,
2318           max_workers => 4,
2319           progress    => make_progress( scalar @input )
2320        );
2321
2322        MCE::Flow->run( sub { sleep 1.5 }, \@input );
2323        MCE::Flow->finish();
2324
2325        print "\n";
2326
2327        -- Output
2328
2329        [################    ]  80.0% ETA: 0:01
2330
2331       For sequence of numbers, using the "sequence" option, one must account
2332       for "step_size", typically set to 1 automatically.
2333
2334        use Time::HiRes 'sleep';
2335        use MCE;
2336
2337        sub make_progress {
2338           my ($total_size) = @_;
2339           return sub {
2340              my ($completed_size) = @_;
2341              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2342           };
2343        }
2344
2345        MCE->new(
2346           chunk_size  => 10,
2347           max_workers => 4,
2348           sequence    => [ 1, 100, 2 ],
2349           progress    => make_progress( int( 100 / 2 + 0.5 ) ),
2350           user_func   => sub { sleep 1.5 }
2351        )->run();
2352
2353        -- Output
2354
2355        20.0%
2356        40.0%
2357        60.0%
2358        80.0%
2359        100.0%
2360
2361       Changing "chunk_size" to 1 means workers notify the manager process
2362       more often, thus increasing granularity. Take a look at the output.
2363
2364        2.0%
2365        4.0%
2366        6.0%
2367        8.0%
2368        10.0%
2369        ...
2370        92.0%
2371        94.0%
2372        96.0%
2373        98.0%
2374        100.0%
2375
2376       Here is the same thing using MCE::Flow together with
2377       ProgressBar::Stack.
2378
2379        use Time::HiRes 'sleep';
2380        use ProgressBar::Stack;
2381        use MCE::Flow;
2382
2383        sub make_progress {
2384           my ($total_size) = @_;
2385           init_progress();
2386           return sub {
2387              my ($completed_size) = @_;
2388              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2389           };
2390        }
2391
2392        MCE::Flow->init(
2393           chunk_size  => 1,
2394           max_workers => 4,
2395           progress    => make_progress( int( 100 / 2 + 0.5 ) )
2396        );
2397
2398        MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2399        MCE::Flow->finish();
2400
2401        print "\n";
2402
2403        -- Output
2404
2405        [#########           ]  48.0% ETA: 0:03
2406
2407       For files and file handles, workers send the actual size of the data
2408       read versus counting rows.
2409
2410        use Time::HiRes 'sleep';
2411        use MCE;
2412
2413        sub make_progress {
2414           my ($total_size) = @_;
2415           return sub {
2416              my ($completed_size) = @_;
2417              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2418           };
2419        }
2420
2421        my $input_file = "/path/to/input_file.txt";
2422
2423        MCE->new(
2424           chunk_size  => 5,
2425           max_workers => 4,
2426           input_data  => $input_file,
2427           progress    => make_progress( -s $input_file ),
2428           user_func   => sub { sleep 0.03 }
2429        )->run();
2430
2431       For consistency, here is the example using MCE::Flow, again with
2432       ProgressBar::Stack.
2433
2434        use Time::HiRes 'sleep';
2435        use ProgressBar::Stack;
2436        use MCE::Flow;
2437
2438        sub make_progress {
2439           my ($total_size) = @_;
2440           init_progress();
2441           return sub {
2442              my ($completed_size) = @_;
2443              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2444           };
2445        }
2446
2447        my $input_file = "/path/to/input_file.txt";
2448
2449        MCE::Flow->init(
2450           chunk_size  => 5,
2451           max_workers => 4,
2452           progress    => make_progress( -s $input_file )
2453        );
2454
2455        MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2456        MCE::Flow->finish();
2457
2458       The next demonstration processes three arrays consecutively. For this
2459       one, MCE workers persist after running. This needs MCE 1.814 or later
2460       to run. Otherwise, the progress output is not shown in MCE 1.813.
2461
2462        use Time::HiRes 'sleep';
2463        use ProgressBar::Stack;
2464        use MCE;
2465
2466        sub make_progress {
2467           my ($total_size, $message) = @_;
2468           init_progress();
2469           return sub {
2470              my ($completed_size) = @_;
2471              update_progress(
2472                 sprintf("%0.1f", $completed_size / $total_size * 100),
2473                 $message
2474              );
2475           };
2476        }
2477
2478        my $mce = MCE->new(
2479           chunk_size  => 10,
2480           max_workers => 4,
2481           user_func   => sub { sleep 0.5 }
2482        )->spawn();
2483
2484        my @a1 = ( 1 .. 200 );
2485        my @a2 = ( 1 .. 500 );
2486        my @a3 = ( 1 .. 300 );
2487
2488        $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2489
2490        print "\n";
2491
2492        $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2493
2494        print "\n";
2495
2496        $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2497
2498        print "\n";
2499
2500        $mce->shutdown;
2501
2502        -- Output
2503
2504        [####################] 100.0% ETA: 0:00 array 1
2505        [####################] 100.0% ETA: 0:00 array 2
2506        [####################] 100.0% ETA: 0:00 array 3
2507
2508       When size is not know, such as reading from "STDIN", the only thing one
2509       can do is report the size completed thus far.
2510
2511        # 1 kibibyte equals 1024 bytes
2512
2513        progress => sub {
2514           my ($completed_size) = @_;
2515           printf "%0.1f kibibytes\n", $completed_size / 1024;
2516        }
2517

SEE ALSO

2519       •  MCE::Examples
2520

INDEX

2522       MCE
2523

AUTHOR

2525       Mario E. Roy, <marioeroy AT gmail DOT com>
2526
2527
2528
2529perl v5.38.0                      2023-09-14                      MCE::Core(3)
Impressum