1MCE::Core(3) User Contributed Perl Documentation MCE::Core(3)
2
3
4
6 MCE::Core - Documentation describing the core MCE API
7
9 This document describes MCE::Core version 1.838
10
12 This is a simplistic use case of MCE running with 5 workers.
13
14 ## Construction using the Core API
15
16 use MCE;
17
18 my $mce = MCE->new(
19 max_workers => 5,
20 user_func => sub {
21 my ($mce) = @_;
22 $mce->say("Hello from " . $mce->wid);
23 }
24 );
25
26 $mce->run;
27
28 ## Construction using a MCE model
29
30 use MCE::Flow max_workers => 5;
31
32 mce_flow sub {
33 my ($mce) = @_;
34 MCE->say("Hello from " . MCE->wid);
35 };
36
37 -- Output
38
39 Hello from 2
40 Hello from 4
41 Hello from 5
42 Hello from 1
43 Hello from 3
44
45 MCE->new ( [ options ] )
46 Below, a new instance is configured with all available options.
47
48 use MCE;
49
50 my $mce = MCE->new(
51
52 max_workers => 8, ## Default 1
53
54 # Number of workers to spawn. This can be set automatically
55 # with MCE 1.412 and later releases.
56
57 # MCE 1.521 sets an upper-limit of 8 for 'auto'.
58 # See MCE::Util::get_ncpu for more info.
59
60 # max_workers => 'auto', ## # of lcores, 8 maximum
61 # max_workers => 'auto-1', ## 7 on HW with 16-lcores
62 # max_workers => 'auto-1', ## 3 on HW with 4-lcores
63
64 # max_workers => MCE::Util::get_ncpu, # run on all lcores
65
66 chunk_size => 2000, ## Default 1
67
68 # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
69 # The default is 1 when using the Core API and 'auto' for
70 # MCE Models. For arrays or queues, chunk_size means the
71 # number of records per chunk. For iterators, MCE will not
72 # use chunk_size, though the iterator may use it to determine
73 # how much to return per iteration. For files, smaller than or
74 # equal to 8192 is the number of records. Greater than 8192
75 # is the number of bytes. MCE reads until the end of record
76 # before calling user_func.
77
78 # chunk_size => 1, ## Consists of 1 record
79 # chunk_size => 1000, ## Consists of 1000 records
80 # chunk_size => '16k', ## Approximate 16 kibiBytes (KiB)
81 # chunk_size => '20m', ## Approximate 20 mebiBytes (MiB)
82
83 tmp_dir => $tmp_dir, ## Default $MCE::Signal::tmp_dir
84
85 # Default is $MCE::Signal::tmp_dir which points to
86 # $ENV{TEMP} if defined. Otherwise, tmp_dir points
87 # to a location under /tmp.
88
89 freeze => \&encode_sereal, ## Default \&Storable::freeze
90 thaw => \&decode_sereal, ## Default \&Storable::thaw
91
92 # Release 1.412 allows freeze and thaw to be overridden.
93 # Simply include a serialization module prior to loading
94 # MCE. Configure freeze/thaw options.
95
96 # use Sereal qw( encode_sereal decode_sereal );
97 # use CBOR::XS qw( encode_cbor decode_cbor );
98 # use JSON::XS qw( encode_json decode_json );
99 #
100 # use MCE;
101
102 gather => \@a, ## Default undef
103
104 # Release 1.5 allows for gathering of data to an array or
105 # hash reference, a MCE::Queue/Thread::Queue object, or code
106 # reference. One invokes gathering by calling the gather
107 # method as often as needed.
108
109 # gather => \@array,
110 # gather => \%hash,
111 # gather => $queue,
112 # gather => \&order,
113
114 init_relay => 0, ## Default undef
115
116 # For specifying the initial relay value. Allowed values
117 # are array_ref, hash_ref, or scalar. The MCE::Relay module
118 # is loaded automatically when specified.
119
120 # init_relay => \@array,
121 # init_relay => \%hash,
122 # init_relay => scalar,
123
124 input_data => $input_file, ## Default undef
125 RS => "\n>", ## Default undef
126
127 # input_data => '/path/to/file' ## Process file
128 # input_data => \@array ## Process array
129 # input_data => \*FILE_HNDL ## Process file handle
130 # input_data => \$scalar ## Treated like a file
131 # input_data => \&iterator ## User specified iterator
132
133 # The RS option (for input record separator) applies to files
134 # and file handles.
135
136 # MCE applies additional logic when RS begins with a newline
137 # character; e.g. RS => "\n>". It trims away characters after
138 # the newline and prepends them to the next record.
139 #
140 # Typically, the left side is what happens for $/ = "\n>".
141 # The right side is what user_func receives.
142 #
143 # All records begin with > and end with \n
144 # Record 1: >seq1 ... \n> (to) >seq1 ... \n
145 # Record 2: seq2 ... \n> >seq2 ... \n
146 # Record 3: seq3 ... \n> >seq3 ... \n
147 # Last Rec: seqN ... \n >seqN ... \n
148
149 loop_timeout => 5, ## Default 0
150
151 # Added in 1.7, enables the manager process to timeout of a read
152 # on channel 0. The manager process decrements the total workers
153 # running for any worker which have died in an uncontrollable
154 # manner. Specify this option if on occassion a worker runs out
155 # of memory or dies due to an error from an XS module.
156 #
157 # A number smaller than 5 is silently increased to 5.
158
159 max_retries => 2, ## Default 0
160
161 # This option, added in 1.7, causes MCE to retry a failed
162 # chunk from a worker dying while processing input data or
163 # sequence of numbers.
164
165 parallel_io => 1, ## Default 0
166 posix_exit => 1, ## Default 0
167 use_slurpio => 1, ## Default 0
168
169 # The parallel_io option enables parallel reads during large
170 # slurpio, useful when reading from fast storage. Do not enable
171 # parallel_io when running MCE on many nodes with input coming
172 # from shared storage.
173
174 # Set posix_exit to avoid all END and destructor processing.
175 # Constructing MCE inside a thread implies 1 or if present CGI,
176 # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
177 # Mojo::IOLoop, Prima, STFL, Tk, Wx, or Win32::GUI.
178
179 # Enable slurpio to pass the raw chunk (scalar ref) to the user
180 # function when reading input files.
181
182 use_threads => 1, ## Auto 0 or 1
183
184 # MCE spawns child processes by default, not threads.
185 #
186 # However, MCE supports threads via 2 threading
187 # libraries if threads is desired.
188
189 # The use of threads in MCE requires that you include
190 # threads support prior to loading MCE. The use_threads
191 # option defaults to 1 when a thread library is loaded.
192 # Threads is loaded automatically for $^O eq 'MSWin32'.
193 #
194 # use threads; use forks;
195 # use threads::shared; (or) use forks::shared;
196 # use MCE use MCE;
197
198 spawn_delay => 0.035, ## Default undef
199 submit_delay => 0.002, ## Default undef
200 job_delay => 0.150, ## Default undef
201
202 # Time to wait, in fractional seconds, after spawning
203 # a worker, parameters submission to a worker, and
204 # job commencement (running, staggered delay).
205
206 # Specify job_delay when wanting to stagger
207 # workers connecting to a database.
208
209 on_post_exit => \&on_post_exit, ## Default undef
210 on_post_run => \&on_post_run, ## Default undef
211
212 # Execute code block after a worker exits or dies.
213 # MCE->exit, exit, or die
214
215 # Execute code block after running.
216 # MCE->process or MCE->run
217
218 progress => sub { ... }, ## Default undef
219
220 # A code block for receiving info on progress made.
221 # See section labeled "PROGRESS DEMONSTRATIONS"
222 # at the end of this document.
223
224 user_args => { env => 'test' }, ## Default undef
225
226 # MCE release 1.4 added a new parameter to allow one to
227 # specify arbitrary arguments such as a string, an ARRAY
228 # or a HASH reference. Workers can access this directly:
229 # my $args = $mce->{user_args} or MCE->user_args;
230
231 user_begin => \&user_begin, ## Default undef
232 user_func => \&user_func, ## Default undef
233 user_end => \&user_end, ## Default undef
234
235 # Think of user_begin, user_func, and user_end as in
236 # the awk scripting language:
237 # awk 'BEGIN { begin } { func } { func } ... END { end }'
238
239 # MCE workers call user_begin once at the start of a job,
240 # then user_func repeatedly until no chunks remain.
241 # Afterwards, user_end is called.
242
243 user_error => \&user_error, ## Default undef
244 user_output => \&user_output, ## Default undef
245
246 # MCE will forward data to user_error/user_output,
247 # when defined, for the following methods.
248
249 # MCE->sendto(\*STDERR, "sent to user_error\n");
250 # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
251 # MCE->print(\*STDERR, "sent to user_error\n");
252 # MCE->say(\*STDERR, "sent to user_error");
253
254 # MCE->sendto(\*STDOUT, "sent to user_output\n");
255 # MCE->printf("%s\n", "sent to user_output");
256 # MCE->print("sent to user_output\n");
257 # MCE->say("sent to user_output");
258
259 stderr_file => 'err_file', ## Default STDERR
260 stdout_file => 'out_file', ## Default STDOUT
261
262 # Or to file; user_error and user_output take precedence.
263
264 flush_file => 1, ## Default 0
265 flush_stderr => 1, ## Default 0
266 flush_stdout => 1, ## Default 0
267
268 # Flush sendto file, standard error, or standard output.
269
270 interval => {
271 delay => 0.007 [, max_nodes => 4, node_id => 1 ]
272 },
273
274 # For use with the yield method introduced in MCE 1.5.
275 # Both max_nodes & node_id are optional and default to 1.
276 # Delay is the amount of time between intervals.
277
278 # interval => 0.007 ## Shorter; MCE 1.506+
279
280 sequence => { ## Default undef
281 begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
282 },
283
284 bounds_only => 1, ## Default undef
285
286 # For looping through a sequence of numbers in parallel.
287 # STEP, if omitted, defaults to 1 if BEGIN is smaller than
288 # END or -1 if BEGIN is greater than END. The FORMAT string
289 # is passed to sprintf behind the scene (% may be omitted).
290 # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
291
292 # Do not specify both options; input_data and sequence.
293 # Release 1.4 allows one to specify an array reference.
294 # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
295
296 # The bounds_only => 1 option will compute the 'begin' and
297 # 'end' items only for the chunk and not the items in between
298 # (hence boundaries only). This option has no effect when
299 # sequence is not specified or chunk_size equals 1.
300
301 # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
302
303 task_end => \&task_end, ## Default undef
304
305 # This is called by the manager process after the task
306 # has completed processing. MCE 1.5 allows this option
307 # to be specified at the top level.
308
309 task_name => 'string', ## Default 'MCE'
310
311 # Added in MCE 1.5 and mainly beneficial for user_tasks.
312 # One may specify a unique name per each sub-task.
313 # The string is passed as the 3rd arg to task_end.
314
315 user_tasks => [ ## Default undef
316 { ... }, ## Options for task 0
317 { ... }, ## Options for task 1
318 { ... }, ## Options for task 2
319 ],
320
321 # Takes a list of hash references, each allowing up to 17
322 # options. All other MCE options are ignored. The init_relay,
323 # input_data, RS, and use_slurpio options are applicable to
324 # the first task only.
325
326 # max_workers, chunk_size, input_data, interval, sequence,
327 # bounds_only, user_args, user_begin, user_end, user_func,
328 # gather, task_end, task_name, use_slurpio, use_threads,
329 # init_relay, RS
330
331 # Options not specified here will default to same option
332 # specified at the top level.
333 );
334
335 EXPORT_CONST, CONST
336 There are 3 constants which are exportable. Using the constants in lieu
337 of 0,1,2 makes it more legible when accessing the user_func arguments
338 directly.
339
340 SELF CHUNK CID
341
342 Exports SELF => 0, CHUNK => 1, and CID => 2.
343
344 use MCE export_const => 1;
345 use MCE const => 1; ## Shorter; MCE 1.415+
346
347 user_func => sub {
348 # my ($mce, $chunk_ref, $chunk_id) = @_;
349 print "Hello from ", $_[SELF]->wid, "\n";
350 }
351
352 MCE 1.5 allows all public method to be called directly.
353
354 use MCE;
355
356 user_func => sub {
357 # my ($mce, $chunk_ref, $chunk_id) = @_;
358 print "Hello from ", MCE->wid, "\n";
359 }
360
361 OVERRIDING DEFAULTS
362 The following list options which may be overridden when loading the
363 module.
364
365 use Sereal qw( encode_sereal decode_sereal );
366 use CBOR::XS qw( encode_cbor decode_cbor );
367 use JSON::XS qw( encode_json decode_json );
368
369 use MCE
370 max_workers => 4, ## Default 1
371 chunk_size => 100, ## Default 1
372 tmp_dir => "/path/to/app/tmp", ## $MCE::Signal::tmp_dir
373 freeze => \&encode_sereal, ## \&Storable::freeze
374 thaw => \&decode_sereal ## \&Storable::thaw
375 ;
376
377 my $mce = MCE->new( ... );
378
379 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
380 available. Specify "Sereal =" 0> to use Storable instead.
381
382 use MCE Sereal => 0;
383
384 RUNNING
385 Run calls spawn, submits the job; workers call user_begin, user_func,
386 and user_end. Run shuts down workers afterwards. Call spawn whenever
387 the need arises for large data structures prior to running.
388
389 $mce->spawn; ## Call early if desired
390
391 $mce->run; ## Call run or process below
392
393 ## Acquire data arrays and/or input_files. Workers persist after
394 ## processing.
395
396 $mce->process(\@input_data_1); ## Process array
397 $mce->process(\@input_data_2);
398 $mce->process(\@input_data_n);
399
400 $mce->process(\%input_hash_1); ## Process hash, current API
401 $mce->process(\%input_hash_2); ## available since 1.828
402 $mce->process(\%input_hash_n);
403
404 $mce->process('input_file_1'); ## Process file
405 $mce->process('input_file_2');
406 $mce->process('input_file_n');
407
408 $mce->shutdown; ## Shutdown workers
409
410 SYNTAX for ON_POST_EXIT
411 Often times, one may want to capture the exit status. The on_post_exit
412 option, if defined, is executed immediately by the manager process
413 after a worker exits via exit (children only), MCE->exit (children and
414 threads), or die.
415
416 The format of $e->{pid} is PID_123 for children and THR_123 for
417 threads.
418
419 my $restart_flag = 1;
420
421 sub on_post_exit {
422 my ($mce, $e) = @_;
423
424 ## Display all possible hash elements.
425 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
426
427 ## Restart this worker if desired.
428 if ($restart_flag && $e->{wid} == 2) {
429 $mce->restart_worker;
430 $restart_flag = 0;
431 }
432 }
433
434 sub user_func {
435 my ($mce) = @_;
436 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); ## Args, not necessary
437 }
438
439 my $mce = MCE->new(
440 on_post_exit => \&on_post_exit,
441 user_func => \&user_func,
442 max_workers => 3
443 );
444
445 $mce->run;
446
447 -- Output (child processes)
448
449 2: PID_33223: 0: msg_foo: 1002
450 1: PID_33222: 0: msg_foo: 1001
451 3: PID_33224: 0: msg_foo: 1003
452 2: PID_33225: 0: msg_foo: 1002
453
454 -- Output (running with threads)
455
456 3: TID_3: 0: msg_foo: 1003
457 2: TID_2: 0: msg_foo: 1002
458 1: TID_1: 0: msg_foo: 1001
459 2: TID_4: 0: msg_foo: 1002
460
461 SYNTAX for ON_POST_RUN
462 The on_post_run option, if defined, is executed immediately by the
463 manager process after running MCE->process or MCE->run. This option
464 receives an array reference of hashes.
465
466 The difference between on_post_exit and on_post_run is that the former
467 is called immediately whereas the latter is called after all workers
468 have completed running.
469
470 sub on_post_run {
471 my ($mce, $status_ref) = @_;
472 foreach my $e ( @{ $status_ref } ) {
473 ## Display all possible hash elements.
474 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
475 }
476 }
477
478 sub user_func {
479 my ($mce) = @_;
480 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); ## Args, not necessary
481 }
482
483 my $mce = MCE->new(
484 on_post_run => \&on_post_run,
485 user_func => \&user_func,
486 max_workers => 3
487 );
488
489 $mce->run;
490
491 -- Output (child processes)
492
493 3: PID_33174: 0: msg_foo: 1003
494 1: PID_33172: 0: msg_foo: 1001
495 2: PID_33173: 0: msg_foo: 1002
496
497 -- Output (running with threads)
498
499 2: TID_2: 0: msg_foo: 1002
500 3: TID_3: 0: msg_foo: 1003
501 1: TID_1: 0: msg_foo: 1001
502
503 SYNTAX for INPUT_DATA
504 MCE supports many ways to specify input_data. Support for iterators was
505 added in MCE 1.505. The RS option allows one to specify the record
506 separator when processing files.
507
508 MCE is a chunking engine. Therefore, chunk_size is applicable to
509 input_data. Specifying 1 for use_slurpio causes user_func to receive a
510 scalar reference containing the raw data (applicable to files only)
511 instead of an array reference.
512
513 input_data => '/path/to/file', ## process file
514 input_data => \@array, ## process array
515 input_data => \%hash, ## process hash, API since 1.828
516 input_data => \*FILE_HNDL, ## process file handle
517 input_data => $fh, ## open $fh, "<", "file"
518 input_data => $fh, ## IO::File "file", "r"
519 input_data => $fh, ## IO::Uncompress::Gunzip "file.gz"
520 input_data => \$scalar, ## treated like a file
521 input_data => \&iterator, ## user specified iterator
522
523 chunk_size => 1, ## >1 means looping inside user_func
524 use_slurpio => 1, ## $chunk_ref is a scalar ref
525 RS => "\n>", ## input record separator
526
527 The chunk_size value determines the chunking mode to use when
528 processing files. Otherwise, chunk_size is the number of elements for
529 arrays. For files, a chunk size value of <= 8192 is how many records to
530 read. Greater than 8192 is how many bytes to read. MCE appends (the
531 rest) up to the next record separator.
532
533 chunk_size => 8192, ## Consists of 8192 records
534 chunk_size => 8193, ## Approximate 8193 bytes for files
535
536 chunk_size => 1, ## Consists of 1 record or element
537 chunk_size => 1000, ## Consists of 1000 records
538 chunk_size => '16k', ## Approximate 16 kibiBytes (KiB)
539 chunk_size => '20m', ## Approximate 20 mebiBytes (MiB)
540
541 The construction for user_func when chunk_size > 1 and assuming
542 use_slurpio equals 0.
543
544 user_func => sub {
545 my ($mce, $chunk_ref, $chunk_id) = @_;
546
547 ## $_ is $chunk_ref->[0] when chunk_size equals 1
548 ## $_ is $chunk_ref otherwise; $_ can be used below
549
550 for my $record ( @{ $chunk_ref } ) {
551 print "$chunk_id: $record\n";
552 }
553 }
554
555 # input_data => \%hash
556 # current API available since 1.828
557
558 user_func => sub {
559 my ($mce, $chunk_ref, $chunk_id) = @_;
560
561 ## $_ points to $chunk_ref regardless of chunk_size
562
563 for my $key ( keys %{ $chunk_ref } ) {
564 print "$key: ", $chunk_ref->{$key}, "\n";
565 }
566 }
567
568 Specifying a value for input_data is straight forward for arrays and
569 files. The next several examples specify an iterator reference for
570 input_data.
571
572 use MCE;
573
574 ## A factory function which creates a closure (the iterator itself)
575 ## for generating a sequence of numbers. The external variables
576 ## ($n, $max, $step) are used for keeping state across successive
577 ## calls to the closure. The iterator simply returns when $n > max.
578
579 sub input_iterator {
580 my ($n, $max, $step) = @_;
581
582 return sub {
583 return if $n > $max;
584
585 my $current = $n;
586 $n += $step;
587
588 return $current;
589 };
590 }
591
592 ## Run user_func in parallel. Input data can be specified during
593 ## the construction or as an argument to the process method.
594
595 my $mce = MCE->new(
596
597 # input_data => input_iterator(10, 30, 2),
598 chunk_size => 1, max_workers => 4,
599
600 user_func => sub {
601 my ($mce, $chunk_ref, $chunk_id) = @_;
602 MCE->print("$_: ", $_ * 2, "\n");
603 }
604
605 )->spawn;
606
607 $mce->process( input_iterator(10, 30, 2) );
608
609 -- Output Note that output order is not guaranteed
610 Take a look at iterator.pl for ordered output
611
612 10: 20
613 12: 24
614 16: 32
615 20: 40
616 14: 28
617 22: 44
618 18: 36
619 24: 48
620 26: 52
621 28: 56
622 30: 60
623
624 The following example queries the DB for the next 1000 rows. Notice the
625 use of fetchall_arrayref. The iterator function itself receives one
626 argument which is chunk_size (added in MCE 1.510) to determine how much
627 to return per iteration. The default is 1 for the Core API and MCE
628 Models.
629
630 use DBI;
631 use MCE;
632
633 sub db_iter {
634
635 my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
636
637 my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
638 die "Could not connect to database: $DBI::errstr";
639
640 my $sth = $dbh->prepare('select color, desc from table');
641
642 $sth->execute;
643
644 return sub {
645 my ($chunk_size) = @_;
646
647 if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
648 return @{ $aref };
649 }
650
651 return;
652 };
653 }
654
655 ## Let's enumerate column indexes for easy column retrieval.
656 my ($i_color, $i_desc) = (0 .. 1);
657
658 my $mce = MCE->new(
659 max_workers => 3, chunk_size => 1000,
660 input_data => db_iter(),
661
662 user_func => sub {
663 my ($mce, $chunk_ref, $chunk_id) = @_;
664 my $ret = '';
665
666 foreach my $row (@{ $chunk_ref }) {
667 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
668 }
669
670 MCE->print($ret);
671 }
672 );
673
674 $mce->run;
675
676 There are many modules on CPAN which return an iterator reference.
677 Showing one such example below. The demonstration ensures MCE workers
678 are spawned before obtaining the iterator. Note the worker_id value
679 (left column) in the output.
680
681 use Path::Iterator::Rule;
682 use MCE;
683
684 my $start_dir = shift
685 or die "Please specify a starting directory";
686
687 -d $start_dir
688 or die "Cannot open ($start_dir): No such file or directory";
689
690 my $mce = MCE->new(
691 max_workers => 'auto',
692 user_func => sub { MCE->say( MCE->wid . ": $_" ) }
693 )->spawn;
694
695 my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
696
697 my $iterator = $rule->iter(
698 $start_dir, { follow_symlinks => 0, depthfirst => 1 }
699 );
700
701 $mce->process( $iterator );
702
703 -- Output
704
705 8: lib/MCE/Core/Input/Generator.pm
706 5: lib/MCE/Core/Input/Handle.pm
707 6: lib/MCE/Core/Input/Iterator.pm
708 2: lib/MCE/Core/Input/Request.pm
709 3: lib/MCE/Core/Manager.pm
710 4: lib/MCE/Core/Input/Sequence.pm
711 7: lib/MCE/Core/Validation.pm
712 1: lib/MCE/Core/Worker.pm
713 8: lib/MCE/Flow.pm
714 5: lib/MCE/Grep.pm
715 6: lib/MCE/Loop.pm
716 2: lib/MCE/Map.pm
717 3: lib/MCE/Queue.pm
718 4: lib/MCE/Signal.pm
719 7: lib/MCE/Stream.pm
720 1: lib/MCE/Subs.pm
721 8: lib/MCE/Util.pm
722 5: lib/MCE.pm
723
724 Although MCE supports arrays, extra measures are needed to use a "lazy"
725 array as input data. The reason for this is that MCE needs the size of
726 the array before processing which may be unknown for lazy arrays.
727 Therefore, closures provides an excellent mechanism for this.
728
729 The code block belonging to the lazy array must return undef after
730 exhausting its input data. Otherwise, the process will never end.
731
732 use Tie::Array::Lazy;
733 use MCE;
734
735 tie my @a, 'Tie::Array::Lazy', [], sub {
736 my $i = $_[0]->index;
737
738 return ($i < 10) ? $i : undef;
739 };
740
741 sub make_iterator {
742 my $i = 0; my $a_ref = shift;
743
744 return sub {
745 return $a_ref->[$i++];
746 };
747 }
748
749 my $mce = MCE->new(
750 max_workers => 4, input_data => make_iterator(\@a),
751
752 user_func => sub {
753 my ($mce, $chunk_ref, $chunk_id) = @_;
754 MCE->say($_);
755 }
756
757 )->run;
758
759 -- Output
760
761 0
762 1
763 2
764 3
765 4
766 6
767 7
768 8
769 5
770 9
771
772 The following demonstrates how to retrieve a chunk from the lazy array
773 per each successive call. Here, undef is sent by the iterator block
774 when $i is greater than $max. Iterators may optionally use chunk_size
775 to determine how much to return per iteration.
776
777 use Tie::Array::Lazy;
778 use MCE;
779
780 tie my @a, 'Tie::Array::Lazy', [], sub {
781 $_[0]->index;
782 };
783
784 sub make_iterator {
785 my $j = 0; my ($a_ref, $max) = @_;
786
787 return sub {
788 my ($chunk_size) = @_;
789 my $i = $j; $j += $chunk_size;
790
791 return if $i > $max;
792 return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
793 };
794 }
795
796 my $mce = MCE->new(
797 chunk_size => 15, max_workers => 4,
798 input_data => make_iterator(\@a, 100),
799
800 user_func => sub {
801 my ($mce, $chunk_ref, $chunk_id) = @_;
802 MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
803 }
804
805 )->run;
806
807 -- Output
808
809 1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
810 2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
811 3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
812 4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
813 5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
814 6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
815 7: 90 91 92 93 94 95 96 97 98 99 100
816
817 SYNTAX for SEQUENCE
818 The 1.3 release and above allows workers to loop through a sequence of
819 numbers computed mathematically without the overhead of an array. The
820 sequence can be specified separately per each user_task entry unlike
821 input_data which is applicable to the first task only.
822
823 See the seq_demo.pl example, included with this distribution, on
824 applying sequences with the user_tasks option.
825
826 Sequence can be defined using an array or a hash reference.
827
828 use MCE;
829
830 my $mce = MCE->new(
831 max_workers => 3,
832
833 # sequence => [ 10, 19, 0.7, "%4.1f" ], ## up to 4 options
834
835 sequence => {
836 begin => 10, end => 19, step => 0.7, format => "%4.1f"
837 },
838
839 user_func => sub {
840 my ($mce, $n, $chunk_id) = @_;
841 print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
842 }
843 );
844
845 $mce->run;
846
847 -- Output (sorted afterwards, notice wid and chunk_id in output)
848
849 10.0 from 1 id 1
850 10.7 from 2 id 2
851 11.4 from 3 id 3
852 12.1 from 1 id 4
853 12.8 from 2 id 5
854 13.5 from 3 id 6
855 14.2 from 1 id 7
856 14.9 from 2 id 8
857 15.6 from 3 id 9
858 16.3 from 1 id 10
859 17.0 from 2 id 11
860 17.7 from 3 id 12
861 18.4 from 1 id 13
862
863 The 1.5 release includes a new option (bounds_only). This option tells
864 the sequence engine to compute 'begin' and 'end' items only, for the
865 chunk, and not the items in between (hence boundaries only). This
866 option applies to sequence only and has no effect when chunk_size
867 equals 1.
868
869 The time to run is 0.006s below. This becomes 0.827s without the
870 bounds_only option due to computing all items in between, thus creating
871 a very large array. Basically, specify bounds_only => 1 when boundaries
872 is all you need for looping inside the block; e.g. Monte Carlo
873 simulations.
874
875 Time was measured using 1 worker to emphasize the difference.
876
877 use MCE;
878
879 my $mce = MCE->new(
880 max_workers => 1, chunk_size => 1_250_000,
881
882 sequence => { begin => 1, end => 10_000_000 },
883 bounds_only => 1,
884
885 ## For sequence, the input scalar $_ points to $chunk_ref
886 ## when chunk_size > 1, otherwise $chunk_ref->[0].
887 ##
888 ## user_func => sub {
889 ## my $begin = $_->[0]; my $end = $_->[-1];
890 ##
891 ## for ($begin .. $end) {
892 ## ...
893 ## }
894 ## },
895
896 user_func => sub {
897 my ($mce, $chunk_ref, $chunk_id) = @_;
898 ## $chunk_ref contains 2 items, not 1_250_000
899
900 my $begin = $chunk_ref->[ 0];
901 my $end = $chunk_ref->[-1]; ## or $chunk_ref->[1]
902
903 MCE->printf("%7d .. %8d\n", $begin, $end);
904 }
905 );
906
907 $mce->run;
908
909 -- Output
910
911 1 .. 1250000
912 1250001 .. 2500000
913 2500001 .. 3750000
914 3750001 .. 5000000
915 5000001 .. 6250000
916 6250001 .. 7500000
917 7500001 .. 8750000
918 8750001 .. 10000000
919
920 SYNTAX for MAX_RETRIES
921 The max_retries option, added in 1.7, allows MCE to retry a failed
922 chunk from a worker dying while processing input data or a sequence of
923 numbers.
924
925 When max_retries is set, MCE configures the on_post_exit option before
926 running, using the following code. Specify on_post_exit explicitly for
927 further tailoring if need be. The restart_worker line is necessary,
928 obviously.
929
930 on_post_exit => sub {
931 my ( $mce, $e, $retry_cnt ) = @_;
932 my ( $cnt, $msg ) = ( $retry_cnt + 1, "Error: chunk $e->{id} failed" );
933
934 ( $retry_cnt < $mce->max_retries )
935 ? print {*STDERR} "$msg, retrying chunk attempt # ${cnt}\n"
936 : print {*STDERR} "$msg\n";
937
938 $mce->restart_worker;
939 }
940
941 Below, we let MCE handle on_post_exit automatically, which is
942 essentially the same code shown above. For max_retries to work, the
943 worker must die, abnormally included, or call MCE->exit. Notice that we
944 pass the chunk_id value for the 3rd argument to MCE->exit.
945
946 ## Running MCE with max_retries.
947
948 use strict;
949 use warnings;
950
951 use MCE;
952
953 sub user_func {
954 my ( $mce, $chunk_ref, $chunk_id ) = @_;
955
956 # die "Died : chunk_id = 3\n" if $chunk_id == 3;
957 MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
958
959 print MCE->wid(), ": $chunk_id\n";
960 }
961
962 my $mce = MCE->new(
963 max_workers => 1,
964 max_retries => 2,
965 user_func => \&user_func,
966 )->spawn;
967
968 my @input_data = qw( 0 1 2 3 4 5 6 7 );
969
970 $mce->process( { chunk_size => 1 }, \@input_data );
971
972 $mce->shutdown;
973
974 -- Output
975
976 1: 1
977 1: 2
978 Error: chunk 3 failed, retrying chunk attempt # 1
979 Error: chunk 3 failed, retrying chunk attempt # 2
980 Error: chunk 3 failed
981 1: 4
982 1: 5
983 1: 6
984 1: 7
985 1: 8
986
987 SYNTAX for USER_BEGIN and USER_END
988 The user_begin and user_end options, if specified, behave similarly to
989 awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
990 called once per worker during each run.
991
992 MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
993
994 sub user_begin { ## Called once at the beginning
995 my ($mce, $task_id, $task_name) = @_;
996 $mce->{wk_total_rows} = 0;
997 }
998
999 sub user_func { ## Called while processing
1000 my $mce = shift;
1001 $mce->{wk_total_rows} += 1;
1002 }
1003
1004 sub user_end { ## Called once at the end
1005 my ($mce, $task_id, $task_name) = @_;
1006 printf "## %d: Processed %d rows\n",
1007 MCE->wid, $mce->{wk_total_rows};
1008 }
1009
1010 my $mce = MCE->new(
1011 user_begin => \&user_begin,
1012 user_func => \&user_func,
1013 user_end => \&user_end
1014 );
1015
1016 $mce->run;
1017
1018 SYNTAX for USER_FUNC with USE_SLURPIO => 0
1019 When processing input data, MCE can pass an array of rows or a slurped
1020 chunk. Below, a reference to an array containing the chunk data is
1021 processed.
1022
1023 e.g. $chunk_ref = [ record1, record2, record3, ... ]
1024
1025 sub user_func {
1026
1027 my ($mce, $chunk_ref, $chunk_id) = @_;
1028
1029 foreach my $row ( @{ $chunk_ref } ) {
1030 $mce->{wk_total_rows} += 1;
1031 print $row;
1032 }
1033 }
1034
1035 my $mce = MCE->new(
1036 chunk_size => 100,
1037 input_data => "/path/to/file",
1038 user_func => \&user_func,
1039 use_slurpio => 0
1040 );
1041
1042 $mce->run;
1043
1044 SYNTAX for USER_FUNC with USE_SLURPIO => 1
1045 Here, a reference to a scalar containing the raw chunk data is
1046 processed.
1047
1048 sub user_func {
1049
1050 my ($mce, $chunk_ref, $chunk_id) = @_;
1051
1052 my $count = () = $$chunk_ref =~ /abc/;
1053 }
1054
1055 my $mce = MCE->new(
1056 chunk_size => 16000,
1057 input_data => "/path/to/file",
1058 user_func => \&user_func,
1059 use_slurpio => 1
1060 );
1061
1062 $mce->run;
1063
1064 SYNTAX for USER_ERROR and USER_OUTPUT
1065 Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1066 and MCE->say can be intercepted by specifying the user_error and
1067 user_output options. MCE on receiving output will forward to user_error
1068 or user_output in a serialized fashion.
1069
1070 Handy when wanting to filter, modify, and/or direct the output
1071 elsewhere.
1072
1073 sub user_error { ## Redirect STDERR to STDOUT
1074 my $error = shift;
1075 print {*STDOUT} $error;
1076 }
1077
1078 sub user_output { ## Redirect STDOUT to STDERR
1079 my $output = shift;
1080 print {*STDERR} $output;
1081 }
1082
1083 sub user_func {
1084 my ($mce, $chunk_ref, $chunk_id) = @_;
1085 my $count = 0;
1086
1087 foreach my $row ( @{ $chunk_ref } ) {
1088 MCE->print($row);
1089 $count += 1;
1090 }
1091
1092 MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1093 }
1094
1095 my $mce = MCE->new(
1096 chunk_size => 1000,
1097 input_data => "/path/to/file",
1098 user_error => \&user_error,
1099 user_output => \&user_output,
1100 user_func => \&user_func
1101 );
1102
1103 $mce->run;
1104
1105 SYNTAX for USER_TASKS and TASK_END
1106 This option takes an array of tasks. Each task allows up to 17 options.
1107 The init_relay, input_data, RS, and use_slurpio options may be defined
1108 inside the first task or at the top level, otherwise ignored under
1109 other sub-tasks.
1110
1111 max_workers, chunk_size, input_data, interval, sequence,
1112 bounds_only, user_args, user_begin, user_end, user_func,
1113 gather, task_end, task_name, use_slurpio, use_threads,
1114 init_relay, RS
1115
1116 Sequence and chunk_size were added in 1.3. User_args was introduced in
1117 1.4. Name and input_data are new options allowed in 1.5. In addition,
1118 one can specify task_end at the top level. Task_end also receives 2
1119 additional arguments $task_id and $task_name (shown below).
1120
1121 Options not specified here will default to the same option specified at
1122 the top level. The task_end option is called by the manager process
1123 when all workers for that sub-task have completed processing.
1124
1125 Forking and threading can be intermixed among tasks unless running
1126 Cygwin. The run method will continue running until all workers have
1127 completed processing.
1128
1129 use threads;
1130 use threads::shared;
1131
1132 use MCE;
1133
1134 sub parallel_task1 { sleep 2; }
1135 sub parallel_task2 { sleep 1; }
1136
1137 my $mce = MCE->new(
1138
1139 task_end => sub {
1140 my ($mce, $task_id, $task_name) = @_;
1141 print "Task [$task_id -- $task_name] completed processing\n";
1142 },
1143
1144 user_tasks => [{
1145 task_name => 'foo',
1146 max_workers => 2,
1147 user_func => \¶llel_task1,
1148 use_threads => 0 ## Not using threads
1149
1150 },{
1151 task_name => 'bar',
1152 max_workers => 4,
1153 user_func => \¶llel_task2,
1154 use_threads => 1 ## Yes, threads
1155
1156 }]
1157 );
1158
1159 $mce->run;
1160
1161 -- Output
1162
1163 Task [1 -- bar] completed processing
1164 Task [0 -- foo] completed processing
1165
1167 Beginning with MCE 1.5, the input scalar $_ is localized prior to
1168 calling user_func for input_data and sequence of numbers. The following
1169 applies.
1170
1171 use_slurpio => 1
1172 $_ is a reference to the buffer e.g. $_ = \$_buffer;
1173 $_ is a reference regardless of whether chunk_size is 1 or greater
1174
1175 user_func => sub {
1176 # my ($mce, $chunk_ref, $chunk_id) = @_;
1177 print ${ $_ }; ## $_ is same as $chunk_ref
1178 }
1179
1180 chunk_size is greater than 1, use_slurpio => 0
1181 $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1182 $_ is same as $chunk_ref or $_[CHUNK]
1183
1184 user_func => sub {
1185 # my ($mce, $chunk_ref, $chunk_id) = @_;
1186 for my $row ( @{ $_ } ) {
1187 print $row, "\n";
1188 }
1189 }
1190
1191 use MCE const => 1;
1192
1193 user_func => sub {
1194 # my ($mce, $chunk_ref, $chunk_id) = @_;
1195 for my $row ( @{ $_[CHUNK] } ) {
1196 print $row, "\n";
1197 }
1198 }
1199
1200 chunk_size equals 1, use_slurpio => 0
1201 $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1202
1203 ## Note that $_ and $chunk_ref are not the same below.
1204 ## $chunk_ref is a reference to an array.
1205
1206 user_func => sub {
1207 # my ($mce, $chunk_ref, $chunk_id) = @_;
1208 print $_, "\n; ## Same as $chunk_ref->[0];
1209 }
1210
1211 $mce->foreach("/path/to/file", sub {
1212 # my ($mce, $chunk_ref, $chunk_id) = @_;
1213 print $_; ## Same as $chunk_ref->[0];
1214 });
1215
1216 ## However, that is not the case for the forseq method.
1217 ## Both $_ and $n_seq are the same when chunk_size => 1.
1218
1219 $mce->forseq([ 1, 9 ], sub {
1220 # my ($mce, $n_seq, $chunk_id) = @_;
1221 print $_, "\n"; ## Same as $n_seq
1222 });
1223
1224 Sequence can also be specified using an array reference. The below
1225 is the same as the example afterwards.
1226
1227 $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1228
1229 The code block receives an array containing the next 5 sequences.
1230 Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1231 to an array, same as $_, due to chunk_size being greater than 1.
1232
1233 $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1234 # my ($mce, $n_seq, $chunk_id) = @_;
1235 my @result;
1236 for my $n ( @{ $_ } ) {
1237 ... do work, append to result for 5
1238 }
1239 ... do something with result afterwards
1240 });
1241
1243 The methods listed below are callable by the main process and workers.
1244
1245 $mce->abort ( void )
1246 The 'abort' method is applicable when processing input_data only. This
1247 causes all workers to abort after processing the current chunk.
1248
1249 Workers write the next offset position to the queue socket for the next
1250 available worker. In essence, the 'abort' method writes the last offset
1251 position. Workers, on requesting the next offset position, will think
1252 the end of input_data has been reached and leave the chunking loop.
1253
1254 $mce->abort;
1255 MCE->abort;
1256
1257 $mce->chunk_id ( void )
1258 Returns the chunk_id for the current chunk. The value starts at 1.
1259 Chunking applies to input_data or sequence. The value is 0 for the
1260 manager process.
1261
1262 my $chunk_id = $mce->chunk_id;
1263 my $chunk_id = MCE->chunk_id;
1264
1265 $mce->chunk_size ( void )
1266 Getter method for chunk_size used by MCE.
1267
1268 my $chunk_size = $mce->chunk_size;
1269 my $chunk_size = MCE->chunk_size;
1270
1271 $mce->freeze ( $object_ref )
1272 Calls the internal freeze method to serialize an object. The default
1273 serialization routines are handled by Sereal if available or Storable.
1274
1275 my $frozen = $mce->freeze([ 0, 2, 4 ]);
1276 my $frozen = MCE->freeze([ 0, 2, 4 ]);
1277
1278 $mce->max_retries ( void )
1279 Getter method for max_retries used by MCE.
1280
1281 my $max_retries = $mce->max_retries;
1282 my $max_retries = MCE->max_retries;
1283
1284 $mce->max_workers ( void )
1285 Getter method for max_workers used by MCE.
1286
1287 my $max_workers = $mce->max_workers;
1288 my $max_workers = MCE->max_workers;
1289
1290 $mce->pid ( void )
1291 Returns the Process ID. Threads have thread ID attached to the value.
1292
1293 my $pid = $mce->pid; ## 16180 (pid) ; 16180.2 (pid.tid)
1294 my $pid = MCE->pid;
1295
1296 $mce->sess_dir ( void )
1297 Returns the session directory used by the MCE instance. This is defined
1298 during spawning and removed during shutdown.
1299
1300 my $sess_dir = $mce->sess_dir;
1301 my $sess_dir = MCE->sess_dir;
1302
1303 $mce->task_id ( void )
1304 Returns the task ID. This applies to the user_tasks option (starts at
1305 0).
1306
1307 my $task_id = $mce->task_id;
1308 my $task_id = MCE->task_id;
1309
1310 $mce->task_name ( void )
1311 Returns the task_name value specified via the task_name option when
1312 configuring MCE.
1313
1314 my $task_name = $mce->task_name;
1315 my $task_name = MCE->task_name;
1316
1317 $mce->task_wid ( void )
1318 Returns the task worker ID (applies to user_tasks). The value starts at
1319 1 per each task configured within user_tasks. The value is 0 for the
1320 manager process.
1321
1322 my $task_wid = $mce->task_wid;
1323 my $task_wid = MCE->task_wid;
1324
1325 $mce->thaw ( $frozen )
1326 Calls the internal thaw method to un-serialize the frozen object.
1327
1328 my $object_ref = $mce->thaw($frozen);
1329 my $object_ref = MCE->thaw($frozen);
1330
1331 $mce->tmp_dir ( void )
1332 Returns the temporary directory used by MCE.
1333
1334 my $tmp_dir = $mce->tmp_dir;
1335 my $tmp_dir = MCE->tmp_dir;
1336
1337 $mce->user_args ( void )
1338 Returns the arguments specified via the user_args option.
1339
1340 my ($arg1, $arg2, $arg3) = $mce->user_args;
1341 my ($arg1, $arg2, $arg3) = MCE->user_args;
1342
1343 $mce->wid ( void )
1344 Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1345 is 0 for the manager process.
1346
1347 my $wid = $mce->wid;
1348 my $wid = MCE->wid;
1349
1351 Methods listed below are callable by the main process only.
1352
1353 $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1354 $mce->foreach ( $input_data [, { options } ], sub { ... } )
1355 $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1356 Forchunk, foreach, and forseq are sugar methods and described in
1357 MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1358
1359 $mce->process ( $input_data [, { options } ] )
1360 The process method will spawn workers automatically if not already
1361 spawned. It will set input_data => $input_data. It calls run(0) to not
1362 auto-shutdown workers. Specifying options is optional.
1363
1364 Allowable options { key => value, ... } are:
1365
1366 chunk_size input_data job_delay spawn_delay submit_delay
1367 flush_file flush_stderr flush_stdout stderr_file stdout_file
1368 on_post_exit on_post_run sequence user_args user_begin user_end
1369 user_func user_error user_output use_slurpio RS
1370
1371 Options remain persistent going forward unless changed. Setting
1372 user_begin, user_end, or user_func will cause already spawned workers
1373 to shut down and re-spawn automatically. Therefore, define these during
1374 instantiation.
1375
1376 The below will cause workers to re-spawn after running.
1377
1378 my $mce = MCE->new( max_workers => 'auto' );
1379
1380 $mce->process( {
1381 user_begin => sub { ## connect to DB },
1382 user_func => sub { ## process each row },
1383 user_end => sub { ## close handle to DB },
1384 }, \@input_data );
1385
1386 $mce->process( {
1387 user_begin => sub { ## connect to DB },
1388 user_func => sub { ## process each file },
1389 user_end => sub { ## close handle to DB },
1390 }, "/list/of/files" );
1391
1392 Do the following if wanting workers to persist between jobs.
1393
1394 use MCE max_workers => 'auto';
1395
1396 my $mce = MCE->new(
1397 user_begin => sub { ## connect to DB },
1398 user_func => sub { ## process each chunk or row or host },
1399 user_end => sub { ## close handle to DB },
1400 );
1401
1402 $mce->spawn; ## Spawn early if desired
1403
1404 $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1405 $mce->process(\@array_of_files_to_grep);
1406 $mce->process("/path/to/host/list");
1407
1408 $mce->process($array_ref);
1409 $mce->process($array_ref, { stdout_file => $output_file });
1410
1411 ## This was not allowed before. Fixed in 1.415.
1412 $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1413 $mce->process({ sequence => [ 10, 90, 2 ] });
1414
1415 $mce->shutdown;
1416
1417 $mce->relay_final ( void )
1418 Described in MCE::Relay.
1419
1420 $mce->restart_worker ( void )
1421 One can restart a worker who has died or exited. The job never ends
1422 below due to restarting each time. Recommended is to call MCE->exit or
1423 $mce->exit instead of the native exit function for better handling,
1424 especially under the Windows environment.
1425
1426 The $e->{wid} argument is no longer necessary starting with the 1.5
1427 release.
1428
1429 Press [ctrl-c] to terminate the script.
1430
1431 my $mce = MCE->new(
1432
1433 on_post_exit => sub {
1434 my ($mce, $e) = @_;
1435 print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1436 # $mce->restart_worker($e->{wid}); ## MCE-1.415 and below
1437 $mce->restart_worker; ## MCE-1.500 and above
1438 },
1439
1440 user_begin => sub {
1441 my ($mce, $task_id, $task_name) = @_;
1442 ## Not interested in die messages going to STDERR,
1443 ## because the die handler calls MCE->exit(255, $_[0]).
1444 close STDERR;
1445 },
1446
1447 user_tasks => [{
1448 max_workers => 5,
1449 user_func => sub {
1450 my ($mce) = @_; sleep MCE->wid;
1451 MCE->exit(3, "exited from " . MCE->wid . "\n");
1452 }
1453 },{
1454 max_workers => 4,
1455 user_func => sub {
1456 my ($mce) = @_; sleep MCE->wid;
1457 die("died from " . MCE->wid . "\n");
1458 }
1459 }]
1460 );
1461
1462 $mce->run;
1463
1464 -- Output
1465
1466 1: PID_85388: status 3: exited from 1
1467 2: PID_85389: status 3: exited from 2
1468 1: PID_85397: status 3: exited from 1
1469 3: PID_85390: status 3: exited from 3
1470 1: PID_85399: status 3: exited from 1
1471 4: PID_85391: status 3: exited from 4
1472 2: PID_85398: status 3: exited from 2
1473 1: PID_85401: status 3: exited from 1
1474 5: PID_85392: status 3: exited from 5
1475 1: PID_85404: status 3: exited from 1
1476 6: PID_85393: status 255: died from 6
1477 3: PID_85400: status 3: exited from 3
1478 2: PID_85403: status 3: exited from 2
1479 1: PID_85406: status 3: exited from 1
1480 7: PID_85394: status 255: died from 7
1481 1: PID_85410: status 3: exited from 1
1482 8: PID_85395: status 255: died from 8
1483 4: PID_85402: status 3: exited from 4
1484 2: PID_85409: status 3: exited from 2
1485 1: PID_85412: status 3: exited from 1
1486 9: PID_85396: status 255: died from 9
1487 3: PID_85408: status 3: exited from 3
1488 1: PID_85416: status 3: exited from 1
1489
1490 ...
1491
1492 $mce->run ( [ $auto_shutdown [, { options } ] ] )
1493 The run method, by default, spawns workers, processes once, and shuts
1494 down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1495 persist after running (default 1).
1496
1497 Specifying options is optional. Valid options are the same as for the
1498 process method.
1499
1500 my $mce = MCE->new( ... );
1501
1502 ## Disables auto-shutdown
1503 $mce->run(0);
1504
1505 $mce->send ( $data_ref )
1506 The 'send' method is useful when wanting to spawn workers early to
1507 minimize memory consumption and afterwards send data individually to
1508 each worker. One cannot send more than the total workers spawned.
1509 Workers store the received data as $mce->{user_data}.
1510
1511 The data which can be sent is restricted to an ARRAY, HASH, or PDL
1512 reference. Workers begin processing immediately after receiving data.
1513 Workers set $mce->{user_data} to undef after processing. One cannot
1514 specify input_data, sequence, or user_tasks when using the "send"
1515 method.
1516
1517 Passing any options e.g. run(0, { options }) is ignored due to workers
1518 running immediately after receiving user data. There is no guarantee to
1519 which worker will receive data first. It depends on which worker is
1520 available awaiting data.
1521
1522 use MCE;
1523
1524 my $mce = MCE->new(
1525 max_workers => 5,
1526
1527 user_func => sub {
1528 my ($mce) = @_;
1529 my $data = $mce->{user_data};
1530 my $first_name = $data->{first_name};
1531 print MCE->wid, ": Hello from $first_name\n";
1532 }
1533 );
1534
1535 $mce->spawn; ## Optional, send will spawn if necessary.
1536
1537 $mce->send( { first_name => "Theresa" } );
1538 $mce->send( { first_name => "Francis" } );
1539 $mce->send( { first_name => "Padre" } );
1540 $mce->send( { first_name => "Anthony" } );
1541
1542 $mce->run; ## Wait for workers to complete processing.
1543
1544 -- Output
1545
1546 2: Hello from Theresa
1547 5: Hello from Anthony
1548 3: Hello from Francis
1549 4: Hello from Padre
1550
1551 $mce->shutdown ( void )
1552 The run method will automatically spawn workers, run once, and shutdown
1553 workers automatically. Workers persist after running below. Shutdown
1554 may be called as needed or prior to exiting.
1555
1556 my $mce = MCE->new( ... );
1557
1558 $mce->spawn;
1559
1560 $mce->process(\@input_data_1); ## Processing multiple arrays
1561 $mce->process(\@input_data_2);
1562 $mce->process(\@input_data_n);
1563
1564 $mce->shutdown;
1565
1566 $mce->process('input_file_1'); ## Processing multiple files
1567 $mce->process('input_file_2');
1568 $mce->process('input_file_n');
1569
1570 $mce->shutdown;
1571
1572 $mce->spawn ( void )
1573 Workers are normally spawned automatically. The spawn method allows one
1574 to spawn workers early if so desired.
1575
1576 my $mce = MCE->new( ... );
1577
1578 $mce->spawn;
1579
1580 $mce->status ( void )
1581 The greatest exit status is saved among workers while running. Look at
1582 the on_post_exit or on_post_run options for callback support.
1583
1584 my $mce = MCE->new( ... );
1585
1586 $mce->run;
1587
1588 my $exit_status = $mce->status;
1589
1591 Methods listed below are callable by workers only.
1592
1593 $mce->do ( 'callback_func' [, $arg1, ... ] )
1594 MCE serializes data transfers from a worker process via helper
1595 functions do & sendto to the manager process. The callback function can
1596 optionally return a reply.
1597
1598 [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1599
1600 Passing args to a callback function using references & scalar.
1601
1602 sub callback {
1603 my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1604 ...
1605 }
1606
1607 MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1608 MCE->do('callback', \@a, \%h, \$s, 'foo');
1609
1610 MCE knows if wanting a void, list, hash, or a scalar return value.
1611
1612 MCE->do('callback' [, $arg1, ... ]);
1613
1614 my @array = MCE->do('callback' [, $arg1, ... ]);
1615 my %hash = MCE->do('callback' [, $arg1, ... ]);
1616 my $scalar = MCE->do('callback' [, $arg1, ... ]);
1617
1618 $mce->exit ( [ $status [, $message [, $id ] ] ] )
1619 A worker exits from MCE entirely. $id (optional) can be used for
1620 passing the primary key or a string along with the message. Look at the
1621 on_post_exit or on_post_run options for callback support.
1622
1623 MCE->exit; ## default 0
1624 MCE->exit(1);
1625 MCE->exit(2, 'chunk failed', $chunk_id);
1626 MCE->exit(0, 'msg_foo', 'id_1000');
1627
1628 $mce->gather ( $arg1, [, $arg2, ... ] )
1629 A worker can submit data to the location specified via the gather
1630 option by calling this method. See MCE::Flow and MCE::Loop for
1631 additional use-case.
1632
1633 use MCE;
1634
1635 my @hosts = qw(
1636 hosta hostb hostc hostd hoste
1637 );
1638
1639 my $mce = MCE->new(
1640 chunk_size => 1, max_workers => 3,
1641
1642 user_func => sub {
1643 # my ($mce, $chunk_ref, $chunk_id) = @_;
1644 my ($output, $error, $status); my $host = $_;
1645
1646 ## Do something with $host;
1647 $output = "Worker ". MCE->wid .": Hello from $host";
1648
1649 if (MCE->chunk_id % 3 == 0) {
1650 ## Simulating an error condition
1651 local $? = 1; $status = $?;
1652 $error = "Error from $host"
1653 }
1654 else {
1655 $status = 0;
1656 }
1657
1658 ## Ensure unique keys (key, value) when gathering to a
1659 ## hash.
1660 MCE->gather("$host.out", $output, "$host.sta", $status);
1661 MCE->gather("$host.err", $error) if (defined $error);
1662 }
1663 );
1664
1665 my %h; $mce->process(\@hosts, { gather => \%h });
1666
1667 foreach my $host (@hosts) {
1668 print $h{"$host.out"}, "\n";
1669 print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1670 print "Exit status: ", $h{"$host.sta"}, "\n\n";
1671 }
1672
1673 -- Output
1674
1675 Worker 2: Hello from hosta
1676 Exit status: 0
1677
1678 Worker 1: Hello from hostb
1679 Exit status: 0
1680
1681 Worker 3: Hello from hostc
1682 Error from hostc
1683 Exit status: 1
1684
1685 Worker 2: Hello from hostd
1686 Exit status: 0
1687
1688 Worker 1: Hello from hoste
1689 Exit status: 0
1690
1691 $mce->last ( void )
1692 Worker leaves the chunking loop or user_func block immediately.
1693 Callable from inside foreach, forchunk, forseq, and user_func.
1694
1695 use MCE;
1696
1697 my $mce = MCE->new(
1698 max_workers => 5
1699 );
1700
1701 my @list = (1 .. 80);
1702
1703 $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1704
1705 my ($mce, $chunk_ref, $chunk_id) = @_;
1706 MCE->last if ($chunk_id > 4);
1707
1708 my @output = ();
1709
1710 foreach my $rec ( @{ $chunk_ref } ) {
1711 push @output, $rec, "\n";
1712 }
1713
1714 MCE->print(@output);
1715 });
1716
1717 -- Output (each chunk above consists of 2 elements)
1718
1719 3
1720 4
1721 1
1722 2
1723 7
1724 8
1725 5
1726 6
1727
1728 $mce->next ( void )
1729 Worker starts the next iteration of the chunking loop. Callable from
1730 inside foreach, forchunk, forseq, and user_func.
1731
1732 use MCE;
1733
1734 my $mce = MCE->new(
1735 max_workers => 5
1736 );
1737
1738 my @list = (1 .. 80);
1739
1740 $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1741
1742 my ($mce, $chunk_ref, $chunk_id) = @_;
1743 MCE->next if ($chunk_id < 20);
1744
1745 my @output = ();
1746
1747 foreach my $rec ( @{ $chunk_ref } ) {
1748 push @output, $rec, "\n";
1749 }
1750
1751 MCE->print(@output);
1752 });
1753
1754 -- Output (each chunk above consists of 4 elements)
1755
1756 77
1757 78
1758 79
1759 80
1760
1761 $mce->printf ( $format, $list [, ... ] )
1762 $mce->print ( $list [, ... ] )
1763 $mce->say ( $list [, ... ] )
1764 Use the printf, print, and say methods when wanting to serialize output
1765 among workers. These are sugar syntax for the sendto method. These
1766 behave similar to the native subroutines in Perl with the exception
1767 that barewords must be passed as a reference and require the comma
1768 after it including file handles.
1769
1770 Say is like print, but implicitly appends a newline.
1771
1772 MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1773 MCE->printf($fh, "%s: %d\n", $name, $age);
1774 MCE->printf("%s: %d\n", $name, $age);
1775
1776 MCE->print(\*STDERR, "$error_msg\n");
1777 MCE->print($fh, $log_msg."\n");
1778 MCE->print("$output_msg\n");
1779
1780 MCE->say(\*STDERR, $error_msg);
1781 MCE->say($fh, $log_msg);
1782 MCE->say($output_msg);
1783
1784 Caveat: Use the following syntax when passing a reference not a glob or
1785 file handle. Otherwise, MCE will error indicating the first argument is
1786 not a glob reference.
1787
1788 MCE->print(\*STDOUT, \@array, "\n");
1789 MCE->print("", \@array, "\n"); ## ok
1790
1791 $mce->relay ( sub { code } )
1792 $mce->relay_recv ( void )
1793 Described in MCE::Relay.
1794
1795 $mce->sendto ( $to, $arg1, ... )
1796 The sendto method is called by workers for serializing data to standard
1797 output, standard error, or end of file. The action is done by the
1798 manager process.
1799
1800 Release 1.00x supported 1 data argument, not more.
1801
1802 MCE->sendto('file', \@array, '/path/to/file');
1803 MCE->sendto('file', \$scalar, '/path/to/file');
1804 MCE->sendto('file', $scalar, '/path/to/file');
1805
1806 MCE->sendto('STDERR', \@array);
1807 MCE->sendto('STDERR', \$scalar);
1808 MCE->sendto('STDERR', $scalar);
1809
1810 MCE->sendto('STDOUT', \@array);
1811 MCE->sendto('STDOUT', \$scalar);
1812 MCE->sendto('STDOUT', $scalar);
1813
1814 Release 1.100 added the ability to pass multiple arguments. Notice the
1815 syntax change for sending to a file. Passing a reference to an array is
1816 no longer necessary.
1817
1818 MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1819 MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1820 MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1821
1822 MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1823
1824 To retain 1.00x compatibility, sendto outputs the content when a single
1825 data reference is specified. Otherwise, the reference for \@array or
1826 \$scalar is shown in 1.500, not the content.
1827
1828 MCE->sendto('STDERR', \@array); ## 1.00x behavior, content
1829 MCE->sendto('STDOUT', \$scalar);
1830 MCE->sendto('file:/path/to/file', \@array);
1831
1832 ## Output matches the print statement
1833
1834 MCE->sendto(\*STDERR, \@array); ## 1.500 behavior, reference
1835 MCE->sendto(\*STDOUT, \$scalar);
1836 MCE->sendto($fh, \@array);
1837
1838 MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1839 print {*STDOUT} \@array, "\n", \$scalar, "\n";
1840
1841 MCE 1.500 added support for sending to a glob reference, file
1842 descriptor, and file handle.
1843
1844 MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1845 MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1846 MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1847
1848 $mce->sync ( void )
1849 A barrier sync operation means any worker must stop at this point until
1850 all workers reach this barrier. Barrier syncing is useful for many
1851 computer algorithms.
1852
1853 Barrier synchronization is supported for task 0 only or omitting
1854 user_tasks. All workers assigned task_id 0 must call sync whenever
1855 barrier syncing.
1856
1857 use MCE;
1858
1859 sub user_func {
1860
1861 my ($mce) = @_;
1862 my $wid = MCE->wid;
1863
1864 MCE->sendto("STDOUT", "a: $wid\n"); ## MCE 1.0+
1865 MCE->sync;
1866
1867 MCE->sendto(\*STDOUT, "b: $wid\n"); ## MCE 1.5+
1868 MCE->sync;
1869
1870 MCE->print("c: $wid\n"); ## MCE 1.5+
1871 MCE->sync;
1872
1873 return;
1874 }
1875
1876 my $mce = MCE->new(
1877 max_workers => 4, user_func => \&user_func
1878 )->run;
1879
1880 -- Output (without barrier synchronization)
1881
1882 a: 1
1883 a: 2
1884 b: 1
1885 b: 2
1886 c: 1
1887 c: 2
1888 a: 3
1889 b: 3
1890 c: 3
1891 a: 4
1892 b: 4
1893 c: 4
1894
1895 -- Output (with barrier synchronization)
1896
1897 a: 1
1898 a: 2
1899 a: 4
1900 a: 3
1901 b: 2
1902 b: 1
1903 b: 3
1904 b: 4
1905 c: 1
1906 c: 4
1907 c: 2
1908 c: 3
1909
1910 Consider the following example. The MCE->sync operation is done inside
1911 a loop along with MCE->do. A stall may occur for workers calling sync
1912 the 2nd or 3rd time while other workers are sending results via MCE->do
1913 or MCE->sendto.
1914
1915 It requires another semaphore lock in MCE to solve this which was not
1916 done in order to keep resources low. Therefore, please keep this in
1917 mind when mixing MCE->sync with MCE->do or output serialization methods
1918 inside a loop.
1919
1920 sub user_func {
1921
1922 my ($mce) = @_;
1923 my @result;
1924
1925 for (1 .. 3) {
1926 ... compute algorithm ...
1927
1928 MCE->sync;
1929
1930 ... compute algorithm ...
1931
1932 MCE->sync;
1933
1934 MCE->do('aggregate_result', \@result); ## or MCE->sendto
1935
1936 MCE->sync; ## The sync operation is also needed here to
1937 ## prevent MCE from stalling.
1938 }
1939 }
1940
1941 $mce->yield ( void )
1942 There may be on occasion when the MCE driven app is too fast. The
1943 interval option combined with the yield method, both introduced with
1944 MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
1945 the design.
1946
1947 A use case is an app configured with 100 workers running on a 24
1948 logical way box. Data is polled from a database containing over 2.5
1949 million rows. Workers chunk away at 300 rows per chunk performing SNMP
1950 gets (300 sockets per worker) polling 25 metrics from each device. With
1951 this scenario, the load on the box may rise beyond 90+. In addition,
1952 IP_Tables may reach its contention point causing the entire application
1953 to fail.
1954
1955 The scenario above is solved by simply having workers yield among
1956 themselves in a synchronized fashion. A delay of 0.007 seconds between
1957 intervals is all that's needed. The load on the box will hover between
1958 23 ~ 27 for the duration of the run. Polling completes in under 17
1959 minutes time. This is quite fast considering the app polls 62.5 million
1960 metrics combined. The math equates to 3,676,470 per minute or rather
1961 61,275 per second from a single box.
1962
1963 ## Both max_nodes and node_id are optional (default 1).
1964
1965 interval => {
1966 delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
1967 }
1968
1969 A 4 node setup can poll 10 million devices without the additional
1970 overhead of a distribution agent. The difference between the 4 nodes
1971 are simply node_id and the where clause used to query the database. The
1972 mac addresses are random such that the data divides equally to any
1973 power of 2. The distribution key lies in the mac address itself. In
1974 fact, the 2nd character from the right is sufficient for maximizing on
1975 the power of randomness for equal distribution.
1976
1977 Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
1978 Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
1979 Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
1980 Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
1981
1982 Below, the user_tasks is configured to simulate 4 nodes. This
1983 demonstration uses 2 workers to minimize the output size. Input is from
1984 the sequence option.
1985
1986 use Time::HiRes qw(time);
1987 use MCE;
1988
1989 my $d = shift || 0.1;
1990
1991 local $| = 1;
1992
1993 sub create_task {
1994
1995 my ($node_id) = @_;
1996
1997 my $seq_size = 6;
1998 my $seq_start = ($node_id - 1) * $seq_size + 1;
1999 my $seq_end = $seq_start + $seq_size - 1;
2000
2001 return {
2002 max_workers => 2, sequence => [ $seq_start, $seq_end ],
2003 interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2004 };
2005 }
2006
2007 sub user_begin {
2008
2009 my ($mce, $task_id, $task_name) = @_;
2010
2011 ## The yield method causes this worker to wait for its next time
2012 ## interval slot before running. Yield has no effect without the
2013 ## 'interval' option.
2014
2015 ## Yielding is beneficial inside a user_begin block. A use case
2016 ## is staggering database connections among workers in order
2017 ## to not impact the DB server.
2018
2019 MCE->yield;
2020
2021 MCE->printf(
2022 "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2023 MCE->task_id + 1, time, MCE->task_wid, ''
2024 );
2025
2026 return;
2027 }
2028
2029 {
2030 my $prev_time = time;
2031
2032 sub user_func {
2033
2034 my ($mce, $seq_n, $chunk_id) = @_;
2035
2036 ## Yield simply waits for the next time interval.
2037 MCE->yield;
2038
2039 ## Calculate how long this worker has waited.
2040 my $curr_time = time;
2041 my $time_waited = $curr_time - $prev_time;
2042
2043 $prev_time = $curr_time;
2044
2045 MCE->printf(
2046 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2047 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2048 );
2049
2050 return;
2051 }
2052 }
2053
2054 ## Simulate a 4 node environment passing node_id to create_task.
2055
2056 print "Node_ID Current_Time Worker_ID Time_Waited Comment\n";
2057
2058 MCE->new(
2059 user_begin => \&user_begin,
2060 user_func => \&user_func,
2061
2062 user_tasks => [
2063 create_task(1),
2064 create_task(2),
2065 create_task(3),
2066 create_task(4)
2067 ]
2068
2069 )->run;
2070
2071 -- Output (notice Current_Time below, stays 0.10 apart)
2072
2073 Node_ID Current_Time Worker_ID Time_Waited Comment
2074 Node 1: 1374807976.74634 -- Worker 1: -- Started
2075 Node 2: 1374807976.84634 -- Worker 1: -- Started
2076 Node 3: 1374807976.94638 -- Worker 1: -- Started
2077 Node 4: 1374807977.04639 -- Worker 1: -- Started
2078 Node 1: 1374807977.14634 -- Worker 2: -- Started
2079 Node 2: 1374807977.24640 -- Worker 2: -- Started
2080 Node 3: 1374807977.34649 -- Worker 2: -- Started
2081 Node 4: 1374807977.44657 -- Worker 2: -- Started
2082 Node 1: 1374807977.54636 -- Worker 1: 0.90037 -- Seq_N 1
2083 Node 2: 1374807977.64638 -- Worker 1: 1.00040 -- Seq_N 7
2084 Node 3: 1374807977.74642 -- Worker 1: 1.10043 -- Seq_N 13
2085 Node 4: 1374807977.84643 -- Worker 1: 1.20045 -- Seq_N 19
2086 Node 1: 1374807977.94636 -- Worker 2: 1.30037 -- Seq_N 2
2087 Node 2: 1374807978.04638 -- Worker 2: 1.40040 -- Seq_N 8
2088 Node 3: 1374807978.14641 -- Worker 2: 1.50042 -- Seq_N 14
2089 Node 4: 1374807978.24644 -- Worker 2: 1.60045 -- Seq_N 20
2090 Node 1: 1374807978.34628 -- Worker 1: 0.79996 -- Seq_N 3
2091 Node 2: 1374807978.44631 -- Worker 1: 0.79996 -- Seq_N 9
2092 Node 3: 1374807978.54634 -- Worker 1: 0.79996 -- Seq_N 15
2093 Node 4: 1374807978.64636 -- Worker 1: 0.79997 -- Seq_N 21
2094 Node 1: 1374807978.74628 -- Worker 2: 0.79996 -- Seq_N 4
2095 Node 2: 1374807978.84632 -- Worker 2: 0.79997 -- Seq_N 10
2096 Node 3: 1374807978.94634 -- Worker 2: 0.79996 -- Seq_N 16
2097 Node 4: 1374807979.04636 -- Worker 2: 0.79996 -- Seq_N 22
2098 Node 1: 1374807979.14628 -- Worker 1: 0.80001 -- Seq_N 5
2099 Node 2: 1374807979.24631 -- Worker 1: 0.80000 -- Seq_N 11
2100 Node 3: 1374807979.34634 -- Worker 1: 0.80001 -- Seq_N 17
2101 Node 4: 1374807979.44636 -- Worker 1: 0.80000 -- Seq_N 23
2102 Node 1: 1374807979.54628 -- Worker 2: 0.80000 -- Seq_N 6
2103 Node 2: 1374807979.64631 -- Worker 2: 0.80000 -- Seq_N 12
2104 Node 3: 1374807979.74633 -- Worker 2: 0.80000 -- Seq_N 18
2105 Node 4: 1374807979.84636 -- Worker 2: 0.80000 -- Seq_N 24
2106
2107 The interval.pl example above is included with MCE.
2108
2110 The "progress" option takes a code block for receiving info on the
2111 progress made while processing input data; e.g. "input_data" or
2112 "sequence". To make this work, one provides the "progress" option a
2113 closure block like so, passing along the size of the input_data; e.g
2114 "scalar @array" or "-s /path/to/file".
2115
2116 Current API available since 1.813.
2117
2118 A worker, upon completing processing its chunk, notifies the manager-
2119 process with the size of the chunk. That could be the number of rows or
2120 literally the size of the chunk when processing an input file. The
2121 manager-process accumulates the size before calling the code block
2122 associated with the "progress" option.
2123
2124 When running many tasks simultaneously, via "user_tasks", the call is
2125 initiated by workers at level 0 only or rather the first task, not
2126 shown here.
2127
2128 use Time::HiRes 'sleep';
2129 use MCE;
2130
2131 sub make_progress {
2132 my ($total_size) = @_;
2133 return sub {
2134 my ($completed_size) = @_;
2135 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2136 };
2137 }
2138
2139 my @input = (1..150);
2140
2141 MCE->new(
2142 chunk_size => 10,
2143 max_workers => 4,
2144 input_data => \@input,
2145 progress => make_progress( scalar @input ),
2146 user_func => sub { sleep 1.5 }
2147 )->run();
2148
2149 -- Output
2150
2151 6.7%
2152 13.3%
2153 20.0%
2154 26.7%
2155 33.3%
2156 40.0%
2157 46.7%
2158 53.3%
2159 60.0%
2160 66.7%
2161 73.3%
2162 80.0%
2163 86.7%
2164 93.3%
2165 100.0%
2166
2167 Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2168 thing, practically.
2169
2170 use Time::HiRes 'sleep';
2171 use ProgressBar::Stack;
2172 use MCE::Flow;
2173
2174 sub make_progress {
2175 my ($total_size) = @_;
2176 init_progress();
2177 return sub {
2178 my ($completed_size) = @_;
2179 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2180 };
2181 }
2182
2183 my @input = (1..150);
2184
2185 MCE::Flow->init(
2186 chunk_size => 10,
2187 max_workers => 4,
2188 progress => make_progress( scalar @input )
2189 );
2190
2191 MCE::Flow->run( sub { sleep 1.5 }, \@input );
2192 MCE::Flow->finish();
2193
2194 print "\n";
2195
2196 -- Output
2197
2198 [################ ] 80.0% ETA: 0:01
2199
2200 For sequence of numbers, using the "sequence" option, one must account
2201 for "step_size", typically set to 1 automatically.
2202
2203 use Time::HiRes 'sleep';
2204 use MCE;
2205
2206 sub make_progress {
2207 my ($total_size) = @_;
2208 return sub {
2209 my ($completed_size) = @_;
2210 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2211 };
2212 }
2213
2214 MCE->new(
2215 chunk_size => 10,
2216 max_workers => 4,
2217 sequence => [ 1, 100, 2 ],
2218 progress => make_progress( int( 100 / 2 + 0.5 ) ),
2219 user_func => sub { sleep 1.5 }
2220 )->run();
2221
2222 -- Output
2223
2224 20.0%
2225 40.0%
2226 60.0%
2227 80.0%
2228 100.0%
2229
2230 Changing "chunk_size" to 1 means workers notify the manager process
2231 more often, thus increasing granularity. Take a look at the output.
2232
2233 2.0%
2234 4.0%
2235 6.0%
2236 8.0%
2237 10.0%
2238 ...
2239 92.0%
2240 94.0%
2241 96.0%
2242 98.0%
2243 100.0%
2244
2245 Here is the same thing using MCE::Flow together with
2246 ProgressBar::Stack.
2247
2248 use Time::HiRes 'sleep';
2249 use ProgressBar::Stack;
2250 use MCE::Flow;
2251
2252 sub make_progress {
2253 my ($total_size) = @_;
2254 init_progress();
2255 return sub {
2256 my ($completed_size) = @_;
2257 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2258 };
2259 }
2260
2261 MCE::Flow->init(
2262 chunk_size => 1,
2263 max_workers => 4,
2264 progress => make_progress( int( 100 / 2 + 0.5 ) )
2265 );
2266
2267 MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2268 MCE::Flow->finish();
2269
2270 print "\n";
2271
2272 -- Output
2273
2274 [######### ] 48.0% ETA: 0:03
2275
2276 For files and file handles, workers send the actual size of the data
2277 read versus counting rows.
2278
2279 use Time::HiRes 'sleep';
2280 use MCE;
2281
2282 sub make_progress {
2283 my ($total_size) = @_;
2284 return sub {
2285 my ($completed_size) = @_;
2286 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2287 };
2288 }
2289
2290 my $input_file = "/path/to/input_file.txt";
2291
2292 MCE->new(
2293 chunk_size => 5,
2294 max_workers => 4,
2295 input_data => $input_file,
2296 progress => make_progress( -s $input_file ),
2297 user_func => sub { sleep 0.03 }
2298 )->run();
2299
2300 For consistency, here is the example using MCE::Flow, again with
2301 ProgressBar::Stack.
2302
2303 use Time::HiRes 'sleep';
2304 use ProgressBar::Stack;
2305 use MCE::Flow;
2306
2307 sub make_progress {
2308 my ($total_size) = @_;
2309 init_progress();
2310 return sub {
2311 my ($completed_size) = @_;
2312 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2313 };
2314 }
2315
2316 my $input_file = "/path/to/input_file.txt";
2317
2318 MCE::Flow->init(
2319 chunk_size => 5,
2320 max_workers => 4,
2321 progress => make_progress( -s $input_file )
2322 );
2323
2324 MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2325 MCE::Flow->finish();
2326
2327 The next demonstration processes three arrays consecutively. For this
2328 one, MCE workers persist after running. This needs MCE 1.814 or later
2329 to run. Otherwise, the progress output is not shown in MCE 1.813.
2330
2331 use Time::HiRes 'sleep';
2332 use ProgressBar::Stack;
2333 use MCE;
2334
2335 sub make_progress {
2336 my ($total_size, $message) = @_;
2337 init_progress();
2338 return sub {
2339 my ($completed_size) = @_;
2340 update_progress(
2341 sprintf("%0.1f", $completed_size / $total_size * 100),
2342 $message
2343 );
2344 };
2345 }
2346
2347 my $mce = MCE->new(
2348 chunk_size => 10,
2349 max_workers => 4,
2350 user_func => sub { sleep 0.5 }
2351 )->spawn();
2352
2353 my @a1 = ( 1 .. 200 );
2354 my @a2 = ( 1 .. 500 );
2355 my @a3 = ( 1 .. 300 );
2356
2357 $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2358
2359 print "\n";
2360
2361 $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2362
2363 print "\n";
2364
2365 $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2366
2367 print "\n";
2368
2369 $mce->shutdown;
2370
2371 -- Output
2372
2373 [####################] 100.0% ETA: 0:00 array 1
2374 [####################] 100.0% ETA: 0:00 array 2
2375 [####################] 100.0% ETA: 0:00 array 3
2376
2377 When size is not know, such as reading from "STDIN", the only thing one
2378 can do is report the size completed thus far.
2379
2380 # 1 kibibyte equals 1024 bytes
2381
2382 progress => sub {
2383 my ($completed_size) = @_;
2384 printf "%0.1f kibibytes\n", $completed_size / 1024;
2385 }
2386
2388 MCE
2389
2391 Mario E. Roy, <marioeroy AT gmail DOT com>
2392
2393
2394
2395perl v5.28.1 2019-01-23 MCE::Core(3)