1MCE::Core(3) User Contributed Perl Documentation MCE::Core(3)
2
3
4
6 MCE::Core - Documentation describing the core MCE API
7
9 This document describes MCE::Core version 1.874
10
12 This is a simplistic use case of MCE running with 5 workers.
13
14 ## Construction using the Core API
15
16 use MCE;
17
18 my $mce = MCE->new(
19 max_workers => 5,
20 user_func => sub {
21 my ($mce) = @_;
22 $mce->say("Hello from " . $mce->wid);
23 }
24 );
25
26 $mce->run;
27
28 ## Construction using a MCE model
29
30 use MCE::Flow max_workers => 5;
31
32 mce_flow sub {
33 my ($mce) = @_;
34 MCE->say("Hello from " . MCE->wid);
35 };
36
37 -- Output
38
39 Hello from 2
40 Hello from 4
41 Hello from 5
42 Hello from 1
43 Hello from 3
44
45 MCE->new ( [ options ] )
46 Below, a new instance is configured with all available options.
47
48 use MCE;
49
50 my $mce = MCE->new(
51
52 max_workers => 8, ## Default 1
53
54 # Number of workers to spawn. This can be set automatically
55 # with MCE 1.412 and later releases.
56
57 # MCE 1.521 sets an upper-limit of 8 for 'auto'.
58 # See MCE::Util::get_ncpu for more info.
59
60 # max_workers => 'auto', ## # of lcores, 8 maximum
61 # max_workers => 'auto-1', ## 7 on HW with 16-lcores
62 # max_workers => 'auto-1', ## 3 on HW with 4-lcores
63
64 # max_workers => MCE::Util::get_ncpu, # run on all lcores
65
66 chunk_size => 2000, ## Default 1
67
68 # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
69 # The default is 1 when using the Core API and 'auto' for
70 # MCE Models. For arrays or queues, chunk_size means the
71 # number of records per chunk. For iterators, MCE will not
72 # use chunk_size, though the iterator may use it to determine
73 # how much to return per iteration. For files, smaller than or
74 # equal to 8192 is the number of records. Greater than 8192
75 # is the number of bytes. MCE reads until the end of record
76 # before calling user_func.
77
78 # chunk_size => 1, ## Consists of 1 record
79 # chunk_size => 1000, ## Consists of 1000 records
80 # chunk_size => '16k', ## Approximate 16 kibiBytes (KiB)
81 # chunk_size => '20m', ## Approximate 20 mebiBytes (MiB)
82
83 tmp_dir => $tmp_dir, ## Default $MCE::Signal::tmp_dir
84
85 # Default is $MCE::Signal::tmp_dir which points to
86 # $ENV{TEMP} if defined. Otherwise, tmp_dir points
87 # to a location under /tmp.
88
89 freeze => \&encode_sereal, ## Default \&Storable::freeze
90 thaw => \&decode_sereal, ## Default \&Storable::thaw
91
92 # Release 1.412 allows freeze and thaw to be overridden.
93 # Simply include a serialization module prior to loading
94 # MCE. Configure freeze/thaw options.
95
96 # use Sereal qw( encode_sereal decode_sereal );
97 # use CBOR::XS qw( encode_cbor decode_cbor );
98 # use JSON::XS qw( encode_json decode_json );
99 #
100 # use MCE;
101
102 gather => \@a, ## Default undef
103
104 # Release 1.5 allows for gathering of data to an array or
105 # hash reference, a MCE::Queue/Thread::Queue object, or code
106 # reference. One invokes gathering by calling the gather
107 # method as often as needed.
108
109 # gather => \@array,
110 # gather => \%hash,
111 # gather => $queue,
112 # gather => \&order,
113
114 init_relay => 0, ## Default undef
115
116 # For specifying the initial relay value. Allowed values
117 # are array_ref, hash_ref, or scalar. The MCE::Relay module
118 # is loaded automatically when specified.
119
120 # init_relay => \@array,
121 # init_relay => \%hash,
122 # init_relay => scalar,
123
124 input_data => $input_file, ## Default undef
125 RS => "\n>", ## Default undef
126
127 # input_data => '/path/to/file' ## Process file
128 # input_data => \@array ## Process array
129 # input_data => \*FILE_HNDL ## Process file handle
130 # input_data => $io ## Process IO::All { File, Pipe, STDIO }
131 # input_data => \$scalar ## Treated like a file
132 # input_data => \&iterator ## User specified iterator
133
134 # The RS option (for input record separator) applies to files
135 # and file handles.
136
137 # MCE applies additional logic when RS begins with a newline
138 # character; e.g. RS => "\n>". It trims away characters after
139 # the newline and prepends them to the next record.
140 #
141 # Typically, the left side is what happens for $/ = "\n>".
142 # The right side is what user_func receives.
143 #
144 # All records begin with > and end with \n
145 # Record 1: >seq1 ... \n> (to) >seq1 ... \n
146 # Record 2: seq2 ... \n> >seq2 ... \n
147 # Record 3: seq3 ... \n> >seq3 ... \n
148 # Last Rec: seqN ... \n >seqN ... \n
149
150 loop_timeout => 20, ## Default 0
151
152 # Added in 1.7, enables the manager process to timeout of a read
153 # operation on channel 0 (UNIX platforms only). The manager process
154 # decrements the total workers running for any worker which have
155 # died in an uncontrollable manner. Specify this option if on
156 # occassion a worker dies unexpectedly (i.e. from an XS module).
157
158 # Option works with init_relay on UNIX platforms since MCE 1.844.
159 # A number smaller than 5 is silently increased to 5.
160
161 max_retries => 2, ## Default 0
162
163 # This option, added in 1.7, causes MCE to retry a failed
164 # chunk from a worker dying while processing input data or
165 # sequence of numbers.
166
167 parallel_io => 1, ## Default 0
168 posix_exit => 1, ## Default 0
169 use_slurpio => 1, ## Default 0
170
171 # The parallel_io option enables parallel reads during large
172 # slurpio, useful when reading from fast storage. Do not enable
173 # parallel_io when running MCE on many nodes with input coming
174 # from shared storage.
175
176 # Set posix_exit to avoid all END and destructor processing.
177 # Constructing MCE inside a thread implies 1 or if present CGI,
178 # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
179 # Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
180
181 # Enable slurpio to pass the raw chunk (scalar ref) to the user
182 # function when reading input files.
183
184 use_threads => 1, ## Auto 0 or 1
185
186 # By default MCE spawns child processes on UNIX platforms and
187 # threads on Windows (i.e. $^O eq 'MSWin32').
188
189 # MCE supports threads via two threading libraries if threads
190 # is preferred over child processes. The use of threads requires
191 # a thread library prior to loading MCE, causing the use_threads
192 # option to default to 1. Specify 0 for child processes.
193 #
194 # use threads; use forks;
195 # use threads::shared; use forks::shared;
196 # use MCE (or) use MCE; (or) use MCE;
197
198 spawn_delay => 0.045, ## Default undef
199 submit_delay => 0.015, ## Default undef
200 job_delay => 0.060, ## Default undef
201
202 # Time to wait in fractional seconds after spawning a worker,
203 # after submitting parameters to worker (MCE->run, MCE->process),
204 # and worker running (one time staggered delay).
205
206 # Specify job_delay to stagger workers connecting to a database.
207
208 on_post_exit => \&on_post_exit, ## Default undef
209 on_post_run => \&on_post_run, ## Default undef
210
211 # Execute the code block after a worker exits or dies.
212 # (i.e. MCE->exit, exit, die)
213
214 # Execute the code block after running.
215 # (i.e. MCE->process, MCE->run)
216
217 progress => sub { ... }, ## Default undef
218
219 # A code block for receiving info on the progress made.
220 # See section labeled "MCE PROGRESS DEMONSTRATIONS" at the
221 # end of this document.
222
223 user_args => { env => 'test' }, ## Default undef
224
225 # MCE release 1.4 added a new parameter to allow one to
226 # specify arbitrary arguments such as a string, an ARRAY
227 # or HASH reference. Workers can access this directly.
228 # (i.e. my $args = $mce->{user_args} or MCE->user_args)
229
230 user_begin => \&user_begin, ## Default undef
231 user_func => \&user_func, ## Default undef
232 user_end => \&user_end, ## Default undef
233
234 # Think of user_begin, user_func, and user_end as in
235 # the awk scripting language:
236 # awk 'BEGIN { begin } { func } { func } ... END { end }'
237
238 # MCE workers call user_begin once at the start of a job,
239 # then user_func repeatedly until no chunks remain.
240 # Afterwards, user_end is called.
241
242 user_error => \&user_error, ## Default undef
243 user_output => \&user_output, ## Default undef
244
245 # MCE will forward data to user_error/user_output,
246 # when defined, for the following methods.
247
248 # MCE->sendto(\*STDERR, "sent to user_error\n");
249 # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
250 # MCE->print(\*STDERR, "sent to user_error\n");
251 # MCE->say(\*STDERR, "sent to user_error");
252
253 # MCE->sendto(\*STDOUT, "sent to user_output\n");
254 # MCE->printf("%s\n", "sent to user_output");
255 # MCE->print("sent to user_output\n");
256 # MCE->say("sent to user_output");
257
258 stderr_file => 'err_file', ## Default STDERR
259 stdout_file => 'out_file', ## Default STDOUT
260
261 # Or to file; user_error and user_output take precedence.
262
263 flush_file => 0, ## Default 1
264 flush_stderr => 0, ## Default 1
265 flush_stdout => 0, ## Default 1
266
267 # Flush sendto file, standard error, or standard output.
268
269 interval => {
270 delay => 0.007 [, max_nodes => 4, node_id => 1 ]
271 },
272
273 # For use with the yield method introduced in MCE 1.5.
274 # Both max_nodes & node_id are optional and default to 1.
275 # Delay is the amount of time between intervals.
276
277 # interval => 0.007 ## Shorter; MCE 1.506+
278
279 sequence => { ## Default undef
280 begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
281 },
282
283 bounds_only => 1, ## Default undef
284
285 # For looping through a sequence of numbers in parallel.
286 # STEP, if omitted, defaults to 1 if BEGIN is smaller than
287 # END or -1 if BEGIN is greater than END. The FORMAT string
288 # is passed to sprintf behind the scene (% may be omitted).
289 # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
290
291 # Do not specify both options; input_data and sequence.
292 # Release 1.4 allows one to specify an array reference.
293 # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
294
295 # The bounds_only => 1 option will compute the 'begin' and
296 # 'end' items only for the chunk and not the items in between
297 # (hence boundaries only). This option has no effect when
298 # sequence is not specified or chunk_size equals 1.
299
300 # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
301
302 task_end => \&task_end, ## Default undef
303
304 # This is called by the manager process after the task
305 # has completed processing. MCE 1.5 allows this option
306 # to be specified at the top level.
307
308 task_name => 'string', ## Default 'MCE'
309
310 # Added in MCE 1.5 and mainly beneficial for user_tasks.
311 # One may specify a unique name per each sub-task.
312 # The string is passed as the 3rd arg to task_end.
313
314 user_tasks => [ ## Default undef
315 { ... }, ## Options for task 0
316 { ... }, ## Options for task 1
317 { ... }, ## Options for task 2
318 ],
319
320 # Takes a list of hash references, each allowing up to 17
321 # options. All other MCE options are ignored. The init_relay,
322 # input_data, RS, and use_slurpio options are applicable to
323 # the first task only.
324
325 # max_workers, chunk_size, input_data, interval, sequence,
326 # bounds_only, user_args, user_begin, user_end, user_func,
327 # gather, task_end, task_name, use_slurpio, use_threads,
328 # init_relay, RS
329
330 # Options not specified here will default to same option
331 # specified at the top level.
332 );
333
334 EXPORT_CONST, CONST
335 There are 3 constants which are exportable. Using the constants in lieu
336 of 0,1,2 makes it more legible when accessing the user_func arguments
337 directly.
338
339 SELF CHUNK CID - MCE CONSTANTS
340
341 Exports SELF => 0, CHUNK => 1, and CID => 2.
342
343 use MCE export_const => 1;
344 use MCE const => 1; ## Shorter; MCE 1.415+
345
346 user_func => sub {
347 # my ($mce, $chunk_ref, $chunk_id) = @_;
348 print "Hello from ", $_[SELF]->wid, "\n";
349 }
350
351 MCE 1.5 allows all public method to be called directly.
352
353 use MCE;
354
355 user_func => sub {
356 # my ($mce, $chunk_ref, $chunk_id) = @_;
357 print "Hello from ", MCE->wid, "\n";
358 }
359
360 OVERRIDING DEFAULTS
361 The following list options which may be overridden when loading the
362 module.
363
364 use Sereal qw( encode_sereal decode_sereal );
365 use CBOR::XS qw( encode_cbor decode_cbor );
366 use JSON::XS qw( encode_json decode_json );
367
368 use MCE
369 max_workers => 4, ## Default 1
370 chunk_size => 100, ## Default 1
371 tmp_dir => "/path/to/app/tmp", ## $MCE::Signal::tmp_dir
372 freeze => \&encode_sereal, ## \&Storable::freeze
373 thaw => \&decode_sereal ## \&Storable::thaw
374 ;
375
376 my $mce = MCE->new( ... );
377
378 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
379 available. Specify "Sereal => 0" to use Storable instead.
380
381 use MCE Sereal => 0;
382
383 RUNNING
384 Run calls spawn, submits the job; workers call user_begin, user_func,
385 and user_end. Run shuts down workers afterwards. Call spawn whenever
386 the need arises for large data structures prior to running.
387
388 $mce->spawn; ## Call early if desired
389
390 $mce->run; ## Call run or process below
391
392 ## Acquire data arrays and/or input_files. Workers persist after
393 ## processing.
394
395 $mce->process(\@input_data_1); ## Process array
396 $mce->process(\@input_data_2);
397 $mce->process(\@input_data_n);
398
399 $mce->process(\%input_hash_1); ## Process hash, current API
400 $mce->process(\%input_hash_2); ## available since 1.828
401 $mce->process(\%input_hash_n);
402
403 $mce->process('input_file_1'); ## Process file
404 $mce->process('input_file_2');
405 $mce->process('input_file_n');
406
407 $mce->shutdown; ## Shutdown workers
408
409 SYNTAX for ON_POST_EXIT
410 Often times, one may want to capture the exit status. The on_post_exit
411 option, if defined, is executed immediately by the manager process
412 after a worker exits via exit (children only), MCE->exit (children and
413 threads), or die.
414
415 The format of $e->{pid} is PID_123 for children and THR_123 for
416 threads.
417
418 my $restart_flag = 1;
419
420 sub on_post_exit {
421 my ($mce, $e) = @_;
422
423 ## Display all possible hash elements.
424 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
425
426 ## Restart this worker if desired.
427 if ($restart_flag && $e->{wid} == 2) {
428 $mce->restart_worker;
429 $restart_flag = 0;
430 }
431 }
432
433 sub user_func {
434 my ($mce) = @_;
435 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); ## Args, not necessary
436 }
437
438 my $mce = MCE->new(
439 on_post_exit => \&on_post_exit,
440 user_func => \&user_func,
441 max_workers => 3
442 );
443
444 $mce->run;
445
446 -- Output (child processes)
447
448 2: PID_33223: 0: msg_foo: 1002
449 1: PID_33222: 0: msg_foo: 1001
450 3: PID_33224: 0: msg_foo: 1003
451 2: PID_33225: 0: msg_foo: 1002
452
453 -- Output (running with threads)
454
455 3: TID_3: 0: msg_foo: 1003
456 2: TID_2: 0: msg_foo: 1002
457 1: TID_1: 0: msg_foo: 1001
458 2: TID_4: 0: msg_foo: 1002
459
460 SYNTAX for ON_POST_RUN
461 The on_post_run option, if defined, is executed immediately by the
462 manager process after running MCE->process or MCE->run. This option
463 receives an array reference of hashes.
464
465 The difference between on_post_exit and on_post_run is that the former
466 is called immediately whereas the latter is called after all workers
467 have completed running.
468
469 sub on_post_run {
470 my ($mce, $status_ref) = @_;
471 foreach my $e ( @{ $status_ref } ) {
472 ## Display all possible hash elements.
473 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
474 }
475 }
476
477 sub user_func {
478 my ($mce) = @_;
479 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); ## Args, not necessary
480 }
481
482 my $mce = MCE->new(
483 on_post_run => \&on_post_run,
484 user_func => \&user_func,
485 max_workers => 3
486 );
487
488 $mce->run;
489
490 -- Output (child processes)
491
492 3: PID_33174: 0: msg_foo: 1003
493 1: PID_33172: 0: msg_foo: 1001
494 2: PID_33173: 0: msg_foo: 1002
495
496 -- Output (running with threads)
497
498 2: TID_2: 0: msg_foo: 1002
499 3: TID_3: 0: msg_foo: 1003
500 1: TID_1: 0: msg_foo: 1001
501
502 SYNTAX for INPUT_DATA
503 MCE supports many ways to specify input_data. Support for iterators was
504 added in MCE 1.505. The RS option allows one to specify the record
505 separator when processing files.
506
507 MCE is a chunking engine. Therefore, chunk_size is applicable to
508 input_data. Specifying 1 for use_slurpio causes user_func to receive a
509 scalar reference containing the raw data (applicable to files only)
510 instead of an array reference.
511
512 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
513
514 input_data => '/path/to/file', ## process file
515 input_data => \@array, ## process array
516 input_data => \%hash, ## process hash, API since 1.828
517 input_data => \*FILE_HNDL, ## process file handle
518 input_data => $fh, ## open $fh, "<", "file"
519 input_data => $fh, ## IO::File "file", "r"
520 input_data => $fh, ## IO::Uncompress::Gunzip "file.gz"
521 input_data => $io, ## IO::All { File, Pipe, STDIO }
522 input_data => \$scalar, ## treated like a file
523 input_data => \&iterator, ## user specified iterator
524
525 chunk_size => 1, ## >1 means looping inside user_func
526 use_slurpio => 1, ## $chunk_ref is a scalar ref
527 RS => "\n>", ## input record separator
528
529 The chunk_size value determines the chunking mode to use when
530 processing files. Otherwise, chunk_size is the number of elements for
531 arrays. For files, a chunk size value of <= 8192 is how many records to
532 read. Greater than 8192 is how many bytes to read. MCE appends (the
533 rest) up to the next record separator.
534
535 chunk_size => 8192, ## Consists of 8192 records
536 chunk_size => 8193, ## Approximate 8193 bytes for files
537
538 chunk_size => 1, ## Consists of 1 record or element
539 chunk_size => 1000, ## Consists of 1000 records
540 chunk_size => '16k', ## Approximate 16 kibiBytes (KiB)
541 chunk_size => '20m', ## Approximate 20 mebiBytes (MiB)
542
543 The construction for user_func when chunk_size > 1 and assuming
544 use_slurpio equals 0.
545
546 user_func => sub {
547 my ($mce, $chunk_ref, $chunk_id) = @_;
548
549 ## $_ is $chunk_ref->[0] when chunk_size equals 1
550 ## $_ is $chunk_ref otherwise; $_ can be used below
551
552 for my $record ( @{ $chunk_ref } ) {
553 print "$chunk_id: $record\n";
554 }
555 }
556
557 # input_data => \%hash
558 # current API available since 1.828
559
560 user_func => sub {
561 my ($mce, $chunk_ref, $chunk_id) = @_;
562
563 ## $_ points to $chunk_ref regardless of chunk_size
564
565 for my $key ( keys %{ $chunk_ref } ) {
566 print "$key: ", $chunk_ref->{$key}, "\n";
567 }
568 }
569
570 Specifying a value for input_data is straight forward for arrays and
571 files. The next several examples specify an iterator reference for
572 input_data.
573
574 use MCE;
575
576 ## A factory function which creates a closure (the iterator itself)
577 ## for generating a sequence of numbers. The external variables
578 ## ($n, $max, $step) are used for keeping state across successive
579 ## calls to the closure. The iterator simply returns when $n > max.
580
581 sub input_iterator {
582 my ($n, $max, $step) = @_;
583
584 return sub {
585 return if $n > $max;
586
587 my $current = $n;
588 $n += $step;
589
590 return $current;
591 };
592 }
593
594 ## Run user_func in parallel. Input data can be specified during
595 ## the construction or as an argument to the process method.
596
597 my $mce = MCE->new(
598
599 # input_data => input_iterator(10, 30, 2),
600 chunk_size => 1, max_workers => 4,
601
602 user_func => sub {
603 my ($mce, $chunk_ref, $chunk_id) = @_;
604 MCE->print("$_: ", $_ * 2, "\n");
605 }
606
607 )->spawn;
608
609 $mce->process( input_iterator(10, 30, 2) );
610
611 -- Output Note that output order is not guaranteed
612 Take a look at iterator.pl for ordered output
613
614 10: 20
615 12: 24
616 16: 32
617 20: 40
618 14: 28
619 22: 44
620 18: 36
621 24: 48
622 26: 52
623 28: 56
624 30: 60
625
626 The following example queries the DB for the next 1000 rows. Notice the
627 use of fetchall_arrayref. The iterator function itself receives one
628 argument which is chunk_size (added in MCE 1.510) to determine how much
629 to return per iteration. The default is 1 for the Core API and MCE
630 Models.
631
632 use DBI;
633 use MCE;
634
635 sub db_iter {
636
637 my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
638
639 my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
640 die "Could not connect to database: $DBI::errstr";
641
642 my $sth = $dbh->prepare('select color, desc from table');
643
644 $sth->execute;
645
646 return sub {
647 my ($chunk_size) = @_;
648
649 if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
650 return @{ $aref };
651 }
652
653 return;
654 };
655 }
656
657 ## Let's enumerate column indexes for easy column retrieval.
658 my ($i_color, $i_desc) = (0 .. 1);
659
660 my $mce = MCE->new(
661 max_workers => 3, chunk_size => 1000,
662 input_data => db_iter(),
663
664 user_func => sub {
665 my ($mce, $chunk_ref, $chunk_id) = @_;
666 my $ret = '';
667
668 foreach my $row (@{ $chunk_ref }) {
669 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
670 }
671
672 MCE->print($ret);
673 }
674 );
675
676 $mce->run;
677
678 There are many modules on CPAN which return an iterator reference.
679 Showing one such example below. The demonstration ensures MCE workers
680 are spawned before obtaining the iterator. Note the worker_id value
681 (left column) in the output.
682
683 use Path::Iterator::Rule;
684 use MCE;
685
686 my $start_dir = shift
687 or die "Please specify a starting directory";
688
689 -d $start_dir
690 or die "Cannot open ($start_dir): No such file or directory";
691
692 my $mce = MCE->new(
693 max_workers => 'auto',
694 user_func => sub { MCE->say( MCE->wid . ": $_" ) }
695 )->spawn;
696
697 my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
698
699 my $iterator = $rule->iter(
700 $start_dir, { follow_symlinks => 0, depthfirst => 1 }
701 );
702
703 $mce->process( $iterator );
704
705 -- Output
706
707 8: lib/MCE/Core/Input/Generator.pm
708 5: lib/MCE/Core/Input/Handle.pm
709 6: lib/MCE/Core/Input/Iterator.pm
710 2: lib/MCE/Core/Input/Request.pm
711 3: lib/MCE/Core/Manager.pm
712 4: lib/MCE/Core/Input/Sequence.pm
713 7: lib/MCE/Core/Validation.pm
714 1: lib/MCE/Core/Worker.pm
715 8: lib/MCE/Flow.pm
716 5: lib/MCE/Grep.pm
717 6: lib/MCE/Loop.pm
718 2: lib/MCE/Map.pm
719 3: lib/MCE/Queue.pm
720 4: lib/MCE/Signal.pm
721 7: lib/MCE/Stream.pm
722 1: lib/MCE/Subs.pm
723 8: lib/MCE/Util.pm
724 5: lib/MCE.pm
725
726 Although MCE supports arrays, extra measures are needed to use a "lazy"
727 array as input data. The reason for this is that MCE needs the size of
728 the array before processing which may be unknown for lazy arrays.
729 Therefore, closures provides an excellent mechanism for this.
730
731 The code block belonging to the lazy array must return undef after
732 exhausting its input data. Otherwise, the process will never end.
733
734 use Tie::Array::Lazy;
735 use MCE;
736
737 tie my @a, 'Tie::Array::Lazy', [], sub {
738 my $i = $_[0]->index;
739
740 return ($i < 10) ? $i : undef;
741 };
742
743 sub make_iterator {
744 my $i = 0; my $a_ref = shift;
745
746 return sub {
747 return $a_ref->[$i++];
748 };
749 }
750
751 my $mce = MCE->new(
752 max_workers => 4, input_data => make_iterator(\@a),
753
754 user_func => sub {
755 my ($mce, $chunk_ref, $chunk_id) = @_;
756 MCE->say($_);
757 }
758
759 )->run;
760
761 -- Output
762
763 0
764 1
765 2
766 3
767 4
768 6
769 7
770 8
771 5
772 9
773
774 The following demonstrates how to retrieve a chunk from the lazy array
775 per each successive call. Here, undef is sent by the iterator block
776 when $i is greater than $max. Iterators may optionally use chunk_size
777 to determine how much to return per iteration.
778
779 use Tie::Array::Lazy;
780 use MCE;
781
782 tie my @a, 'Tie::Array::Lazy', [], sub {
783 $_[0]->index;
784 };
785
786 sub make_iterator {
787 my $j = 0; my ($a_ref, $max) = @_;
788
789 return sub {
790 my ($chunk_size) = @_;
791 my $i = $j; $j += $chunk_size;
792
793 return if $i > $max;
794 return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
795 };
796 }
797
798 my $mce = MCE->new(
799 chunk_size => 15, max_workers => 4,
800 input_data => make_iterator(\@a, 100),
801
802 user_func => sub {
803 my ($mce, $chunk_ref, $chunk_id) = @_;
804 MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
805 }
806
807 )->run;
808
809 -- Output
810
811 1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
812 2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
813 3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
814 4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
815 5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
816 6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
817 7: 90 91 92 93 94 95 96 97 98 99 100
818
819 SYNTAX for SEQUENCE
820 The 1.3 release and above allows workers to loop through a sequence of
821 numbers computed mathematically without the overhead of an array. The
822 sequence can be specified separately per each user_task entry unlike
823 input_data which is applicable to the first task only.
824
825 See the seq_demo.pl example, included with this distribution, on
826 applying sequences with the user_tasks option.
827
828 Sequence can be defined using an array or a hash reference.
829
830 use MCE;
831
832 my $mce = MCE->new(
833 max_workers => 3,
834
835 # sequence => [ 10, 19, 0.7, "%4.1f" ], ## up to 4 options
836
837 sequence => {
838 begin => 10, end => 19, step => 0.7, format => "%4.1f"
839 },
840
841 user_func => sub {
842 my ($mce, $n, $chunk_id) = @_;
843 print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
844 }
845 );
846
847 $mce->run;
848
849 -- Output (sorted afterwards, notice wid and chunk_id in output)
850
851 10.0 from 1 id 1
852 10.7 from 2 id 2
853 11.4 from 3 id 3
854 12.1 from 1 id 4
855 12.8 from 2 id 5
856 13.5 from 3 id 6
857 14.2 from 1 id 7
858 14.9 from 2 id 8
859 15.6 from 3 id 9
860 16.3 from 1 id 10
861 17.0 from 2 id 11
862 17.7 from 3 id 12
863 18.4 from 1 id 13
864
865 The 1.5 release includes a new option (bounds_only). This option tells
866 the sequence engine to compute 'begin' and 'end' items only, for the
867 chunk, and not the items in between (hence boundaries only). This
868 option applies to sequence only and has no effect when chunk_size
869 equals 1.
870
871 The time to run is 0.006s below. This becomes 0.827s without the
872 bounds_only option due to computing all items in between, thus creating
873 a very large array. Basically, specify bounds_only => 1 when boundaries
874 is all you need for looping inside the block; e.g. Monte Carlo
875 simulations.
876
877 Time was measured using 1 worker to emphasize the difference.
878
879 use MCE;
880
881 my $mce = MCE->new(
882 max_workers => 1, chunk_size => 1_250_000,
883
884 sequence => { begin => 1, end => 10_000_000 },
885 bounds_only => 1,
886
887 ## For sequence, the input scalar $_ points to $chunk_ref
888 ## when chunk_size > 1, otherwise $chunk_ref->[0].
889 ##
890 ## user_func => sub {
891 ## my $begin = $_->[0]; my $end = $_->[-1];
892 ##
893 ## for ($begin .. $end) {
894 ## ...
895 ## }
896 ## },
897
898 user_func => sub {
899 my ($mce, $chunk_ref, $chunk_id) = @_;
900 ## $chunk_ref contains 2 items, not 1_250_000
901
902 my $begin = $chunk_ref->[ 0];
903 my $end = $chunk_ref->[-1]; ## or $chunk_ref->[1]
904
905 MCE->printf("%7d .. %8d\n", $begin, $end);
906 }
907 );
908
909 $mce->run;
910
911 -- Output
912
913 1 .. 1250000
914 1250001 .. 2500000
915 2500001 .. 3750000
916 3750001 .. 5000000
917 5000001 .. 6250000
918 6250001 .. 7500000
919 7500001 .. 8750000
920 8750001 .. 10000000
921
922 SYNTAX for MAX_RETRIES
923 The max_retries option, added in 1.7, allows MCE to retry a failed
924 chunk from a worker dying while processing input data or a sequence of
925 numbers.
926
927 When max_retries is set, MCE configures the on_post_exit option
928 automatically using the following code before running. Specify
929 on_post_exit explicitly for any further tailoring. The restart_worker
930 line is necessary, obviously.
931
932 on_post_exit => sub {
933 my ( $mce, $e, $retry_cnt ) = @_;
934
935 if ( $e->{id} ) {
936 my $cnt = $retry_cnt + 1;
937 my $msg = "Error: chunk $e->{id} failed";
938
939 if ( defined $mce->{init_relay} ) {
940 print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
941 if ( $retry_cnt < $mce->{max_retries} );
942 }
943 else {
944 ( $retry_cnt < $mce->{max_retries} )
945 ? print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
946 : print {*STDERR} "$msg\n";
947 }
948
949 $mce->restart_worker;
950 }
951 }
952
953 We let MCE handle on_post_exit automatically below, which is
954 essentially the same code shown above. For max_retries to work, the
955 worker must die, abnormally included, or call MCE->exit. Notice that we
956 pass the chunk_id value for the 3rd argument to MCE->exit (defaults to
957 chunk_id if omitted since MCE 1.844).
958
959 ## max_retries demonstration
960
961 use strict;
962 use warnings;
963
964 use MCE;
965
966 sub user_func {
967 my ( $mce, $chunk_ref, $chunk_id ) = @_;
968
969 # die "Died : chunk_id = 3\n" if $chunk_id == 3;
970 MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
971
972 print "$chunk_id\n";
973 }
974
975 my $mce = MCE->new(
976 max_workers => 1,
977 max_retries => 2,
978 user_func => \&user_func,
979 )->spawn;
980
981 my $input_data = [ 0..7 ];
982
983 $mce->process( { chunk_size => 1 }, $input_data );
984 $mce->shutdown;
985
986 -- Output
987
988 1
989 2
990 Error: chunk 3 failed, retrying chunk attempt # 1
991 Error: chunk 3 failed, retrying chunk attempt # 2
992 Error: chunk 3 failed
993 4
994 5
995 6
996 7
997 8
998
999 Orderly output with max_retries is possible since MCE 1.844. Below,
1000 chunk 3 succeeds whereas chunk 5 fails due to exceeding the number of
1001 retries. Be sure to call MCE::relay inside "user_func" and near the end
1002 of the block.
1003
1004 ## max_retries demonstration with init_relay
1005
1006 use strict;
1007 use warnings;
1008
1009 use MCE;
1010 use MCE::Shared;
1011
1012 tie my $retries1, 'MCE::Shared', 0;
1013 tie my $retries2, 'MCE::Shared', 0;
1014
1015 MCE->new(
1016 max_workers => 4,
1017 input_data => [ 1..7 ],
1018 chunk_size => 1,
1019
1020 max_retries => 2,
1021 init_relay => 0,
1022
1023 user_func => sub {
1024 if ( MCE->chunk_id == 3 ) {
1025 MCE->exit if ++$retries1 <= 2;
1026 }
1027 if ( MCE->chunk_id == 5 ) {
1028 MCE->exit if ++$retries2 <= 3;
1029 }
1030 MCE::relay {
1031 $_ += 1;
1032 print MCE->chunk_id, "\n";
1033 };
1034 }
1035 )->run;
1036
1037 print "final: ", MCE::relay_final(), "\n";
1038
1039 -- Output
1040
1041 1
1042 2
1043 Error: chunk 3 failed, retrying chunk attempt # 1
1044 Error: chunk 5 failed, retrying chunk attempt # 1
1045 Error: chunk 3 failed, retrying chunk attempt # 2
1046 Error: chunk 5 failed, retrying chunk attempt # 2
1047 3
1048 4
1049 Error: chunk 5 failed
1050 6
1051 7
1052 final: 6
1053
1054 SYNTAX for USER_BEGIN and USER_END
1055 The user_begin and user_end options, if specified, behave similarly to
1056 awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
1057 called once per worker during each run.
1058
1059 MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
1060
1061 sub user_begin { ## Called once at the beginning
1062 my ($mce, $task_id, $task_name) = @_;
1063 $mce->{wk_total_rows} = 0;
1064 }
1065
1066 sub user_func { ## Called while processing
1067 my $mce = shift;
1068 $mce->{wk_total_rows} += 1;
1069 }
1070
1071 sub user_end { ## Called once at the end
1072 my ($mce, $task_id, $task_name) = @_;
1073 printf "## %d: Processed %d rows\n",
1074 MCE->wid, $mce->{wk_total_rows};
1075 }
1076
1077 my $mce = MCE->new(
1078 user_begin => \&user_begin,
1079 user_func => \&user_func,
1080 user_end => \&user_end
1081 );
1082
1083 $mce->run;
1084
1085 SYNTAX for USER_FUNC with USE_SLURPIO => 0
1086 When processing input data, MCE can pass an array of rows or a slurped
1087 chunk. Below, a reference to an array containing the chunk data is
1088 processed.
1089
1090 e.g. $chunk_ref = [ record1, record2, record3, ... ]
1091
1092 sub user_func {
1093
1094 my ($mce, $chunk_ref, $chunk_id) = @_;
1095
1096 foreach my $row ( @{ $chunk_ref } ) {
1097 $mce->{wk_total_rows} += 1;
1098 print $row;
1099 }
1100 }
1101
1102 my $mce = MCE->new(
1103 chunk_size => 100,
1104 input_data => "/path/to/file",
1105 user_func => \&user_func,
1106 use_slurpio => 0
1107 );
1108
1109 $mce->run;
1110
1111 SYNTAX for USER_FUNC with USE_SLURPIO => 1
1112 Here, a reference to a scalar containing the raw chunk data is
1113 processed.
1114
1115 sub user_func {
1116
1117 my ($mce, $chunk_ref, $chunk_id) = @_;
1118
1119 my $count = () = $$chunk_ref =~ /abc/;
1120 }
1121
1122 my $mce = MCE->new(
1123 chunk_size => 16000,
1124 input_data => "/path/to/file",
1125 user_func => \&user_func,
1126 use_slurpio => 1
1127 );
1128
1129 $mce->run;
1130
1131 SYNTAX for USER_ERROR and USER_OUTPUT
1132 Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1133 and MCE->say can be intercepted by specifying the user_error and
1134 user_output options. MCE on receiving output will forward to user_error
1135 or user_output in a serialized fashion.
1136
1137 Handy when wanting to filter, modify, and/or direct the output
1138 elsewhere.
1139
1140 sub user_error { ## Redirect STDERR to STDOUT
1141 my $error = shift;
1142 print {*STDOUT} $error;
1143 }
1144
1145 sub user_output { ## Redirect STDOUT to STDERR
1146 my $output = shift;
1147 print {*STDERR} $output;
1148 }
1149
1150 sub user_func {
1151 my ($mce, $chunk_ref, $chunk_id) = @_;
1152 my $count = 0;
1153
1154 foreach my $row ( @{ $chunk_ref } ) {
1155 MCE->print($row);
1156 $count += 1;
1157 }
1158
1159 MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1160 }
1161
1162 my $mce = MCE->new(
1163 chunk_size => 1000,
1164 input_data => "/path/to/file",
1165 user_error => \&user_error,
1166 user_output => \&user_output,
1167 user_func => \&user_func
1168 );
1169
1170 $mce->run;
1171
1172 SYNTAX for USER_TASKS and TASK_END
1173 This option takes an array of tasks. Each task allows up to 17 options.
1174 The init_relay, input_data, RS, and use_slurpio options may be defined
1175 inside the first task or at the top level, otherwise ignored under
1176 other sub-tasks.
1177
1178 max_workers, chunk_size, input_data, interval, sequence,
1179 bounds_only, user_args, user_begin, user_end, user_func,
1180 gather, task_end, task_name, use_slurpio, use_threads,
1181 init_relay, RS
1182
1183 Sequence and chunk_size were added in 1.3. User_args was introduced in
1184 1.4. Name and input_data are new options allowed in 1.5. In addition,
1185 one can specify task_end at the top level. Task_end also receives 2
1186 additional arguments $task_id and $task_name (shown below).
1187
1188 Options not specified here will default to the same option specified at
1189 the top level. The task_end option is called by the manager process
1190 when all workers for that sub-task have completed processing.
1191
1192 Forking and threading can be intermixed among tasks unless running
1193 Cygwin. The run method will continue running until all workers have
1194 completed processing.
1195
1196 use threads;
1197 use threads::shared;
1198
1199 use MCE;
1200
1201 sub parallel_task1 { sleep 2; }
1202 sub parallel_task2 { sleep 1; }
1203
1204 my $mce = MCE->new(
1205
1206 task_end => sub {
1207 my ($mce, $task_id, $task_name) = @_;
1208 print "Task [$task_id -- $task_name] completed processing\n";
1209 },
1210
1211 user_tasks => [{
1212 task_name => 'foo',
1213 max_workers => 2,
1214 user_func => \¶llel_task1,
1215 use_threads => 0 ## Not using threads
1216
1217 },{
1218 task_name => 'bar',
1219 max_workers => 4,
1220 user_func => \¶llel_task2,
1221 use_threads => 1 ## Yes, threads
1222
1223 }]
1224 );
1225
1226 $mce->run;
1227
1228 -- Output
1229
1230 Task [1 -- bar] completed processing
1231 Task [0 -- foo] completed processing
1232
1234 Beginning with MCE 1.5, the input scalar $_ is localized prior to
1235 calling user_func for input_data and sequence of numbers. The following
1236 applies.
1237
1238 use_slurpio => 1
1239 $_ is a reference to the buffer e.g. $_ = \$_buffer;
1240 $_ is a reference regardless of whether chunk_size is 1 or greater
1241
1242 user_func => sub {
1243 # my ($mce, $chunk_ref, $chunk_id) = @_;
1244 print ${ $_ }; ## $_ is same as $chunk_ref
1245 }
1246
1247 chunk_size is greater than 1, use_slurpio => 0
1248 $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1249 $_ is same as $chunk_ref or $_[CHUNK]
1250
1251 user_func => sub {
1252 # my ($mce, $chunk_ref, $chunk_id) = @_;
1253 for my $row ( @{ $_ } ) {
1254 print $row, "\n";
1255 }
1256 }
1257
1258 use MCE const => 1;
1259
1260 user_func => sub {
1261 # my ($mce, $chunk_ref, $chunk_id) = @_;
1262 for my $row ( @{ $_[CHUNK] } ) {
1263 print $row, "\n";
1264 }
1265 }
1266
1267 chunk_size equals 1, use_slurpio => 0
1268 $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1269
1270 ## Note that $_ and $chunk_ref are not the same below.
1271 ## $chunk_ref is a reference to an array.
1272
1273 user_func => sub {
1274 # my ($mce, $chunk_ref, $chunk_id) = @_;
1275 print $_, "\n; ## Same as $chunk_ref->[0];
1276 }
1277
1278 $mce->foreach("/path/to/file", sub {
1279 # my ($mce, $chunk_ref, $chunk_id) = @_;
1280 print $_; ## Same as $chunk_ref->[0];
1281 });
1282
1283 ## However, that is not the case for the forseq method.
1284 ## Both $_ and $n_seq are the same when chunk_size => 1.
1285
1286 $mce->forseq([ 1, 9 ], sub {
1287 # my ($mce, $n_seq, $chunk_id) = @_;
1288 print $_, "\n"; ## Same as $n_seq
1289 });
1290
1291 Sequence can also be specified using an array reference. The below
1292 is the same as the example afterwards.
1293
1294 $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1295
1296 The code block receives an array containing the next 5 sequences.
1297 Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1298 to an array, same as $_, due to chunk_size being greater than 1.
1299
1300 $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1301 # my ($mce, $n_seq, $chunk_id) = @_;
1302 my @result;
1303 for my $n ( @{ $_ } ) {
1304 ... do work, append to result for 5
1305 }
1306 ... do something with result afterwards
1307 });
1308
1310 The methods listed below are callable by the main process and workers.
1311
1312 MCE->abort ( void )
1313 $mce->abort ( void )
1314 The 'abort' method is applicable when processing input_data only. This
1315 causes all workers to abort after processing the current chunk.
1316
1317 Workers write the next offset position to the queue socket for the next
1318 available worker. In essence, the 'abort' method writes the last offset
1319 position. Workers, on requesting the next offset position, will think
1320 the end of input_data has been reached and leave the chunking loop.
1321
1322 MCE->abort;
1323 $mce->abort;
1324
1325 MCE->chunk_id ( void )
1326 $mce->chunk_id ( void )
1327 Returns the chunk_id for the current chunk. The value starts at 1.
1328 Chunking applies to input_data or sequence. The value is 0 for the
1329 manager process.
1330
1331 my $chunk_id = MCE->chunk_id;
1332 my $chunk_id = $mce->chunk_id;
1333
1334 MCE->chunk_size ( void )
1335 $mce->chunk_size ( void )
1336 Getter method for chunk_size used by MCE.
1337
1338 my $chunk_size = MCE->chunk_size;
1339 my $chunk_size = $mce->chunk_size;
1340
1341 MCE->do ( 'callback_func' [, $arg1, ... ] )
1342 $mce->do ( 'callback_func' [, $arg1, ... ] )
1343 MCE serializes data transfers from a worker process via helper
1344 functions do & sendto to the manager process. The callback function can
1345 optionally return a reply. Support for calling by the manager process
1346 was enabled in MCE 1.839.
1347
1348 [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1349
1350 Passing args to a callback function using references & scalar.
1351
1352 sub callback {
1353 my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1354 ...
1355 }
1356
1357 MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1358 MCE->do('callback', \@a, \%h, \$s, 'foo');
1359
1360 MCE knows if wanting a void, list, hash, or a scalar return value.
1361
1362 MCE->do('callback' [, $arg1, ... ]);
1363
1364 my @array = MCE->do('callback' [, $arg1, ... ]);
1365 my %hash = MCE->do('callback' [, $arg1, ... ]);
1366 my $scalar = MCE->do('callback' [, $arg1, ... ]);
1367
1368 MCE->freeze ( $object_ref )
1369 $mce->freeze ( $object_ref )
1370 Calls the internal freeze method to serialize an object. The default
1371 serialization routines are handled by Sereal if available or Storable.
1372
1373 my $frozen = MCE->freeze([ 0, 2, 4 ]);
1374 my $frozen = $mce->freeze([ 0, 2, 4 ]);
1375
1376 MCE->max_retries ( void )
1377 $mce->max_retries ( void )
1378 Getter method for max_retries used by MCE.
1379
1380 my $max_retries = MCE->max_retries;
1381 my $max_retries = $mce->max_retries;
1382
1383 MCE->max_workers ( void )
1384 $mce->max_workers ( void )
1385 Getter method for max_workers used by MCE.
1386
1387 my $max_workers = MCE->max_workers;
1388 my $max_workers = $mce->max_workers;
1389
1390 MCE->pid ( void )
1391 $mce->pid ( void )
1392 Returns the Process ID. Threads have thread ID attached to the value.
1393
1394 my $pid = MCE->pid; ## 16180 (pid) ; 16180.2 (pid.tid)
1395 my $pid = $mce->pid;
1396
1397 MCE->printf ( $format, $list [, ... ] )
1398 MCE->print ( $list [, ... ] )
1399 MCE->say ( $list [, ... ] )
1400 $mce->printf ( $format, $list [, ... ] )
1401 $mce->print ( $list [, ... ] )
1402 $mce->say ( $list [, ... ] )
1403 Use the printf, print, and say methods when wanting to serialize output
1404 among workers and the manager process. These are sugar syntax for the
1405 sendto method. These behave similar to the native subroutines in Perl
1406 with the exception that barewords must be passed as a reference and
1407 require the comma after it including file handles.
1408
1409 Say is like print, but implicitly appends a newline.
1410
1411 MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1412 MCE->printf($fh, "%s: %d\n", $name, $age);
1413 MCE->printf("%s: %d\n", $name, $age);
1414
1415 MCE->print(\*STDERR, "$error_msg\n");
1416 MCE->print($fh, $log_msg."\n");
1417 MCE->print("$output_msg\n");
1418
1419 MCE->say(\*STDERR, $error_msg);
1420 MCE->say($fh, $log_msg);
1421 MCE->say($output_msg);
1422
1423 Caveat: Use the following syntax when passing a reference not a glob or
1424 file handle. Otherwise, MCE will error indicating the first argument is
1425 not a glob reference.
1426
1427 MCE->print(\*STDOUT, \@array, "\n");
1428 MCE->print("", \@array, "\n"); ## ok
1429
1430 Sending to "IO::All" { File, Pipe, STDIO } is supported since MCE
1431 1.845.
1432
1433 use IO::All;
1434
1435 my $out = io->stdout;
1436 my $err = io->stderr;
1437
1438 MCE->printf($out, "%s\n", "sent to stdout");
1439 MCE->printf($err, "%s\n", "sent to stderr");
1440
1441 MCE->print($out, "sent to stdout\n");
1442 MCE->print($err, "sent to stderr\n");
1443
1444 MCE->say($out, "sent to stdout");
1445 MCE->say($err, "sent to stderr");
1446
1447 MCE->sess_dir ( void )
1448 $mce->sess_dir ( void )
1449 Returns the session directory used by the MCE instance. This is defined
1450 during spawning and removed during shutdown.
1451
1452 my $sess_dir = MCE->sess_dir;
1453 my $sess_dir = $mce->sess_dir;
1454
1455 MCE->task_id ( void )
1456 $mce->task_id ( void )
1457 Returns the task ID. This applies to the user_tasks option (starts at
1458 0).
1459
1460 my $task_id = MCE->task_id;
1461 my $task_id = $mce->task_id;
1462
1463 MCE->task_name ( void )
1464 $mce->task_name ( void )
1465 Returns the task_name value specified via the task_name option when
1466 configuring MCE.
1467
1468 my $task_name = MCE->task_name;
1469 my $task_name = $mce->task_name;
1470
1471 MCE->task_wid ( void )
1472 $mce->task_wid ( void )
1473 Returns the task worker ID (applies to user_tasks). The value starts at
1474 1 per each task configured within user_tasks. The value is 0 for the
1475 manager process.
1476
1477 my $task_wid = MCE->task_wid;
1478 my $task_wid = $mce->task_wid;
1479
1480 MCE->thaw ( $frozen )
1481 $mce->thaw ( $frozen )
1482 Calls the internal thaw method to un-serialize the frozen object.
1483
1484 my $object_ref = MCE->thaw($frozen);
1485 my $object_ref = $mce->thaw($frozen);
1486
1487 MCE->tmp_dir ( void )
1488 $mce->tmp_dir ( void )
1489 Returns the temporary directory used by MCE.
1490
1491 my $tmp_dir = MCE->tmp_dir;
1492 my $tmp_dir = $mce->tmp_dir;
1493
1494 MCE->user_args ( void )
1495 $mce->user_args ( void )
1496 Returns the arguments specified via the user_args option.
1497
1498 my ($arg1, $arg2, $arg3) = MCE->user_args;
1499 my ($arg1, $arg2, $arg3) = $mce->user_args;
1500
1501 MCE->wid ( void )
1502 $mce->wid ( void )
1503 Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1504 is 0 for the manager process.
1505
1506 my $wid = MCE->wid;
1507 my $wid = $mce->wid;
1508
1510 Methods listed below are callable by the main process only.
1511
1512 MCE->forchunk ( $input_data [, { options } ], sub { ... } )
1513 MCE->foreach ( $input_data [, { options } ], sub { ... } )
1514 MCE->forseq ( $sequence_spec [, { options } ], sub { ... } )
1515 $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1516 $mce->foreach ( $input_data [, { options } ], sub { ... } )
1517 $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1518 Forchunk, foreach, and forseq are sugar methods and described in
1519 MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1520
1521 MCE->process ( $input_data [, { options } ] )
1522 $mce->process ( $input_data [, { options } ] )
1523 The process method will spawn workers automatically if not already
1524 spawned. It will set input_data => $input_data. It calls run(0) to not
1525 auto-shutdown workers. Specifying options is optional.
1526
1527 Allowable options { key => value, ... } are:
1528
1529 chunk_size input_data job_delay spawn_delay submit_delay
1530 flush_file flush_stderr flush_stdout stderr_file stdout_file
1531 on_post_exit on_post_run sequence user_args user_begin user_end
1532 user_func user_error user_output use_slurpio RS
1533
1534 Options remain persistent going forward unless changed. Setting
1535 user_begin, user_end, or user_func will cause already spawned workers
1536 to shut down and re-spawn automatically. Therefore, define these during
1537 instantiation.
1538
1539 The below will cause workers to re-spawn after running.
1540
1541 my $mce = MCE->new( max_workers => 'auto' );
1542
1543 $mce->process( {
1544 user_begin => sub { ## connect to DB },
1545 user_func => sub { ## process each row },
1546 user_end => sub { ## close handle to DB },
1547 }, \@input_data );
1548
1549 $mce->process( {
1550 user_begin => sub { ## connect to DB },
1551 user_func => sub { ## process each file },
1552 user_end => sub { ## close handle to DB },
1553 }, "/list/of/files" );
1554
1555 Do the following if wanting workers to persist between jobs.
1556
1557 use MCE max_workers => 'auto';
1558
1559 my $mce = MCE->new(
1560 user_begin => sub { ## connect to DB },
1561 user_func => sub { ## process each chunk or row or host },
1562 user_end => sub { ## close handle to DB },
1563 );
1564
1565 $mce->spawn; ## Spawn early if desired
1566
1567 $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1568 $mce->process(\@array_of_files_to_grep);
1569 $mce->process("/path/to/host/list");
1570
1571 $mce->process($array_ref);
1572 $mce->process($array_ref, { stdout_file => $output_file });
1573
1574 ## This was not allowed before. Fixed in 1.415.
1575 $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1576 $mce->process({ sequence => [ 10, 90, 2 ] });
1577
1578 $mce->shutdown;
1579
1580 MCE->relay_final ( void )
1581 $mce->relay_final ( void )
1582 The relay methods are described in MCE::Relay. Relay capabilities are
1583 enabled by specifying the "init_relay" MCE option.
1584
1585 MCE->restart_worker ( void )
1586 $mce->restart_worker ( void )
1587 One can restart a worker who has died or exited. The job never ends
1588 below due to restarting each time. Recommended is to call MCE->exit or
1589 $mce->exit instead of the native exit function for better handling,
1590 especially under the Windows environment.
1591
1592 The $e->{wid} argument is no longer necessary starting with the 1.5
1593 release.
1594
1595 Press [ctrl-c] to terminate the script.
1596
1597 my $mce = MCE->new(
1598
1599 on_post_exit => sub {
1600 my ($mce, $e) = @_;
1601 print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1602 # $mce->restart_worker($e->{wid}); ## MCE-1.415 and below
1603 $mce->restart_worker; ## MCE-1.500 and above
1604 },
1605
1606 user_begin => sub {
1607 my ($mce, $task_id, $task_name) = @_;
1608 ## Not interested in die messages going to STDERR,
1609 ## because the die handler calls MCE->exit(255, $_[0]).
1610 close STDERR;
1611 },
1612
1613 user_tasks => [{
1614 max_workers => 5,
1615 user_func => sub {
1616 my ($mce) = @_; sleep MCE->wid;
1617 MCE->exit(3, "exited from " . MCE->wid . "\n");
1618 }
1619 },{
1620 max_workers => 4,
1621 user_func => sub {
1622 my ($mce) = @_; sleep MCE->wid;
1623 die("died from " . MCE->wid . "\n");
1624 }
1625 }]
1626 );
1627
1628 $mce->run;
1629
1630 -- Output
1631
1632 1: PID_85388: status 3: exited from 1
1633 2: PID_85389: status 3: exited from 2
1634 1: PID_85397: status 3: exited from 1
1635 3: PID_85390: status 3: exited from 3
1636 1: PID_85399: status 3: exited from 1
1637 4: PID_85391: status 3: exited from 4
1638 2: PID_85398: status 3: exited from 2
1639 1: PID_85401: status 3: exited from 1
1640 5: PID_85392: status 3: exited from 5
1641 1: PID_85404: status 3: exited from 1
1642 6: PID_85393: status 255: died from 6
1643 3: PID_85400: status 3: exited from 3
1644 2: PID_85403: status 3: exited from 2
1645 1: PID_85406: status 3: exited from 1
1646 7: PID_85394: status 255: died from 7
1647 1: PID_85410: status 3: exited from 1
1648 8: PID_85395: status 255: died from 8
1649 4: PID_85402: status 3: exited from 4
1650 2: PID_85409: status 3: exited from 2
1651 1: PID_85412: status 3: exited from 1
1652 9: PID_85396: status 255: died from 9
1653 3: PID_85408: status 3: exited from 3
1654 1: PID_85416: status 3: exited from 1
1655
1656 ...
1657
1658 MCE->run ( [ $auto_shutdown [, { options } ] ] )
1659 $mce->run ( [ $auto_shutdown [, { options } ] ] )
1660 The run method, by default, spawns workers, processes once, and shuts
1661 down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1662 persist after running (default 1).
1663
1664 Specifying options is optional. Valid options are the same as for the
1665 process method.
1666
1667 my $mce = MCE->new( ... );
1668
1669 ## Disables auto-shutdown
1670 $mce->run(0);
1671
1672 MCE->send ( $data_ref )
1673 $mce->send ( $data_ref )
1674 The 'send' method is useful when wanting to spawn workers early to
1675 minimize memory consumption and afterwards send data individually to
1676 each worker. One cannot send more than the total workers spawned.
1677 Workers store the received data as $mce->{user_data}.
1678
1679 The data which can be sent is restricted to an ARRAY, HASH, or PDL
1680 reference. Workers begin processing immediately after receiving data.
1681 Workers set $mce->{user_data} to undef after processing. One cannot
1682 specify input_data, sequence, or user_tasks when using the "send"
1683 method.
1684
1685 Passing any options e.g. run(0, { options }) is ignored due to workers
1686 running immediately after receiving user data. There is no guarantee to
1687 which worker will receive data first. It depends on which worker is
1688 available awaiting data.
1689
1690 use MCE;
1691
1692 my $mce = MCE->new(
1693 max_workers => 5,
1694
1695 user_func => sub {
1696 my ($mce) = @_;
1697 my $data = $mce->{user_data};
1698 my $first_name = $data->{first_name};
1699 print MCE->wid, ": Hello from $first_name\n";
1700 }
1701 );
1702
1703 $mce->spawn; ## Optional, send will spawn if necessary.
1704
1705 $mce->send( { first_name => "Theresa" } );
1706 $mce->send( { first_name => "Francis" } );
1707 $mce->send( { first_name => "Padre" } );
1708 $mce->send( { first_name => "Anthony" } );
1709
1710 $mce->run; ## Wait for workers to complete processing.
1711
1712 -- Output
1713
1714 2: Hello from Theresa
1715 5: Hello from Anthony
1716 3: Hello from Francis
1717 4: Hello from Padre
1718
1719 MCE->shutdown ( void )
1720 $mce->shutdown ( void )
1721 The run method will automatically spawn workers, run once, and shutdown
1722 workers automatically. Workers persist after running below. Shutdown
1723 may be called as needed or prior to exiting.
1724
1725 my $mce = MCE->new( ... );
1726
1727 $mce->spawn;
1728
1729 $mce->process(\@input_data_1); ## Processing multiple arrays
1730 $mce->process(\@input_data_2);
1731 $mce->process(\@input_data_n);
1732
1733 $mce->shutdown;
1734
1735 $mce->process('input_file_1'); ## Processing multiple files
1736 $mce->process('input_file_2');
1737 $mce->process('input_file_n');
1738
1739 $mce->shutdown;
1740
1741 MCE->spawn ( void )
1742 $mce->spawn ( void )
1743 Workers are normally spawned automatically. The spawn method allows one
1744 to spawn workers early if so desired.
1745
1746 my $mce = MCE->new( ... );
1747
1748 $mce->spawn;
1749
1750 MCE->status ( void )
1751 $mce->status ( void )
1752 The greatest exit status is saved among workers while running. Look at
1753 the on_post_exit or on_post_run options for callback support.
1754
1755 my $mce = MCE->new( ... );
1756
1757 $mce->run;
1758
1759 my $exit_status = $mce->status;
1760
1762 Methods listed below are callable by workers only.
1763
1764 MCE->exit ( [ $status [, $message [, $id ] ] ] )
1765 $mce->exit ( [ $status [, $message [, $id ] ] ] )
1766 A worker exits from MCE entirely. $id (optional) can be used for
1767 passing the primary key or a string along with the message. Look at the
1768 on_post_exit or on_post_run options for callback support.
1769
1770 MCE->exit; ## default 0
1771 MCE->exit(1);
1772 MCE->exit(2, 'chunk failed', $chunk_id);
1773 MCE->exit(0, 'msg_foo', 'id_1000');
1774
1775 MCE->gather ( $arg1, [, $arg2, ... ] )
1776 $mce->gather ( $arg1, [, $arg2, ... ] )
1777 A worker can submit data to the location specified via the gather
1778 option by calling this method. See MCE::Flow and MCE::Loop for
1779 additional use-case.
1780
1781 use MCE;
1782
1783 my @hosts = qw(
1784 hosta hostb hostc hostd hoste
1785 );
1786
1787 my $mce = MCE->new(
1788 chunk_size => 1, max_workers => 3,
1789
1790 user_func => sub {
1791 # my ($mce, $chunk_ref, $chunk_id) = @_;
1792 my ($output, $error, $status); my $host = $_;
1793
1794 ## Do something with $host;
1795 $output = "Worker ". MCE->wid .": Hello from $host";
1796
1797 if (MCE->chunk_id % 3 == 0) {
1798 ## Simulating an error condition
1799 local $? = 1; $status = $?;
1800 $error = "Error from $host"
1801 }
1802 else {
1803 $status = 0;
1804 }
1805
1806 ## Ensure unique keys (key, value) when gathering to a
1807 ## hash.
1808 MCE->gather("$host.out", $output, "$host.sta", $status);
1809 MCE->gather("$host.err", $error) if (defined $error);
1810 }
1811 );
1812
1813 my %h; $mce->process(\@hosts, { gather => \%h });
1814
1815 foreach my $host (@hosts) {
1816 print $h{"$host.out"}, "\n";
1817 print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1818 print "Exit status: ", $h{"$host.sta"}, "\n\n";
1819 }
1820
1821 -- Output
1822
1823 Worker 2: Hello from hosta
1824 Exit status: 0
1825
1826 Worker 1: Hello from hostb
1827 Exit status: 0
1828
1829 Worker 3: Hello from hostc
1830 Error from hostc
1831 Exit status: 1
1832
1833 Worker 2: Hello from hostd
1834 Exit status: 0
1835
1836 Worker 1: Hello from hoste
1837 Exit status: 0
1838
1839 MCE->last ( void )
1840 $mce->last ( void )
1841 Worker leaves the chunking loop or user_func block immediately.
1842 Callable from inside foreach, forchunk, forseq, and user_func.
1843
1844 use MCE;
1845
1846 my $mce = MCE->new(
1847 max_workers => 5
1848 );
1849
1850 my @list = (1 .. 80);
1851
1852 $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1853
1854 my ($mce, $chunk_ref, $chunk_id) = @_;
1855 MCE->last if ($chunk_id > 4);
1856
1857 my @output = ();
1858
1859 foreach my $rec ( @{ $chunk_ref } ) {
1860 push @output, $rec, "\n";
1861 }
1862
1863 MCE->print(@output);
1864 });
1865
1866 -- Output (each chunk above consists of 2 elements)
1867
1868 3
1869 4
1870 1
1871 2
1872 7
1873 8
1874 5
1875 6
1876
1877 MCE->next ( void )
1878 $mce->next ( void )
1879 Worker starts the next iteration of the chunking loop. Callable from
1880 inside foreach, forchunk, forseq, and user_func.
1881
1882 use MCE;
1883
1884 my $mce = MCE->new(
1885 max_workers => 5
1886 );
1887
1888 my @list = (1 .. 80);
1889
1890 $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1891
1892 my ($mce, $chunk_ref, $chunk_id) = @_;
1893 MCE->next if ($chunk_id < 20);
1894
1895 my @output = ();
1896
1897 foreach my $rec ( @{ $chunk_ref } ) {
1898 push @output, $rec, "\n";
1899 }
1900
1901 MCE->print(@output);
1902 });
1903
1904 -- Output (each chunk above consists of 4 elements)
1905
1906 77
1907 78
1908 79
1909 80
1910
1911 MCE::relay { code }
1912 MCE->relay ( sub { code } )
1913 MCE->relay_recv ( void )
1914 $mce->relay ( sub { code } )
1915 $mce->relay_recv ( void )
1916 The relay methods are described in MCE::Relay. Relay capabilities are
1917 enabled by specifying the "init_relay" MCE option.
1918
1919 MCE->sendto ( $to, $arg1, ... )
1920 $mce->sendto ( $to, $arg1, ... )
1921 The sendto method is called by workers for serializing data to standard
1922 output, standard error, or end of file. The action is done by the
1923 manager process.
1924
1925 Release 1.00x supported 1 data argument, not more.
1926
1927 MCE->sendto('file', \@array, '/path/to/file');
1928 MCE->sendto('file', \$scalar, '/path/to/file');
1929 MCE->sendto('file', $scalar, '/path/to/file');
1930
1931 MCE->sendto('STDERR', \@array);
1932 MCE->sendto('STDERR', \$scalar);
1933 MCE->sendto('STDERR', $scalar);
1934
1935 MCE->sendto('STDOUT', \@array);
1936 MCE->sendto('STDOUT', \$scalar);
1937 MCE->sendto('STDOUT', $scalar);
1938
1939 Release 1.100 added the ability to pass multiple arguments. Notice the
1940 syntax change for sending to a file. Passing a reference to an array is
1941 no longer necessary.
1942
1943 MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1944 MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1945 MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1946
1947 MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1948
1949 To retain 1.00x compatibility, sendto outputs the content when a single
1950 data reference is specified. Otherwise, the reference for \@array or
1951 \$scalar is shown in 1.500, not the content.
1952
1953 MCE->sendto('STDERR', \@array); ## 1.00x behavior, content
1954 MCE->sendto('STDOUT', \$scalar);
1955 MCE->sendto('file:/path/to/file', \@array);
1956
1957 ## Output matches the print statement
1958
1959 MCE->sendto(\*STDERR, \@array); ## 1.500 behavior, reference
1960 MCE->sendto(\*STDOUT, \$scalar);
1961 MCE->sendto($fh, \@array);
1962
1963 MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1964 print {*STDOUT} \@array, "\n", \$scalar, "\n";
1965
1966 MCE 1.500 added support for sending to a glob reference, file
1967 descriptor, and file handle.
1968
1969 MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1970 MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1971 MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1972
1973 MCE->sync ( void )
1974 $mce->sync ( void )
1975 A barrier sync operation means any worker must stop at this point until
1976 all workers reach this barrier. Barrier syncing is useful for many
1977 computer algorithms.
1978
1979 Barrier synchronization is supported for task 0 only or omitting
1980 user_tasks. All workers assigned task_id 0 must call sync whenever
1981 barrier syncing.
1982
1983 use MCE;
1984
1985 sub user_func {
1986
1987 my ($mce) = @_;
1988 my $wid = MCE->wid;
1989
1990 MCE->sendto("STDOUT", "a: $wid\n"); ## MCE 1.0+
1991 MCE->sync;
1992
1993 MCE->sendto(\*STDOUT, "b: $wid\n"); ## MCE 1.5+
1994 MCE->sync;
1995
1996 MCE->print("c: $wid\n"); ## MCE 1.5+
1997 MCE->sync;
1998
1999 return;
2000 }
2001
2002 my $mce = MCE->new(
2003 max_workers => 4, user_func => \&user_func
2004 )->run;
2005
2006 -- Output (without barrier synchronization)
2007
2008 a: 1
2009 a: 2
2010 b: 1
2011 b: 2
2012 c: 1
2013 c: 2
2014 a: 3
2015 b: 3
2016 c: 3
2017 a: 4
2018 b: 4
2019 c: 4
2020
2021 -- Output (with barrier synchronization)
2022
2023 a: 1
2024 a: 2
2025 a: 4
2026 a: 3
2027 b: 2
2028 b: 1
2029 b: 3
2030 b: 4
2031 c: 1
2032 c: 4
2033 c: 2
2034 c: 3
2035
2036 Consider the following example. The MCE->sync operation is done inside
2037 a loop along with MCE->do. A stall may occur for workers calling sync
2038 the 2nd or 3rd time while other workers are sending results via MCE->do
2039 or MCE->sendto.
2040
2041 It requires another semaphore lock in MCE to solve this which was not
2042 done in order to keep resources low. Therefore, please keep this in
2043 mind when mixing MCE->sync with MCE->do or output serialization methods
2044 inside a loop.
2045
2046 sub user_func {
2047
2048 my ($mce) = @_;
2049 my @result;
2050
2051 for (1 .. 3) {
2052 ... compute algorithm ...
2053
2054 MCE->sync;
2055
2056 ... compute algorithm ...
2057
2058 MCE->sync;
2059
2060 MCE->do('aggregate_result', \@result); ## or MCE->sendto
2061
2062 MCE->sync; ## The sync operation is also needed here to
2063 ## prevent MCE from stalling.
2064 }
2065 }
2066
2067 MCE->yield ( void )
2068 $mce->yield ( void )
2069 There may be on occasion when the MCE driven app is too fast. The
2070 interval option combined with the yield method, both introduced with
2071 MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
2072 the design.
2073
2074 A use case is an app configured with 100 workers running on a 24
2075 logical way box. Data is polled from a database containing over 2.5
2076 million rows. Workers chunk away at 300 rows per chunk performing SNMP
2077 gets (300 sockets per worker) polling 25 metrics from each device. With
2078 this scenario, the load on the box may rise beyond 90+. In addition,
2079 IP_Tables may reach its contention point causing the entire application
2080 to fail.
2081
2082 The scenario above is solved by simply having workers yield among
2083 themselves in a synchronized fashion. A delay of 0.007 seconds between
2084 intervals is all that's needed. The load on the box will hover between
2085 23 ~ 27 for the duration of the run. Polling completes in under 17
2086 minutes time. This is quite fast considering the app polls 62.5 million
2087 metrics combined. The math equates to 3,676,470 per minute or rather
2088 61,275 per second from a single box.
2089
2090 ## Both max_nodes and node_id are optional (default 1).
2091
2092 interval => {
2093 delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
2094 }
2095
2096 A 4 node setup can poll 10 million devices without the additional
2097 overhead of a distribution agent. The difference between the 4 nodes
2098 are simply node_id and the where clause used to query the database. The
2099 mac addresses are random such that the data divides equally to any
2100 power of 2. The distribution key lies in the mac address itself. In
2101 fact, the 2nd character from the right is sufficient for maximizing on
2102 the power of randomness for equal distribution.
2103
2104 Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
2105 Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
2106 Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
2107 Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
2108
2109 Below, the user_tasks is configured to simulate 4 nodes. This
2110 demonstration uses 2 workers to minimize the output size. Input is from
2111 the sequence option.
2112
2113 use Time::HiRes qw(time);
2114 use MCE;
2115
2116 my $d = shift || 0.1;
2117
2118 local $| = 1;
2119
2120 sub create_task {
2121
2122 my ($node_id) = @_;
2123
2124 my $seq_size = 6;
2125 my $seq_start = ($node_id - 1) * $seq_size + 1;
2126 my $seq_end = $seq_start + $seq_size - 1;
2127
2128 return {
2129 max_workers => 2, sequence => [ $seq_start, $seq_end ],
2130 interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2131 };
2132 }
2133
2134 sub user_begin {
2135
2136 my ($mce, $task_id, $task_name) = @_;
2137
2138 ## The yield method causes this worker to wait for its next time
2139 ## interval slot before running. Yield has no effect without the
2140 ## 'interval' option.
2141
2142 ## Yielding is beneficial inside a user_begin block. A use case
2143 ## is staggering database connections among workers in order
2144 ## to not impact the DB server.
2145
2146 MCE->yield;
2147
2148 MCE->printf(
2149 "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2150 MCE->task_id + 1, time, MCE->task_wid, ''
2151 );
2152
2153 return;
2154 }
2155
2156 {
2157 my $prev_time = time;
2158
2159 sub user_func {
2160
2161 my ($mce, $seq_n, $chunk_id) = @_;
2162
2163 ## Yield simply waits for the next time interval.
2164 MCE->yield;
2165
2166 ## Calculate how long this worker has waited.
2167 my $curr_time = time;
2168 my $time_waited = $curr_time - $prev_time;
2169
2170 $prev_time = $curr_time;
2171
2172 MCE->printf(
2173 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2174 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2175 );
2176
2177 return;
2178 }
2179 }
2180
2181 ## Simulate a 4 node environment passing node_id to create_task.
2182
2183 print "Node_ID Current_Time Worker_ID Time_Waited Comment\n";
2184
2185 MCE->new(
2186 user_begin => \&user_begin,
2187 user_func => \&user_func,
2188
2189 user_tasks => [
2190 create_task(1),
2191 create_task(2),
2192 create_task(3),
2193 create_task(4)
2194 ]
2195
2196 )->run;
2197
2198 -- Output (notice Current_Time below, stays 0.10 apart)
2199
2200 Node_ID Current_Time Worker_ID Time_Waited Comment
2201 Node 1: 1374807976.74634 -- Worker 1: -- Started
2202 Node 2: 1374807976.84634 -- Worker 1: -- Started
2203 Node 3: 1374807976.94638 -- Worker 1: -- Started
2204 Node 4: 1374807977.04639 -- Worker 1: -- Started
2205 Node 1: 1374807977.14634 -- Worker 2: -- Started
2206 Node 2: 1374807977.24640 -- Worker 2: -- Started
2207 Node 3: 1374807977.34649 -- Worker 2: -- Started
2208 Node 4: 1374807977.44657 -- Worker 2: -- Started
2209 Node 1: 1374807977.54636 -- Worker 1: 0.90037 -- Seq_N 1
2210 Node 2: 1374807977.64638 -- Worker 1: 1.00040 -- Seq_N 7
2211 Node 3: 1374807977.74642 -- Worker 1: 1.10043 -- Seq_N 13
2212 Node 4: 1374807977.84643 -- Worker 1: 1.20045 -- Seq_N 19
2213 Node 1: 1374807977.94636 -- Worker 2: 1.30037 -- Seq_N 2
2214 Node 2: 1374807978.04638 -- Worker 2: 1.40040 -- Seq_N 8
2215 Node 3: 1374807978.14641 -- Worker 2: 1.50042 -- Seq_N 14
2216 Node 4: 1374807978.24644 -- Worker 2: 1.60045 -- Seq_N 20
2217 Node 1: 1374807978.34628 -- Worker 1: 0.79996 -- Seq_N 3
2218 Node 2: 1374807978.44631 -- Worker 1: 0.79996 -- Seq_N 9
2219 Node 3: 1374807978.54634 -- Worker 1: 0.79996 -- Seq_N 15
2220 Node 4: 1374807978.64636 -- Worker 1: 0.79997 -- Seq_N 21
2221 Node 1: 1374807978.74628 -- Worker 2: 0.79996 -- Seq_N 4
2222 Node 2: 1374807978.84632 -- Worker 2: 0.79997 -- Seq_N 10
2223 Node 3: 1374807978.94634 -- Worker 2: 0.79996 -- Seq_N 16
2224 Node 4: 1374807979.04636 -- Worker 2: 0.79996 -- Seq_N 22
2225 Node 1: 1374807979.14628 -- Worker 1: 0.80001 -- Seq_N 5
2226 Node 2: 1374807979.24631 -- Worker 1: 0.80000 -- Seq_N 11
2227 Node 3: 1374807979.34634 -- Worker 1: 0.80001 -- Seq_N 17
2228 Node 4: 1374807979.44636 -- Worker 1: 0.80000 -- Seq_N 23
2229 Node 1: 1374807979.54628 -- Worker 2: 0.80000 -- Seq_N 6
2230 Node 2: 1374807979.64631 -- Worker 2: 0.80000 -- Seq_N 12
2231 Node 3: 1374807979.74633 -- Worker 2: 0.80000 -- Seq_N 18
2232 Node 4: 1374807979.84636 -- Worker 2: 0.80000 -- Seq_N 24
2233
2234 The interval.pl example above is included with MCE.
2235
2237 The "progress" option takes a code block for receiving info on the
2238 progress made while processing input data; e.g. "input_data" or
2239 "sequence". To make this work, one provides the "progress" option a
2240 closure block like so, passing along the size of the input_data; e.g
2241 "scalar @array" or "-s /path/to/file".
2242
2243 Current API available since 1.813.
2244
2245 A worker, upon completing processing its chunk, notifies the manager-
2246 process with the size of the chunk. That could be the number of rows or
2247 literally the size of the chunk when processing an input file. The
2248 manager-process accumulates the size before calling the code block
2249 associated with the "progress" option.
2250
2251 When running many tasks simultaneously, via "user_tasks", the call is
2252 initiated by workers at level 0 only or rather the first task, not
2253 shown here.
2254
2255 use Time::HiRes 'sleep';
2256 use MCE;
2257
2258 sub make_progress {
2259 my ($total_size) = @_;
2260 return sub {
2261 my ($completed_size) = @_;
2262 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2263 };
2264 }
2265
2266 my @input = (1..150);
2267
2268 MCE->new(
2269 chunk_size => 10,
2270 max_workers => 4,
2271 input_data => \@input,
2272 progress => make_progress( scalar @input ),
2273 user_func => sub { sleep 1.5 }
2274 )->run();
2275
2276 -- Output
2277
2278 6.7%
2279 13.3%
2280 20.0%
2281 26.7%
2282 33.3%
2283 40.0%
2284 46.7%
2285 53.3%
2286 60.0%
2287 66.7%
2288 73.3%
2289 80.0%
2290 86.7%
2291 93.3%
2292 100.0%
2293
2294 Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2295 thing, practically.
2296
2297 use Time::HiRes 'sleep';
2298 use ProgressBar::Stack;
2299 use MCE::Flow;
2300
2301 sub make_progress {
2302 my ($total_size) = @_;
2303 init_progress();
2304 return sub {
2305 my ($completed_size) = @_;
2306 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2307 };
2308 }
2309
2310 my @input = (1..150);
2311
2312 MCE::Flow->init(
2313 chunk_size => 10,
2314 max_workers => 4,
2315 progress => make_progress( scalar @input )
2316 );
2317
2318 MCE::Flow->run( sub { sleep 1.5 }, \@input );
2319 MCE::Flow->finish();
2320
2321 print "\n";
2322
2323 -- Output
2324
2325 [################ ] 80.0% ETA: 0:01
2326
2327 For sequence of numbers, using the "sequence" option, one must account
2328 for "step_size", typically set to 1 automatically.
2329
2330 use Time::HiRes 'sleep';
2331 use MCE;
2332
2333 sub make_progress {
2334 my ($total_size) = @_;
2335 return sub {
2336 my ($completed_size) = @_;
2337 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2338 };
2339 }
2340
2341 MCE->new(
2342 chunk_size => 10,
2343 max_workers => 4,
2344 sequence => [ 1, 100, 2 ],
2345 progress => make_progress( int( 100 / 2 + 0.5 ) ),
2346 user_func => sub { sleep 1.5 }
2347 )->run();
2348
2349 -- Output
2350
2351 20.0%
2352 40.0%
2353 60.0%
2354 80.0%
2355 100.0%
2356
2357 Changing "chunk_size" to 1 means workers notify the manager process
2358 more often, thus increasing granularity. Take a look at the output.
2359
2360 2.0%
2361 4.0%
2362 6.0%
2363 8.0%
2364 10.0%
2365 ...
2366 92.0%
2367 94.0%
2368 96.0%
2369 98.0%
2370 100.0%
2371
2372 Here is the same thing using MCE::Flow together with
2373 ProgressBar::Stack.
2374
2375 use Time::HiRes 'sleep';
2376 use ProgressBar::Stack;
2377 use MCE::Flow;
2378
2379 sub make_progress {
2380 my ($total_size) = @_;
2381 init_progress();
2382 return sub {
2383 my ($completed_size) = @_;
2384 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2385 };
2386 }
2387
2388 MCE::Flow->init(
2389 chunk_size => 1,
2390 max_workers => 4,
2391 progress => make_progress( int( 100 / 2 + 0.5 ) )
2392 );
2393
2394 MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2395 MCE::Flow->finish();
2396
2397 print "\n";
2398
2399 -- Output
2400
2401 [######### ] 48.0% ETA: 0:03
2402
2403 For files and file handles, workers send the actual size of the data
2404 read versus counting rows.
2405
2406 use Time::HiRes 'sleep';
2407 use MCE;
2408
2409 sub make_progress {
2410 my ($total_size) = @_;
2411 return sub {
2412 my ($completed_size) = @_;
2413 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2414 };
2415 }
2416
2417 my $input_file = "/path/to/input_file.txt";
2418
2419 MCE->new(
2420 chunk_size => 5,
2421 max_workers => 4,
2422 input_data => $input_file,
2423 progress => make_progress( -s $input_file ),
2424 user_func => sub { sleep 0.03 }
2425 )->run();
2426
2427 For consistency, here is the example using MCE::Flow, again with
2428 ProgressBar::Stack.
2429
2430 use Time::HiRes 'sleep';
2431 use ProgressBar::Stack;
2432 use MCE::Flow;
2433
2434 sub make_progress {
2435 my ($total_size) = @_;
2436 init_progress();
2437 return sub {
2438 my ($completed_size) = @_;
2439 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2440 };
2441 }
2442
2443 my $input_file = "/path/to/input_file.txt";
2444
2445 MCE::Flow->init(
2446 chunk_size => 5,
2447 max_workers => 4,
2448 progress => make_progress( -s $input_file )
2449 );
2450
2451 MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2452 MCE::Flow->finish();
2453
2454 The next demonstration processes three arrays consecutively. For this
2455 one, MCE workers persist after running. This needs MCE 1.814 or later
2456 to run. Otherwise, the progress output is not shown in MCE 1.813.
2457
2458 use Time::HiRes 'sleep';
2459 use ProgressBar::Stack;
2460 use MCE;
2461
2462 sub make_progress {
2463 my ($total_size, $message) = @_;
2464 init_progress();
2465 return sub {
2466 my ($completed_size) = @_;
2467 update_progress(
2468 sprintf("%0.1f", $completed_size / $total_size * 100),
2469 $message
2470 );
2471 };
2472 }
2473
2474 my $mce = MCE->new(
2475 chunk_size => 10,
2476 max_workers => 4,
2477 user_func => sub { sleep 0.5 }
2478 )->spawn();
2479
2480 my @a1 = ( 1 .. 200 );
2481 my @a2 = ( 1 .. 500 );
2482 my @a3 = ( 1 .. 300 );
2483
2484 $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2485
2486 print "\n";
2487
2488 $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2489
2490 print "\n";
2491
2492 $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2493
2494 print "\n";
2495
2496 $mce->shutdown;
2497
2498 -- Output
2499
2500 [####################] 100.0% ETA: 0:00 array 1
2501 [####################] 100.0% ETA: 0:00 array 2
2502 [####################] 100.0% ETA: 0:00 array 3
2503
2504 When size is not know, such as reading from "STDIN", the only thing one
2505 can do is report the size completed thus far.
2506
2507 # 1 kibibyte equals 1024 bytes
2508
2509 progress => sub {
2510 my ($completed_size) = @_;
2511 printf "%0.1f kibibytes\n", $completed_size / 1024;
2512 }
2513
2515 • MCE::Examples
2516
2518 MCE
2519
2521 Mario E. Roy, <marioeroy AT gmail DOT com>
2522
2523
2524
2525perl v5.34.0 2021-07-22 MCE::Core(3)