1MCE::Core(3)          User Contributed Perl Documentation         MCE::Core(3)
2
3
4

NAME

6       MCE::Core - Documentation describing the core MCE API
7

VERSION

9       This document describes MCE::Core version 1.837
10

SYNOPSIS

12       This is a simplistic use case of MCE running with 5 workers.
13
14        ## Construction using the Core API
15
16        use MCE;
17
18        my $mce = MCE->new(
19           max_workers => 5,
20           user_func => sub {
21              my ($mce) = @_;
22              $mce->say("Hello from " . $mce->wid);
23           }
24        );
25
26        $mce->run;
27
28        ## Construction using a MCE model
29
30        use MCE::Flow max_workers => 5;
31
32        mce_flow sub {
33           my ($mce) = @_;
34           MCE->say("Hello from " . MCE->wid);
35        };
36
37        -- Output
38
39        Hello from 2
40        Hello from 4
41        Hello from 5
42        Hello from 1
43        Hello from 3
44
45   MCE->new ( [ options ] )
46       Below, a new instance is configured with all available options.
47
48        use MCE;
49
50        my $mce = MCE->new(
51
52           max_workers => 8,                   ## Default 1
53
54              # Number of workers to spawn. This can be set automatically
55              # with MCE 1.412 and later releases.
56
57              # MCE 1.521 sets an upper-limit of 8 for 'auto'.
58              # See MCE::Util::get_ncpu for more info.
59
60              # max_workers => 'auto',         ## # of lcores, 8 maximum
61              # max_workers => 'auto-1',       ## 7 on HW with 16-lcores
62              # max_workers => 'auto-1',       ## 3 on HW with  4-lcores
63
64              # max_workers => MCE::Util::get_ncpu, # run on all lcores
65
66           chunk_size => 2000,                 ## Default 1
67
68              # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
69              # The default is 1 when using the Core API and 'auto' for
70              # MCE Models. For arrays or queues, chunk_size means the
71              # number of records per chunk. For iterators, MCE will not
72              # use chunk_size, though the iterator may use it to determine
73              # how much to return per iteration. For files, smaller than or
74              # equal to 8192 is the number of records.  Greater than 8192
75              # is the number of bytes. MCE reads until the end of record
76              # before calling user_func.
77
78              # chunk_size => 1,               ## Consists of 1 record
79              # chunk_size => 1000,            ## Consists of 1000 records
80              # chunk_size => '16k',           ## Approximate 16 kibiBytes (KiB)
81              # chunk_size => '20m',           ## Approximate 20 mebiBytes (MiB)
82
83           tmp_dir => $tmp_dir,                ## Default $MCE::Signal::tmp_dir
84
85              # Default is $MCE::Signal::tmp_dir which points to
86              # $ENV{TEMP} if defined. Otherwise, tmp_dir points
87              # to a location under /tmp.
88
89           freeze => \&encode_sereal,          ## Default \&Storable::freeze
90           thaw   => \&decode_sereal,          ## Default \&Storable::thaw
91
92              # Release 1.412 allows freeze and thaw to be overridden.
93              # Simply include a serialization module prior to loading
94              # MCE. Configure freeze/thaw options.
95
96              # use Sereal qw( encode_sereal decode_sereal );
97              # use CBOR::XS qw( encode_cbor decode_cbor );
98              # use JSON::XS qw( encode_json decode_json );
99              #
100              # use MCE;
101
102           gather => \@a,                      ## Default undef
103
104              # Release 1.5 allows for gathering of data to an array or
105              # hash reference, a MCE::Queue/Thread::Queue object, or code
106              # reference. One invokes gathering by calling the gather
107              # method as often as needed.
108
109              # gather => \@array,
110              # gather => \%hash,
111              # gather => $queue,
112              # gather => \&order,
113
114           init_relay => 0,                    ## Default undef
115
116              # For specifying the initial relay value. Allowed values
117              # are array_ref, hash_ref, or scalar. The MCE::Relay module
118              # is loaded automatically when specified.
119
120              # init_relay => \@array,
121              # init_relay => \%hash,
122              # init_relay => scalar,
123
124           input_data => $input_file,          ## Default undef
125           RS         => "\n>",                ## Default undef
126
127              # input_data => '/path/to/file'  ## Process file
128              # input_data => \@array          ## Process array
129              # input_data => \*FILE_HNDL      ## Process file handle
130              # input_data => \$scalar         ## Treated like a file
131              # input_data => \&iterator       ## User specified iterator
132
133              # The RS option (for input record separator) applies to files
134              # and file handles.
135
136              # MCE applies additional logic when RS begins with a newline
137              # character; e.g. RS => "\n>". It trims away characters after
138              # the newline and prepends them to the next record.
139              #
140              # Typically, the left side is what happens for $/ = "\n>".
141              # The right side is what user_func receives.
142              #
143              # All records begin with > and end with \n
144              #    Record 1:  >seq1 ... \n>   (to)   >seq1 ... \n
145              #    Record 2:  seq2  ... \n>          >seq2 ... \n
146              #    Record 3:  seq3  ... \n>          >seq3 ... \n
147              #    Last Rec:  seqN  ... \n           >seqN ... \n
148
149           loop_timeout => 5,                  ## Default 0
150
151              # Added in 1.7, enables the manager process to timeout of a read
152              # on channel 0. The manager process decrements the total workers
153              # running for any worker which have died in an uncontrollable
154              # manner. Specify this option if on occassion a worker runs out
155              # of memory or dies due to an error from an XS module.
156              #
157              # A number smaller than 5 is silently increased to 5.
158
159           max_retries => 2,                   ## Default 0
160
161              # This option, added in 1.7, causes MCE to retry a failed
162              # chunk from a worker dying while processing input data or
163              # sequence of numbers.
164
165           parallel_io => 1,                   ## Default 0
166           posix_exit  => 1,                   ## Default 0
167           use_slurpio => 1,                   ## Default 0
168
169              # The parallel_io option enables parallel reads during large
170              # slurpio, useful when reading from fast storage. Do not enable
171              # parallel_io when running MCE on many nodes with input coming
172              # from shared storage.
173
174              # Set posix_exit to avoid all END and destructor processing.
175              # Constructing MCE inside a thread implies 1 or if present CGI,
176              # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
177              # Mojo::IOLoop, Prima, STFL, Tk, Wx, or Win32::GUI.
178
179              # Enable slurpio to pass the raw chunk (scalar ref) to the user
180              # function when reading input files.
181
182           use_threads => 1,                   ## Auto 0 or 1
183
184              # MCE spawns child processes by default, not threads.
185              #
186              # However, MCE supports threads via 2 threading
187              # libraries if threads is desired.
188
189              # The use of threads in MCE requires that you include
190              # threads support prior to loading MCE. The use_threads
191              # option defaults to 1 when a thread library is loaded.
192              # Threads is loaded automatically for $^O eq 'MSWin32'.
193              #
194              #   use threads;                use forks;
195              #   use threads::shared;  (or)  use forks::shared;
196              #   use MCE                     use MCE;
197
198           spawn_delay  => 0.035,              ## Default undef
199           submit_delay => 0.002,              ## Default undef
200           job_delay    => 0.150,              ## Default undef
201
202              # Time to wait, in fractional seconds, after spawning
203              # a worker, parameters submission to a worker, and
204              # job commencement (running, staggered delay).
205
206              # Specify job_delay when wanting to stagger
207              # workers connecting to a database.
208
209           on_post_exit => \&on_post_exit,     ## Default undef
210           on_post_run  => \&on_post_run,      ## Default undef
211
212              # Execute code block after a worker exits or dies.
213              # MCE->exit, exit, or die
214
215              # Execute code block after running.
216              # MCE->process or MCE->run
217
218           progress => sub { ... },            ## Default undef
219
220              # A code block for receiving info on progress made.
221              # See section labeled "PROGRESS DEMONSTRATIONS"
222              # at the end of this document.
223
224           user_args => { env => 'test' },     ## Default undef
225
226              # MCE release 1.4 added a new parameter to allow one to
227              # specify arbitrary arguments such as a string, an ARRAY
228              # or a HASH reference. Workers can access this directly:
229              # my $args = $mce->{user_args} or MCE->user_args;
230
231           user_begin => \&user_begin,         ## Default undef
232           user_func => \&user_func,           ## Default undef
233           user_end => \&user_end,             ## Default undef
234
235              # Think of user_begin, user_func, and user_end as in
236              # the awk scripting language:
237              # awk 'BEGIN { begin } { func } { func } ... END { end }'
238
239              # MCE workers call user_begin once at the start of a job,
240              # then user_func repeatedly until no chunks remain.
241              # Afterwards, user_end is called.
242
243           user_error => \&user_error,         ## Default undef
244           user_output => \&user_output,       ## Default undef
245
246              # MCE will forward data to user_error/user_output,
247              # when defined, for the following methods.
248
249              # MCE->sendto(\*STDERR, "sent to user_error\n");
250              # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
251              # MCE->print(\*STDERR, "sent to user_error\n");
252              # MCE->say(\*STDERR, "sent to user_error");
253
254              # MCE->sendto(\*STDOUT, "sent to user_output\n");
255              # MCE->printf("%s\n", "sent to user_output");
256              # MCE->print("sent to user_output\n");
257              # MCE->say("sent to user_output");
258
259           stderr_file => 'err_file',          ## Default STDERR
260           stdout_file => 'out_file',          ## Default STDOUT
261
262              # Or to file; user_error and user_output take precedence.
263
264           flush_file   => 1,                  ## Default 0
265           flush_stderr => 1,                  ## Default 0
266           flush_stdout => 1,                  ## Default 0
267
268              # Flush sendto file, standard error, or standard output.
269
270           interval => {
271               delay => 0.007 [, max_nodes => 4, node_id => 1 ]
272           },
273
274              # For use with the yield method introduced in MCE 1.5.
275              # Both max_nodes & node_id are optional and default to 1.
276              # Delay is the amount of time between intervals.
277
278              # interval => 0.007              ## Shorter; MCE 1.506+
279
280           sequence => {                       ## Default undef
281               begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
282           },
283
284           bounds_only => 1,                   ## Default undef
285
286              # For looping through a sequence of numbers in parallel.
287              # STEP, if omitted, defaults to 1 if BEGIN is smaller than
288              # END or -1 if BEGIN is greater than END. The FORMAT string
289              # is passed to sprintf behind the scene (% may be omitted).
290              # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
291
292              # Do not specify both options; input_data and sequence.
293              # Release 1.4 allows one to specify an array reference.
294              # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
295
296              # The bounds_only => 1 option will compute the 'begin' and
297              # 'end' items only for the chunk and not the items in between
298              # (hence boundaries only). This option has no effect when
299              # sequence is not specified or chunk_size equals 1.
300
301              # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
302
303           task_end => \&task_end,             ## Default undef
304
305              # This is called by the manager process after the task
306              # has completed processing. MCE 1.5 allows this option
307              # to be specified at the top level.
308
309           task_name => 'string',              ## Default 'MCE'
310
311              # Added in MCE 1.5 and mainly beneficial for user_tasks.
312              # One may specify a unique name per each sub-task.
313              # The string is passed as the 3rd arg to task_end.
314
315           user_tasks => [                     ## Default undef
316              { ... },                         ## Options for task 0
317              { ... },                         ## Options for task 1
318              { ... },                         ## Options for task 2
319           ],
320
321              # Takes a list of hash references, each allowing up to 17
322              # options. All other MCE options are ignored. The init_relay,
323              # input_data, RS, and use_slurpio options are applicable to
324              # the first task only.
325
326              # max_workers, chunk_size, input_data, interval, sequence,
327              # bounds_only, user_args, user_begin, user_end, user_func,
328              # gather, task_end, task_name, use_slurpio, use_threads,
329              # init_relay, RS
330
331              # Options not specified here will default to same option
332              # specified at the top level.
333        );
334
335   EXPORT_CONST, CONST
336       There are 3 constants which are exportable. Using the constants in lieu
337       of 0,1,2 makes it more legible when accessing the user_func arguments
338       directly.
339
340       SELF CHUNK CID
341
342       Exports SELF => 0, CHUNK => 1, and CID => 2.
343
344        use MCE export_const => 1;
345        use MCE const => 1;                    ## Shorter; MCE 1.415+
346
347        user_func => sub {
348         # my ($mce, $chunk_ref, $chunk_id) = @_;
349           print "Hello from ", $_[SELF]->wid, "\n";
350        }
351
352       MCE 1.5 allows all public method to be called directly.
353
354        use MCE;
355
356        user_func => sub {
357         # my ($mce, $chunk_ref, $chunk_id) = @_;
358           print "Hello from ", MCE->wid, "\n";
359        }
360
361   OVERRIDING DEFAULTS
362       The following list options which may be overridden when loading the
363       module.
364
365        use Sereal qw( encode_sereal decode_sereal );
366        use CBOR::XS qw( encode_cbor decode_cbor );
367        use JSON::XS qw( encode_json decode_json );
368
369        use MCE
370            max_workers => 4,                  ## Default 1
371            chunk_size => 100,                 ## Default 1
372            tmp_dir => "/path/to/app/tmp",     ## $MCE::Signal::tmp_dir
373            freeze => \&encode_sereal,         ## \&Storable::freeze
374            thaw => \&decode_sereal            ## \&Storable::thaw
375        ;
376
377        my $mce = MCE->new( ... );
378
379       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
380       available.  Specify "Sereal =" 0> to use Storable instead.
381
382        use MCE Sereal => 0;
383
384   RUNNING
385       Run calls spawn, submits the job; workers call user_begin, user_func,
386       and user_end. Run shuts down workers afterwards. Call spawn whenever
387       the need arises for large data structures prior to running.
388
389        $mce->spawn;                           ## Call early if desired
390
391        $mce->run;                             ## Call run or process below
392
393        ## Acquire data arrays and/or input_files. Workers persist after
394        ## processing.
395
396        $mce->process(\@input_data_1);         ## Process array
397        $mce->process(\@input_data_2);
398        $mce->process(\@input_data_n);
399
400        $mce->process(\%input_hash_1);         ## Process hash, current API
401        $mce->process(\%input_hash_2);         ## available since 1.828
402        $mce->process(\%input_hash_n);
403
404        $mce->process('input_file_1');         ## Process file
405        $mce->process('input_file_2');
406        $mce->process('input_file_n');
407
408        $mce->shutdown;                        ## Shutdown workers
409
410   SYNTAX for ON_POST_EXIT
411       Often times, one may want to capture the exit status. The on_post_exit
412       option, if defined, is executed immediately by the manager process
413       after a worker exits via exit (children only), MCE->exit (children and
414       threads), or die.
415
416       The format of $e->{pid} is PID_123 for children and THR_123 for
417       threads.
418
419        my $restart_flag = 1;
420
421        sub on_post_exit {
422           my ($mce, $e) = @_;
423
424           ## Display all possible hash elements.
425           print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
426
427           ## Restart this worker if desired.
428           if ($restart_flag && $e->{wid} == 2) {
429              $mce->restart_worker;
430              $restart_flag = 0;
431           }
432        }
433
434        sub user_func {
435           my ($mce) = @_;
436           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  ## Args, not necessary
437        }
438
439        my $mce = MCE->new(
440           on_post_exit => \&on_post_exit,
441           user_func => \&user_func,
442           max_workers => 3
443        );
444
445        $mce->run;
446
447        -- Output (child processes)
448
449        2: PID_33223: 0: msg_foo: 1002
450        1: PID_33222: 0: msg_foo: 1001
451        3: PID_33224: 0: msg_foo: 1003
452        2: PID_33225: 0: msg_foo: 1002
453
454        -- Output (running with threads)
455
456        3: TID_3: 0: msg_foo: 1003
457        2: TID_2: 0: msg_foo: 1002
458        1: TID_1: 0: msg_foo: 1001
459        2: TID_4: 0: msg_foo: 1002
460
461   SYNTAX for ON_POST_RUN
462       The on_post_run option, if defined, is executed immediately by the
463       manager process after running MCE->process or MCE->run. This option
464       receives an array reference of hashes.
465
466       The difference between on_post_exit and on_post_run is that the former
467       is called immediately whereas the latter is called after all workers
468       have completed running.
469
470        sub on_post_run {
471           my ($mce, $status_ref) = @_;
472           foreach my $e ( @{ $status_ref } ) {
473              ## Display all possible hash elements.
474              print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
475           }
476        }
477
478        sub user_func {
479           my ($mce) = @_;
480           MCE->exit(0, 'msg_foo', 1000 + MCE->wid);  ## Args, not necessary
481        }
482
483        my $mce = MCE->new(
484           on_post_run => \&on_post_run,
485           user_func => \&user_func,
486           max_workers => 3
487        );
488
489        $mce->run;
490
491        -- Output (child processes)
492
493        3: PID_33174: 0: msg_foo: 1003
494        1: PID_33172: 0: msg_foo: 1001
495        2: PID_33173: 0: msg_foo: 1002
496
497        -- Output (running with threads)
498
499        2: TID_2: 0: msg_foo: 1002
500        3: TID_3: 0: msg_foo: 1003
501        1: TID_1: 0: msg_foo: 1001
502
503   SYNTAX for INPUT_DATA
504       MCE supports many ways to specify input_data. Support for iterators was
505       added in MCE 1.505. The RS option allows one to specify the record
506       separator when processing files.
507
508       MCE is a chunking engine. Therefore, chunk_size is applicable to
509       input_data.  Specifying 1 for use_slurpio causes user_func to receive a
510       scalar reference containing the raw data (applicable to files only)
511       instead of an array reference.
512
513        input_data  => '/path/to/file',  ## process file
514        input_data  => \@array,          ## process array
515        input_data  => \%hash,           ## process hash, API since 1.828
516        input_data  => \*FILE_HNDL,      ## process file handle
517        input_data  => $fh,              ## open $fh, "<", "file"
518        input_data  => $fh,              ## IO::File "file", "r"
519        input_data  => $fh,              ## IO::Uncompress::Gunzip "file.gz"
520        input_data  => \$scalar,         ## treated like a file
521        input_data  => \&iterator,       ## user specified iterator
522
523        chunk_size  => 1,                ## >1 means looping inside user_func
524        use_slurpio => 1,                ## $chunk_ref is a scalar ref
525        RS          => "\n>",            ## input record separator
526
527       The chunk_size value determines the chunking mode to use when
528       processing files.  Otherwise, chunk_size is the number of elements for
529       arrays. For files, a chunk size value of <= 8192 is how many records to
530       read. Greater than 8192 is how many bytes to read. MCE appends (the
531       rest) up to the next record separator.
532
533        chunk_size  => 8192,             ## Consists of 8192 records
534        chunk_size  => 8193,             ## Approximate 8193 bytes for files
535
536        chunk_size  => 1,                ## Consists of 1 record or element
537        chunk_size  => 1000,             ## Consists of 1000 records
538        chunk_size  => '16k',            ## Approximate 16 kibiBytes (KiB)
539        chunk_size  => '20m',            ## Approximate 20 mebiBytes (MiB)
540
541       The construction for user_func when chunk_size > 1 and assuming
542       use_slurpio equals 0.
543
544        user_func => sub {
545           my ($mce, $chunk_ref, $chunk_id) = @_;
546
547           ## $_ is $chunk_ref->[0] when chunk_size equals 1
548           ## $_ is $chunk_ref otherwise; $_ can be used below
549
550           for my $record ( @{ $chunk_ref } ) {
551              print "$chunk_id: $record\n";
552           }
553        }
554
555        # input_data => \%hash
556        # current API available since 1.828
557
558        user_func => sub {
559           my ($mce, $chunk_ref, $chunk_id) = @_;
560
561           ## $_ points to $chunk_ref regardless of chunk_size
562
563           for my $key ( keys %{ $chunk_ref } ) {
564              print "$key: ", $chunk_ref->{$key}, "\n";
565           }
566        }
567
568       Specifying a value for input_data is straight forward for arrays and
569       files.  The next several examples specify an iterator reference for
570       input_data.
571
572        use MCE;
573
574        ## A factory function which creates a closure (the iterator itself)
575        ## for generating a sequence of numbers. The external variables
576        ## ($n, $max, $step) are used for keeping state across successive
577        ## calls to the closure. The iterator simply returns when $n > max.
578
579        sub input_iterator {
580           my ($n, $max, $step) = @_;
581
582           return sub {
583              return if $n > $max;
584
585              my $current = $n;
586              $n += $step;
587
588              return $current;
589           };
590        }
591
592        ## Run user_func in parallel. Input data can be specified during
593        ## the construction or as an argument to the process method.
594
595        my $mce = MCE->new(
596
597         # input_data => input_iterator(10, 30, 2),
598           chunk_size => 1, max_workers => 4,
599
600           user_func => sub {
601              my ($mce, $chunk_ref, $chunk_id) = @_;
602              MCE->print("$_: ", $_ * 2, "\n");
603           }
604
605        )->spawn;
606
607        $mce->process( input_iterator(10, 30, 2) );
608
609        -- Output   Note that output order is not guaranteed
610                    Take a look at iterator.pl for ordered output
611
612        10: 20
613        12: 24
614        16: 32
615        20: 40
616        14: 28
617        22: 44
618        18: 36
619        24: 48
620        26: 52
621        28: 56
622        30: 60
623
624       The following example queries the DB for the next 1000 rows. Notice the
625       use of fetchall_arrayref. The iterator function itself receives one
626       argument which is chunk_size (added in MCE 1.510) to determine how much
627       to return per iteration.  The default is 1 for the Core API and MCE
628       Models.
629
630        use DBI;
631        use MCE;
632
633        sub db_iter {
634
635           my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
636
637           my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
638                     die "Could not connect to database: $DBI::errstr";
639
640           my $sth = $dbh->prepare('select color, desc from table');
641
642           $sth->execute;
643
644           return sub {
645              my ($chunk_size) = @_;
646
647              if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
648                 return @{ $aref };
649              }
650
651              return;
652           };
653        }
654
655        ## Let's enumerate column indexes for easy column retrieval.
656        my ($i_color, $i_desc) = (0 .. 1);
657
658        my $mce = MCE->new(
659           max_workers => 3, chunk_size => 1000,
660           input_data => db_iter(),
661
662           user_func => sub {
663              my ($mce, $chunk_ref, $chunk_id) = @_;
664              my $ret = '';
665
666              foreach my $row (@{ $chunk_ref }) {
667                 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
668              }
669
670              MCE->print($ret);
671           }
672        );
673
674        $mce->run;
675
676       There are many modules on CPAN which return an iterator reference.
677       Showing one such example below. The demonstration ensures MCE workers
678       are spawned before obtaining the iterator. Note the worker_id value
679       (left column) in the output.
680
681        use Path::Iterator::Rule;
682        use MCE;
683
684        my $start_dir = shift
685           or die "Please specify a starting directory";
686
687        -d $start_dir
688           or die "Cannot open ($start_dir): No such file or directory";
689
690        my $mce = MCE->new(
691           max_workers => 'auto',
692           user_func => sub { MCE->say( MCE->wid . ": $_" ) }
693        )->spawn;
694
695        my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
696
697        my $iterator = $rule->iter(
698           $start_dir, { follow_symlinks => 0, depthfirst => 1 }
699        );
700
701        $mce->process( $iterator );
702
703        -- Output
704
705        8: lib/MCE/Core/Input/Generator.pm
706        5: lib/MCE/Core/Input/Handle.pm
707        6: lib/MCE/Core/Input/Iterator.pm
708        2: lib/MCE/Core/Input/Request.pm
709        3: lib/MCE/Core/Manager.pm
710        4: lib/MCE/Core/Input/Sequence.pm
711        7: lib/MCE/Core/Validation.pm
712        1: lib/MCE/Core/Worker.pm
713        8: lib/MCE/Flow.pm
714        5: lib/MCE/Grep.pm
715        6: lib/MCE/Loop.pm
716        2: lib/MCE/Map.pm
717        3: lib/MCE/Queue.pm
718        4: lib/MCE/Signal.pm
719        7: lib/MCE/Stream.pm
720        1: lib/MCE/Subs.pm
721        8: lib/MCE/Util.pm
722        5: lib/MCE.pm
723
724       Although MCE supports arrays, extra measures are needed to use a "lazy"
725       array as input data. The reason for this is that MCE needs the size of
726       the array before processing which may be unknown for lazy arrays.
727       Therefore, closures provides an excellent mechanism for this.
728
729       The code block belonging to the lazy array must return undef after
730       exhausting its input data. Otherwise, the process will never end.
731
732        use Tie::Array::Lazy;
733        use MCE;
734
735        tie my @a, 'Tie::Array::Lazy', [], sub {
736           my $i = $_[0]->index;
737
738           return ($i < 10) ? $i : undef;
739        };
740
741        sub make_iterator {
742           my $i = 0; my $a_ref = shift;
743
744           return sub {
745              return $a_ref->[$i++];
746           };
747        }
748
749        my $mce = MCE->new(
750           max_workers => 4, input_data => make_iterator(\@a),
751
752           user_func => sub {
753              my ($mce, $chunk_ref, $chunk_id) = @_;
754              MCE->say($_);
755           }
756
757        )->run;
758
759        -- Output
760
761        0
762        1
763        2
764        3
765        4
766        6
767        7
768        8
769        5
770        9
771
772       The following demonstrates how to retrieve a chunk from the lazy array
773       per each successive call. Here, undef is sent by the iterator block
774       when $i is greater than $max. Iterators may optionally use chunk_size
775       to determine how much to return per iteration.
776
777        use Tie::Array::Lazy;
778        use MCE;
779
780        tie my @a, 'Tie::Array::Lazy', [], sub {
781           $_[0]->index;
782        };
783
784        sub make_iterator {
785           my $j = 0; my ($a_ref, $max) = @_;
786
787           return sub {
788              my ($chunk_size) = @_;
789              my $i = $j;  $j += $chunk_size;
790
791              return if $i > $max;
792              return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
793           };
794        }
795
796        my $mce = MCE->new(
797           chunk_size => 15, max_workers => 4,
798           input_data => make_iterator(\@a, 100),
799
800           user_func => sub {
801              my ($mce, $chunk_ref, $chunk_id) = @_;
802              MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
803           }
804
805        )->run;
806
807        -- Output
808
809        1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
810        2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
811        3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
812        4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
813        5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
814        6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
815        7: 90 91 92 93 94 95 96 97 98 99 100
816
817   SYNTAX for SEQUENCE
818       The 1.3 release and above allows workers to loop through a sequence of
819       numbers computed mathematically without the overhead of an array. The
820       sequence can be specified separately per each user_task entry unlike
821       input_data which is applicable to the first task only.
822
823       See the seq_demo.pl example, included with this distribution, on
824       applying sequences with the user_tasks option.
825
826       Sequence can be defined using an array or a hash reference.
827
828        use MCE;
829
830        my $mce = MCE->new(
831           max_workers => 3,
832
833         # sequence => [ 10, 19, 0.7, "%4.1f" ],  ## up to 4 options
834
835           sequence => {
836              begin => 10, end => 19, step => 0.7, format => "%4.1f"
837           },
838
839           user_func => sub {
840              my ($mce, $n, $chunk_id) = @_;
841              print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
842           }
843        );
844
845        $mce->run;
846
847        -- Output (sorted afterwards, notice wid and chunk_id in output)
848
849        10.0 from 1 id 1
850        10.7 from 2 id 2
851        11.4 from 3 id 3
852        12.1 from 1 id 4
853        12.8 from 2 id 5
854        13.5 from 3 id 6
855        14.2 from 1 id 7
856        14.9 from 2 id 8
857        15.6 from 3 id 9
858        16.3 from 1 id 10
859        17.0 from 2 id 11
860        17.7 from 3 id 12
861        18.4 from 1 id 13
862
863       The 1.5 release includes a new option (bounds_only). This option tells
864       the sequence engine to compute 'begin' and 'end' items only, for the
865       chunk, and not the items in between (hence boundaries only). This
866       option applies to sequence only and has no effect when chunk_size
867       equals 1.
868
869       The time to run is 0.006s below. This becomes 0.827s without the
870       bounds_only option due to computing all items in between, thus creating
871       a very large array. Basically, specify bounds_only => 1 when boundaries
872       is all you need for looping inside the block; e.g. Monte Carlo
873       simulations.
874
875       Time was measured using 1 worker to emphasize the difference.
876
877        use MCE;
878
879        my $mce = MCE->new(
880           max_workers => 1, chunk_size => 1_250_000,
881
882           sequence => { begin => 1, end => 10_000_000 },
883           bounds_only => 1,
884
885           ## For sequence, the input scalar $_ points to $chunk_ref
886           ## when chunk_size > 1, otherwise $chunk_ref->[0].
887           ##
888           ## user_func => sub {
889           ##    my $begin = $_->[0]; my $end = $_->[-1];
890           ##
891           ##    for ($begin .. $end) {
892           ##       ...
893           ##    }
894           ## },
895
896           user_func => sub {
897              my ($mce, $chunk_ref, $chunk_id) = @_;
898              ## $chunk_ref contains 2 items, not 1_250_000
899
900              my $begin = $chunk_ref->[ 0];
901              my $end   = $chunk_ref->[-1];   ## or $chunk_ref->[1]
902
903              MCE->printf("%7d .. %8d\n", $begin, $end);
904           }
905        );
906
907        $mce->run;
908
909        -- Output
910
911              1 ..  1250000
912        1250001 ..  2500000
913        2500001 ..  3750000
914        3750001 ..  5000000
915        5000001 ..  6250000
916        6250001 ..  7500000
917        7500001 ..  8750000
918        8750001 .. 10000000
919
920   SYNTAX for MAX_RETRIES
921       The max_retries option, added in 1.7, allows MCE to retry a failed
922       chunk from a worker dying while processing input data or a sequence of
923       numbers.
924
925       When max_retries is set, MCE configures the on_post_exit option before
926       running, using the following code. Specify on_post_exit explicitly for
927       further tailoring if need be. The restart_worker line is necessary,
928       obviously.
929
930        on_post_exit => sub {
931           my ( $mce, $e, $retry_cnt ) = @_;
932           my ( $cnt, $msg ) = ( $retry_cnt + 1, "Error: chunk $e->{id} failed" );
933
934           ( $retry_cnt < $mce->max_retries )
935              ? print {*STDERR} "$msg, retrying chunk attempt # ${cnt}\n"
936              : print {*STDERR} "$msg\n";
937
938           $mce->restart_worker;
939        }
940
941       Below, we let MCE handle on_post_exit automatically, which is
942       essentially the same code shown above. For max_retries to work, the
943       worker must die, abnormally included, or call MCE->exit. Notice that we
944       pass the chunk_id value for the 3rd argument to MCE->exit.
945
946        ## Running MCE with max_retries.
947
948        use strict;
949        use warnings;
950
951        use MCE;
952
953        sub user_func {
954           my ( $mce, $chunk_ref, $chunk_id ) = @_;
955
956           # die "Died : chunk_id = 3\n"  if $chunk_id == 3;
957           MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
958
959           print MCE->wid(), ": $chunk_id\n";
960        }
961
962        my $mce = MCE->new(
963           max_workers => 1,
964           max_retries => 2,
965           user_func   => \&user_func,
966        )->spawn;
967
968        my @input_data = qw( 0 1 2 3 4 5 6 7 );
969
970        $mce->process( { chunk_size => 1 }, \@input_data );
971
972        $mce->shutdown;
973
974        -- Output
975
976        1: 1
977        1: 2
978        Error: chunk 3 failed, retrying chunk attempt # 1
979        Error: chunk 3 failed, retrying chunk attempt # 2
980        Error: chunk 3 failed
981        1: 4
982        1: 5
983        1: 6
984        1: 7
985        1: 8
986
987   SYNTAX for USER_BEGIN and USER_END
988       The user_begin and user_end options, if specified, behave similarly to
989       awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
990       called once per worker during each run.
991
992       MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
993
994        sub user_begin {                   ## Called once at the beginning
995           my ($mce, $task_id, $task_name) = @_;
996           $mce->{wk_total_rows} = 0;
997        }
998
999        sub user_func {                    ## Called while processing
1000           my $mce = shift;
1001           $mce->{wk_total_rows} += 1;
1002        }
1003
1004        sub user_end {                     ## Called once at the end
1005           my ($mce, $task_id, $task_name) = @_;
1006           printf "## %d: Processed %d rows\n",
1007              MCE->wid, $mce->{wk_total_rows};
1008        }
1009
1010        my $mce = MCE->new(
1011           user_begin => \&user_begin,
1012           user_func  => \&user_func,
1013           user_end   => \&user_end
1014        );
1015
1016        $mce->run;
1017
1018   SYNTAX for USER_FUNC with USE_SLURPIO => 0
1019       When processing input data, MCE can pass an array of rows or a slurped
1020       chunk.  Below, a reference to an array containing the chunk data is
1021       processed.
1022
1023       e.g. $chunk_ref = [ record1, record2, record3, ... ]
1024
1025        sub user_func {
1026
1027           my ($mce, $chunk_ref, $chunk_id) = @_;
1028
1029           foreach my $row ( @{ $chunk_ref } ) {
1030              $mce->{wk_total_rows} += 1;
1031              print $row;
1032           }
1033        }
1034
1035        my $mce = MCE->new(
1036           chunk_size  => 100,
1037           input_data  => "/path/to/file",
1038           user_func   => \&user_func,
1039           use_slurpio => 0
1040        );
1041
1042        $mce->run;
1043
1044   SYNTAX for USER_FUNC with USE_SLURPIO => 1
1045       Here, a reference to a scalar containing the raw chunk data is
1046       processed.
1047
1048        sub user_func {
1049
1050           my ($mce, $chunk_ref, $chunk_id) = @_;
1051
1052           my $count = () = $$chunk_ref =~ /abc/;
1053        }
1054
1055        my $mce = MCE->new(
1056           chunk_size  => 16000,
1057           input_data  => "/path/to/file",
1058           user_func   => \&user_func,
1059           use_slurpio => 1
1060        );
1061
1062        $mce->run;
1063
1064   SYNTAX for USER_ERROR and USER_OUTPUT
1065       Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1066       and MCE->say can be intercepted by specifying the user_error and
1067       user_output options. MCE on receiving output will forward to user_error
1068       or user_output in a serialized fashion.
1069
1070       Handy when wanting to filter, modify, and/or direct the output
1071       elsewhere.
1072
1073        sub user_error {                   ## Redirect STDERR to STDOUT
1074           my $error = shift;
1075           print {*STDOUT} $error;
1076        }
1077
1078        sub user_output {                  ## Redirect STDOUT to STDERR
1079           my $output = shift;
1080           print {*STDERR} $output;
1081        }
1082
1083        sub user_func {
1084           my ($mce, $chunk_ref, $chunk_id) = @_;
1085           my $count = 0;
1086
1087           foreach my $row ( @{ $chunk_ref } ) {
1088              MCE->print($row);
1089              $count += 1;
1090           }
1091
1092           MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1093        }
1094
1095        my $mce = MCE->new(
1096           chunk_size  => 1000,
1097           input_data  => "/path/to/file",
1098           user_error  => \&user_error,
1099           user_output => \&user_output,
1100           user_func   => \&user_func
1101        );
1102
1103        $mce->run;
1104
1105   SYNTAX for USER_TASKS and TASK_END
1106       This option takes an array of tasks. Each task allows up to 17 options.
1107       The init_relay, input_data, RS, and use_slurpio options may be defined
1108       inside the first task or at the top level, otherwise ignored under
1109       other sub-tasks.
1110
1111        max_workers, chunk_size, input_data, interval, sequence,
1112        bounds_only, user_args, user_begin, user_end, user_func,
1113        gather, task_end, task_name, use_slurpio, use_threads,
1114        init_relay, RS
1115
1116       Sequence and chunk_size were added in 1.3. User_args was introduced in
1117       1.4.  Name and input_data are new options allowed in 1.5. In addition,
1118       one can specify task_end at the top level. Task_end also receives 2
1119       additional arguments $task_id and $task_name (shown below).
1120
1121       Options not specified here will default to the same option specified at
1122       the top level. The task_end option is called by the manager process
1123       when all workers for that sub-task have completed processing.
1124
1125       Forking and threading can be intermixed among tasks unless running
1126       Cygwin.  The run method will continue running until all workers have
1127       completed processing.
1128
1129        use threads;
1130        use threads::shared;
1131
1132        use MCE;
1133
1134        sub parallel_task1 { sleep 2; }
1135        sub parallel_task2 { sleep 1; }
1136
1137        my $mce = MCE->new(
1138
1139           task_end => sub {
1140              my ($mce, $task_id, $task_name) = @_;
1141              print "Task [$task_id -- $task_name] completed processing\n";
1142           },
1143
1144           user_tasks => [{
1145              task_name   => 'foo',
1146              max_workers => 2,
1147              user_func   => \&parallel_task1,
1148              use_threads => 0             ## Not using threads
1149
1150           },{
1151              task_name   => 'bar',
1152              max_workers => 4,
1153              user_func   => \&parallel_task2,
1154              use_threads => 1             ## Yes, threads
1155
1156           }]
1157        );
1158
1159        $mce->run;
1160
1161        -- Output
1162
1163        Task [1 -- bar] completed processing
1164        Task [0 -- foo] completed processing
1165

DEFAULT INPUT SCALAR

1167       Beginning with MCE 1.5, the input scalar $_ is localized prior to
1168       calling user_func for input_data and sequence of numbers. The following
1169       applies.
1170
1171       use_slurpio => 1
1172           $_ is a reference to the buffer e.g. $_ = \$_buffer;
1173           $_ is a reference regardless of whether chunk_size is 1 or greater
1174
1175           user_func => sub {
1176            # my ($mce, $chunk_ref, $chunk_id) = @_;
1177              print ${ $_ };    ## $_ is same as $chunk_ref
1178           }
1179
1180       chunk_size is greater than 1, use_slurpio => 0
1181           $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1182           $_ is same as $chunk_ref or $_[CHUNK]
1183
1184           user_func => sub {
1185            # my ($mce, $chunk_ref, $chunk_id) = @_;
1186              for my $row ( @{ $_ } ) {
1187                 print $row, "\n";
1188              }
1189           }
1190
1191           use MCE const => 1;
1192
1193           user_func => sub {
1194            # my ($mce, $chunk_ref, $chunk_id) = @_;
1195              for my $row ( @{ $_[CHUNK] } ) {
1196                 print $row, "\n";
1197              }
1198           }
1199
1200       chunk_size equals 1, use_slurpio => 0
1201           $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1202
1203           ## Note that $_ and $chunk_ref are not the same below.
1204           ## $chunk_ref is a reference to an array.
1205
1206           user_func => sub {
1207            # my ($mce, $chunk_ref, $chunk_id) = @_;
1208              print $_, "\n;    ## Same as $chunk_ref->[0];
1209           }
1210
1211           $mce->foreach("/path/to/file", sub {
1212            # my ($mce, $chunk_ref, $chunk_id) = @_;
1213              print $_;         ## Same as $chunk_ref->[0];
1214           });
1215
1216           ## However, that is not the case for the forseq method.
1217           ## Both $_ and $n_seq are the same when chunk_size => 1.
1218
1219           $mce->forseq([ 1, 9 ], sub {
1220            # my ($mce, $n_seq, $chunk_id) = @_;
1221              print $_, "\n";   ## Same as $n_seq
1222           });
1223
1224          Sequence can also be specified using an array reference. The below
1225          is the same as the example afterwards.
1226
1227           $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1228
1229          The code block receives an array containing the next 5 sequences.
1230          Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1231          to an array, same as $_, due to chunk_size being greater than 1.
1232
1233           $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1234            # my ($mce, $n_seq, $chunk_id) = @_;
1235              my @result;
1236              for my $n ( @{ $_ } ) {
1237                 ... do work, append to result for 5
1238              }
1239              ... do something with result afterwards
1240           });
1241

METHODS for the MANAGER PROCESS and WORKERS

1243       The methods listed below are callable by the main process and workers.
1244
1245   $mce->abort ( void )
1246       The 'abort' method is applicable when processing input_data only. This
1247       causes all workers to abort after processing the current chunk.
1248
1249       Workers write the next offset position to the queue socket for the next
1250       available worker. In essence, the 'abort' method writes the last offset
1251       position. Workers, on requesting the next offset position, will think
1252       the end of input_data has been reached and leave the chunking loop.
1253
1254        $mce->abort;
1255        MCE->abort;
1256
1257   $mce->chunk_id ( void )
1258       Returns the chunk_id for the current chunk. The value starts at 1.
1259       Chunking applies to input_data or sequence. The value is 0 for the
1260       manager process.
1261
1262        my $chunk_id = $mce->chunk_id;
1263        my $chunk_id = MCE->chunk_id;
1264
1265   $mce->chunk_size ( void )
1266       Getter method for chunk_size used by MCE.
1267
1268        my $chunk_size = $mce->chunk_size;
1269        my $chunk_size = MCE->chunk_size;
1270
1271   $mce->freeze ( $object_ref )
1272       Calls the internal freeze method to serialize an object. The default
1273       serialization routines are handled by Sereal if available or Storable.
1274
1275        my $frozen = $mce->freeze([ 0, 2, 4 ]);
1276        my $frozen = MCE->freeze([ 0, 2, 4 ]);
1277
1278   $mce->max_retries ( void )
1279       Getter method for max_retries used by MCE.
1280
1281        my $max_retries = $mce->max_retries;
1282        my $max_retries = MCE->max_retries;
1283
1284   $mce->max_workers ( void )
1285       Getter method for max_workers used by MCE.
1286
1287        my $max_workers = $mce->max_workers;
1288        my $max_workers = MCE->max_workers;
1289
1290   $mce->pid ( void )
1291       Returns the Process ID. Threads have thread ID attached to the value.
1292
1293        my $pid = $mce->pid;   ## 16180 (pid) ; 16180.2 (pid.tid)
1294        my $pid = MCE->pid;
1295
1296   $mce->sess_dir ( void )
1297       Returns the session directory used by the MCE instance. This is defined
1298       during spawning and removed during shutdown.
1299
1300        my $sess_dir = $mce->sess_dir;
1301        my $sess_dir = MCE->sess_dir;
1302
1303   $mce->task_id ( void )
1304       Returns the task ID. This applies to the user_tasks option (starts at
1305       0).
1306
1307        my $task_id = $mce->task_id;
1308        my $task_id = MCE->task_id;
1309
1310   $mce->task_name ( void )
1311       Returns the task_name value specified via the task_name option when
1312       configuring MCE.
1313
1314        my $task_name = $mce->task_name;
1315        my $task_name = MCE->task_name;
1316
1317   $mce->task_wid ( void )
1318       Returns the task worker ID (applies to user_tasks). The value starts at
1319       1 per each task configured within user_tasks. The value is 0 for the
1320       manager process.
1321
1322        my $task_wid = $mce->task_wid;
1323        my $task_wid = MCE->task_wid;
1324
1325   $mce->thaw ( $frozen )
1326       Calls the internal thaw method to un-serialize the frozen object.
1327
1328        my $object_ref = $mce->thaw($frozen);
1329        my $object_ref = MCE->thaw($frozen);
1330
1331   $mce->tmp_dir ( void )
1332       Returns the temporary directory used by MCE.
1333
1334        my $tmp_dir = $mce->tmp_dir;
1335        my $tmp_dir = MCE->tmp_dir;
1336
1337   $mce->user_args ( void )
1338       Returns the arguments specified via the user_args option.
1339
1340        my ($arg1, $arg2, $arg3) = $mce->user_args;
1341        my ($arg1, $arg2, $arg3) = MCE->user_args;
1342
1343   $mce->wid ( void )
1344       Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1345       is 0 for the manager process.
1346
1347        my $wid = $mce->wid;
1348        my $wid = MCE->wid;
1349

METHODS for the MANAGER PROCESS only

1351       Methods listed below are callable by the main process only.
1352
1353   $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1354   $mce->foreach ( $input_data [, { options } ], sub { ... } )
1355   $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1356       Forchunk, foreach, and forseq are sugar methods and described in
1357       MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1358
1359   $mce->process ( $input_data [, { options } ] )
1360       The process method will spawn workers automatically if not already
1361       spawned.  It will set input_data => $input_data. It calls run(0) to not
1362       auto-shutdown workers. Specifying options is optional.
1363
1364       Allowable options { key => value, ... } are:
1365
1366        chunk_size input_data job_delay spawn_delay submit_delay
1367        flush_file flush_stderr flush_stdout stderr_file stdout_file
1368        on_post_exit on_post_run sequence user_args user_begin user_end
1369        user_func user_error user_output use_slurpio RS
1370
1371       Options remain persistent going forward unless changed. Setting
1372       user_begin, user_end, or user_func will cause already spawned workers
1373       to shut down and re-spawn automatically. Therefore, define these during
1374       instantiation.
1375
1376       The below will cause workers to re-spawn after running.
1377
1378        my $mce = MCE->new( max_workers => 'auto' );
1379
1380        $mce->process( {
1381           user_begin => sub { ## connect to DB },
1382           user_func  => sub { ## process each row },
1383           user_end   => sub { ## close handle to DB },
1384        }, \@input_data );
1385
1386        $mce->process( {
1387           user_begin => sub { ## connect to DB },
1388           user_func  => sub { ## process each file },
1389           user_end   => sub { ## close handle to DB },
1390        }, "/list/of/files" );
1391
1392       Do the following if wanting workers to persist between jobs.
1393
1394        use MCE max_workers => 'auto';
1395
1396        my $mce = MCE->new(
1397           user_begin => sub { ## connect to DB },
1398           user_func  => sub { ## process each chunk or row or host },
1399           user_end   => sub { ## close handle to DB },
1400        );
1401
1402        $mce->spawn;           ## Spawn early if desired
1403
1404        $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1405        $mce->process(\@array_of_files_to_grep);
1406        $mce->process("/path/to/host/list");
1407
1408        $mce->process($array_ref);
1409        $mce->process($array_ref, { stdout_file => $output_file });
1410
1411        ## This was not allowed before. Fixed in 1.415.
1412        $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1413        $mce->process({ sequence => [ 10, 90, 2 ] });
1414
1415        $mce->shutdown;
1416
1417   $mce->relay_final ( void )
1418       Described in MCE::Relay.
1419
1420   $mce->restart_worker ( void )
1421       One can restart a worker who has died or exited. The job never ends
1422       below due to restarting each time. Recommended is to call MCE->exit or
1423       $mce->exit instead of the native exit function for better handling,
1424       especially under the Windows environment.
1425
1426       The $e->{wid} argument is no longer necessary starting with the 1.5
1427       release.
1428
1429       Press [ctrl-c] to terminate the script.
1430
1431        my $mce = MCE->new(
1432
1433           on_post_exit => sub {
1434              my ($mce, $e) = @_;
1435              print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1436            # $mce->restart_worker($e->{wid});    ## MCE-1.415 and below
1437              $mce->restart_worker;               ## MCE-1.500 and above
1438           },
1439
1440           user_begin => sub {
1441              my ($mce, $task_id, $task_name) = @_;
1442              ## Not interested in die messages going to STDERR,
1443              ## because the die handler calls MCE->exit(255, $_[0]).
1444              close STDERR;
1445           },
1446
1447           user_tasks => [{
1448              max_workers => 5,
1449              user_func => sub {
1450                 my ($mce) = @_; sleep MCE->wid;
1451                 MCE->exit(3, "exited from " . MCE->wid . "\n");
1452              }
1453           },{
1454              max_workers => 4,
1455              user_func => sub {
1456                 my ($mce) = @_; sleep MCE->wid;
1457                 die("died from " . MCE->wid . "\n");
1458              }
1459           }]
1460        );
1461
1462        $mce->run;
1463
1464        -- Output
1465
1466        1: PID_85388: status 3: exited from 1
1467        2: PID_85389: status 3: exited from 2
1468        1: PID_85397: status 3: exited from 1
1469        3: PID_85390: status 3: exited from 3
1470        1: PID_85399: status 3: exited from 1
1471        4: PID_85391: status 3: exited from 4
1472        2: PID_85398: status 3: exited from 2
1473        1: PID_85401: status 3: exited from 1
1474        5: PID_85392: status 3: exited from 5
1475        1: PID_85404: status 3: exited from 1
1476        6: PID_85393: status 255: died from 6
1477        3: PID_85400: status 3: exited from 3
1478        2: PID_85403: status 3: exited from 2
1479        1: PID_85406: status 3: exited from 1
1480        7: PID_85394: status 255: died from 7
1481        1: PID_85410: status 3: exited from 1
1482        8: PID_85395: status 255: died from 8
1483        4: PID_85402: status 3: exited from 4
1484        2: PID_85409: status 3: exited from 2
1485        1: PID_85412: status 3: exited from 1
1486        9: PID_85396: status 255: died from 9
1487        3: PID_85408: status 3: exited from 3
1488        1: PID_85416: status 3: exited from 1
1489
1490        ...
1491
1492   $mce->run ( [ $auto_shutdown [, { options } ] ] )
1493       The run method, by default, spawns workers, processes once, and shuts
1494       down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1495       persist after running (default 1).
1496
1497       Specifying options is optional. Valid options are the same as for the
1498       process method.
1499
1500        my $mce = MCE->new( ... );
1501
1502        ## Disables auto-shutdown
1503        $mce->run(0);
1504
1505   $mce->send ( $data_ref )
1506       The 'send' method is useful when wanting to spawn workers early to
1507       minimize memory consumption and afterwards send data individually to
1508       each worker. One cannot send more than the total workers spawned.
1509       Workers store the received data as $mce->{user_data}.
1510
1511       The data which can be sent is restricted to an ARRAY, HASH, or PDL
1512       reference.  Workers begin processing immediately after receiving data.
1513       Workers set $mce->{user_data} to undef after processing. One cannot
1514       specify input_data, sequence, or user_tasks when using the "send"
1515       method.
1516
1517       Passing any options e.g. run(0, { options }) is ignored due to workers
1518       running immediately after receiving user data. There is no guarantee to
1519       which worker will receive data first. It depends on which worker is
1520       available awaiting data.
1521
1522        use MCE;
1523
1524        my $mce = MCE->new(
1525           max_workers => 5,
1526
1527           user_func => sub {
1528              my ($mce) = @_;
1529              my $data = $mce->{user_data};
1530              my $first_name = $data->{first_name};
1531              print MCE->wid, ": Hello from $first_name\n";
1532           }
1533        );
1534
1535        $mce->spawn;     ## Optional, send will spawn if necessary.
1536
1537        $mce->send( { first_name => "Theresa" } );
1538        $mce->send( { first_name => "Francis" } );
1539        $mce->send( { first_name => "Padre"   } );
1540        $mce->send( { first_name => "Anthony" } );
1541
1542        $mce->run;       ## Wait for workers to complete processing.
1543
1544        -- Output
1545
1546        2: Hello from Theresa
1547        5: Hello from Anthony
1548        3: Hello from Francis
1549        4: Hello from Padre
1550
1551   $mce->shutdown ( void )
1552       The run method will automatically spawn workers, run once, and shutdown
1553       workers automatically. Workers persist after running below. Shutdown
1554       may be called as needed or prior to exiting.
1555
1556        my $mce = MCE->new( ... );
1557
1558        $mce->spawn;
1559
1560        $mce->process(\@input_data_1);         ## Processing multiple arrays
1561        $mce->process(\@input_data_2);
1562        $mce->process(\@input_data_n);
1563
1564        $mce->shutdown;
1565
1566        $mce->process('input_file_1');         ## Processing multiple files
1567        $mce->process('input_file_2');
1568        $mce->process('input_file_n');
1569
1570        $mce->shutdown;
1571
1572   $mce->spawn ( void )
1573       Workers are normally spawned automatically. The spawn method allows one
1574       to spawn workers early if so desired.
1575
1576        my $mce = MCE->new( ... );
1577
1578        $mce->spawn;
1579
1580   $mce->status ( void )
1581       The greatest exit status is saved among workers while running. Look at
1582       the on_post_exit or on_post_run options for callback support.
1583
1584        my $mce = MCE->new( ... );
1585
1586        $mce->run;
1587
1588        my $exit_status = $mce->status;
1589

METHODS for WORKERS only

1591       Methods listed below are callable by workers only.
1592
1593   $mce->do ( 'callback_func' [, $arg1, ... ] )
1594       MCE serializes data transfers from a worker process via helper
1595       functions do & sendto to the manager process. The callback function can
1596       optionally return a reply.
1597
1598        [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1599
1600       Passing args to a callback function using references & scalar.
1601
1602        sub callback {
1603           my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1604           ...
1605        }
1606
1607        MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1608        MCE->do('callback', \@a, \%h, \$s, 'foo');
1609
1610       MCE knows if wanting a void, list, hash, or a scalar return value.
1611
1612        MCE->do('callback' [, $arg1, ... ]);
1613
1614        my @array  = MCE->do('callback' [, $arg1, ... ]);
1615        my %hash   = MCE->do('callback' [, $arg1, ... ]);
1616        my $scalar = MCE->do('callback' [, $arg1, ... ]);
1617
1618   $mce->exit ( [ $status [, $message [, $id ] ] ] )
1619       A worker exits from MCE entirely. $id (optional) can be used for
1620       passing the primary key or a string along with the message. Look at the
1621       on_post_exit or on_post_run options for callback support.
1622
1623        MCE->exit;           ## default 0
1624        MCE->exit(1);
1625        MCE->exit(2, 'chunk failed', $chunk_id);
1626        MCE->exit(0, 'msg_foo', 'id_1000');
1627
1628   $mce->gather ( $arg1, [, $arg2, ... ] )
1629       A worker can submit data to the location specified via the gather
1630       option by calling this method. See MCE::Flow and MCE::Loop for
1631       additional use-case.
1632
1633        use MCE;
1634
1635        my @hosts = qw(
1636           hosta hostb hostc hostd hoste
1637        );
1638
1639        my $mce = MCE->new(
1640           chunk_size => 1, max_workers => 3,
1641
1642           user_func => sub {
1643            # my ($mce, $chunk_ref, $chunk_id) = @_;
1644              my ($output, $error, $status); my $host = $_;
1645
1646              ## Do something with $host;
1647              $output = "Worker ". MCE->wid .": Hello from $host";
1648
1649              if (MCE->chunk_id % 3 == 0) {
1650                 ## Simulating an error condition
1651                 local $? = 1; $status = $?;
1652                 $error = "Error from $host"
1653              }
1654              else {
1655                 $status = 0;
1656              }
1657
1658              ## Ensure unique keys (key, value) when gathering to a
1659              ## hash.
1660              MCE->gather("$host.out", $output, "$host.sta", $status);
1661              MCE->gather("$host.err", $error) if (defined $error);
1662           }
1663        );
1664
1665        my %h; $mce->process(\@hosts, { gather => \%h });
1666
1667        foreach my $host (@hosts) {
1668           print $h{"$host.out"}, "\n";
1669           print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1670           print "Exit status: ", $h{"$host.sta"}, "\n\n";
1671        }
1672
1673        -- Output
1674
1675        Worker 2: Hello from hosta
1676        Exit status: 0
1677
1678        Worker 1: Hello from hostb
1679        Exit status: 0
1680
1681        Worker 3: Hello from hostc
1682        Error from hostc
1683        Exit status: 1
1684
1685        Worker 2: Hello from hostd
1686        Exit status: 0
1687
1688        Worker 1: Hello from hoste
1689        Exit status: 0
1690
1691   $mce->last ( void )
1692       Worker leaves the chunking loop or user_func block immediately.
1693       Callable from inside foreach, forchunk, forseq, and user_func.
1694
1695        use MCE;
1696
1697        my $mce = MCE->new(
1698           max_workers => 5
1699        );
1700
1701        my @list = (1 .. 80);
1702
1703        $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1704
1705           my ($mce, $chunk_ref, $chunk_id) = @_;
1706           MCE->last if ($chunk_id > 4);
1707
1708           my @output = ();
1709
1710           foreach my $rec ( @{ $chunk_ref } ) {
1711              push @output, $rec, "\n";
1712           }
1713
1714           MCE->print(@output);
1715        });
1716
1717        -- Output (each chunk above consists of 2 elements)
1718
1719        3
1720        4
1721        1
1722        2
1723        7
1724        8
1725        5
1726        6
1727
1728   $mce->next ( void )
1729       Worker starts the next iteration of the chunking loop. Callable from
1730       inside foreach, forchunk, forseq, and user_func.
1731
1732        use MCE;
1733
1734        my $mce = MCE->new(
1735           max_workers => 5
1736        );
1737
1738        my @list = (1 .. 80);
1739
1740        $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1741
1742           my ($mce, $chunk_ref, $chunk_id) = @_;
1743           MCE->next if ($chunk_id < 20);
1744
1745           my @output = ();
1746
1747           foreach my $rec ( @{ $chunk_ref } ) {
1748              push @output, $rec, "\n";
1749           }
1750
1751           MCE->print(@output);
1752        });
1753
1754        -- Output (each chunk above consists of 4 elements)
1755
1756        77
1757        78
1758        79
1759        80
1760
1761   $mce->printf ( $format, $list [, ... ] )
1762   $mce->print ( $list [, ... ] )
1763   $mce->say ( $list [, ... ] )
1764       Use the printf, print, and say methods when wanting to serialize output
1765       among workers. These are sugar syntax for the sendto method. These
1766       behave similar to the native subroutines in Perl with the exception
1767       that barewords must be passed as a reference and require the comma
1768       after it including file handles.
1769
1770       Say is like print, but implicitly appends a newline.
1771
1772        MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1773        MCE->printf($fh, "%s: %d\n", $name, $age);
1774        MCE->printf("%s: %d\n", $name, $age);
1775
1776        MCE->print(\*STDERR, "$error_msg\n");
1777        MCE->print($fh, $log_msg."\n");
1778        MCE->print("$output_msg\n");
1779
1780        MCE->say(\*STDERR, $error_msg);
1781        MCE->say($fh, $log_msg);
1782        MCE->say($output_msg);
1783
1784       Caveat: Use the following syntax when passing a reference not a glob or
1785       file handle. Otherwise, MCE will error indicating the first argument is
1786       not a glob reference.
1787
1788        MCE->print(\*STDOUT, \@array, "\n");
1789        MCE->print("", \@array, "\n");         ## ok
1790
1791   $mce->relay ( sub { code } )
1792   $mce->relay_recv ( void )
1793       Described in MCE::Relay.
1794
1795   $mce->sendto ( $to, $arg1, ... )
1796       The sendto method is called by workers for serializing data to standard
1797       output, standard error, or end of file. The action is done by the
1798       manager process.
1799
1800       Release 1.00x supported 1 data argument, not more.
1801
1802        MCE->sendto('file', \@array, '/path/to/file');
1803        MCE->sendto('file', \$scalar, '/path/to/file');
1804        MCE->sendto('file', $scalar, '/path/to/file');
1805
1806        MCE->sendto('STDERR', \@array);
1807        MCE->sendto('STDERR', \$scalar);
1808        MCE->sendto('STDERR', $scalar);
1809
1810        MCE->sendto('STDOUT', \@array);
1811        MCE->sendto('STDOUT', \$scalar);
1812        MCE->sendto('STDOUT', $scalar);
1813
1814       Release 1.100 added the ability to pass multiple arguments. Notice the
1815       syntax change for sending to a file. Passing a reference to an array is
1816       no longer necessary.
1817
1818        MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1819        MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1820        MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1821
1822        MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1823
1824       To retain 1.00x compatibility, sendto outputs the content when a single
1825       data reference is specified. Otherwise, the reference for \@array or
1826       \$scalar is shown in 1.500, not the content.
1827
1828        MCE->sendto('STDERR', \@array);        ## 1.00x behavior, content
1829        MCE->sendto('STDOUT', \$scalar);
1830        MCE->sendto('file:/path/to/file', \@array);
1831
1832        ## Output matches the print statement
1833
1834        MCE->sendto(\*STDERR, \@array);        ## 1.500 behavior, reference
1835        MCE->sendto(\*STDOUT, \$scalar);
1836        MCE->sendto($fh, \@array);
1837
1838        MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1839        print {*STDOUT} \@array, "\n", \$scalar, "\n";
1840
1841       MCE 1.500 added support for sending to a glob reference, file
1842       descriptor, and file handle.
1843
1844        MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1845        MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1846        MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1847
1848   $mce->sync ( void )
1849       A barrier sync operation means any worker must stop at this point until
1850       all workers reach this barrier. Barrier syncing is useful for many
1851       computer algorithms.
1852
1853       Barrier synchronization is supported for task 0 only or omitting
1854       user_tasks.  All workers assigned task_id 0 must call sync whenever
1855       barrier syncing.
1856
1857        use MCE;
1858
1859        sub user_func {
1860
1861           my ($mce) = @_;
1862           my $wid = MCE->wid;
1863
1864           MCE->sendto("STDOUT", "a: $wid\n");   ## MCE 1.0+
1865           MCE->sync;
1866
1867           MCE->sendto(\*STDOUT, "b: $wid\n");   ## MCE 1.5+
1868           MCE->sync;
1869
1870           MCE->print("c: $wid\n");              ## MCE 1.5+
1871           MCE->sync;
1872
1873           return;
1874        }
1875
1876        my $mce = MCE->new(
1877           max_workers => 4, user_func => \&user_func
1878        )->run;
1879
1880        -- Output (without barrier synchronization)
1881
1882        a: 1
1883        a: 2
1884        b: 1
1885        b: 2
1886        c: 1
1887        c: 2
1888        a: 3
1889        b: 3
1890        c: 3
1891        a: 4
1892        b: 4
1893        c: 4
1894
1895        -- Output (with barrier synchronization)
1896
1897        a: 1
1898        a: 2
1899        a: 4
1900        a: 3
1901        b: 2
1902        b: 1
1903        b: 3
1904        b: 4
1905        c: 1
1906        c: 4
1907        c: 2
1908        c: 3
1909
1910       Consider the following example. The MCE->sync operation is done inside
1911       a loop along with MCE->do. A stall may occur for workers calling sync
1912       the 2nd or 3rd time while other workers are sending results via MCE->do
1913       or MCE->sendto.
1914
1915       It requires another semaphore lock in MCE to solve this which was not
1916       done in order to keep resources low. Therefore, please keep this in
1917       mind when mixing MCE->sync with MCE->do or output serialization methods
1918       inside a loop.
1919
1920        sub user_func {
1921
1922           my ($mce) = @_;
1923           my @result;
1924
1925           for (1 .. 3) {
1926              ... compute algorithm ...
1927
1928              MCE->sync;
1929
1930              ... compute algorithm ...
1931
1932              MCE->sync;
1933
1934              MCE->do('aggregate_result', \@result);  ## or MCE->sendto
1935
1936              MCE->sync;      ## The sync operation is also needed here to
1937                              ## prevent MCE from stalling.
1938           }
1939        }
1940
1941   $mce->yield ( void )
1942       There may be on occasion when the MCE driven app is too fast. The
1943       interval option combined with the yield method, both introduced with
1944       MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
1945       the design.
1946
1947       A use case is an app configured with 100 workers running on a 24
1948       logical way box. Data is polled from a database containing over 2.5
1949       million rows. Workers chunk away at 300 rows per chunk performing SNMP
1950       gets (300 sockets per worker) polling 25 metrics from each device. With
1951       this scenario, the load on the box may rise beyond 90+. In addition,
1952       IP_Tables may reach its contention point causing the entire application
1953       to fail.
1954
1955       The scenario above is solved by simply having workers yield among
1956       themselves in a synchronized fashion. A delay of 0.007 seconds between
1957       intervals is all that's needed. The load on the box will hover between
1958       23 ~ 27 for the duration of the run. Polling completes in under 17
1959       minutes time. This is quite fast considering the app polls 62.5 million
1960       metrics combined. The math equates to 3,676,470 per minute or rather
1961       61,275 per second from a single box.
1962
1963        ## Both max_nodes and node_id are optional (default 1).
1964
1965        interval => {
1966           delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
1967        }
1968
1969       A 4 node setup can poll 10 million devices without the additional
1970       overhead of a distribution agent. The difference between the 4 nodes
1971       are simply node_id and the where clause used to query the database. The
1972       mac addresses are random such that the data divides equally to any
1973       power of 2. The distribution key lies in the mac address itself. In
1974       fact, the 2nd character from the right is sufficient for maximizing on
1975       the power of randomness for equal distribution.
1976
1977        Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
1978        Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
1979        Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
1980        Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
1981
1982       Below, the user_tasks is configured to simulate 4 nodes. This
1983       demonstration uses 2 workers to minimize the output size. Input is from
1984       the sequence option.
1985
1986        use Time::HiRes qw(time);
1987        use MCE;
1988
1989        my $d = shift || 0.1;
1990
1991        local $| = 1;
1992
1993        sub create_task {
1994
1995           my ($node_id) = @_;
1996
1997           my $seq_size  = 6;
1998           my $seq_start = ($node_id - 1) * $seq_size + 1;
1999           my $seq_end   = $seq_start + $seq_size - 1;
2000
2001           return {
2002              max_workers => 2, sequence => [ $seq_start, $seq_end ],
2003              interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2004           };
2005        }
2006
2007        sub user_begin {
2008
2009           my ($mce, $task_id, $task_name) = @_;
2010
2011           ## The yield method causes this worker to wait for its next time
2012           ## interval slot before running. Yield has no effect without the
2013           ## 'interval' option.
2014
2015           ## Yielding is beneficial inside a user_begin block. A use case
2016           ## is staggering database connections among workers in order
2017           ## to not impact the DB server.
2018
2019           MCE->yield;
2020
2021           MCE->printf(
2022              "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2023              MCE->task_id + 1, time, MCE->task_wid, ''
2024           );
2025
2026           return;
2027        }
2028
2029        {
2030           my $prev_time = time;
2031
2032           sub user_func {
2033
2034              my ($mce, $seq_n, $chunk_id) = @_;
2035
2036              ## Yield simply waits for the next time interval.
2037              MCE->yield;
2038
2039              ## Calculate how long this worker has waited.
2040              my $curr_time = time;
2041              my $time_waited = $curr_time - $prev_time;
2042
2043              $prev_time = $curr_time;
2044
2045              MCE->printf(
2046                 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2047                 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2048              );
2049
2050              return;
2051           }
2052        }
2053
2054        ## Simulate a 4 node environment passing node_id to create_task.
2055
2056        print "Node_ID  Current_Time        Worker_ID  Time_Waited     Comment\n";
2057
2058        MCE->new(
2059           user_begin => \&user_begin,
2060           user_func  => \&user_func,
2061
2062           user_tasks => [
2063              create_task(1),
2064              create_task(2),
2065              create_task(3),
2066              create_task(4)
2067           ]
2068
2069        )->run;
2070
2071        -- Output (notice Current_Time below, stays 0.10 apart)
2072
2073        Node_ID  Current_Time        Worker_ID  Time_Waited     Comment
2074        Node  1: 1374807976.74634 -- Worker  1:              -- Started
2075        Node  2: 1374807976.84634 -- Worker  1:              -- Started
2076        Node  3: 1374807976.94638 -- Worker  1:              -- Started
2077        Node  4: 1374807977.04639 -- Worker  1:              -- Started
2078        Node  1: 1374807977.14634 -- Worker  2:              -- Started
2079        Node  2: 1374807977.24640 -- Worker  2:              -- Started
2080        Node  3: 1374807977.34649 -- Worker  2:              -- Started
2081        Node  4: 1374807977.44657 -- Worker  2:              -- Started
2082        Node  1: 1374807977.54636 -- Worker  1:      0.90037 -- Seq_N   1
2083        Node  2: 1374807977.64638 -- Worker  1:      1.00040 -- Seq_N   7
2084        Node  3: 1374807977.74642 -- Worker  1:      1.10043 -- Seq_N  13
2085        Node  4: 1374807977.84643 -- Worker  1:      1.20045 -- Seq_N  19
2086        Node  1: 1374807977.94636 -- Worker  2:      1.30037 -- Seq_N   2
2087        Node  2: 1374807978.04638 -- Worker  2:      1.40040 -- Seq_N   8
2088        Node  3: 1374807978.14641 -- Worker  2:      1.50042 -- Seq_N  14
2089        Node  4: 1374807978.24644 -- Worker  2:      1.60045 -- Seq_N  20
2090        Node  1: 1374807978.34628 -- Worker  1:      0.79996 -- Seq_N   3
2091        Node  2: 1374807978.44631 -- Worker  1:      0.79996 -- Seq_N   9
2092        Node  3: 1374807978.54634 -- Worker  1:      0.79996 -- Seq_N  15
2093        Node  4: 1374807978.64636 -- Worker  1:      0.79997 -- Seq_N  21
2094        Node  1: 1374807978.74628 -- Worker  2:      0.79996 -- Seq_N   4
2095        Node  2: 1374807978.84632 -- Worker  2:      0.79997 -- Seq_N  10
2096        Node  3: 1374807978.94634 -- Worker  2:      0.79996 -- Seq_N  16
2097        Node  4: 1374807979.04636 -- Worker  2:      0.79996 -- Seq_N  22
2098        Node  1: 1374807979.14628 -- Worker  1:      0.80001 -- Seq_N   5
2099        Node  2: 1374807979.24631 -- Worker  1:      0.80000 -- Seq_N  11
2100        Node  3: 1374807979.34634 -- Worker  1:      0.80001 -- Seq_N  17
2101        Node  4: 1374807979.44636 -- Worker  1:      0.80000 -- Seq_N  23
2102        Node  1: 1374807979.54628 -- Worker  2:      0.80000 -- Seq_N   6
2103        Node  2: 1374807979.64631 -- Worker  2:      0.80000 -- Seq_N  12
2104        Node  3: 1374807979.74633 -- Worker  2:      0.80000 -- Seq_N  18
2105        Node  4: 1374807979.84636 -- Worker  2:      0.80000 -- Seq_N  24
2106
2107       The interval.pl example above is included with MCE.
2108

PROGRESS DEMONSTRATIONS

2110       The "progress" option takes a code block for receiving info on the
2111       progress made while processing input data; e.g. "input_data" or
2112       "sequence". To make this work, one provides the "progress" option a
2113       closure block like so, passing along the size of the input_data; e.g
2114       "scalar @array" or "-s /path/to/file".
2115
2116       Current API available since 1.813.
2117
2118       A worker, upon completing processing its chunk, notifies the manager-
2119       process with the size of the chunk. That could be the number of rows or
2120       literally the size of the chunk when processing an input file. The
2121       manager-process accumulates the size before calling the code block
2122       associated with the "progress" option.
2123
2124       When running many tasks simultaneously, via "user_tasks", the call is
2125       initiated by workers at level 0 only or rather the first task, not
2126       shown here.
2127
2128        use Time::HiRes 'sleep';
2129        use MCE;
2130
2131        sub make_progress {
2132           my ($total_size) = @_;
2133           return sub {
2134              my ($completed_size) = @_;
2135              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2136           };
2137        }
2138
2139        my @input = (1..150);
2140
2141        MCE->new(
2142           chunk_size  => 10,
2143           max_workers => 4,
2144           input_data  => \@input,
2145           progress    => make_progress( scalar @input ),
2146           user_func   => sub { sleep 1.5 }
2147        )->run();
2148
2149        -- Output
2150
2151        6.7%
2152        13.3%
2153        20.0%
2154        26.7%
2155        33.3%
2156        40.0%
2157        46.7%
2158        53.3%
2159        60.0%
2160        66.7%
2161        73.3%
2162        80.0%
2163        86.7%
2164        93.3%
2165        100.0%
2166
2167       Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2168       thing, practically.
2169
2170        use Time::HiRes 'sleep';
2171        use ProgressBar::Stack;
2172        use MCE::Flow;
2173
2174        sub make_progress {
2175           my ($total_size) = @_;
2176           init_progress();
2177           return sub {
2178              my ($completed_size) = @_;
2179              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2180           };
2181        }
2182
2183        my @input = (1..150);
2184
2185        MCE::Flow->init(
2186           chunk_size  => 10,
2187           max_workers => 4,
2188           progress    => make_progress( scalar @input )
2189        );
2190
2191        MCE::Flow->run( sub { sleep 1.5 }, \@input );
2192        MCE::Flow->finish();
2193
2194        print "\n";
2195
2196        -- Output
2197
2198        [################    ]  80.0% ETA: 0:01
2199
2200       For sequence of numbers, using the "sequence" option, one must account
2201       for "step_size", typically set to 1 automatically.
2202
2203        use Time::HiRes 'sleep';
2204        use MCE;
2205
2206        sub make_progress {
2207           my ($total_size) = @_;
2208           return sub {
2209              my ($completed_size) = @_;
2210              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2211           };
2212        }
2213
2214        MCE->new(
2215           chunk_size  => 10,
2216           max_workers => 4,
2217           sequence    => [ 1, 100, 2 ],
2218           progress    => make_progress( int( 100 / 2 + 0.5 ) ),
2219           user_func   => sub { sleep 1.5 }
2220        )->run();
2221
2222        -- Output
2223
2224        20.0%
2225        40.0%
2226        60.0%
2227        80.0%
2228        100.0%
2229
2230       Changing "chunk_size" to 1 means workers notify the manager process
2231       more often, thus increasing granularity. Take a look at the output.
2232
2233        2.0%
2234        4.0%
2235        6.0%
2236        8.0%
2237        10.0%
2238        ...
2239        92.0%
2240        94.0%
2241        96.0%
2242        98.0%
2243        100.0%
2244
2245       Here is the same thing using MCE::Flow together with
2246       ProgressBar::Stack.
2247
2248        use Time::HiRes 'sleep';
2249        use ProgressBar::Stack;
2250        use MCE::Flow;
2251
2252        sub make_progress {
2253           my ($total_size) = @_;
2254           init_progress();
2255           return sub {
2256              my ($completed_size) = @_;
2257              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2258           };
2259        }
2260
2261        MCE::Flow->init(
2262           chunk_size  => 1,
2263           max_workers => 4,
2264           progress    => make_progress( int( 100 / 2 + 0.5 ) )
2265        );
2266
2267        MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2268        MCE::Flow->finish();
2269
2270        print "\n";
2271
2272        -- Output
2273
2274        [#########           ]  48.0% ETA: 0:03
2275
2276       For files and file handles, workers send the actual size of the data
2277       read versus counting rows.
2278
2279        use Time::HiRes 'sleep';
2280        use MCE;
2281
2282        sub make_progress {
2283           my ($total_size) = @_;
2284           return sub {
2285              my ($completed_size) = @_;
2286              printf "%0.1f%%\n", $completed_size / $total_size * 100;
2287           };
2288        }
2289
2290        my $input_file = "/path/to/input_file.txt";
2291
2292        MCE->new(
2293           chunk_size  => 5,
2294           max_workers => 4,
2295           input_data  => $input_file,
2296           progress    => make_progress( -s $input_file ),
2297           user_func   => sub { sleep 0.03 }
2298        )->run();
2299
2300       For consistency, here is the example using MCE::Flow, again with
2301       ProgressBar::Stack.
2302
2303        use Time::HiRes 'sleep';
2304        use ProgressBar::Stack;
2305        use MCE::Flow;
2306
2307        sub make_progress {
2308           my ($total_size) = @_;
2309           init_progress();
2310           return sub {
2311              my ($completed_size) = @_;
2312              update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2313           };
2314        }
2315
2316        my $input_file = "/path/to/input_file.txt";
2317
2318        MCE::Flow->init(
2319           chunk_size  => 5,
2320           max_workers => 4,
2321           progress    => make_progress( -s $input_file )
2322        );
2323
2324        MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2325        MCE::Flow->finish();
2326
2327       The next demonstration processes three arrays consecutively. For this
2328       one, MCE workers persist after running. This needs MCE 1.814 or later
2329       to run. Otherwise, the progress output is not shown in MCE 1.813.
2330
2331        use Time::HiRes 'sleep';
2332        use ProgressBar::Stack;
2333        use MCE;
2334
2335        sub make_progress {
2336           my ($total_size, $message) = @_;
2337           init_progress();
2338           return sub {
2339              my ($completed_size) = @_;
2340              update_progress(
2341                 sprintf("%0.1f", $completed_size / $total_size * 100),
2342                 $message
2343              );
2344           };
2345        }
2346
2347        my $mce = MCE->new(
2348           chunk_size  => 10,
2349           max_workers => 4,
2350           user_func   => sub { sleep 0.5 }
2351        )->spawn();
2352
2353        my @a1 = ( 1 .. 200 );
2354        my @a2 = ( 1 .. 500 );
2355        my @a3 = ( 1 .. 300 );
2356
2357        $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2358
2359        print "\n";
2360
2361        $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2362
2363        print "\n";
2364
2365        $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2366
2367        print "\n";
2368
2369        $mce->shutdown;
2370
2371        -- Output
2372
2373        [####################] 100.0% ETA: 0:00 array 1
2374        [####################] 100.0% ETA: 0:00 array 2
2375        [####################] 100.0% ETA: 0:00 array 3
2376
2377       When size is not know, such as reading from "STDIN", the only thing one
2378       can do is report the size completed thus far.
2379
2380        # 1 kibibyte equals 1024 bytes
2381
2382        progress => sub {
2383           my ($completed_size) = @_;
2384           printf "%0.1f kibibytes\n", $completed_size / 1024;
2385        }
2386

INDEX

2388       MCE
2389

AUTHOR

2391       Mario E. Roy, <marioeroy AT gmail DOT com>
2392
2393
2394
2395perl v5.28.0                      2018-08-25                      MCE::Core(3)
Impressum