1MCE::Core(3) User Contributed Perl Documentation MCE::Core(3)
2
3
4
6 MCE::Core - Documentation describing the core MCE API
7
9 This document describes MCE::Core version 1.878
10
12 This is a simplistic use case of MCE running with 5 workers.
13
14 ## Construction using the Core API
15
16 use MCE;
17
18 my $mce = MCE->new(
19 max_workers => 5,
20 user_func => sub {
21 my ($mce) = @_;
22 $mce->say("Hello from " . $mce->wid);
23 }
24 );
25
26 $mce->run;
27
28 ## Construction using a MCE model
29
30 use MCE::Flow max_workers => 5;
31
32 mce_flow sub {
33 my ($mce) = @_;
34 MCE->say("Hello from " . MCE->wid);
35 };
36
37 -- Output
38
39 Hello from 2
40 Hello from 4
41 Hello from 5
42 Hello from 1
43 Hello from 3
44
45 MCE->new ( [ options ] )
46 Below, a new instance is configured with all available options.
47
48 use MCE;
49
50 my $mce = MCE->new(
51
52 max_workers => 8, ## Default 1
53
54 # Number of workers to spawn.
55
56 # MCE sets an upper-limit of 8 for 'auto'. MCE 1.521+.
57 # max_workers => 'auto', ## # of lcores, 8 maximum
58 # max_workers => 'auto-1', ## 7 on HW with 16 lcores
59 # max_workers => 'auto-1', ## 3 on HW with 4 lcores
60
61 # Specify a percentage. MCE 1.875+.
62 # max_workers => '25%', ## 4 on HW with 16 lcores
63 # max_workers => '50%', ## 8 on HW with 16 lcores
64
65 # Run on all logical cores.
66 # max_workers => MCE::Util::get_ncpu(),
67
68 chunk_size => 2000, ## Default 1
69
70 # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
71 # The default is 1 when using the Core API and 'auto' for
72 # MCE Models. For arrays or queues, chunk_size means the
73 # number of records per chunk. For iterators, MCE will not
74 # use chunk_size, though the iterator may use it to determine
75 # how much to return per iteration. For files, smaller than or
76 # equal to 8192 is the number of records. Greater than 8192
77 # is the number of bytes. MCE reads until the end of record
78 # before calling user_func.
79
80 # chunk_size => 1, ## Consists of 1 record
81 # chunk_size => 1000, ## Consists of 1000 records
82 # chunk_size => '16k', ## Approximate 16 kibiBytes (KiB)
83 # chunk_size => '20m', ## Approximate 20 mebiBytes (MiB)
84
85 tmp_dir => $tmp_dir, ## Default $MCE::Signal::tmp_dir
86
87 # Default is $MCE::Signal::tmp_dir which points to
88 # $ENV{TEMP} if defined. Otherwise, tmp_dir points
89 # to a location under /tmp.
90
91 freeze => \&encode_sereal, ## Default \&Storable::freeze
92 thaw => \&decode_sereal, ## Default \&Storable::thaw
93
94 # Release 1.412 allows freeze and thaw to be overridden.
95 # Simply include a serialization module prior to loading
96 # MCE. Configure freeze/thaw options.
97
98 # use Sereal qw( encode_sereal decode_sereal );
99 # use CBOR::XS qw( encode_cbor decode_cbor );
100 # use JSON::XS qw( encode_json decode_json );
101 #
102 # use MCE;
103
104 gather => \@a, ## Default undef
105
106 # Release 1.5 allows for gathering of data to an array or
107 # hash reference, a MCE::Queue/Thread::Queue object, or code
108 # reference. One invokes gathering by calling the gather
109 # method as often as needed.
110
111 # gather => \@array,
112 # gather => \%hash,
113 # gather => $queue,
114 # gather => \&order,
115
116 init_relay => 0, ## Default undef
117
118 # For specifying the initial relay value. Allowed values
119 # are array_ref, hash_ref, or scalar. The MCE::Relay module
120 # is loaded automatically when specified.
121
122 # init_relay => \@array,
123 # init_relay => \%hash,
124 # init_relay => scalar,
125
126 input_data => $input_file, ## Default undef
127 RS => "\n>", ## Default undef
128
129 # input_data => '/path/to/file' ## Process file
130 # input_data => \@array ## Process array
131 # input_data => \*FILE_HNDL ## Process file handle
132 # input_data => $io ## Process IO::All { File, Pipe, STDIO }
133 # input_data => \$scalar ## Treated like a file
134 # input_data => \&iterator ## User specified iterator
135
136 # The RS option (for input record separator) applies to files
137 # and file handles.
138
139 # MCE applies additional logic when RS begins with a newline
140 # character; e.g. RS => "\n>". It trims away characters after
141 # the newline and prepends them to the next record.
142 #
143 # Typically, the left side is what happens for $/ = "\n>".
144 # The right side is what user_func receives.
145 #
146 # All records begin with > and end with \n
147 # Record 1: >seq1 ... \n> (to) >seq1 ... \n
148 # Record 2: seq2 ... \n> >seq2 ... \n
149 # Record 3: seq3 ... \n> >seq3 ... \n
150 # Last Rec: seqN ... \n >seqN ... \n
151
152 loop_timeout => 20, ## Default 0
153
154 # Added in 1.7, enables the manager process to timeout of a read
155 # operation on channel 0 (UNIX platforms only). The manager process
156 # decrements the total workers running for any worker which have
157 # died in an uncontrollable manner. Specify this option if on
158 # occassion a worker dies unexpectedly (i.e. from an XS module).
159
160 # Option works with init_relay on UNIX platforms since MCE 1.844.
161 # A number smaller than 5 is silently increased to 5.
162
163 max_retries => 2, ## Default 0
164
165 # This option, added in 1.7, causes MCE to retry a failed
166 # chunk from a worker dying while processing input data or
167 # sequence of numbers.
168
169 parallel_io => 1, ## Default 0
170 posix_exit => 1, ## Default 0
171 use_slurpio => 1, ## Default 0
172
173 # The parallel_io option enables parallel reads during large
174 # slurpio, useful when reading from fast storage. Do not enable
175 # parallel_io when running MCE on many nodes with input coming
176 # from shared storage.
177
178 # Set posix_exit to avoid all END and destructor processing.
179 # Constructing MCE inside a thread implies 1 or if present CGI,
180 # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
181 # Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
182
183 # Enable slurpio to pass the raw chunk (scalar ref) to the user
184 # function when reading input files.
185
186 use_threads => 1, ## Auto 0 or 1
187
188 # By default MCE spawns child processes on UNIX platforms and
189 # threads on Windows (i.e. $^O eq 'MSWin32').
190
191 # MCE supports threads via two threading libraries if threads
192 # is preferred over child processes. The use of threads requires
193 # a thread library prior to loading MCE, causing the use_threads
194 # option to default to 1. Specify 0 for child processes.
195 #
196 # use threads; use forks;
197 # use threads::shared; use forks::shared;
198 # use MCE (or) use MCE; (or) use MCE;
199
200 spawn_delay => 0.045, ## Default undef
201 submit_delay => 0.015, ## Default undef
202 job_delay => 0.060, ## Default undef
203
204 # Time to wait in fractional seconds after spawning a worker,
205 # after submitting parameters to worker (MCE->run, MCE->process),
206 # and worker running (one time staggered delay).
207
208 # Specify job_delay to stagger workers connecting to a database.
209
210 on_post_exit => \&on_post_exit, ## Default undef
211 on_post_run => \&on_post_run, ## Default undef
212
213 # Execute the code block after a worker exits or dies.
214 # (i.e. MCE->exit, exit, die)
215
216 # Execute the code block after running.
217 # (i.e. MCE->process, MCE->run)
218
219 progress => sub { ... }, ## Default undef
220
221 # A code block for receiving info on the progress made.
222 # See section labeled "MCE PROGRESS DEMONSTRATIONS" at the
223 # end of this document.
224
225 user_args => { env => 'test' }, ## Default undef
226
227 # MCE release 1.4 added a new parameter to allow one to
228 # specify arbitrary arguments such as a string, an ARRAY
229 # or HASH reference. Workers can access this directly.
230 # (i.e. my $args = $mce->{user_args} or MCE->user_args)
231
232 user_begin => \&user_begin, ## Default undef
233 user_func => \&user_func, ## Default undef
234 user_end => \&user_end, ## Default undef
235
236 # Think of user_begin, user_func, and user_end as in
237 # the awk scripting language:
238 # awk 'BEGIN { begin } { func } { func } ... END { end }'
239
240 # MCE workers call user_begin once at the start of a job,
241 # then user_func repeatedly until no chunks remain.
242 # Afterwards, user_end is called.
243
244 user_error => \&user_error, ## Default undef
245 user_output => \&user_output, ## Default undef
246
247 # MCE will forward data to user_error/user_output,
248 # when defined, for the following methods.
249
250 # MCE->sendto(\*STDERR, "sent to user_error\n");
251 # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
252 # MCE->print(\*STDERR, "sent to user_error\n");
253 # MCE->say(\*STDERR, "sent to user_error");
254
255 # MCE->sendto(\*STDOUT, "sent to user_output\n");
256 # MCE->printf("%s\n", "sent to user_output");
257 # MCE->print("sent to user_output\n");
258 # MCE->say("sent to user_output");
259
260 stderr_file => 'err_file', ## Default STDERR
261 stdout_file => 'out_file', ## Default STDOUT
262
263 # Or to file; user_error and user_output take precedence.
264
265 flush_file => 0, ## Default 1
266 flush_stderr => 0, ## Default 1
267 flush_stdout => 0, ## Default 1
268
269 # Flush sendto file, standard error, or standard output.
270
271 interval => {
272 delay => 0.007 [, max_nodes => 4, node_id => 1 ]
273 },
274
275 # For use with the yield method introduced in MCE 1.5.
276 # Both max_nodes & node_id are optional and default to 1.
277 # Delay is the amount of time between intervals.
278
279 # interval => 0.007 ## Shorter; MCE 1.506+
280
281 sequence => { ## Default undef
282 begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
283 },
284
285 bounds_only => 1, ## Default undef
286
287 # For looping through a sequence of numbers in parallel.
288 # STEP, if omitted, defaults to 1 if BEGIN is smaller than
289 # END or -1 if BEGIN is greater than END. The FORMAT string
290 # is passed to sprintf behind the scene (% may be omitted).
291 # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
292
293 # Do not specify both options; input_data and sequence.
294 # Release 1.4 allows one to specify an array reference.
295 # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
296
297 # The bounds_only => 1 option will compute the 'begin' and
298 # 'end' items only for the chunk and not the items in between
299 # (hence boundaries only). This option has no effect when
300 # sequence is not specified or chunk_size equals 1.
301
302 # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
303
304 task_end => \&task_end, ## Default undef
305
306 # This is called by the manager process after the task
307 # has completed processing. MCE 1.5 allows this option
308 # to be specified at the top level.
309
310 task_name => 'string', ## Default 'MCE'
311
312 # Added in MCE 1.5 and mainly beneficial for user_tasks.
313 # One may specify a unique name per each sub-task.
314 # The string is passed as the 3rd arg to task_end.
315
316 user_tasks => [ ## Default undef
317 { ... }, ## Options for task 0
318 { ... }, ## Options for task 1
319 { ... }, ## Options for task 2
320 ],
321
322 # Takes a list of hash references, each allowing up to 17
323 # options. All other MCE options are ignored. The init_relay,
324 # input_data, RS, and use_slurpio options are applicable to
325 # the first task only.
326
327 # max_workers, chunk_size, input_data, interval, sequence,
328 # bounds_only, user_args, user_begin, user_end, user_func,
329 # gather, task_end, task_name, use_slurpio, use_threads,
330 # init_relay, RS
331
332 # Options not specified here will default to same option
333 # specified at the top level.
334 );
335
336 EXPORT_CONST, CONST
337 There are 3 constants which are exportable. Using the constants in lieu
338 of 0,1,2 makes it more legible when accessing the user_func arguments
339 directly.
340
341 SELF CHUNK CID - MCE CONSTANTS
342
343 Exports SELF => 0, CHUNK => 1, and CID => 2.
344
345 use MCE export_const => 1;
346 use MCE const => 1; ## Shorter; MCE 1.415+
347
348 user_func => sub {
349 # my ($mce, $chunk_ref, $chunk_id) = @_;
350 print "Hello from ", $_[SELF]->wid, "\n";
351 }
352
353 MCE 1.5 allows all public method to be called directly.
354
355 use MCE;
356
357 user_func => sub {
358 # my ($mce, $chunk_ref, $chunk_id) = @_;
359 print "Hello from ", MCE->wid, "\n";
360 }
361
362 OVERRIDING DEFAULTS
363 The following list options which may be overridden when loading the
364 module.
365
366 use Sereal qw( encode_sereal decode_sereal );
367 use CBOR::XS qw( encode_cbor decode_cbor );
368 use JSON::XS qw( encode_json decode_json );
369
370 use MCE
371 max_workers => 4, ## Default 1
372 chunk_size => 100, ## Default 1
373 tmp_dir => "/path/to/app/tmp", ## $MCE::Signal::tmp_dir
374 freeze => \&encode_sereal, ## \&Storable::freeze
375 thaw => \&decode_sereal ## \&Storable::thaw
376 ;
377
378 my $mce = MCE->new( ... );
379
380 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
381 available. Specify "Sereal => 0" to use Storable instead.
382
383 use MCE Sereal => 0;
384
385 RUNNING
386 Run calls spawn, submits the job; workers call user_begin, user_func,
387 and user_end. Run shuts down workers afterwards. Call spawn whenever
388 the need arises for large data structures prior to running.
389
390 $mce->spawn; ## Call early if desired
391
392 $mce->run; ## Call run or process below
393
394 ## Acquire data arrays and/or input_files. Workers persist after
395 ## processing.
396
397 $mce->process(\@input_data_1); ## Process array
398 $mce->process(\@input_data_2);
399 $mce->process(\@input_data_n);
400
401 $mce->process(\%input_hash_1); ## Process hash, current API
402 $mce->process(\%input_hash_2); ## available since 1.828
403 $mce->process(\%input_hash_n);
404
405 $mce->process('input_file_1'); ## Process file
406 $mce->process('input_file_2');
407 $mce->process('input_file_n');
408
409 $mce->shutdown; ## Shutdown workers
410
411 SYNTAX for ON_POST_EXIT
412 Often times, one may want to capture the exit status. The on_post_exit
413 option, if defined, is executed immediately by the manager process
414 after a worker exits via exit (children only), MCE->exit (children and
415 threads), or die.
416
417 The format of $e->{pid} is PID_123 for children and THR_123 for
418 threads.
419
420 my $restart_flag = 1;
421
422 sub on_post_exit {
423 my ($mce, $e) = @_;
424
425 ## Display all possible hash elements.
426 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
427
428 ## Restart this worker if desired.
429 if ($restart_flag && $e->{wid} == 2) {
430 $mce->restart_worker;
431 $restart_flag = 0;
432 }
433 }
434
435 sub user_func {
436 my ($mce) = @_;
437 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); ## Args, not necessary
438 }
439
440 my $mce = MCE->new(
441 on_post_exit => \&on_post_exit,
442 user_func => \&user_func,
443 max_workers => 3
444 );
445
446 $mce->run;
447
448 -- Output (child processes)
449
450 2: PID_33223: 0: msg_foo: 1002
451 1: PID_33222: 0: msg_foo: 1001
452 3: PID_33224: 0: msg_foo: 1003
453 2: PID_33225: 0: msg_foo: 1002
454
455 -- Output (running with threads)
456
457 3: TID_3: 0: msg_foo: 1003
458 2: TID_2: 0: msg_foo: 1002
459 1: TID_1: 0: msg_foo: 1001
460 2: TID_4: 0: msg_foo: 1002
461
462 SYNTAX for ON_POST_RUN
463 The on_post_run option, if defined, is executed immediately by the
464 manager process after running MCE->process or MCE->run. This option
465 receives an array reference of hashes.
466
467 The difference between on_post_exit and on_post_run is that the former
468 is called immediately whereas the latter is called after all workers
469 have completed running.
470
471 sub on_post_run {
472 my ($mce, $status_ref) = @_;
473 foreach my $e ( @{ $status_ref } ) {
474 ## Display all possible hash elements.
475 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
476 }
477 }
478
479 sub user_func {
480 my ($mce) = @_;
481 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); ## Args, not necessary
482 }
483
484 my $mce = MCE->new(
485 on_post_run => \&on_post_run,
486 user_func => \&user_func,
487 max_workers => 3
488 );
489
490 $mce->run;
491
492 -- Output (child processes)
493
494 3: PID_33174: 0: msg_foo: 1003
495 1: PID_33172: 0: msg_foo: 1001
496 2: PID_33173: 0: msg_foo: 1002
497
498 -- Output (running with threads)
499
500 2: TID_2: 0: msg_foo: 1002
501 3: TID_3: 0: msg_foo: 1003
502 1: TID_1: 0: msg_foo: 1001
503
504 SYNTAX for INPUT_DATA
505 MCE supports many ways to specify input_data. Support for iterators was
506 added in MCE 1.505. The RS option allows one to specify the record
507 separator when processing files.
508
509 MCE is a chunking engine. Therefore, chunk_size is applicable to
510 input_data. Specifying 1 for use_slurpio causes user_func to receive a
511 scalar reference containing the raw data (applicable to files only)
512 instead of an array reference.
513
514 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
515
516 input_data => '/path/to/file', ## process file
517 input_data => \@array, ## process array
518 input_data => \%hash, ## process hash, API since 1.828
519 input_data => \*FILE_HNDL, ## process file handle
520 input_data => $fh, ## open $fh, "<", "file"
521 input_data => $fh, ## IO::File "file", "r"
522 input_data => $fh, ## IO::Uncompress::Gunzip "file.gz"
523 input_data => $io, ## IO::All { File, Pipe, STDIO }
524 input_data => \$scalar, ## treated like a file
525 input_data => \&iterator, ## user specified iterator
526
527 chunk_size => 1, ## >1 means looping inside user_func
528 use_slurpio => 1, ## $chunk_ref is a scalar ref
529 RS => "\n>", ## input record separator
530
531 The chunk_size value determines the chunking mode to use when
532 processing files. Otherwise, chunk_size is the number of elements for
533 arrays. For files, a chunk size value of <= 8192 is how many records to
534 read. Greater than 8192 is how many bytes to read. MCE appends (the
535 rest) up to the next record separator.
536
537 chunk_size => 8192, ## Consists of 8192 records
538 chunk_size => 8193, ## Approximate 8193 bytes for files
539
540 chunk_size => 1, ## Consists of 1 record or element
541 chunk_size => 1000, ## Consists of 1000 records
542 chunk_size => '16k', ## Approximate 16 kibiBytes (KiB)
543 chunk_size => '20m', ## Approximate 20 mebiBytes (MiB)
544
545 The construction for user_func when chunk_size > 1 and assuming
546 use_slurpio equals 0.
547
548 user_func => sub {
549 my ($mce, $chunk_ref, $chunk_id) = @_;
550
551 ## $_ is $chunk_ref->[0] when chunk_size equals 1
552 ## $_ is $chunk_ref otherwise; $_ can be used below
553
554 for my $record ( @{ $chunk_ref } ) {
555 print "$chunk_id: $record\n";
556 }
557 }
558
559 # input_data => \%hash
560 # current API available since 1.828
561
562 user_func => sub {
563 my ($mce, $chunk_ref, $chunk_id) = @_;
564
565 ## $_ points to $chunk_ref regardless of chunk_size
566
567 for my $key ( keys %{ $chunk_ref } ) {
568 print "$key: ", $chunk_ref->{$key}, "\n";
569 }
570 }
571
572 Specifying a value for input_data is straight forward for arrays and
573 files. The next several examples specify an iterator reference for
574 input_data.
575
576 use MCE;
577
578 ## A factory function which creates a closure (the iterator itself)
579 ## for generating a sequence of numbers. The external variables
580 ## ($n, $max, $step) are used for keeping state across successive
581 ## calls to the closure. The iterator simply returns when $n > max.
582
583 sub input_iterator {
584 my ($n, $max, $step) = @_;
585
586 return sub {
587 return if $n > $max;
588
589 my $current = $n;
590 $n += $step;
591
592 return $current;
593 };
594 }
595
596 ## Run user_func in parallel. Input data can be specified during
597 ## the construction or as an argument to the process method.
598
599 my $mce = MCE->new(
600
601 # input_data => input_iterator(10, 30, 2),
602 chunk_size => 1, max_workers => 4,
603
604 user_func => sub {
605 my ($mce, $chunk_ref, $chunk_id) = @_;
606 MCE->print("$_: ", $_ * 2, "\n");
607 }
608
609 )->spawn;
610
611 $mce->process( input_iterator(10, 30, 2) );
612
613 -- Output Note that output order is not guaranteed
614 Take a look at iterator.pl for ordered output
615
616 10: 20
617 12: 24
618 16: 32
619 20: 40
620 14: 28
621 22: 44
622 18: 36
623 24: 48
624 26: 52
625 28: 56
626 30: 60
627
628 The following example queries the DB for the next 1000 rows. Notice the
629 use of fetchall_arrayref. The iterator function itself receives one
630 argument which is chunk_size (added in MCE 1.510) to determine how much
631 to return per iteration. The default is 1 for the Core API and MCE
632 Models.
633
634 use DBI;
635 use MCE;
636
637 sub db_iter {
638
639 my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
640
641 my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
642 die "Could not connect to database: $DBI::errstr";
643
644 my $sth = $dbh->prepare('select color, desc from table');
645
646 $sth->execute;
647
648 return sub {
649 my ($chunk_size) = @_;
650
651 if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
652 return @{ $aref };
653 }
654
655 return;
656 };
657 }
658
659 ## Let's enumerate column indexes for easy column retrieval.
660 my ($i_color, $i_desc) = (0 .. 1);
661
662 my $mce = MCE->new(
663 max_workers => 3, chunk_size => 1000,
664 input_data => db_iter(),
665
666 user_func => sub {
667 my ($mce, $chunk_ref, $chunk_id) = @_;
668 my $ret = '';
669
670 foreach my $row (@{ $chunk_ref }) {
671 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
672 }
673
674 MCE->print($ret);
675 }
676 );
677
678 $mce->run;
679
680 There are many modules on CPAN which return an iterator reference.
681 Showing one such example below. The demonstration ensures MCE workers
682 are spawned before obtaining the iterator. Note the worker_id value
683 (left column) in the output.
684
685 use Path::Iterator::Rule;
686 use MCE;
687
688 my $start_dir = shift
689 or die "Please specify a starting directory";
690
691 -d $start_dir
692 or die "Cannot open ($start_dir): No such file or directory";
693
694 my $mce = MCE->new(
695 max_workers => 'auto',
696 user_func => sub { MCE->say( MCE->wid . ": $_" ) }
697 )->spawn;
698
699 my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
700
701 my $iterator = $rule->iter(
702 $start_dir, { follow_symlinks => 0, depthfirst => 1 }
703 );
704
705 $mce->process( $iterator );
706
707 -- Output
708
709 8: lib/MCE/Core/Input/Generator.pm
710 5: lib/MCE/Core/Input/Handle.pm
711 6: lib/MCE/Core/Input/Iterator.pm
712 2: lib/MCE/Core/Input/Request.pm
713 3: lib/MCE/Core/Manager.pm
714 4: lib/MCE/Core/Input/Sequence.pm
715 7: lib/MCE/Core/Validation.pm
716 1: lib/MCE/Core/Worker.pm
717 8: lib/MCE/Flow.pm
718 5: lib/MCE/Grep.pm
719 6: lib/MCE/Loop.pm
720 2: lib/MCE/Map.pm
721 3: lib/MCE/Queue.pm
722 4: lib/MCE/Signal.pm
723 7: lib/MCE/Stream.pm
724 1: lib/MCE/Subs.pm
725 8: lib/MCE/Util.pm
726 5: lib/MCE.pm
727
728 Although MCE supports arrays, extra measures are needed to use a "lazy"
729 array as input data. The reason for this is that MCE needs the size of
730 the array before processing which may be unknown for lazy arrays.
731 Therefore, closures provides an excellent mechanism for this.
732
733 The code block belonging to the lazy array must return undef after
734 exhausting its input data. Otherwise, the process will never end.
735
736 use Tie::Array::Lazy;
737 use MCE;
738
739 tie my @a, 'Tie::Array::Lazy', [], sub {
740 my $i = $_[0]->index;
741
742 return ($i < 10) ? $i : undef;
743 };
744
745 sub make_iterator {
746 my $i = 0; my $a_ref = shift;
747
748 return sub {
749 return $a_ref->[$i++];
750 };
751 }
752
753 my $mce = MCE->new(
754 max_workers => 4, input_data => make_iterator(\@a),
755
756 user_func => sub {
757 my ($mce, $chunk_ref, $chunk_id) = @_;
758 MCE->say($_);
759 }
760
761 )->run;
762
763 -- Output
764
765 0
766 1
767 2
768 3
769 4
770 6
771 7
772 8
773 5
774 9
775
776 The following demonstrates how to retrieve a chunk from the lazy array
777 per each successive call. Here, undef is sent by the iterator block
778 when $i is greater than $max. Iterators may optionally use chunk_size
779 to determine how much to return per iteration.
780
781 use Tie::Array::Lazy;
782 use MCE;
783
784 tie my @a, 'Tie::Array::Lazy', [], sub {
785 $_[0]->index;
786 };
787
788 sub make_iterator {
789 my $j = 0; my ($a_ref, $max) = @_;
790
791 return sub {
792 my ($chunk_size) = @_;
793 my $i = $j; $j += $chunk_size;
794
795 return if $i > $max;
796 return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
797 };
798 }
799
800 my $mce = MCE->new(
801 chunk_size => 15, max_workers => 4,
802 input_data => make_iterator(\@a, 100),
803
804 user_func => sub {
805 my ($mce, $chunk_ref, $chunk_id) = @_;
806 MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
807 }
808
809 )->run;
810
811 -- Output
812
813 1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
814 2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
815 3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
816 4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
817 5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
818 6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
819 7: 90 91 92 93 94 95 96 97 98 99 100
820
821 SYNTAX for SEQUENCE
822 The 1.3 release and above allows workers to loop through a sequence of
823 numbers computed mathematically without the overhead of an array. The
824 sequence can be specified separately per each user_task entry unlike
825 input_data which is applicable to the first task only.
826
827 See the seq_demo.pl example, included with this distribution, on
828 applying sequences with the user_tasks option.
829
830 Sequence can be defined using an array or a hash reference.
831
832 use MCE;
833
834 my $mce = MCE->new(
835 max_workers => 3,
836
837 # sequence => [ 10, 19, 0.7, "%4.1f" ], ## up to 4 options
838
839 sequence => {
840 begin => 10, end => 19, step => 0.7, format => "%4.1f"
841 },
842
843 user_func => sub {
844 my ($mce, $n, $chunk_id) = @_;
845 print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
846 }
847 );
848
849 $mce->run;
850
851 -- Output (sorted afterwards, notice wid and chunk_id in output)
852
853 10.0 from 1 id 1
854 10.7 from 2 id 2
855 11.4 from 3 id 3
856 12.1 from 1 id 4
857 12.8 from 2 id 5
858 13.5 from 3 id 6
859 14.2 from 1 id 7
860 14.9 from 2 id 8
861 15.6 from 3 id 9
862 16.3 from 1 id 10
863 17.0 from 2 id 11
864 17.7 from 3 id 12
865 18.4 from 1 id 13
866
867 The 1.5 release includes a new option (bounds_only). This option tells
868 the sequence engine to compute 'begin' and 'end' items only, for the
869 chunk, and not the items in between (hence boundaries only). This
870 option applies to sequence only and has no effect when chunk_size
871 equals 1.
872
873 The time to run is 0.006s below. This becomes 0.827s without the
874 bounds_only option due to computing all items in between, thus creating
875 a very large array. Basically, specify bounds_only => 1 when boundaries
876 is all you need for looping inside the block; e.g. Monte Carlo
877 simulations.
878
879 Time was measured using 1 worker to emphasize the difference.
880
881 use MCE;
882
883 my $mce = MCE->new(
884 max_workers => 1, chunk_size => 1_250_000,
885
886 sequence => { begin => 1, end => 10_000_000 },
887 bounds_only => 1,
888
889 ## For sequence, the input scalar $_ points to $chunk_ref
890 ## when chunk_size > 1, otherwise $chunk_ref->[0].
891 ##
892 ## user_func => sub {
893 ## my $begin = $_->[0]; my $end = $_->[-1];
894 ##
895 ## for ($begin .. $end) {
896 ## ...
897 ## }
898 ## },
899
900 user_func => sub {
901 my ($mce, $chunk_ref, $chunk_id) = @_;
902 ## $chunk_ref contains 2 items, not 1_250_000
903
904 my $begin = $chunk_ref->[ 0];
905 my $end = $chunk_ref->[-1]; ## or $chunk_ref->[1]
906
907 MCE->printf("%7d .. %8d\n", $begin, $end);
908 }
909 );
910
911 $mce->run;
912
913 -- Output
914
915 1 .. 1250000
916 1250001 .. 2500000
917 2500001 .. 3750000
918 3750001 .. 5000000
919 5000001 .. 6250000
920 6250001 .. 7500000
921 7500001 .. 8750000
922 8750001 .. 10000000
923
924 SYNTAX for MAX_RETRIES
925 The max_retries option, added in 1.7, allows MCE to retry a failed
926 chunk from a worker dying while processing input data or a sequence of
927 numbers.
928
929 When max_retries is set, MCE configures the on_post_exit option
930 automatically using the following code before running. Specify
931 on_post_exit explicitly for any further tailoring. The restart_worker
932 line is necessary, obviously.
933
934 on_post_exit => sub {
935 my ( $mce, $e, $retry_cnt ) = @_;
936
937 if ( $e->{id} ) {
938 my $cnt = $retry_cnt + 1;
939 my $msg = "Error: chunk $e->{id} failed";
940
941 if ( defined $mce->{init_relay} ) {
942 print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
943 if ( $retry_cnt < $mce->{max_retries} );
944 }
945 else {
946 ( $retry_cnt < $mce->{max_retries} )
947 ? print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
948 : print {*STDERR} "$msg\n";
949 }
950
951 $mce->restart_worker;
952 }
953 }
954
955 We let MCE handle on_post_exit automatically below, which is
956 essentially the same code shown above. For max_retries to work, the
957 worker must die, abnormally included, or call MCE->exit. Notice that we
958 pass the chunk_id value for the 3rd argument to MCE->exit (defaults to
959 chunk_id if omitted since MCE 1.844).
960
961 ## max_retries demonstration
962
963 use strict;
964 use warnings;
965
966 use MCE;
967
968 sub user_func {
969 my ( $mce, $chunk_ref, $chunk_id ) = @_;
970
971 # die "Died : chunk_id = 3\n" if $chunk_id == 3;
972 MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
973
974 print "$chunk_id\n";
975 }
976
977 my $mce = MCE->new(
978 max_workers => 1,
979 max_retries => 2,
980 user_func => \&user_func,
981 )->spawn;
982
983 my $input_data = [ 0..7 ];
984
985 $mce->process( { chunk_size => 1 }, $input_data );
986 $mce->shutdown;
987
988 -- Output
989
990 1
991 2
992 Error: chunk 3 failed, retrying chunk attempt # 1
993 Error: chunk 3 failed, retrying chunk attempt # 2
994 Error: chunk 3 failed
995 4
996 5
997 6
998 7
999 8
1000
1001 Orderly output with max_retries is possible since MCE 1.844. Below,
1002 chunk 3 succeeds whereas chunk 5 fails due to exceeding the number of
1003 retries. Be sure to call MCE::relay inside "user_func" and near the end
1004 of the block.
1005
1006 ## max_retries demonstration with init_relay
1007
1008 use strict;
1009 use warnings;
1010
1011 use MCE;
1012 use MCE::Shared;
1013
1014 tie my $retries1, 'MCE::Shared', 0;
1015 tie my $retries2, 'MCE::Shared', 0;
1016
1017 MCE->new(
1018 max_workers => 4,
1019 input_data => [ 1..7 ],
1020 chunk_size => 1,
1021
1022 max_retries => 2,
1023 init_relay => 0,
1024
1025 user_func => sub {
1026 if ( MCE->chunk_id == 3 ) {
1027 MCE->exit if ++$retries1 <= 2;
1028 }
1029 if ( MCE->chunk_id == 5 ) {
1030 MCE->exit if ++$retries2 <= 3;
1031 }
1032 MCE::relay {
1033 $_ += 1;
1034 print MCE->chunk_id, "\n";
1035 };
1036 }
1037 )->run;
1038
1039 print "final: ", MCE::relay_final(), "\n";
1040
1041 -- Output
1042
1043 1
1044 2
1045 Error: chunk 3 failed, retrying chunk attempt # 1
1046 Error: chunk 5 failed, retrying chunk attempt # 1
1047 Error: chunk 3 failed, retrying chunk attempt # 2
1048 Error: chunk 5 failed, retrying chunk attempt # 2
1049 3
1050 4
1051 Error: chunk 5 failed
1052 6
1053 7
1054 final: 6
1055
1056 SYNTAX for USER_BEGIN and USER_END
1057 The user_begin and user_end options, if specified, behave similarly to
1058 awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
1059 called once per worker during each run.
1060
1061 MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
1062
1063 sub user_begin { ## Called once at the beginning
1064 my ($mce, $task_id, $task_name) = @_;
1065 $mce->{wk_total_rows} = 0;
1066 }
1067
1068 sub user_func { ## Called while processing
1069 my $mce = shift;
1070 $mce->{wk_total_rows} += 1;
1071 }
1072
1073 sub user_end { ## Called once at the end
1074 my ($mce, $task_id, $task_name) = @_;
1075 printf "## %d: Processed %d rows\n",
1076 MCE->wid, $mce->{wk_total_rows};
1077 }
1078
1079 my $mce = MCE->new(
1080 user_begin => \&user_begin,
1081 user_func => \&user_func,
1082 user_end => \&user_end
1083 );
1084
1085 $mce->run;
1086
1087 SYNTAX for USER_FUNC with USE_SLURPIO => 0
1088 When processing input data, MCE can pass an array of rows or a slurped
1089 chunk. Below, a reference to an array containing the chunk data is
1090 processed.
1091
1092 e.g. $chunk_ref = [ record1, record2, record3, ... ]
1093
1094 sub user_func {
1095
1096 my ($mce, $chunk_ref, $chunk_id) = @_;
1097
1098 foreach my $row ( @{ $chunk_ref } ) {
1099 $mce->{wk_total_rows} += 1;
1100 print $row;
1101 }
1102 }
1103
1104 my $mce = MCE->new(
1105 chunk_size => 100,
1106 input_data => "/path/to/file",
1107 user_func => \&user_func,
1108 use_slurpio => 0
1109 );
1110
1111 $mce->run;
1112
1113 SYNTAX for USER_FUNC with USE_SLURPIO => 1
1114 Here, a reference to a scalar containing the raw chunk data is
1115 processed.
1116
1117 sub user_func {
1118
1119 my ($mce, $chunk_ref, $chunk_id) = @_;
1120
1121 my $count = () = $$chunk_ref =~ /abc/;
1122 }
1123
1124 my $mce = MCE->new(
1125 chunk_size => 16000,
1126 input_data => "/path/to/file",
1127 user_func => \&user_func,
1128 use_slurpio => 1
1129 );
1130
1131 $mce->run;
1132
1133 SYNTAX for USER_ERROR and USER_OUTPUT
1134 Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1135 and MCE->say can be intercepted by specifying the user_error and
1136 user_output options. MCE on receiving output will forward to user_error
1137 or user_output in a serialized fashion.
1138
1139 Handy when wanting to filter, modify, and/or direct the output
1140 elsewhere.
1141
1142 sub user_error { ## Redirect STDERR to STDOUT
1143 my $error = shift;
1144 print {*STDOUT} $error;
1145 }
1146
1147 sub user_output { ## Redirect STDOUT to STDERR
1148 my $output = shift;
1149 print {*STDERR} $output;
1150 }
1151
1152 sub user_func {
1153 my ($mce, $chunk_ref, $chunk_id) = @_;
1154 my $count = 0;
1155
1156 foreach my $row ( @{ $chunk_ref } ) {
1157 MCE->print($row);
1158 $count += 1;
1159 }
1160
1161 MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1162 }
1163
1164 my $mce = MCE->new(
1165 chunk_size => 1000,
1166 input_data => "/path/to/file",
1167 user_error => \&user_error,
1168 user_output => \&user_output,
1169 user_func => \&user_func
1170 );
1171
1172 $mce->run;
1173
1174 SYNTAX for USER_TASKS and TASK_END
1175 This option takes an array of tasks. Each task allows up to 17 options.
1176 The init_relay, input_data, RS, and use_slurpio options may be defined
1177 inside the first task or at the top level, otherwise ignored under
1178 other sub-tasks.
1179
1180 max_workers, chunk_size, input_data, interval, sequence,
1181 bounds_only, user_args, user_begin, user_end, user_func,
1182 gather, task_end, task_name, use_slurpio, use_threads,
1183 init_relay, RS
1184
1185 Sequence and chunk_size were added in 1.3. User_args was introduced in
1186 1.4. Name and input_data are new options allowed in 1.5. In addition,
1187 one can specify task_end at the top level. Task_end also receives 2
1188 additional arguments $task_id and $task_name (shown below).
1189
1190 Options not specified here will default to the same option specified at
1191 the top level. The task_end option is called by the manager process
1192 when all workers for that sub-task have completed processing.
1193
1194 Forking and threading can be intermixed among tasks unless running
1195 Cygwin. The run method will continue running until all workers have
1196 completed processing.
1197
1198 use threads;
1199 use threads::shared;
1200
1201 use MCE;
1202
1203 sub parallel_task1 { sleep 2; }
1204 sub parallel_task2 { sleep 1; }
1205
1206 my $mce = MCE->new(
1207
1208 task_end => sub {
1209 my ($mce, $task_id, $task_name) = @_;
1210 print "Task [$task_id -- $task_name] completed processing\n";
1211 },
1212
1213 user_tasks => [{
1214 task_name => 'foo',
1215 max_workers => 2,
1216 user_func => \¶llel_task1,
1217 use_threads => 0 ## Not using threads
1218
1219 },{
1220 task_name => 'bar',
1221 max_workers => 4,
1222 user_func => \¶llel_task2,
1223 use_threads => 1 ## Yes, threads
1224
1225 }]
1226 );
1227
1228 $mce->run;
1229
1230 -- Output
1231
1232 Task [1 -- bar] completed processing
1233 Task [0 -- foo] completed processing
1234
1236 Beginning with MCE 1.5, the input scalar $_ is localized prior to
1237 calling user_func for input_data and sequence of numbers. The following
1238 applies.
1239
1240 use_slurpio => 1
1241 $_ is a reference to the buffer e.g. $_ = \$_buffer;
1242 $_ is a reference regardless of whether chunk_size is 1 or greater
1243
1244 user_func => sub {
1245 # my ($mce, $chunk_ref, $chunk_id) = @_;
1246 print ${ $_ }; ## $_ is same as $chunk_ref
1247 }
1248
1249 chunk_size is greater than 1, use_slurpio => 0
1250 $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1251 $_ is same as $chunk_ref or $_[CHUNK]
1252
1253 user_func => sub {
1254 # my ($mce, $chunk_ref, $chunk_id) = @_;
1255 for my $row ( @{ $_ } ) {
1256 print $row, "\n";
1257 }
1258 }
1259
1260 use MCE const => 1;
1261
1262 user_func => sub {
1263 # my ($mce, $chunk_ref, $chunk_id) = @_;
1264 for my $row ( @{ $_[CHUNK] } ) {
1265 print $row, "\n";
1266 }
1267 }
1268
1269 chunk_size equals 1, use_slurpio => 0
1270 $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1271
1272 ## Note that $_ and $chunk_ref are not the same below.
1273 ## $chunk_ref is a reference to an array.
1274
1275 user_func => sub {
1276 # my ($mce, $chunk_ref, $chunk_id) = @_;
1277 print $_, "\n; ## Same as $chunk_ref->[0];
1278 }
1279
1280 $mce->foreach("/path/to/file", sub {
1281 # my ($mce, $chunk_ref, $chunk_id) = @_;
1282 print $_; ## Same as $chunk_ref->[0];
1283 });
1284
1285 ## However, that is not the case for the forseq method.
1286 ## Both $_ and $n_seq are the same when chunk_size => 1.
1287
1288 $mce->forseq([ 1, 9 ], sub {
1289 # my ($mce, $n_seq, $chunk_id) = @_;
1290 print $_, "\n"; ## Same as $n_seq
1291 });
1292
1293 Sequence can also be specified using an array reference. The below
1294 is the same as the example afterwards.
1295
1296 $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1297
1298 The code block receives an array containing the next 5 sequences.
1299 Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1300 to an array, same as $_, due to chunk_size being greater than 1.
1301
1302 $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1303 # my ($mce, $n_seq, $chunk_id) = @_;
1304 my @result;
1305 for my $n ( @{ $_ } ) {
1306 ... do work, append to result for 5
1307 }
1308 ... do something with result afterwards
1309 });
1310
1312 The methods listed below are callable by the main process and workers.
1313
1314 MCE->abort ( void )
1315 $mce->abort ( void )
1316 The 'abort' method is applicable when processing input_data only. This
1317 causes all workers to abort after processing the current chunk.
1318
1319 Workers write the next offset position to the queue socket for the next
1320 available worker. In essence, the 'abort' method writes the last offset
1321 position. Workers, on requesting the next offset position, will think
1322 the end of input_data has been reached and leave the chunking loop.
1323
1324 MCE->abort;
1325 $mce->abort;
1326
1327 MCE->chunk_id ( void )
1328 $mce->chunk_id ( void )
1329 Returns the chunk_id for the current chunk. The value starts at 1.
1330 Chunking applies to input_data or sequence. The value is 0 for the
1331 manager process.
1332
1333 my $chunk_id = MCE->chunk_id;
1334 my $chunk_id = $mce->chunk_id;
1335
1336 MCE->chunk_size ( void )
1337 $mce->chunk_size ( void )
1338 Getter method for chunk_size used by MCE.
1339
1340 my $chunk_size = MCE->chunk_size;
1341 my $chunk_size = $mce->chunk_size;
1342
1343 MCE->do ( 'callback_func' [, $arg1, ... ] )
1344 $mce->do ( 'callback_func' [, $arg1, ... ] )
1345 MCE serializes data transfers from a worker process via helper
1346 functions do & sendto to the manager process. The callback function can
1347 optionally return a reply. Support for calling by the manager process
1348 was enabled in MCE 1.839.
1349
1350 [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1351
1352 Passing args to a callback function using references & scalar.
1353
1354 sub callback {
1355 my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1356 ...
1357 }
1358
1359 MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1360 MCE->do('callback', \@a, \%h, \$s, 'foo');
1361
1362 MCE knows if wanting a void, list, hash, or a scalar return value.
1363
1364 MCE->do('callback' [, $arg1, ... ]);
1365
1366 my @array = MCE->do('callback' [, $arg1, ... ]);
1367 my %hash = MCE->do('callback' [, $arg1, ... ]);
1368 my $scalar = MCE->do('callback' [, $arg1, ... ]);
1369
1370 MCE->freeze ( $object_ref )
1371 $mce->freeze ( $object_ref )
1372 Calls the internal freeze method to serialize an object. The default
1373 serialization routines are handled by Sereal if available or Storable.
1374
1375 my $frozen = MCE->freeze([ 0, 2, 4 ]);
1376 my $frozen = $mce->freeze([ 0, 2, 4 ]);
1377
1378 MCE->max_retries ( void )
1379 $mce->max_retries ( void )
1380 Getter method for max_retries used by MCE.
1381
1382 my $max_retries = MCE->max_retries;
1383 my $max_retries = $mce->max_retries;
1384
1385 MCE->max_workers ( void )
1386 $mce->max_workers ( void )
1387 Getter method for max_workers used by MCE.
1388
1389 my $max_workers = MCE->max_workers;
1390 my $max_workers = $mce->max_workers;
1391
1392 MCE->pid ( void )
1393 $mce->pid ( void )
1394 Returns the Process ID. Threads have thread ID attached to the value.
1395
1396 my $pid = MCE->pid; ## 16180 (pid) ; 16180.2 (pid.tid)
1397 my $pid = $mce->pid;
1398
1399 MCE->printf ( $format, $list [, ... ] )
1400 MCE->print ( $list [, ... ] )
1401 MCE->say ( $list [, ... ] )
1402 $mce->printf ( $format, $list [, ... ] )
1403 $mce->print ( $list [, ... ] )
1404 $mce->say ( $list [, ... ] )
1405 Use the printf, print, and say methods when wanting to serialize output
1406 among workers and the manager process. These are sugar syntax for the
1407 sendto method. These behave similar to the native subroutines in Perl
1408 with the exception that barewords must be passed as a reference and
1409 require the comma after it including file handles.
1410
1411 Say is like print, but implicitly appends a newline.
1412
1413 MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1414 MCE->printf($fh, "%s: %d\n", $name, $age);
1415 MCE->printf("%s: %d\n", $name, $age);
1416
1417 MCE->print(\*STDERR, "$error_msg\n");
1418 MCE->print($fh, $log_msg."\n");
1419 MCE->print("$output_msg\n");
1420
1421 MCE->say(\*STDERR, $error_msg);
1422 MCE->say($fh, $log_msg);
1423 MCE->say($output_msg);
1424
1425 Caveat: Use the following syntax when passing a reference not a glob or
1426 file handle. Otherwise, MCE will error indicating the first argument is
1427 not a glob reference.
1428
1429 MCE->print(\*STDOUT, \@array, "\n");
1430 MCE->print("", \@array, "\n"); ## ok
1431
1432 Sending to "IO::All" { File, Pipe, STDIO } is supported since MCE
1433 1.845.
1434
1435 use IO::All;
1436
1437 my $out = io->stdout;
1438 my $err = io->stderr;
1439
1440 MCE->printf($out, "%s\n", "sent to stdout");
1441 MCE->printf($err, "%s\n", "sent to stderr");
1442
1443 MCE->print($out, "sent to stdout\n");
1444 MCE->print($err, "sent to stderr\n");
1445
1446 MCE->say($out, "sent to stdout");
1447 MCE->say($err, "sent to stderr");
1448
1449 MCE->sess_dir ( void )
1450 $mce->sess_dir ( void )
1451 Returns the session directory used by the MCE instance. This is defined
1452 during spawning and removed during shutdown.
1453
1454 my $sess_dir = MCE->sess_dir;
1455 my $sess_dir = $mce->sess_dir;
1456
1457 MCE->task_id ( void )
1458 $mce->task_id ( void )
1459 Returns the task ID. This applies to the user_tasks option (starts at
1460 0).
1461
1462 my $task_id = MCE->task_id;
1463 my $task_id = $mce->task_id;
1464
1465 MCE->task_name ( void )
1466 $mce->task_name ( void )
1467 Returns the task_name value specified via the task_name option when
1468 configuring MCE.
1469
1470 my $task_name = MCE->task_name;
1471 my $task_name = $mce->task_name;
1472
1473 MCE->task_wid ( void )
1474 $mce->task_wid ( void )
1475 Returns the task worker ID (applies to user_tasks). The value starts at
1476 1 per each task configured within user_tasks. The value is 0 for the
1477 manager process.
1478
1479 my $task_wid = MCE->task_wid;
1480 my $task_wid = $mce->task_wid;
1481
1482 MCE->thaw ( $frozen )
1483 $mce->thaw ( $frozen )
1484 Calls the internal thaw method to un-serialize the frozen object.
1485
1486 my $object_ref = MCE->thaw($frozen);
1487 my $object_ref = $mce->thaw($frozen);
1488
1489 MCE->tmp_dir ( void )
1490 $mce->tmp_dir ( void )
1491 Returns the temporary directory used by MCE.
1492
1493 my $tmp_dir = MCE->tmp_dir;
1494 my $tmp_dir = $mce->tmp_dir;
1495
1496 MCE->user_args ( void )
1497 $mce->user_args ( void )
1498 Returns the arguments specified via the user_args option.
1499
1500 my ($arg1, $arg2, $arg3) = MCE->user_args;
1501 my ($arg1, $arg2, $arg3) = $mce->user_args;
1502
1503 MCE->wid ( void )
1504 $mce->wid ( void )
1505 Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1506 is 0 for the manager process.
1507
1508 my $wid = MCE->wid;
1509 my $wid = $mce->wid;
1510
1512 Methods listed below are callable by the main process only.
1513
1514 MCE->forchunk ( $input_data [, { options } ], sub { ... } )
1515 MCE->foreach ( $input_data [, { options } ], sub { ... } )
1516 MCE->forseq ( $sequence_spec [, { options } ], sub { ... } )
1517 $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1518 $mce->foreach ( $input_data [, { options } ], sub { ... } )
1519 $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1520 Forchunk, foreach, and forseq are sugar methods and described in
1521 MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1522
1523 MCE->process ( $input_data [, { options } ] )
1524 $mce->process ( $input_data [, { options } ] )
1525 The process method will spawn workers automatically if not already
1526 spawned. It will set input_data => $input_data. It calls run(0) to not
1527 auto-shutdown workers. Specifying options is optional.
1528
1529 Allowable options { key => value, ... } are:
1530
1531 chunk_size input_data job_delay spawn_delay submit_delay
1532 flush_file flush_stderr flush_stdout stderr_file stdout_file
1533 on_post_exit on_post_run sequence user_args user_begin user_end
1534 user_func user_error user_output use_slurpio RS
1535
1536 Options remain persistent going forward unless changed. Setting
1537 user_begin, user_end, or user_func will cause already spawned workers
1538 to shut down and re-spawn automatically. Therefore, define these during
1539 instantiation.
1540
1541 The below will cause workers to re-spawn after running.
1542
1543 my $mce = MCE->new( max_workers => 'auto' );
1544
1545 $mce->process( {
1546 user_begin => sub { ## connect to DB },
1547 user_func => sub { ## process each row },
1548 user_end => sub { ## close handle to DB },
1549 }, \@input_data );
1550
1551 $mce->process( {
1552 user_begin => sub { ## connect to DB },
1553 user_func => sub { ## process each file },
1554 user_end => sub { ## close handle to DB },
1555 }, "/list/of/files" );
1556
1557 Do the following if wanting workers to persist between jobs.
1558
1559 use MCE max_workers => 'auto';
1560
1561 my $mce = MCE->new(
1562 user_begin => sub { ## connect to DB },
1563 user_func => sub { ## process each chunk or row or host },
1564 user_end => sub { ## close handle to DB },
1565 );
1566
1567 $mce->spawn; ## Spawn early if desired
1568
1569 $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1570 $mce->process(\@array_of_files_to_grep);
1571 $mce->process("/path/to/host/list");
1572
1573 $mce->process($array_ref);
1574 $mce->process($array_ref, { stdout_file => $output_file });
1575
1576 ## This was not allowed before. Fixed in 1.415.
1577 $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1578 $mce->process({ sequence => [ 10, 90, 2 ] });
1579
1580 $mce->shutdown;
1581
1582 MCE->relay_final ( void )
1583 $mce->relay_final ( void )
1584 The relay methods are described in MCE::Relay. Relay capabilities are
1585 enabled by specifying the "init_relay" MCE option.
1586
1587 MCE->restart_worker ( void )
1588 $mce->restart_worker ( void )
1589 One can restart a worker who has died or exited. The job never ends
1590 below due to restarting each time. Recommended is to call MCE->exit or
1591 $mce->exit instead of the native exit function for better handling,
1592 especially under the Windows environment.
1593
1594 The $e->{wid} argument is no longer necessary starting with the 1.5
1595 release.
1596
1597 Press [ctrl-c] to terminate the script.
1598
1599 my $mce = MCE->new(
1600
1601 on_post_exit => sub {
1602 my ($mce, $e) = @_;
1603 print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1604 # $mce->restart_worker($e->{wid}); ## MCE-1.415 and below
1605 $mce->restart_worker; ## MCE-1.500 and above
1606 },
1607
1608 user_begin => sub {
1609 my ($mce, $task_id, $task_name) = @_;
1610 ## Not interested in die messages going to STDERR,
1611 ## because the die handler calls MCE->exit(255, $_[0]).
1612 close STDERR;
1613 },
1614
1615 user_tasks => [{
1616 max_workers => 5,
1617 user_func => sub {
1618 my ($mce) = @_; sleep MCE->wid;
1619 MCE->exit(3, "exited from " . MCE->wid . "\n");
1620 }
1621 },{
1622 max_workers => 4,
1623 user_func => sub {
1624 my ($mce) = @_; sleep MCE->wid;
1625 die("died from " . MCE->wid . "\n");
1626 }
1627 }]
1628 );
1629
1630 $mce->run;
1631
1632 -- Output
1633
1634 1: PID_85388: status 3: exited from 1
1635 2: PID_85389: status 3: exited from 2
1636 1: PID_85397: status 3: exited from 1
1637 3: PID_85390: status 3: exited from 3
1638 1: PID_85399: status 3: exited from 1
1639 4: PID_85391: status 3: exited from 4
1640 2: PID_85398: status 3: exited from 2
1641 1: PID_85401: status 3: exited from 1
1642 5: PID_85392: status 3: exited from 5
1643 1: PID_85404: status 3: exited from 1
1644 6: PID_85393: status 255: died from 6
1645 3: PID_85400: status 3: exited from 3
1646 2: PID_85403: status 3: exited from 2
1647 1: PID_85406: status 3: exited from 1
1648 7: PID_85394: status 255: died from 7
1649 1: PID_85410: status 3: exited from 1
1650 8: PID_85395: status 255: died from 8
1651 4: PID_85402: status 3: exited from 4
1652 2: PID_85409: status 3: exited from 2
1653 1: PID_85412: status 3: exited from 1
1654 9: PID_85396: status 255: died from 9
1655 3: PID_85408: status 3: exited from 3
1656 1: PID_85416: status 3: exited from 1
1657
1658 ...
1659
1660 MCE->run ( [ $auto_shutdown [, { options } ] ] )
1661 $mce->run ( [ $auto_shutdown [, { options } ] ] )
1662 The run method, by default, spawns workers, processes once, and shuts
1663 down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1664 persist after running (default 1).
1665
1666 Specifying options is optional. Valid options are the same as for the
1667 process method.
1668
1669 my $mce = MCE->new( ... );
1670
1671 ## Disables auto-shutdown
1672 $mce->run(0);
1673
1674 MCE->send ( $data_ref )
1675 $mce->send ( $data_ref )
1676 The 'send' method is useful when wanting to spawn workers early to
1677 minimize memory consumption and afterwards send data individually to
1678 each worker. One cannot send more than the total workers spawned.
1679 Workers store the received data as $mce->{user_data}.
1680
1681 The data which can be sent is restricted to an ARRAY, HASH, or PDL
1682 reference. Workers begin processing immediately after receiving data.
1683 Workers set $mce->{user_data} to undef after processing. One cannot
1684 specify input_data, sequence, or user_tasks when using the "send"
1685 method.
1686
1687 Passing any options e.g. run(0, { options }) is ignored due to workers
1688 running immediately after receiving user data. There is no guarantee to
1689 which worker will receive data first. It depends on which worker is
1690 available awaiting data.
1691
1692 use MCE;
1693
1694 my $mce = MCE->new(
1695 max_workers => 5,
1696
1697 user_func => sub {
1698 my ($mce) = @_;
1699 my $data = $mce->{user_data};
1700 my $first_name = $data->{first_name};
1701 print MCE->wid, ": Hello from $first_name\n";
1702 }
1703 );
1704
1705 $mce->spawn; ## Optional, send will spawn if necessary.
1706
1707 $mce->send( { first_name => "Theresa" } );
1708 $mce->send( { first_name => "Francis" } );
1709 $mce->send( { first_name => "Padre" } );
1710 $mce->send( { first_name => "Anthony" } );
1711
1712 $mce->run; ## Wait for workers to complete processing.
1713
1714 -- Output
1715
1716 2: Hello from Theresa
1717 5: Hello from Anthony
1718 3: Hello from Francis
1719 4: Hello from Padre
1720
1721 MCE->shutdown ( void )
1722 $mce->shutdown ( void )
1723 The run method will automatically spawn workers, run once, and shutdown
1724 workers automatically. Workers persist after running below. Shutdown
1725 may be called as needed or prior to exiting.
1726
1727 my $mce = MCE->new( ... );
1728
1729 $mce->spawn;
1730
1731 $mce->process(\@input_data_1); ## Processing multiple arrays
1732 $mce->process(\@input_data_2);
1733 $mce->process(\@input_data_n);
1734
1735 $mce->shutdown;
1736
1737 $mce->process('input_file_1'); ## Processing multiple files
1738 $mce->process('input_file_2');
1739 $mce->process('input_file_n');
1740
1741 $mce->shutdown;
1742
1743 MCE->spawn ( void )
1744 $mce->spawn ( void )
1745 Workers are normally spawned automatically. The spawn method allows one
1746 to spawn workers early if so desired.
1747
1748 my $mce = MCE->new( ... );
1749
1750 $mce->spawn;
1751
1752 MCE->status ( void )
1753 $mce->status ( void )
1754 The greatest exit status is saved among workers while running. Look at
1755 the on_post_exit or on_post_run options for callback support.
1756
1757 my $mce = MCE->new( ... );
1758
1759 $mce->run;
1760
1761 my $exit_status = $mce->status;
1762
1764 Methods listed below are callable by workers only.
1765
1766 MCE->exit ( [ $status [, $message [, $id ] ] ] )
1767 $mce->exit ( [ $status [, $message [, $id ] ] ] )
1768 A worker exits from MCE entirely. $id (optional) can be used for
1769 passing the primary key or a string along with the message. Look at the
1770 on_post_exit or on_post_run options for callback support.
1771
1772 MCE->exit; ## default 0
1773 MCE->exit(1);
1774 MCE->exit(2, 'chunk failed', $chunk_id);
1775 MCE->exit(0, 'msg_foo', 'id_1000');
1776
1777 MCE->gather ( $arg1, [, $arg2, ... ] )
1778 $mce->gather ( $arg1, [, $arg2, ... ] )
1779 A worker can submit data to the location specified via the gather
1780 option by calling this method. See MCE::Flow and MCE::Loop for
1781 additional use-case.
1782
1783 use MCE;
1784
1785 my @hosts = qw(
1786 hosta hostb hostc hostd hoste
1787 );
1788
1789 my $mce = MCE->new(
1790 chunk_size => 1, max_workers => 3,
1791
1792 user_func => sub {
1793 # my ($mce, $chunk_ref, $chunk_id) = @_;
1794 my ($output, $error, $status); my $host = $_;
1795
1796 ## Do something with $host;
1797 $output = "Worker ". MCE->wid .": Hello from $host";
1798
1799 if (MCE->chunk_id % 3 == 0) {
1800 ## Simulating an error condition
1801 local $? = 1; $status = $?;
1802 $error = "Error from $host"
1803 }
1804 else {
1805 $status = 0;
1806 }
1807
1808 ## Ensure unique keys (key, value) when gathering to a
1809 ## hash.
1810 MCE->gather("$host.out", $output, "$host.sta", $status);
1811 MCE->gather("$host.err", $error) if (defined $error);
1812 }
1813 );
1814
1815 my %h; $mce->process(\@hosts, { gather => \%h });
1816
1817 foreach my $host (@hosts) {
1818 print $h{"$host.out"}, "\n";
1819 print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1820 print "Exit status: ", $h{"$host.sta"}, "\n\n";
1821 }
1822
1823 -- Output
1824
1825 Worker 2: Hello from hosta
1826 Exit status: 0
1827
1828 Worker 1: Hello from hostb
1829 Exit status: 0
1830
1831 Worker 3: Hello from hostc
1832 Error from hostc
1833 Exit status: 1
1834
1835 Worker 2: Hello from hostd
1836 Exit status: 0
1837
1838 Worker 1: Hello from hoste
1839 Exit status: 0
1840
1841 MCE->last ( void )
1842 $mce->last ( void )
1843 Worker leaves the chunking loop or user_func block immediately.
1844 Callable from inside foreach, forchunk, forseq, and user_func.
1845
1846 use MCE;
1847
1848 my $mce = MCE->new(
1849 max_workers => 5
1850 );
1851
1852 my @list = (1 .. 80);
1853
1854 $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1855
1856 my ($mce, $chunk_ref, $chunk_id) = @_;
1857 MCE->last if ($chunk_id > 4);
1858
1859 my @output = ();
1860
1861 foreach my $rec ( @{ $chunk_ref } ) {
1862 push @output, $rec, "\n";
1863 }
1864
1865 MCE->print(@output);
1866 });
1867
1868 -- Output (each chunk above consists of 2 elements)
1869
1870 3
1871 4
1872 1
1873 2
1874 7
1875 8
1876 5
1877 6
1878
1879 MCE->next ( void )
1880 $mce->next ( void )
1881 Worker starts the next iteration of the chunking loop. Callable from
1882 inside foreach, forchunk, forseq, and user_func.
1883
1884 use MCE;
1885
1886 my $mce = MCE->new(
1887 max_workers => 5
1888 );
1889
1890 my @list = (1 .. 80);
1891
1892 $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1893
1894 my ($mce, $chunk_ref, $chunk_id) = @_;
1895 MCE->next if ($chunk_id < 20);
1896
1897 my @output = ();
1898
1899 foreach my $rec ( @{ $chunk_ref } ) {
1900 push @output, $rec, "\n";
1901 }
1902
1903 MCE->print(@output);
1904 });
1905
1906 -- Output (each chunk above consists of 4 elements)
1907
1908 77
1909 78
1910 79
1911 80
1912
1913 MCE::relay { code }
1914 MCE->relay ( sub { code } )
1915 MCE->relay_recv ( void )
1916 $mce->relay ( sub { code } )
1917 $mce->relay_recv ( void )
1918 The relay methods are described in MCE::Relay. Relay capabilities are
1919 enabled by specifying the "init_relay" MCE option.
1920
1921 MCE->sendto ( $to, $arg1, ... )
1922 $mce->sendto ( $to, $arg1, ... )
1923 The sendto method is called by workers for serializing data to standard
1924 output, standard error, or end of file. The action is done by the
1925 manager process.
1926
1927 Release 1.00x supported 1 data argument, not more.
1928
1929 MCE->sendto('file', \@array, '/path/to/file');
1930 MCE->sendto('file', \$scalar, '/path/to/file');
1931 MCE->sendto('file', $scalar, '/path/to/file');
1932
1933 MCE->sendto('STDERR', \@array);
1934 MCE->sendto('STDERR', \$scalar);
1935 MCE->sendto('STDERR', $scalar);
1936
1937 MCE->sendto('STDOUT', \@array);
1938 MCE->sendto('STDOUT', \$scalar);
1939 MCE->sendto('STDOUT', $scalar);
1940
1941 Release 1.100 added the ability to pass multiple arguments. Notice the
1942 syntax change for sending to a file. Passing a reference to an array is
1943 no longer necessary.
1944
1945 MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1946 MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1947 MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1948
1949 MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1950
1951 To retain 1.00x compatibility, sendto outputs the content when a single
1952 data reference is specified. Otherwise, the reference for \@array or
1953 \$scalar is shown in 1.500, not the content.
1954
1955 MCE->sendto('STDERR', \@array); ## 1.00x behavior, content
1956 MCE->sendto('STDOUT', \$scalar);
1957 MCE->sendto('file:/path/to/file', \@array);
1958
1959 ## Output matches the print statement
1960
1961 MCE->sendto(\*STDERR, \@array); ## 1.500 behavior, reference
1962 MCE->sendto(\*STDOUT, \$scalar);
1963 MCE->sendto($fh, \@array);
1964
1965 MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1966 print {*STDOUT} \@array, "\n", \$scalar, "\n";
1967
1968 MCE 1.500 added support for sending to a glob reference, file
1969 descriptor, and file handle.
1970
1971 MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1972 MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1973 MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1974
1975 MCE->sync ( void )
1976 $mce->sync ( void )
1977 A barrier sync operation means any worker must stop at this point until
1978 all workers reach this barrier. Barrier syncing is useful for many
1979 computer algorithms.
1980
1981 Barrier synchronization is supported for task 0 only or omitting
1982 user_tasks. All workers assigned task_id 0 must call sync whenever
1983 barrier syncing.
1984
1985 use MCE;
1986
1987 sub user_func {
1988
1989 my ($mce) = @_;
1990 my $wid = MCE->wid;
1991
1992 MCE->sendto("STDOUT", "a: $wid\n"); ## MCE 1.0+
1993 MCE->sync;
1994
1995 MCE->sendto(\*STDOUT, "b: $wid\n"); ## MCE 1.5+
1996 MCE->sync;
1997
1998 MCE->print("c: $wid\n"); ## MCE 1.5+
1999 MCE->sync;
2000
2001 return;
2002 }
2003
2004 my $mce = MCE->new(
2005 max_workers => 4, user_func => \&user_func
2006 )->run;
2007
2008 -- Output (without barrier synchronization)
2009
2010 a: 1
2011 a: 2
2012 b: 1
2013 b: 2
2014 c: 1
2015 c: 2
2016 a: 3
2017 b: 3
2018 c: 3
2019 a: 4
2020 b: 4
2021 c: 4
2022
2023 -- Output (with barrier synchronization)
2024
2025 a: 1
2026 a: 2
2027 a: 4
2028 a: 3
2029 b: 2
2030 b: 1
2031 b: 3
2032 b: 4
2033 c: 1
2034 c: 4
2035 c: 2
2036 c: 3
2037
2038 Consider the following example. The MCE->sync operation is done inside
2039 a loop along with MCE->do. A stall may occur for workers calling sync
2040 the 2nd or 3rd time while other workers are sending results via MCE->do
2041 or MCE->sendto.
2042
2043 It requires another semaphore lock in MCE to solve this which was not
2044 done in order to keep resources low. Therefore, please keep this in
2045 mind when mixing MCE->sync with MCE->do or output serialization methods
2046 inside a loop.
2047
2048 sub user_func {
2049
2050 my ($mce) = @_;
2051 my @result;
2052
2053 for (1 .. 3) {
2054 ... compute algorithm ...
2055
2056 MCE->sync;
2057
2058 ... compute algorithm ...
2059
2060 MCE->sync;
2061
2062 MCE->do('aggregate_result', \@result); ## or MCE->sendto
2063
2064 MCE->sync; ## The sync operation is also needed here to
2065 ## prevent MCE from stalling.
2066 }
2067 }
2068
2069 MCE->yield ( void )
2070 $mce->yield ( void )
2071 There may be on occasion when the MCE driven app is too fast. The
2072 interval option combined with the yield method, both introduced with
2073 MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
2074 the design.
2075
2076 A use case is an app configured with 100 workers running on a 24
2077 logical way box. Data is polled from a database containing over 2.5
2078 million rows. Workers chunk away at 300 rows per chunk performing SNMP
2079 gets (300 sockets per worker) polling 25 metrics from each device. With
2080 this scenario, the load on the box may rise beyond 90+. In addition,
2081 IP_Tables may reach its contention point causing the entire application
2082 to fail.
2083
2084 The scenario above is solved by simply having workers yield among
2085 themselves in a synchronized fashion. A delay of 0.007 seconds between
2086 intervals is all that's needed. The load on the box will hover between
2087 23 ~ 27 for the duration of the run. Polling completes in under 17
2088 minutes time. This is quite fast considering the app polls 62.5 million
2089 metrics combined. The math equates to 3,676,470 per minute or rather
2090 61,275 per second from a single box.
2091
2092 ## Both max_nodes and node_id are optional (default 1).
2093
2094 interval => {
2095 delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
2096 }
2097
2098 A 4 node setup can poll 10 million devices without the additional
2099 overhead of a distribution agent. The difference between the 4 nodes
2100 are simply node_id and the where clause used to query the database. The
2101 mac addresses are random such that the data divides equally to any
2102 power of 2. The distribution key lies in the mac address itself. In
2103 fact, the 2nd character from the right is sufficient for maximizing on
2104 the power of randomness for equal distribution.
2105
2106 Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
2107 Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
2108 Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
2109 Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
2110
2111 Below, the user_tasks is configured to simulate 4 nodes. This
2112 demonstration uses 2 workers to minimize the output size. Input is from
2113 the sequence option.
2114
2115 use Time::HiRes qw(time);
2116 use MCE;
2117
2118 my $d = shift || 0.1;
2119
2120 local $| = 1;
2121
2122 sub create_task {
2123
2124 my ($node_id) = @_;
2125
2126 my $seq_size = 6;
2127 my $seq_start = ($node_id - 1) * $seq_size + 1;
2128 my $seq_end = $seq_start + $seq_size - 1;
2129
2130 return {
2131 max_workers => 2, sequence => [ $seq_start, $seq_end ],
2132 interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2133 };
2134 }
2135
2136 sub user_begin {
2137
2138 my ($mce, $task_id, $task_name) = @_;
2139
2140 ## The yield method causes this worker to wait for its next time
2141 ## interval slot before running. Yield has no effect without the
2142 ## 'interval' option.
2143
2144 ## Yielding is beneficial inside a user_begin block. A use case
2145 ## is staggering database connections among workers in order
2146 ## to not impact the DB server.
2147
2148 MCE->yield;
2149
2150 MCE->printf(
2151 "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2152 MCE->task_id + 1, time, MCE->task_wid, ''
2153 );
2154
2155 return;
2156 }
2157
2158 {
2159 my $prev_time = time;
2160
2161 sub user_func {
2162
2163 my ($mce, $seq_n, $chunk_id) = @_;
2164
2165 ## Yield simply waits for the next time interval.
2166 MCE->yield;
2167
2168 ## Calculate how long this worker has waited.
2169 my $curr_time = time;
2170 my $time_waited = $curr_time - $prev_time;
2171
2172 $prev_time = $curr_time;
2173
2174 MCE->printf(
2175 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2176 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2177 );
2178
2179 return;
2180 }
2181 }
2182
2183 ## Simulate a 4 node environment passing node_id to create_task.
2184
2185 print "Node_ID Current_Time Worker_ID Time_Waited Comment\n";
2186
2187 MCE->new(
2188 user_begin => \&user_begin,
2189 user_func => \&user_func,
2190
2191 user_tasks => [
2192 create_task(1),
2193 create_task(2),
2194 create_task(3),
2195 create_task(4)
2196 ]
2197
2198 )->run;
2199
2200 -- Output (notice Current_Time below, stays 0.10 apart)
2201
2202 Node_ID Current_Time Worker_ID Time_Waited Comment
2203 Node 1: 1374807976.74634 -- Worker 1: -- Started
2204 Node 2: 1374807976.84634 -- Worker 1: -- Started
2205 Node 3: 1374807976.94638 -- Worker 1: -- Started
2206 Node 4: 1374807977.04639 -- Worker 1: -- Started
2207 Node 1: 1374807977.14634 -- Worker 2: -- Started
2208 Node 2: 1374807977.24640 -- Worker 2: -- Started
2209 Node 3: 1374807977.34649 -- Worker 2: -- Started
2210 Node 4: 1374807977.44657 -- Worker 2: -- Started
2211 Node 1: 1374807977.54636 -- Worker 1: 0.90037 -- Seq_N 1
2212 Node 2: 1374807977.64638 -- Worker 1: 1.00040 -- Seq_N 7
2213 Node 3: 1374807977.74642 -- Worker 1: 1.10043 -- Seq_N 13
2214 Node 4: 1374807977.84643 -- Worker 1: 1.20045 -- Seq_N 19
2215 Node 1: 1374807977.94636 -- Worker 2: 1.30037 -- Seq_N 2
2216 Node 2: 1374807978.04638 -- Worker 2: 1.40040 -- Seq_N 8
2217 Node 3: 1374807978.14641 -- Worker 2: 1.50042 -- Seq_N 14
2218 Node 4: 1374807978.24644 -- Worker 2: 1.60045 -- Seq_N 20
2219 Node 1: 1374807978.34628 -- Worker 1: 0.79996 -- Seq_N 3
2220 Node 2: 1374807978.44631 -- Worker 1: 0.79996 -- Seq_N 9
2221 Node 3: 1374807978.54634 -- Worker 1: 0.79996 -- Seq_N 15
2222 Node 4: 1374807978.64636 -- Worker 1: 0.79997 -- Seq_N 21
2223 Node 1: 1374807978.74628 -- Worker 2: 0.79996 -- Seq_N 4
2224 Node 2: 1374807978.84632 -- Worker 2: 0.79997 -- Seq_N 10
2225 Node 3: 1374807978.94634 -- Worker 2: 0.79996 -- Seq_N 16
2226 Node 4: 1374807979.04636 -- Worker 2: 0.79996 -- Seq_N 22
2227 Node 1: 1374807979.14628 -- Worker 1: 0.80001 -- Seq_N 5
2228 Node 2: 1374807979.24631 -- Worker 1: 0.80000 -- Seq_N 11
2229 Node 3: 1374807979.34634 -- Worker 1: 0.80001 -- Seq_N 17
2230 Node 4: 1374807979.44636 -- Worker 1: 0.80000 -- Seq_N 23
2231 Node 1: 1374807979.54628 -- Worker 2: 0.80000 -- Seq_N 6
2232 Node 2: 1374807979.64631 -- Worker 2: 0.80000 -- Seq_N 12
2233 Node 3: 1374807979.74633 -- Worker 2: 0.80000 -- Seq_N 18
2234 Node 4: 1374807979.84636 -- Worker 2: 0.80000 -- Seq_N 24
2235
2236 The interval.pl example above is included with MCE.
2237
2239 The "progress" option takes a code block for receiving info on the
2240 progress made while processing input data; e.g. "input_data" or
2241 "sequence". To make this work, one provides the "progress" option a
2242 closure block like so, passing along the size of the input_data; e.g
2243 "scalar @array" or "-s /path/to/file".
2244
2245 Current API available since 1.813.
2246
2247 A worker, upon completing processing its chunk, notifies the manager-
2248 process with the size of the chunk. That could be the number of rows or
2249 literally the size of the chunk when processing an input file. The
2250 manager-process accumulates the size before calling the code block
2251 associated with the "progress" option.
2252
2253 When running many tasks simultaneously, via "user_tasks", the call is
2254 initiated by workers at level 0 only or rather the first task, not
2255 shown here.
2256
2257 use Time::HiRes 'sleep';
2258 use MCE;
2259
2260 sub make_progress {
2261 my ($total_size) = @_;
2262 return sub {
2263 my ($completed_size) = @_;
2264 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2265 };
2266 }
2267
2268 my @input = (1..150);
2269
2270 MCE->new(
2271 chunk_size => 10,
2272 max_workers => 4,
2273 input_data => \@input,
2274 progress => make_progress( scalar @input ),
2275 user_func => sub { sleep 1.5 }
2276 )->run();
2277
2278 -- Output
2279
2280 6.7%
2281 13.3%
2282 20.0%
2283 26.7%
2284 33.3%
2285 40.0%
2286 46.7%
2287 53.3%
2288 60.0%
2289 66.7%
2290 73.3%
2291 80.0%
2292 86.7%
2293 93.3%
2294 100.0%
2295
2296 Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2297 thing, practically.
2298
2299 use Time::HiRes 'sleep';
2300 use ProgressBar::Stack;
2301 use MCE::Flow;
2302
2303 sub make_progress {
2304 my ($total_size) = @_;
2305 init_progress();
2306 return sub {
2307 my ($completed_size) = @_;
2308 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2309 };
2310 }
2311
2312 my @input = (1..150);
2313
2314 MCE::Flow->init(
2315 chunk_size => 10,
2316 max_workers => 4,
2317 progress => make_progress( scalar @input )
2318 );
2319
2320 MCE::Flow->run( sub { sleep 1.5 }, \@input );
2321 MCE::Flow->finish();
2322
2323 print "\n";
2324
2325 -- Output
2326
2327 [################ ] 80.0% ETA: 0:01
2328
2329 For sequence of numbers, using the "sequence" option, one must account
2330 for "step_size", typically set to 1 automatically.
2331
2332 use Time::HiRes 'sleep';
2333 use MCE;
2334
2335 sub make_progress {
2336 my ($total_size) = @_;
2337 return sub {
2338 my ($completed_size) = @_;
2339 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2340 };
2341 }
2342
2343 MCE->new(
2344 chunk_size => 10,
2345 max_workers => 4,
2346 sequence => [ 1, 100, 2 ],
2347 progress => make_progress( int( 100 / 2 + 0.5 ) ),
2348 user_func => sub { sleep 1.5 }
2349 )->run();
2350
2351 -- Output
2352
2353 20.0%
2354 40.0%
2355 60.0%
2356 80.0%
2357 100.0%
2358
2359 Changing "chunk_size" to 1 means workers notify the manager process
2360 more often, thus increasing granularity. Take a look at the output.
2361
2362 2.0%
2363 4.0%
2364 6.0%
2365 8.0%
2366 10.0%
2367 ...
2368 92.0%
2369 94.0%
2370 96.0%
2371 98.0%
2372 100.0%
2373
2374 Here is the same thing using MCE::Flow together with
2375 ProgressBar::Stack.
2376
2377 use Time::HiRes 'sleep';
2378 use ProgressBar::Stack;
2379 use MCE::Flow;
2380
2381 sub make_progress {
2382 my ($total_size) = @_;
2383 init_progress();
2384 return sub {
2385 my ($completed_size) = @_;
2386 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2387 };
2388 }
2389
2390 MCE::Flow->init(
2391 chunk_size => 1,
2392 max_workers => 4,
2393 progress => make_progress( int( 100 / 2 + 0.5 ) )
2394 );
2395
2396 MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2397 MCE::Flow->finish();
2398
2399 print "\n";
2400
2401 -- Output
2402
2403 [######### ] 48.0% ETA: 0:03
2404
2405 For files and file handles, workers send the actual size of the data
2406 read versus counting rows.
2407
2408 use Time::HiRes 'sleep';
2409 use MCE;
2410
2411 sub make_progress {
2412 my ($total_size) = @_;
2413 return sub {
2414 my ($completed_size) = @_;
2415 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2416 };
2417 }
2418
2419 my $input_file = "/path/to/input_file.txt";
2420
2421 MCE->new(
2422 chunk_size => 5,
2423 max_workers => 4,
2424 input_data => $input_file,
2425 progress => make_progress( -s $input_file ),
2426 user_func => sub { sleep 0.03 }
2427 )->run();
2428
2429 For consistency, here is the example using MCE::Flow, again with
2430 ProgressBar::Stack.
2431
2432 use Time::HiRes 'sleep';
2433 use ProgressBar::Stack;
2434 use MCE::Flow;
2435
2436 sub make_progress {
2437 my ($total_size) = @_;
2438 init_progress();
2439 return sub {
2440 my ($completed_size) = @_;
2441 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2442 };
2443 }
2444
2445 my $input_file = "/path/to/input_file.txt";
2446
2447 MCE::Flow->init(
2448 chunk_size => 5,
2449 max_workers => 4,
2450 progress => make_progress( -s $input_file )
2451 );
2452
2453 MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2454 MCE::Flow->finish();
2455
2456 The next demonstration processes three arrays consecutively. For this
2457 one, MCE workers persist after running. This needs MCE 1.814 or later
2458 to run. Otherwise, the progress output is not shown in MCE 1.813.
2459
2460 use Time::HiRes 'sleep';
2461 use ProgressBar::Stack;
2462 use MCE;
2463
2464 sub make_progress {
2465 my ($total_size, $message) = @_;
2466 init_progress();
2467 return sub {
2468 my ($completed_size) = @_;
2469 update_progress(
2470 sprintf("%0.1f", $completed_size / $total_size * 100),
2471 $message
2472 );
2473 };
2474 }
2475
2476 my $mce = MCE->new(
2477 chunk_size => 10,
2478 max_workers => 4,
2479 user_func => sub { sleep 0.5 }
2480 )->spawn();
2481
2482 my @a1 = ( 1 .. 200 );
2483 my @a2 = ( 1 .. 500 );
2484 my @a3 = ( 1 .. 300 );
2485
2486 $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2487
2488 print "\n";
2489
2490 $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2491
2492 print "\n";
2493
2494 $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2495
2496 print "\n";
2497
2498 $mce->shutdown;
2499
2500 -- Output
2501
2502 [####################] 100.0% ETA: 0:00 array 1
2503 [####################] 100.0% ETA: 0:00 array 2
2504 [####################] 100.0% ETA: 0:00 array 3
2505
2506 When size is not know, such as reading from "STDIN", the only thing one
2507 can do is report the size completed thus far.
2508
2509 # 1 kibibyte equals 1024 bytes
2510
2511 progress => sub {
2512 my ($completed_size) = @_;
2513 printf "%0.1f kibibytes\n", $completed_size / 1024;
2514 }
2515
2517 • MCE::Examples
2518
2520 MCE
2521
2523 Mario E. Roy, <marioeroy AT gmail DOT com>
2524
2525
2526
2527perl v5.34.0 2022-02-20 MCE::Core(3)