1MCE::Core(3) User Contributed Perl Documentation MCE::Core(3)
2
3
4
6 MCE::Core - Documentation describing the core MCE API
7
9 This document describes MCE::Core version 1.884
10
12 This is a simplistic use case of MCE running with 5 workers.
13
14 # Construction using the Core API
15
16 use MCE;
17
18 my $mce = MCE->new(
19 max_workers => 5,
20 user_func => sub {
21 my ($mce) = @_;
22 $mce->say("Hello from " . $mce->wid);
23 }
24 );
25
26 $mce->run;
27
28 # Construction using a MCE model
29
30 use MCE::Flow max_workers => 5;
31
32 mce_flow sub {
33 my ($mce) = @_;
34 MCE->say("Hello from " . MCE->wid);
35 };
36
37 -- Output
38
39 Hello from 2
40 Hello from 4
41 Hello from 5
42 Hello from 1
43 Hello from 3
44
45 MCE->new ( [ options ] )
46 Below, a new instance is configured with all available options.
47
48 use MCE;
49
50 my $mce = MCE->new(
51
52 max_workers => 8, # Default 1
53
54 # Number of workers to spawn.
55
56 # MCE sets an upper-limit of 8 for 'auto'. MCE 1.521+.
57 # max_workers => 'auto', # # of lcores, 8 maximum
58 # max_workers => 'auto-1', # 7 on HW with 16 lcores
59 # max_workers => 'auto-1', # 3 on HW with 4 lcores
60
61 # Specify a percentage. MCE 1.875+.
62 # max_workers => '25%', # 4 on HW with 16 lcores
63 # max_workers => '50%', # 8 on HW with 16 lcores
64
65 # Run on all logical cores.
66 # max_workers => MCE::Util::get_ncpu(),
67
68 chunk_size => 2000, # Default 1
69
70 # Can also take a suffix; k (kibiBytes) or m (mebiBytes).
71 # The default is 1 when using the Core API and 'auto' for
72 # MCE Models. For arrays or queues, chunk_size means the
73 # number of records per chunk. For iterators, MCE will not
74 # use chunk_size, though the iterator may use it to determine
75 # how much to return per iteration. For files, smaller than or
76 # equal to 8192 is the number of records. Greater than 8192
77 # is the number of bytes. MCE reads until the end of record
78 # before calling user_func.
79
80 # chunk_size => 1, # Consists of 1 record
81 # chunk_size => 1000, # Consists of 1000 records
82 # chunk_size => '16k', # Approximate 16 kibiBytes (KiB)
83 # chunk_size => '20m', # Approximate 20 mebiBytes (MiB)
84
85 tmp_dir => $tmp_dir, # Default $MCE::Signal::tmp_dir
86
87 # Default is $MCE::Signal::tmp_dir which points to
88 # $ENV{TEMP} if defined. Otherwise, tmp_dir points
89 # to a location under /tmp.
90
91 freeze => \&encode_sereal, # Default \&Storable::freeze
92 thaw => \&decode_sereal, # Default \&Storable::thaw
93
94 # Release 1.412 allows freeze and thaw to be overridden.
95 # Simply include a serialization module prior to loading
96 # MCE. Configure freeze/thaw options.
97
98 # use Sereal qw( encode_sereal decode_sereal );
99 # use CBOR::XS qw( encode_cbor decode_cbor );
100 # use JSON::XS qw( encode_json decode_json );
101 #
102 # use MCE;
103
104 gather => \@a, # Default undef
105
106 # Release 1.5 allows for gathering of data to an array or
107 # hash reference, a MCE::Queue/Thread::Queue object, or code
108 # reference. One invokes gathering by calling the gather
109 # method as often as needed.
110
111 # gather => \@array,
112 # gather => \%hash,
113 # gather => $queue,
114 # gather => \&order,
115
116 init_relay => 0, # Default undef
117
118 # For specifying the initial relay value. Allowed values
119 # are array_ref, hash_ref, or scalar. The MCE::Relay module
120 # is loaded automatically when specified.
121
122 # init_relay => \@array,
123 # init_relay => \%hash,
124 # init_relay => scalar,
125
126 input_data => $input_file, # Default undef
127 RS => "\n>", # Default undef
128
129 # input_data => '/path/to/file' # Process file
130 # input_data => \@array # Process array
131 # input_data => \*FILE_HNDL # Process file handle
132 # input_data => $io # Process IO::All { File, Pipe, STDIO }
133 # input_data => \$scalar # Treated like a file
134 # input_data => \&iterator # User specified iterator
135
136 # The RS option (for input record separator) applies to files
137 # and file handles.
138
139 # MCE applies additional logic when RS begins with a newline
140 # character; e.g. RS => "\n>". It trims away characters after
141 # the newline and prepends them to the next record.
142 #
143 # Typically, the left side is what happens for $/ = "\n>".
144 # The right side is what user_func receives.
145 #
146 # All records begin with > and end with \n
147 # Record 1: >seq1 ... \n> (to) >seq1 ... \n
148 # Record 2: seq2 ... \n> >seq2 ... \n
149 # Record 3: seq3 ... \n> >seq3 ... \n
150 # Last Rec: seqN ... \n >seqN ... \n
151
152 loop_timeout => 20, # Default 0
153
154 # Added in 1.7, enables the manager process to timeout of a read
155 # operation on channel 0 (UNIX platforms only). The manager process
156 # decrements the total workers running for any worker which have
157 # died in an uncontrollable manner. Specify this option if on
158 # occassion a worker dies unexpectedly (i.e. from an XS module).
159
160 # Option works with init_relay on UNIX platforms since MCE 1.844.
161 # A number smaller than 5 is silently increased to 5.
162
163 max_retries => 2, # Default 0
164
165 # This option, added in 1.7, causes MCE to retry a failed
166 # chunk from a worker dying while processing input data or
167 # sequence of numbers.
168
169 parallel_io => 1, # Default 0
170 posix_exit => 1, # Default 0
171 use_slurpio => 1, # Default 0
172
173 # The parallel_io option enables parallel reads during large
174 # slurpio, useful when reading from fast storage. Do not enable
175 # parallel_io when running MCE on many nodes with input coming
176 # from shared storage.
177
178 # Set posix_exit to avoid all END and destructor processing.
179 # Constructing MCE inside a thread implies 1 or if present CGI,
180 # FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
181 # Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
182
183 # Enable slurpio to pass the raw chunk (scalar ref) to the user
184 # function when reading input files.
185
186 use_threads => 1, # Auto 0 or 1
187
188 # By default MCE spawns child processes on UNIX platforms and
189 # threads on Windows (i.e. $^O eq 'MSWin32').
190
191 # MCE supports threads via two threading libraries if threads
192 # is preferred over child processes. The use of threads requires
193 # a thread library prior to loading MCE, causing the use_threads
194 # option to default to 1. Specify 0 for child processes.
195 #
196 # use threads; use forks;
197 # use threads::shared; use forks::shared;
198 # use MCE (or) use MCE; (or) use MCE;
199
200 spawn_delay => 0.045, # Default undef
201 submit_delay => 0.015, # Default undef
202 job_delay => 0.060, # Default undef
203
204 # Time to wait in fractional seconds after spawning a worker,
205 # after submitting parameters to worker (MCE->run, MCE->process),
206 # and worker running (one time staggered delay).
207
208 # Specify job_delay to stagger workers connecting to a database.
209
210 on_post_exit => \&on_post_exit, # Default undef
211 on_post_run => \&on_post_run, # Default undef
212
213 # Execute the code block after a worker exits or dies.
214 # (i.e. MCE->exit, exit, die)
215
216 # Execute the code block after running.
217 # (i.e. MCE->process, MCE->run)
218
219 progress => sub { ... }, # Default undef
220
221 # A code block for receiving info on the progress made.
222 # See section labeled "MCE PROGRESS DEMONSTRATIONS" at the
223 # end of this document.
224
225 user_args => { env => 'test' }, # Default undef
226
227 # MCE release 1.4 added a new parameter to allow one to
228 # specify arbitrary arguments such as a string, an ARRAY
229 # or HASH reference. Workers can access this directly.
230 # (i.e. my $args = $mce->{user_args} or MCE->user_args)
231
232 user_begin => \&user_begin, # Default undef
233 user_func => \&user_func, # Default undef
234 user_end => \&user_end, # Default undef
235
236 # Think of user_begin, user_func, and user_end as in
237 # the awk scripting language:
238 # awk 'BEGIN { begin } { func } { func } ... END { end }'
239
240 # MCE workers call user_begin once at the start of a job,
241 # then user_func repeatedly until no chunks remain.
242 # Afterwards, user_end is called.
243
244 user_error => \&user_error, # Default undef
245 user_output => \&user_output, # Default undef
246
247 # MCE will forward data to user_error/user_output,
248 # when defined, for the following methods.
249
250 # MCE->sendto(\*STDERR, "sent to user_error\n");
251 # MCE->printf(\*STDERR, "%s\n", "sent to user_error");
252 # MCE->print(\*STDERR, "sent to user_error\n");
253 # MCE->say(\*STDERR, "sent to user_error");
254
255 # MCE->sendto(\*STDOUT, "sent to user_output\n");
256 # MCE->printf("%s\n", "sent to user_output");
257 # MCE->print("sent to user_output\n");
258 # MCE->say("sent to user_output");
259
260 stderr_file => 'err_file', # Default STDERR
261 stdout_file => 'out_file', # Default STDOUT
262
263 # Or to file; user_error and user_output take precedence.
264
265 flush_file => 0, # Default 1
266 flush_stderr => 0, # Default 1
267 flush_stdout => 0, # Default 1
268
269 # Flush sendto file, standard error, or standard output.
270
271 interval => {
272 delay => 0.007 [, max_nodes => 4, node_id => 1 ]
273 },
274
275 # For use with the yield method introduced in MCE 1.5.
276 # Both max_nodes & node_id are optional and default to 1.
277 # Delay is the amount of time between intervals.
278
279 # interval => 0.007 # Shorter; MCE 1.506+
280
281 sequence => { # Default undef
282 begin => -1, end => 1 [, step => 0.1 [, format => "%4.1f" ] ]
283 },
284
285 bounds_only => 1, # Default undef
286
287 # For looping through a sequence of numbers in parallel.
288 # STEP, if omitted, defaults to 1 if BEGIN is smaller than
289 # END or -1 if BEGIN is greater than END. The FORMAT string
290 # is passed to sprintf behind the scene (% may be omitted).
291 # e.g. $seq_n_formatted = sprintf("%4.1f", $seq_n);
292
293 # Do not specify both options; input_data and sequence.
294 # Release 1.4 allows one to specify an array reference.
295 # e.g. sequence => [ -1, 1, 0.1, "%4.1f" ]
296
297 # The bounds_only => 1 option will compute the 'begin' and
298 # 'end' items only for the chunk and not the items in between
299 # (hence boundaries only). This option has no effect when
300 # sequence is not specified or chunk_size equals 1.
301
302 # my $begin = $chunk_ref->[0]; my $end = $chunk_ref->[1];
303
304 task_end => \&task_end, # Default undef
305
306 # This is called by the manager process after the task
307 # has completed processing. MCE 1.5 allows this option
308 # to be specified at the top level.
309
310 task_name => 'string', # Default 'MCE'
311
312 # Added in MCE 1.5 and mainly beneficial for user_tasks.
313 # One may specify a unique name per each sub-task.
314 # The string is passed as the 3rd arg to task_end.
315
316 user_tasks => [ # Default undef
317 { ... }, # Options for task 0
318 { ... }, # Options for task 1
319 { ... }, # Options for task 2
320 ],
321
322 # Takes a list of hash references, each allowing up to 17
323 # options. All other MCE options are ignored. The init_relay,
324 # input_data, RS, and use_slurpio options are applicable to
325 # the first task only.
326
327 # max_workers, chunk_size, input_data, interval, sequence,
328 # bounds_only, user_args, user_begin, user_end, user_func,
329 # gather, task_end, task_name, use_slurpio, use_threads,
330 # init_relay, RS
331
332 # Options not specified here will default to same option
333 # specified at the top level.
334 );
335
336 EXPORT_CONST, CONST
337 There are 3 constants which are exportable. Using the constants in lieu
338 of 0,1,2 makes it more legible when accessing the user_func arguments
339 directly.
340
341 SELF CHUNK CID - MCE CONSTANTS
342
343 Exports SELF => 0, CHUNK => 1, and CID => 2.
344
345 use MCE export_const => 1;
346 use MCE const => 1; # Shorter; MCE 1.415+
347
348 user_func => sub {
349 # my ($mce, $chunk_ref, $chunk_id) = @_;
350 print "Hello from ", $_[SELF]->wid, "\n";
351 }
352
353 MCE 1.5 allows all public method to be called directly.
354
355 use MCE;
356
357 user_func => sub {
358 # my ($mce, $chunk_ref, $chunk_id) = @_;
359 print "Hello from ", MCE->wid, "\n";
360 }
361
362 OVERRIDING DEFAULTS
363 The following list options which may be overridden when loading the
364 module.
365
366 use Sereal qw( encode_sereal decode_sereal );
367 use CBOR::XS qw( encode_cbor decode_cbor );
368 use JSON::XS qw( encode_json decode_json );
369
370 use MCE
371 max_workers => 4, # Default 1
372 chunk_size => 100, # Default 1
373 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
374 freeze => \&encode_sereal, # \&Storable::freeze
375 thaw => \&decode_sereal, # \&Storable::thaw
376 init_relay => 0, # Default undef; MCE 1.882+
377 use_threads => 0, # Default undef; MCE 1.882+
378 ;
379
380 my $mce = MCE->new( ... );
381
382 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
383 available. Specify "Sereal => 0" to use Storable instead.
384
385 use MCE Sereal => 0;
386
387 RUNNING
388 Run calls spawn, submits the job; workers call user_begin, user_func,
389 and user_end. Run shuts down workers afterwards. Call spawn whenever
390 the need arises for large data structures prior to running.
391
392 $mce->spawn; # Call early if desired
393
394 $mce->run; # Call run or process below
395
396 # Acquire data arrays and/or input_files. Workers persist after
397 # processing.
398
399 $mce->process(\@input_data_1); # Process array
400 $mce->process(\@input_data_2);
401 $mce->process(\@input_data_n);
402
403 $mce->process(\%input_hash_1); # Process hash, current API
404 $mce->process(\%input_hash_2); # available since 1.828
405 $mce->process(\%input_hash_n);
406
407 $mce->process('input_file_1'); # Process file
408 $mce->process('input_file_2');
409 $mce->process('input_file_n');
410
411 $mce->shutdown; # Shutdown workers
412
413 SYNTAX for ON_POST_EXIT
414 Often times, one may want to capture the exit status. The on_post_exit
415 option, if defined, is executed immediately by the manager process
416 after a worker exits via exit (children only), MCE->exit (children and
417 threads), or die.
418
419 The format of $e->{pid} is PID_123 for children and THR_123 for
420 threads.
421
422 my $restart_flag = 1;
423
424 sub on_post_exit {
425 my ($mce, $e) = @_;
426
427 # Display all possible hash elements.
428 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
429
430 # Restart this worker if desired.
431 if ($restart_flag && $e->{wid} == 2) {
432 $mce->restart_worker;
433 $restart_flag = 0;
434 }
435 }
436
437 sub user_func {
438 my ($mce) = @_;
439 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); # Args, not necessary
440 }
441
442 my $mce = MCE->new(
443 on_post_exit => \&on_post_exit,
444 user_func => \&user_func,
445 max_workers => 3
446 );
447
448 $mce->run;
449
450 -- Output (child processes)
451
452 2: PID_33223: 0: msg_foo: 1002
453 1: PID_33222: 0: msg_foo: 1001
454 3: PID_33224: 0: msg_foo: 1003
455 2: PID_33225: 0: msg_foo: 1002
456
457 -- Output (running with threads)
458
459 3: TID_3: 0: msg_foo: 1003
460 2: TID_2: 0: msg_foo: 1002
461 1: TID_1: 0: msg_foo: 1001
462 2: TID_4: 0: msg_foo: 1002
463
464 SYNTAX for ON_POST_RUN
465 The on_post_run option, if defined, is executed immediately by the
466 manager process after running MCE->process or MCE->run. This option
467 receives an array reference of hashes.
468
469 The difference between on_post_exit and on_post_run is that the former
470 is called immediately whereas the latter is called after all workers
471 have completed running.
472
473 sub on_post_run {
474 my ($mce, $status_ref) = @_;
475 foreach my $e ( @{ $status_ref } ) {
476 # Display all possible hash elements.
477 print "$e->{wid}: $e->{pid}: $e->{status}: $e->{msg}: $e->{id}\n";
478 }
479 }
480
481 sub user_func {
482 my ($mce) = @_;
483 MCE->exit(0, 'msg_foo', 1000 + MCE->wid); # Args, not necessary
484 }
485
486 my $mce = MCE->new(
487 on_post_run => \&on_post_run,
488 user_func => \&user_func,
489 max_workers => 3
490 );
491
492 $mce->run;
493
494 -- Output (child processes)
495
496 3: PID_33174: 0: msg_foo: 1003
497 1: PID_33172: 0: msg_foo: 1001
498 2: PID_33173: 0: msg_foo: 1002
499
500 -- Output (running with threads)
501
502 2: TID_2: 0: msg_foo: 1002
503 3: TID_3: 0: msg_foo: 1003
504 1: TID_1: 0: msg_foo: 1001
505
506 SYNTAX for INPUT_DATA
507 MCE supports many ways to specify input_data. Support for iterators was
508 added in MCE 1.505. The RS option allows one to specify the record
509 separator when processing files.
510
511 MCE is a chunking engine. Therefore, chunk_size is applicable to
512 input_data. Specifying 1 for use_slurpio causes user_func to receive a
513 scalar reference containing the raw data (applicable to files only)
514 instead of an array reference.
515
516 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
517
518 input_data => '/path/to/file', # process file
519 input_data => \@array, # process array
520 input_data => \%hash, # process hash, API since 1.828
521 input_data => \*FILE_HNDL, # process file handle
522 input_data => $fh, # open $fh, "<", "file"
523 input_data => $fh, # IO::File "file", "r"
524 input_data => $fh, # IO::Uncompress::Gunzip "file.gz"
525 input_data => $io, # IO::All { File, Pipe, STDIO }
526 input_data => \$scalar, # treated like a file
527 input_data => \&iterator, # user specified iterator
528
529 chunk_size => 1, # >1 means looping inside user_func
530 use_slurpio => 1, # $chunk_ref is a scalar ref
531 RS => "\n>", # input record separator
532
533 The chunk_size value determines the chunking mode to use when
534 processing files. Otherwise, chunk_size is the number of elements for
535 arrays. For files, a chunk size value of <= 8192 is how many records to
536 read. Greater than 8192 is how many bytes to read. MCE appends (the
537 rest) up to the next record separator.
538
539 chunk_size => 8192, # Consists of 8192 records
540 chunk_size => 8193, # Approximate 8193 bytes for files
541
542 chunk_size => 1, # Consists of 1 record or element
543 chunk_size => 1000, # Consists of 1000 records
544 chunk_size => '16k', # Approximate 16 kibiBytes (KiB)
545 chunk_size => '20m', # Approximate 20 mebiBytes (MiB)
546
547 The construction for user_func when chunk_size > 1 and assuming
548 use_slurpio equals 0.
549
550 user_func => sub {
551 my ($mce, $chunk_ref, $chunk_id) = @_;
552
553 # $_ is $chunk_ref->[0] when chunk_size equals 1
554 # $_ is $chunk_ref otherwise; $_ can be used below
555
556 for my $record ( @{ $chunk_ref } ) {
557 print "$chunk_id: $record\n";
558 }
559 }
560
561 # input_data => \%hash
562 # current API available since 1.828
563
564 user_func => sub {
565 my ($mce, $chunk_ref, $chunk_id) = @_;
566
567 # $_ points to $chunk_ref regardless of chunk_size
568
569 for my $key ( keys %{ $chunk_ref } ) {
570 print "$key: ", $chunk_ref->{$key}, "\n";
571 }
572 }
573
574 Specifying a value for input_data is straight forward for arrays and
575 files. The next several examples specify an iterator reference for
576 input_data.
577
578 use MCE;
579
580 # A factory function which creates a closure (the iterator itself)
581 # for generating a sequence of numbers. The external variables
582 # ($n, $max, $step) are used for keeping state across successive
583 # calls to the closure. The iterator simply returns when $n > max.
584
585 sub input_iterator {
586 my ($n, $max, $step) = @_;
587
588 return sub {
589 return if $n > $max;
590
591 my $current = $n;
592 $n += $step;
593
594 return $current;
595 };
596 }
597
598 # Run user_func in parallel. Input data can be specified during
599 # the construction or as an argument to the process method.
600
601 my $mce = MCE->new(
602
603 # input_data => input_iterator(10, 30, 2),
604 chunk_size => 1, max_workers => 4,
605
606 user_func => sub {
607 my ($mce, $chunk_ref, $chunk_id) = @_;
608 MCE->print("$_: ", $_ * 2, "\n");
609 }
610
611 )->spawn;
612
613 $mce->process( input_iterator(10, 30, 2) );
614
615 -- Output Note that output order is not guaranteed
616 Take a look at iterator.pl for ordered output
617
618 10: 20
619 12: 24
620 16: 32
621 20: 40
622 14: 28
623 22: 44
624 18: 36
625 24: 48
626 26: 52
627 28: 56
628 30: 60
629
630 The following example queries the DB for the next 1000 rows. Notice the
631 use of fetchall_arrayref. The iterator function itself receives one
632 argument which is chunk_size (added in MCE 1.510) to determine how much
633 to return per iteration. The default is 1 for the Core API and MCE
634 Models.
635
636 use DBI;
637 use MCE;
638
639 sub db_iter {
640
641 my $dsn = "DBI:Oracle:host=db_server;port=db_port;sid=db_name";
642
643 my $dbh = DBI->connect($dsn, 'db_user', 'db_passwd') ||
644 die "Could not connect to database: $DBI::errstr";
645
646 my $sth = $dbh->prepare('select color, desc from table');
647
648 $sth->execute;
649
650 return sub {
651 my ($chunk_size) = @_;
652
653 if (my $aref = $sth->fetchall_arrayref(undef, $chunk_size)) {
654 return @{ $aref };
655 }
656
657 return;
658 };
659 }
660
661 # Let's enumerate column indexes for easy column retrieval.
662 my ($i_color, $i_desc) = (0 .. 1);
663
664 my $mce = MCE->new(
665 max_workers => 3, chunk_size => 1000,
666 input_data => db_iter(),
667
668 user_func => sub {
669 my ($mce, $chunk_ref, $chunk_id) = @_;
670 my $ret = '';
671
672 foreach my $row (@{ $chunk_ref }) {
673 $ret .= $row->[$i_color] .": ". $row->[$i_desc] ."\n";
674 }
675
676 MCE->print($ret);
677 }
678 );
679
680 $mce->run;
681
682 There are many modules on CPAN which return an iterator reference.
683 Showing one such example below. The demonstration ensures MCE workers
684 are spawned before obtaining the iterator. Note the worker_id value
685 (left column) in the output.
686
687 use Path::Iterator::Rule;
688 use MCE;
689
690 my $start_dir = shift
691 or die "Please specify a starting directory";
692
693 -d $start_dir
694 or die "Cannot open ($start_dir): No such file or directory";
695
696 my $mce = MCE->new(
697 max_workers => 'auto',
698 user_func => sub { MCE->say( MCE->wid . ": $_" ) }
699 )->spawn;
700
701 my $rule = Path::Iterator::Rule->new->file->name( qr/[.](pm)$/ );
702
703 my $iterator = $rule->iter(
704 $start_dir, { follow_symlinks => 0, depthfirst => 1 }
705 );
706
707 $mce->process( $iterator );
708
709 -- Output
710
711 8: lib/MCE/Core/Input/Generator.pm
712 5: lib/MCE/Core/Input/Handle.pm
713 6: lib/MCE/Core/Input/Iterator.pm
714 2: lib/MCE/Core/Input/Request.pm
715 3: lib/MCE/Core/Manager.pm
716 4: lib/MCE/Core/Input/Sequence.pm
717 7: lib/MCE/Core/Validation.pm
718 1: lib/MCE/Core/Worker.pm
719 8: lib/MCE/Flow.pm
720 5: lib/MCE/Grep.pm
721 6: lib/MCE/Loop.pm
722 2: lib/MCE/Map.pm
723 3: lib/MCE/Queue.pm
724 4: lib/MCE/Signal.pm
725 7: lib/MCE/Stream.pm
726 1: lib/MCE/Subs.pm
727 8: lib/MCE/Util.pm
728 5: lib/MCE.pm
729
730 Although MCE supports arrays, extra measures are needed to use a "lazy"
731 array as input data. The reason for this is that MCE needs the size of
732 the array before processing which may be unknown for lazy arrays.
733 Therefore, closures provides an excellent mechanism for this.
734
735 The code block belonging to the lazy array must return undef after
736 exhausting its input data. Otherwise, the process will never end.
737
738 use Tie::Array::Lazy;
739 use MCE;
740
741 tie my @a, 'Tie::Array::Lazy', [], sub {
742 my $i = $_[0]->index;
743
744 return ($i < 10) ? $i : undef;
745 };
746
747 sub make_iterator {
748 my $i = 0; my $a_ref = shift;
749
750 return sub {
751 return $a_ref->[$i++];
752 };
753 }
754
755 my $mce = MCE->new(
756 max_workers => 4, input_data => make_iterator(\@a),
757
758 user_func => sub {
759 my ($mce, $chunk_ref, $chunk_id) = @_;
760 MCE->say($_);
761 }
762
763 )->run;
764
765 -- Output
766
767 0
768 1
769 2
770 3
771 4
772 6
773 7
774 8
775 5
776 9
777
778 The following demonstrates how to retrieve a chunk from the lazy array
779 per each successive call. Here, undef is sent by the iterator block
780 when $i is greater than $max. Iterators may optionally use chunk_size
781 to determine how much to return per iteration.
782
783 use Tie::Array::Lazy;
784 use MCE;
785
786 tie my @a, 'Tie::Array::Lazy', [], sub {
787 $_[0]->index;
788 };
789
790 sub make_iterator {
791 my $j = 0; my ($a_ref, $max) = @_;
792
793 return sub {
794 my ($chunk_size) = @_;
795 my $i = $j; $j += $chunk_size;
796
797 return if $i > $max;
798 return $j <= $max ? @$a_ref[$i .. $j - 1] : @$a_ref[$i .. $max];
799 };
800 }
801
802 my $mce = MCE->new(
803 chunk_size => 15, max_workers => 4,
804 input_data => make_iterator(\@a, 100),
805
806 user_func => sub {
807 my ($mce, $chunk_ref, $chunk_id) = @_;
808 MCE->say("$chunk_id: " . join(' ', @{ $chunk_ref }));
809 }
810
811 )->run;
812
813 -- Output
814
815 1: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
816 2: 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
817 3: 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
818 4: 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
819 5: 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
820 6: 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
821 7: 90 91 92 93 94 95 96 97 98 99 100
822
823 SYNTAX for SEQUENCE
824 The 1.3 release and above allows workers to loop through a sequence of
825 numbers computed mathematically without the overhead of an array. The
826 sequence can be specified separately per each user_task entry unlike
827 input_data which is applicable to the first task only.
828
829 See the seq_demo.pl example, included with this distribution, on
830 applying sequences with the user_tasks option.
831
832 Sequence can be defined using an array or a hash reference.
833
834 use MCE;
835
836 my $mce = MCE->new(
837 max_workers => 3,
838
839 # sequence => [ 10, 19, 0.7, "%4.1f" ], # up to 4 options
840
841 sequence => {
842 begin => 10, end => 19, step => 0.7, format => "%4.1f"
843 },
844
845 user_func => sub {
846 my ($mce, $n, $chunk_id) = @_;
847 print $n, " from ", MCE->wid, " id ", $chunk_id, "\n";
848 }
849 );
850
851 $mce->run;
852
853 -- Output (sorted afterwards, notice wid and chunk_id in output)
854
855 10.0 from 1 id 1
856 10.7 from 2 id 2
857 11.4 from 3 id 3
858 12.1 from 1 id 4
859 12.8 from 2 id 5
860 13.5 from 3 id 6
861 14.2 from 1 id 7
862 14.9 from 2 id 8
863 15.6 from 3 id 9
864 16.3 from 1 id 10
865 17.0 from 2 id 11
866 17.7 from 3 id 12
867 18.4 from 1 id 13
868
869 The 1.5 release includes a new option (bounds_only). This option tells
870 the sequence engine to compute 'begin' and 'end' items only, for the
871 chunk, and not the items in between (hence boundaries only). This
872 option applies to sequence only and has no effect when chunk_size
873 equals 1.
874
875 The time to run is 0.006s below. This becomes 0.827s without the
876 bounds_only option due to computing all items in between, thus creating
877 a very large array. Basically, specify bounds_only => 1 when boundaries
878 is all you need for looping inside the block; e.g. Monte Carlo
879 simulations.
880
881 Time was measured using 1 worker to emphasize the difference.
882
883 use MCE;
884
885 my $mce = MCE->new(
886 max_workers => 1, chunk_size => 1_250_000,
887
888 sequence => { begin => 1, end => 10_000_000 },
889 bounds_only => 1,
890
891 # For sequence, the input scalar $_ points to $chunk_ref
892 # when chunk_size > 1, otherwise $chunk_ref->[0].
893 #
894 # user_func => sub {
895 # my $begin = $_->[0]; my $end = $_->[-1];
896 #
897 # for ($begin .. $end) {
898 # ...
899 # }
900 # },
901
902 user_func => sub {
903 my ($mce, $chunk_ref, $chunk_id) = @_;
904 # $chunk_ref contains 2 items, not 1_250_000
905
906 my $begin = $chunk_ref->[ 0];
907 my $end = $chunk_ref->[-1]; # or $chunk_ref->[1]
908
909 MCE->printf("%7d .. %8d\n", $begin, $end);
910 }
911 );
912
913 $mce->run;
914
915 -- Output
916
917 1 .. 1250000
918 1250001 .. 2500000
919 2500001 .. 3750000
920 3750001 .. 5000000
921 5000001 .. 6250000
922 6250001 .. 7500000
923 7500001 .. 8750000
924 8750001 .. 10000000
925
926 SYNTAX for MAX_RETRIES
927 The max_retries option, added in 1.7, allows MCE to retry a failed
928 chunk from a worker dying while processing input data or a sequence of
929 numbers.
930
931 When max_retries is set, MCE configures the on_post_exit option
932 automatically using the following code before running. Specify
933 on_post_exit explicitly for any further tailoring. The restart_worker
934 line is necessary, obviously.
935
936 on_post_exit => sub {
937 my ( $mce, $e, $retry_cnt ) = @_;
938
939 if ( $e->{id} ) {
940 my $cnt = $retry_cnt + 1;
941 my $msg = "Error: chunk $e->{id} failed";
942
943 if ( defined $mce->{init_relay} ) {
944 print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
945 if ( $retry_cnt < $mce->{max_retries} );
946 }
947 else {
948 ( $retry_cnt < $mce->{max_retries} )
949 ? print {*STDERR} "$msg, retrying chunk attempt # $cnt\n"
950 : print {*STDERR} "$msg\n";
951 }
952
953 $mce->restart_worker;
954 }
955 }
956
957 We let MCE handle on_post_exit automatically below, which is
958 essentially the same code shown above. For max_retries to work, the
959 worker must die, abnormally included, or call MCE->exit. Notice that we
960 pass the chunk_id value for the 3rd argument to MCE->exit (defaults to
961 chunk_id if omitted since MCE 1.844).
962
963 # max_retries demonstration
964
965 use strict;
966 use warnings;
967
968 use MCE;
969
970 sub user_func {
971 my ( $mce, $chunk_ref, $chunk_id ) = @_;
972
973 # die "Died : chunk_id = 3\n" if $chunk_id == 3;
974 MCE->exit(1, undef, $chunk_id) if $chunk_id == 3;
975
976 print "$chunk_id\n";
977 }
978
979 my $mce = MCE->new(
980 max_workers => 1,
981 max_retries => 2,
982 user_func => \&user_func,
983 )->spawn;
984
985 my $input_data = [ 0..7 ];
986
987 $mce->process( { chunk_size => 1 }, $input_data );
988 $mce->shutdown;
989
990 -- Output
991
992 1
993 2
994 Error: chunk 3 failed, retrying chunk attempt # 1
995 Error: chunk 3 failed, retrying chunk attempt # 2
996 Error: chunk 3 failed
997 4
998 5
999 6
1000 7
1001 8
1002
1003 Orderly output with max_retries is possible since MCE 1.844. Below,
1004 chunk 3 succeeds whereas chunk 5 fails due to exceeding the number of
1005 retries. Be sure to call MCE::relay inside "user_func" and near the end
1006 of the block.
1007
1008 # max_retries demonstration with init_relay
1009
1010 use strict;
1011 use warnings;
1012
1013 use MCE;
1014 use MCE::Shared;
1015
1016 tie my $retries1, 'MCE::Shared', 0;
1017 tie my $retries2, 'MCE::Shared', 0;
1018
1019 MCE->new(
1020 max_workers => 4,
1021 input_data => [ 1..7 ],
1022 chunk_size => 1,
1023
1024 max_retries => 2,
1025 init_relay => 0,
1026
1027 user_func => sub {
1028 if ( MCE->chunk_id == 3 ) {
1029 MCE->exit if ++$retries1 <= 2;
1030 }
1031 if ( MCE->chunk_id == 5 ) {
1032 MCE->exit if ++$retries2 <= 3;
1033 }
1034 MCE::relay {
1035 $_ += 1;
1036 print MCE->chunk_id, "\n";
1037 };
1038 }
1039 )->run;
1040
1041 print "final: ", MCE::relay_final(), "\n";
1042
1043 -- Output
1044
1045 1
1046 2
1047 Error: chunk 3 failed, retrying chunk attempt # 1
1048 Error: chunk 5 failed, retrying chunk attempt # 1
1049 Error: chunk 3 failed, retrying chunk attempt # 2
1050 Error: chunk 5 failed, retrying chunk attempt # 2
1051 3
1052 4
1053 Error: chunk 5 failed
1054 6
1055 7
1056 final: 6
1057
1058 SYNTAX for USER_BEGIN and USER_END
1059 The user_begin and user_end options, if specified, behave similarly to
1060 awk 'BEGIN { begin } { func } { func } ... END { end }'. These are
1061 called once per worker during each run.
1062
1063 MCE 1.510 passes 2 additional parameters ($task_id and $task_name).
1064
1065 sub user_begin { # Called once at the beginning
1066 my ($mce, $task_id, $task_name) = @_;
1067 $mce->{wk_total_rows} = 0;
1068 }
1069
1070 sub user_func { # Called while processing
1071 my $mce = shift;
1072 $mce->{wk_total_rows} += 1;
1073 }
1074
1075 sub user_end { # Called once at the end
1076 my ($mce, $task_id, $task_name) = @_;
1077 printf "## %d: Processed %d rows\n",
1078 MCE->wid, $mce->{wk_total_rows};
1079 }
1080
1081 my $mce = MCE->new(
1082 user_begin => \&user_begin,
1083 user_func => \&user_func,
1084 user_end => \&user_end
1085 );
1086
1087 $mce->run;
1088
1089 SYNTAX for USER_FUNC with USE_SLURPIO => 0
1090 When processing input data, MCE can pass an array of rows or a slurped
1091 chunk. Below, a reference to an array containing the chunk data is
1092 processed.
1093
1094 e.g. $chunk_ref = [ record1, record2, record3, ... ]
1095
1096 sub user_func {
1097
1098 my ($mce, $chunk_ref, $chunk_id) = @_;
1099
1100 foreach my $row ( @{ $chunk_ref } ) {
1101 $mce->{wk_total_rows} += 1;
1102 print $row;
1103 }
1104 }
1105
1106 my $mce = MCE->new(
1107 chunk_size => 100,
1108 input_data => "/path/to/file",
1109 user_func => \&user_func,
1110 use_slurpio => 0
1111 );
1112
1113 $mce->run;
1114
1115 SYNTAX for USER_FUNC with USE_SLURPIO => 1
1116 Here, a reference to a scalar containing the raw chunk data is
1117 processed.
1118
1119 sub user_func {
1120
1121 my ($mce, $chunk_ref, $chunk_id) = @_;
1122
1123 my $count = () = $$chunk_ref =~ /abc/;
1124 }
1125
1126 my $mce = MCE->new(
1127 chunk_size => 16000,
1128 input_data => "/path/to/file",
1129 user_func => \&user_func,
1130 use_slurpio => 1
1131 );
1132
1133 $mce->run;
1134
1135 SYNTAX for USER_ERROR and USER_OUTPUT
1136 Output from MCE->sendto('STDERR/STDOUT', ...), MCE->printf, MCE->print,
1137 and MCE->say can be intercepted by specifying the user_error and
1138 user_output options. MCE on receiving output will forward to user_error
1139 or user_output in a serialized fashion.
1140
1141 Handy when wanting to filter, modify, and/or direct the output
1142 elsewhere.
1143
1144 sub user_error { # Redirect STDERR to STDOUT
1145 my $error = shift;
1146 print {*STDOUT} $error;
1147 }
1148
1149 sub user_output { # Redirect STDOUT to STDERR
1150 my $output = shift;
1151 print {*STDERR} $output;
1152 }
1153
1154 sub user_func {
1155 my ($mce, $chunk_ref, $chunk_id) = @_;
1156 my $count = 0;
1157
1158 foreach my $row ( @{ $chunk_ref } ) {
1159 MCE->print($row);
1160 $count += 1;
1161 }
1162
1163 MCE->print(\*STDERR, "$chunk_id: processed $count rows\n");
1164 }
1165
1166 my $mce = MCE->new(
1167 chunk_size => 1000,
1168 input_data => "/path/to/file",
1169 user_error => \&user_error,
1170 user_output => \&user_output,
1171 user_func => \&user_func
1172 );
1173
1174 $mce->run;
1175
1176 SYNTAX for USER_TASKS and TASK_END
1177 This option takes an array of tasks. Each task allows up to 17 options.
1178 The init_relay, input_data, RS, and use_slurpio options may be defined
1179 inside the first task or at the top level, otherwise ignored under
1180 other sub-tasks.
1181
1182 max_workers, chunk_size, input_data, interval, sequence,
1183 bounds_only, user_args, user_begin, user_end, user_func,
1184 gather, task_end, task_name, use_slurpio, use_threads,
1185 init_relay, RS
1186
1187 Sequence and chunk_size were added in 1.3. User_args was introduced in
1188 1.4. Name and input_data are new options allowed in 1.5. In addition,
1189 one can specify task_end at the top level. Task_end also receives 2
1190 additional arguments $task_id and $task_name (shown below).
1191
1192 Options not specified here will default to the same option specified at
1193 the top level. The task_end option is called by the manager process
1194 when all workers for that sub-task have completed processing.
1195
1196 Forking and threading can be intermixed among tasks unless running
1197 Cygwin. The run method will continue running until all workers have
1198 completed processing.
1199
1200 use threads;
1201 use threads::shared;
1202
1203 use MCE;
1204
1205 sub parallel_task1 { sleep 2; }
1206 sub parallel_task2 { sleep 1; }
1207
1208 my $mce = MCE->new(
1209
1210 task_end => sub {
1211 my ($mce, $task_id, $task_name) = @_;
1212 print "Task [$task_id -- $task_name] completed processing\n";
1213 },
1214
1215 user_tasks => [{
1216 task_name => 'foo',
1217 max_workers => 2,
1218 user_func => \¶llel_task1,
1219 use_threads => 0 # Not using threads
1220
1221 },{
1222 task_name => 'bar',
1223 max_workers => 4,
1224 user_func => \¶llel_task2,
1225 use_threads => 1 # Yes, threads
1226
1227 }]
1228 );
1229
1230 $mce->run;
1231
1232 -- Output
1233
1234 Task [1 -- bar] completed processing
1235 Task [0 -- foo] completed processing
1236
1238 Beginning with MCE 1.5, the input scalar $_ is localized prior to
1239 calling user_func for input_data and sequence of numbers. The following
1240 applies.
1241
1242 use_slurpio => 1
1243 $_ is a reference to the buffer e.g. $_ = \$_buffer;
1244 $_ is a reference regardless of whether chunk_size is 1 or greater
1245
1246 user_func => sub {
1247 # my ($mce, $chunk_ref, $chunk_id) = @_;
1248 print ${ $_ }; # $_ is same as $chunk_ref
1249 }
1250
1251 chunk_size is greater than 1, use_slurpio => 0
1252 $_ is a reference to an array. $_ = \@_records; $_ = \@_seq_n;
1253 $_ is same as $chunk_ref or $_[CHUNK]
1254
1255 user_func => sub {
1256 # my ($mce, $chunk_ref, $chunk_id) = @_;
1257 for my $row ( @{ $_ } ) {
1258 print $row, "\n";
1259 }
1260 }
1261
1262 use MCE const => 1;
1263
1264 user_func => sub {
1265 # my ($mce, $chunk_ref, $chunk_id) = @_;
1266 for my $row ( @{ $_[CHUNK] } ) {
1267 print $row, "\n";
1268 }
1269 }
1270
1271 chunk_size equals 1, use_slurpio => 0
1272 $_ contains the actual value. $_ = $_buffer; $_ = $seq_n;
1273
1274 # Note that $_ and $chunk_ref are not the same below.
1275 # $chunk_ref is a reference to an array.
1276
1277 user_func => sub {
1278 # my ($mce, $chunk_ref, $chunk_id) = @_;
1279 print $_, "\n; # Same as $chunk_ref->[0];
1280 }
1281
1282 $mce->foreach("/path/to/file", sub {
1283 # my ($mce, $chunk_ref, $chunk_id) = @_;
1284 print $_; # Same as $chunk_ref->[0];
1285 });
1286
1287 # However, that is not the case for the forseq method.
1288 # Both $_ and $n_seq are the same when chunk_size => 1.
1289
1290 $mce->forseq([ 1, 9 ], sub {
1291 # my ($mce, $n_seq, $chunk_id) = @_;
1292 print $_, "\n"; # Same as $n_seq
1293 });
1294
1295 Sequence can also be specified using an array reference. The below
1296 is the same as the example afterwards.
1297
1298 $mce->forseq( { begin => 10, end => 40, step => 2 }, ... );
1299
1300 The code block receives an array containing the next 5 sequences.
1301 Chunk 1 (chunk_id 1) contains 10,12,14,16,18. $n_seq is a reference
1302 to an array, same as $_, due to chunk_size being greater than 1.
1303
1304 $mce->forseq( [ 10, 40000, 2 ], { chunk_size => 5 }, sub {
1305 # my ($mce, $n_seq, $chunk_id) = @_;
1306 my @result;
1307 for my $n ( @{ $_ } ) {
1308 ... do work, append to result for 5
1309 }
1310 ... do something with result afterwards
1311 });
1312
1314 The methods listed below are callable by the main process and workers.
1315
1316 MCE->abort ( void )
1317 $mce->abort ( void )
1318 The 'abort' method is applicable when processing input_data only. This
1319 causes all workers to abort after processing the current chunk.
1320
1321 Workers write the next offset position to the queue socket for the next
1322 available worker. In essence, the 'abort' method writes the last offset
1323 position. Workers, on requesting the next offset position, will think
1324 the end of input_data has been reached and leave the chunking loop.
1325
1326 MCE->abort;
1327 $mce->abort;
1328
1329 MCE->chunk_id ( void )
1330 $mce->chunk_id ( void )
1331 Returns the chunk_id for the current chunk. The value starts at 1.
1332 Chunking applies to input_data or sequence. The value is 0 for the
1333 manager process.
1334
1335 my $chunk_id = MCE->chunk_id;
1336 my $chunk_id = $mce->chunk_id;
1337
1338 MCE->chunk_size ( void )
1339 $mce->chunk_size ( void )
1340 Getter method for chunk_size used by MCE.
1341
1342 my $chunk_size = MCE->chunk_size;
1343 my $chunk_size = $mce->chunk_size;
1344
1345 MCE->do ( 'callback_func' [, $arg1, ... ] )
1346 $mce->do ( 'callback_func' [, $arg1, ... ] )
1347 MCE serializes data transfers from a worker process via helper
1348 functions do & sendto to the manager process. The callback function can
1349 optionally return a reply. Support for calling by the manager process
1350 was enabled in MCE 1.839.
1351
1352 [ $reply = ] MCE->do('callback' [, $arg1, ... ]);
1353
1354 Passing args to a callback function using references & scalar.
1355
1356 sub callback {
1357 my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
1358 ...
1359 }
1360
1361 MCE->do('main::callback', \@a, \%h, \$s, 'foo');
1362 MCE->do('callback', \@a, \%h, \$s, 'foo');
1363
1364 MCE knows if wanting a void, list, hash, or a scalar return value.
1365
1366 MCE->do('callback' [, $arg1, ... ]);
1367
1368 my @array = MCE->do('callback' [, $arg1, ... ]);
1369 my %hash = MCE->do('callback' [, $arg1, ... ]);
1370 my $scalar = MCE->do('callback' [, $arg1, ... ]);
1371
1372 MCE->freeze ( $object_ref )
1373 $mce->freeze ( $object_ref )
1374 Calls the internal freeze method to serialize an object. The default
1375 serialization routines are handled by Sereal if available or Storable.
1376
1377 my $frozen = MCE->freeze([ 0, 2, 4 ]);
1378 my $frozen = $mce->freeze([ 0, 2, 4 ]);
1379
1380 MCE->max_retries ( void )
1381 $mce->max_retries ( void )
1382 Getter method for max_retries used by MCE.
1383
1384 my $max_retries = MCE->max_retries;
1385 my $max_retries = $mce->max_retries;
1386
1387 MCE->max_workers ( void )
1388 $mce->max_workers ( void )
1389 Getter method for max_workers used by MCE.
1390
1391 my $max_workers = MCE->max_workers;
1392 my $max_workers = $mce->max_workers;
1393
1394 MCE->pid ( void )
1395 $mce->pid ( void )
1396 Returns the Process ID. Threads have thread ID attached to the value.
1397
1398 my $pid = MCE->pid; # 16180 (pid) ; 16180.2 (pid.tid)
1399 my $pid = $mce->pid;
1400
1401 MCE->printf ( $format, $list [, ... ] )
1402 MCE->print ( $list [, ... ] )
1403 MCE->say ( $list [, ... ] )
1404 $mce->printf ( $format, $list [, ... ] )
1405 $mce->print ( $list [, ... ] )
1406 $mce->say ( $list [, ... ] )
1407 Use the printf, print, and say methods when wanting to serialize output
1408 among workers and the manager process. These are sugar syntax for the
1409 sendto method. These behave similar to the native subroutines in Perl
1410 with the exception that barewords must be passed as a reference and
1411 require the comma after it including file handles.
1412
1413 Say is like print, but implicitly appends a newline.
1414
1415 MCE->printf(\*STDOUT, "%s: %d\n", $name, $age);
1416 MCE->printf($fh, "%s: %d\n", $name, $age);
1417 MCE->printf("%s: %d\n", $name, $age);
1418
1419 MCE->print(\*STDERR, "$error_msg\n");
1420 MCE->print($fh, $log_msg."\n");
1421 MCE->print("$output_msg\n");
1422
1423 MCE->say(\*STDERR, $error_msg);
1424 MCE->say($fh, $log_msg);
1425 MCE->say($output_msg);
1426
1427 Caveat: Use the following syntax when passing a reference not a glob or
1428 file handle. Otherwise, MCE will error indicating the first argument is
1429 not a glob reference.
1430
1431 MCE->print(\*STDOUT, \@array, "\n");
1432 MCE->print("", \@array, "\n"); # ok
1433
1434 Sending to "IO::All" { File, Pipe, STDIO } is supported since MCE
1435 1.845.
1436
1437 use IO::All;
1438
1439 my $out = io->stdout;
1440 my $err = io->stderr;
1441
1442 MCE->printf($out, "%s\n", "sent to stdout");
1443 MCE->printf($err, "%s\n", "sent to stderr");
1444
1445 MCE->print($out, "sent to stdout\n");
1446 MCE->print($err, "sent to stderr\n");
1447
1448 MCE->say($out, "sent to stdout");
1449 MCE->say($err, "sent to stderr");
1450
1451 MCE->sess_dir ( void )
1452 $mce->sess_dir ( void )
1453 Returns the session directory used by the MCE instance. This is defined
1454 during spawning and removed during shutdown.
1455
1456 my $sess_dir = MCE->sess_dir;
1457 my $sess_dir = $mce->sess_dir;
1458
1459 MCE->task_id ( void )
1460 $mce->task_id ( void )
1461 Returns the task ID. This applies to the user_tasks option (starts at
1462 0).
1463
1464 my $task_id = MCE->task_id;
1465 my $task_id = $mce->task_id;
1466
1467 MCE->task_name ( void )
1468 $mce->task_name ( void )
1469 Returns the task_name value specified via the task_name option when
1470 configuring MCE.
1471
1472 my $task_name = MCE->task_name;
1473 my $task_name = $mce->task_name;
1474
1475 MCE->task_wid ( void )
1476 $mce->task_wid ( void )
1477 Returns the task worker ID (applies to user_tasks). The value starts at
1478 1 per each task configured within user_tasks. The value is 0 for the
1479 manager process.
1480
1481 my $task_wid = MCE->task_wid;
1482 my $task_wid = $mce->task_wid;
1483
1484 MCE->thaw ( $frozen )
1485 $mce->thaw ( $frozen )
1486 Calls the internal thaw method to un-serialize the frozen object.
1487
1488 my $object_ref = MCE->thaw($frozen);
1489 my $object_ref = $mce->thaw($frozen);
1490
1491 MCE->tmp_dir ( void )
1492 $mce->tmp_dir ( void )
1493 Returns the temporary directory used by MCE.
1494
1495 my $tmp_dir = MCE->tmp_dir;
1496 my $tmp_dir = $mce->tmp_dir;
1497
1498 MCE->user_args ( void )
1499 $mce->user_args ( void )
1500 Returns the arguments specified via the user_args option.
1501
1502 my ($arg1, $arg2, $arg3) = MCE->user_args;
1503 my ($arg1, $arg2, $arg3) = $mce->user_args;
1504
1505 MCE->wid ( void )
1506 $mce->wid ( void )
1507 Returns the MCE worker ID. Starts at 1 per each MCE instance. The value
1508 is 0 for the manager process.
1509
1510 my $wid = MCE->wid;
1511 my $wid = $mce->wid;
1512
1514 Methods listed below are callable by the main process only.
1515
1516 MCE->forchunk ( $input_data [, { options } ], sub { ... } )
1517 MCE->foreach ( $input_data [, { options } ], sub { ... } )
1518 MCE->forseq ( $sequence_spec [, { options } ], sub { ... } )
1519 $mce->forchunk ( $input_data [, { options } ], sub { ... } )
1520 $mce->foreach ( $input_data [, { options } ], sub { ... } )
1521 $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
1522 Forchunk, foreach, and forseq are sugar methods and described in
1523 MCE::Candy. Stubs exist in MCE which load MCE::Candy automatically.
1524
1525 MCE->process ( $input_data [, { options } ] )
1526 $mce->process ( $input_data [, { options } ] )
1527 The process method will spawn workers automatically if not already
1528 spawned. It will set input_data => $input_data. It calls run(0) to not
1529 auto-shutdown workers. Specifying options is optional.
1530
1531 Allowable options { key => value, ... } are:
1532
1533 chunk_size input_data job_delay spawn_delay submit_delay
1534 flush_file flush_stderr flush_stdout stderr_file stdout_file
1535 on_post_exit on_post_run sequence user_args user_begin user_end
1536 user_func user_error user_output use_slurpio RS
1537
1538 Options remain persistent going forward unless changed. Setting
1539 user_begin, user_end, or user_func will cause already spawned workers
1540 to shut down and re-spawn automatically. Therefore, define these during
1541 instantiation.
1542
1543 The below will cause workers to re-spawn after running.
1544
1545 my $mce = MCE->new( max_workers => 'auto' );
1546
1547 $mce->process( {
1548 user_begin => sub { # connect to DB },
1549 user_func => sub { # process each row },
1550 user_end => sub { # close handle to DB },
1551 }, \@input_data );
1552
1553 $mce->process( {
1554 user_begin => sub { # connect to DB },
1555 user_func => sub { # process each file },
1556 user_end => sub { # close handle to DB },
1557 }, "/list/of/files" );
1558
1559 Do the following if wanting workers to persist between jobs.
1560
1561 use MCE max_workers => 'auto';
1562
1563 my $mce = MCE->new(
1564 user_begin => sub { # connect to DB },
1565 user_func => sub { # process each chunk or row or host },
1566 user_end => sub { # close handle to DB },
1567 );
1568
1569 $mce->spawn; # Spawn early if desired
1570
1571 $mce->process("/one/very_big_file/_mce_/will_chunk_in_parallel");
1572 $mce->process(\@array_of_files_to_grep);
1573 $mce->process("/path/to/host/list");
1574
1575 $mce->process($array_ref);
1576 $mce->process($array_ref, { stdout_file => $output_file });
1577
1578 # This was not allowed before. Fixed in 1.415.
1579 $mce->process({ sequence => { begin => 10, end => 90, step 2 } });
1580 $mce->process({ sequence => [ 10, 90, 2 ] });
1581
1582 $mce->shutdown;
1583
1584 MCE->relay_final ( void )
1585 $mce->relay_final ( void )
1586 The relay methods are described in MCE::Relay. Relay capabilities are
1587 enabled by specifying the "init_relay" MCE option.
1588
1589 MCE->restart_worker ( void )
1590 $mce->restart_worker ( void )
1591 One can restart a worker who has died or exited. The job never ends
1592 below due to restarting each time. Recommended is to call MCE->exit or
1593 $mce->exit instead of the native exit function for better handling,
1594 especially under the Windows environment.
1595
1596 The $e->{wid} argument is no longer necessary starting with the 1.5
1597 release.
1598
1599 Press [ctrl-c] to terminate the script.
1600
1601 my $mce = MCE->new(
1602
1603 on_post_exit => sub {
1604 my ($mce, $e) = @_;
1605 print "$e->{wid}: $e->{pid}: status $e->{status}: $e->{msg}";
1606 # $mce->restart_worker($e->{wid}); # MCE-1.415 and below
1607 $mce->restart_worker; # MCE-1.500 and above
1608 },
1609
1610 user_begin => sub {
1611 my ($mce, $task_id, $task_name) = @_;
1612 # Not interested in die messages going to STDERR,
1613 # because the die handler calls MCE->exit(255, $_[0]).
1614 close STDERR;
1615 },
1616
1617 user_tasks => [{
1618 max_workers => 5,
1619 user_func => sub {
1620 my ($mce) = @_; sleep MCE->wid;
1621 MCE->exit(3, "exited from " . MCE->wid . "\n");
1622 }
1623 },{
1624 max_workers => 4,
1625 user_func => sub {
1626 my ($mce) = @_; sleep MCE->wid;
1627 die("died from " . MCE->wid . "\n");
1628 }
1629 }]
1630 );
1631
1632 $mce->run;
1633
1634 -- Output
1635
1636 1: PID_85388: status 3: exited from 1
1637 2: PID_85389: status 3: exited from 2
1638 1: PID_85397: status 3: exited from 1
1639 3: PID_85390: status 3: exited from 3
1640 1: PID_85399: status 3: exited from 1
1641 4: PID_85391: status 3: exited from 4
1642 2: PID_85398: status 3: exited from 2
1643 1: PID_85401: status 3: exited from 1
1644 5: PID_85392: status 3: exited from 5
1645 1: PID_85404: status 3: exited from 1
1646 6: PID_85393: status 255: died from 6
1647 3: PID_85400: status 3: exited from 3
1648 2: PID_85403: status 3: exited from 2
1649 1: PID_85406: status 3: exited from 1
1650 7: PID_85394: status 255: died from 7
1651 1: PID_85410: status 3: exited from 1
1652 8: PID_85395: status 255: died from 8
1653 4: PID_85402: status 3: exited from 4
1654 2: PID_85409: status 3: exited from 2
1655 1: PID_85412: status 3: exited from 1
1656 9: PID_85396: status 255: died from 9
1657 3: PID_85408: status 3: exited from 3
1658 1: PID_85416: status 3: exited from 1
1659
1660 ...
1661
1662 MCE->run ( [ $auto_shutdown [, { options } ] ] )
1663 $mce->run ( [ $auto_shutdown [, { options } ] ] )
1664 The run method, by default, spawns workers, processes once, and shuts
1665 down afterwards. Specify 0 for $auto_shutdown when wanting workers to
1666 persist after running (default 1).
1667
1668 Specifying options is optional. Valid options are the same as for the
1669 process method.
1670
1671 my $mce = MCE->new( ... );
1672
1673 # Disables auto-shutdown
1674 $mce->run(0);
1675
1676 MCE->send ( $data_ref )
1677 $mce->send ( $data_ref )
1678 The 'send' method is useful when wanting to spawn workers early to
1679 minimize memory consumption and afterwards send data individually to
1680 each worker. One cannot send more than the total workers spawned.
1681 Workers store the received data as $mce->{user_data}.
1682
1683 The data which can be sent is restricted to an ARRAY, HASH, or PDL
1684 reference. Workers begin processing immediately after receiving data.
1685 Workers set $mce->{user_data} to undef after processing. One cannot
1686 specify input_data, sequence, or user_tasks when using the "send"
1687 method.
1688
1689 Passing any options e.g. run(0, { options }) is ignored due to workers
1690 running immediately after receiving user data. There is no guarantee to
1691 which worker will receive data first. It depends on which worker is
1692 available awaiting data.
1693
1694 use MCE;
1695
1696 my $mce = MCE->new(
1697 max_workers => 5,
1698
1699 user_func => sub {
1700 my ($mce) = @_;
1701 my $data = $mce->{user_data};
1702 my $first_name = $data->{first_name};
1703 print MCE->wid, ": Hello from $first_name\n";
1704 }
1705 );
1706
1707 $mce->spawn; # Optional, send will spawn if necessary.
1708
1709 $mce->send( { first_name => "Theresa" } );
1710 $mce->send( { first_name => "Francis" } );
1711 $mce->send( { first_name => "Padre" } );
1712 $mce->send( { first_name => "Anthony" } );
1713
1714 $mce->run; # Wait for workers to complete processing.
1715
1716 -- Output
1717
1718 2: Hello from Theresa
1719 5: Hello from Anthony
1720 3: Hello from Francis
1721 4: Hello from Padre
1722
1723 MCE->shutdown ( void )
1724 $mce->shutdown ( void )
1725 The run method will automatically spawn workers, run once, and shutdown
1726 workers automatically. Workers persist after running below. Shutdown
1727 may be called as needed or prior to exiting.
1728
1729 my $mce = MCE->new( ... );
1730
1731 $mce->spawn;
1732
1733 $mce->process(\@input_data_1); # Processing multiple arrays
1734 $mce->process(\@input_data_2);
1735 $mce->process(\@input_data_n);
1736
1737 $mce->shutdown;
1738
1739 $mce->process('input_file_1'); # Processing multiple files
1740 $mce->process('input_file_2');
1741 $mce->process('input_file_n');
1742
1743 $mce->shutdown;
1744
1745 MCE->spawn ( void )
1746 $mce->spawn ( void )
1747 Workers are normally spawned automatically. The spawn method allows one
1748 to spawn workers early if so desired.
1749
1750 my $mce = MCE->new( ... );
1751
1752 $mce->spawn;
1753
1754 MCE->status ( void )
1755 $mce->status ( void )
1756 The greatest exit status is saved among workers while running. Look at
1757 the on_post_exit or on_post_run options for callback support.
1758
1759 my $mce = MCE->new( ... );
1760
1761 $mce->run;
1762
1763 my $exit_status = $mce->status;
1764
1766 Methods listed below are callable by workers only.
1767
1768 MCE->exit ( [ $status [, $message [, $id ] ] ] )
1769 $mce->exit ( [ $status [, $message [, $id ] ] ] )
1770 A worker exits from MCE entirely. $id (optional) can be used for
1771 passing the primary key or a string along with the message. Look at the
1772 on_post_exit or on_post_run options for callback support.
1773
1774 MCE->exit; # default 0
1775 MCE->exit(1);
1776 MCE->exit(2, 'chunk failed', $chunk_id);
1777 MCE->exit(0, 'msg_foo', 'id_1000');
1778
1779 MCE->gather ( $arg1, [, $arg2, ... ] )
1780 $mce->gather ( $arg1, [, $arg2, ... ] )
1781 A worker can submit data to the location specified via the gather
1782 option by calling this method. See MCE::Flow and MCE::Loop for
1783 additional use-case.
1784
1785 use MCE;
1786
1787 my @hosts = qw(
1788 hosta hostb hostc hostd hoste
1789 );
1790
1791 my $mce = MCE->new(
1792 chunk_size => 1, max_workers => 3,
1793
1794 user_func => sub {
1795 # my ($mce, $chunk_ref, $chunk_id) = @_;
1796 my ($output, $error, $status); my $host = $_;
1797
1798 # Do something with $host;
1799 $output = "Worker ". MCE->wid .": Hello from $host";
1800
1801 if (MCE->chunk_id % 3 == 0) {
1802 # Simulating an error condition
1803 local $? = 1; $status = $?;
1804 $error = "Error from $host"
1805 }
1806 else {
1807 $status = 0;
1808 }
1809
1810 # Ensure unique keys (key, value) when gathering to a
1811 # hash.
1812 MCE->gather("$host.out", $output, "$host.sta", $status);
1813 MCE->gather("$host.err", $error) if (defined $error);
1814 }
1815 );
1816
1817 my %h; $mce->process(\@hosts, { gather => \%h });
1818
1819 foreach my $host (@hosts) {
1820 print $h{"$host.out"}, "\n";
1821 print $h{"$host.err"}, "\n" if (exists $h{"$host.err"});
1822 print "Exit status: ", $h{"$host.sta"}, "\n\n";
1823 }
1824
1825 -- Output
1826
1827 Worker 2: Hello from hosta
1828 Exit status: 0
1829
1830 Worker 1: Hello from hostb
1831 Exit status: 0
1832
1833 Worker 3: Hello from hostc
1834 Error from hostc
1835 Exit status: 1
1836
1837 Worker 2: Hello from hostd
1838 Exit status: 0
1839
1840 Worker 1: Hello from hoste
1841 Exit status: 0
1842
1843 MCE->last ( void )
1844 $mce->last ( void )
1845 Worker leaves the chunking loop or user_func block immediately.
1846 Callable from inside foreach, forchunk, forseq, and user_func.
1847
1848 use MCE;
1849
1850 my $mce = MCE->new(
1851 max_workers => 5
1852 );
1853
1854 my @list = (1 .. 80);
1855
1856 $mce->forchunk(\@list, { chunk_size => 2 }, sub {
1857
1858 my ($mce, $chunk_ref, $chunk_id) = @_;
1859 MCE->last if ($chunk_id > 4);
1860
1861 my @output = ();
1862
1863 foreach my $rec ( @{ $chunk_ref } ) {
1864 push @output, $rec, "\n";
1865 }
1866
1867 MCE->print(@output);
1868 });
1869
1870 -- Output (each chunk above consists of 2 elements)
1871
1872 3
1873 4
1874 1
1875 2
1876 7
1877 8
1878 5
1879 6
1880
1881 MCE->next ( void )
1882 $mce->next ( void )
1883 Worker starts the next iteration of the chunking loop. Callable from
1884 inside foreach, forchunk, forseq, and user_func.
1885
1886 use MCE;
1887
1888 my $mce = MCE->new(
1889 max_workers => 5
1890 );
1891
1892 my @list = (1 .. 80);
1893
1894 $mce->forchunk(\@list, { chunk_size => 4 }, sub {
1895
1896 my ($mce, $chunk_ref, $chunk_id) = @_;
1897 MCE->next if ($chunk_id < 20);
1898
1899 my @output = ();
1900
1901 foreach my $rec ( @{ $chunk_ref } ) {
1902 push @output, $rec, "\n";
1903 }
1904
1905 MCE->print(@output);
1906 });
1907
1908 -- Output (each chunk above consists of 4 elements)
1909
1910 77
1911 78
1912 79
1913 80
1914
1915 MCE::relay { code }
1916 MCE->relay ( sub { code } )
1917 MCE->relay_recv ( void )
1918 $mce->relay ( sub { code } )
1919 $mce->relay_recv ( void )
1920 The relay methods are described in MCE::Relay. Relay capabilities are
1921 enabled by specifying the "init_relay" MCE option.
1922
1923 MCE->sendto ( $to, $arg1, ... )
1924 $mce->sendto ( $to, $arg1, ... )
1925 The sendto method is called by workers for serializing data to standard
1926 output, standard error, or end of file. The action is done by the
1927 manager process.
1928
1929 Release 1.00x supported 1 data argument, not more.
1930
1931 MCE->sendto('file', \@array, '/path/to/file');
1932 MCE->sendto('file', \$scalar, '/path/to/file');
1933 MCE->sendto('file', $scalar, '/path/to/file');
1934
1935 MCE->sendto('STDERR', \@array);
1936 MCE->sendto('STDERR', \$scalar);
1937 MCE->sendto('STDERR', $scalar);
1938
1939 MCE->sendto('STDOUT', \@array);
1940 MCE->sendto('STDOUT', \$scalar);
1941 MCE->sendto('STDOUT', $scalar);
1942
1943 Release 1.100 added the ability to pass multiple arguments. Notice the
1944 syntax change for sending to a file. Passing a reference to an array is
1945 no longer necessary.
1946
1947 MCE->sendto('file:/path/to/file', $arg1 [, $arg2, ... ]);
1948 MCE->sendto('STDERR', $arg1 [, $arg2, ... ]);
1949 MCE->sendto('STDOUT', $arg1 [, $arg2, ... ]);
1950
1951 MCE->sendto('STDOUT', @a, "\n", %h, "\n", $s, "\n");
1952
1953 To retain 1.00x compatibility, sendto outputs the content when a single
1954 data reference is specified. Otherwise, the reference for \@array or
1955 \$scalar is shown in 1.500, not the content.
1956
1957 MCE->sendto('STDERR', \@array); # 1.00x behavior, content
1958 MCE->sendto('STDOUT', \$scalar);
1959 MCE->sendto('file:/path/to/file', \@array);
1960
1961 # Output matches the print statement
1962
1963 MCE->sendto(\*STDERR, \@array); # 1.500 behavior, reference
1964 MCE->sendto(\*STDOUT, \$scalar);
1965 MCE->sendto($fh, \@array);
1966
1967 MCE->sendto('STDOUT', \@array, "\n", \$scalar, "\n");
1968 print {*STDOUT} \@array, "\n", \$scalar, "\n";
1969
1970 MCE 1.500 added support for sending to a glob reference, file
1971 descriptor, and file handle.
1972
1973 MCE->sendto(\*STDERR, "foo\n", \@array, \$scalar, "\n");
1974 MCE->sendto('fd:2', "foo\n", \@array, \$scalar, "\n");
1975 MCE->sendto($fh, "foo\n", \@array, \$scalar, "\n");
1976
1977 MCE->sync ( void )
1978 $mce->sync ( void )
1979 A barrier sync operation means any worker must stop at this point until
1980 all workers reach this barrier. Barrier syncing is useful for many
1981 computer algorithms.
1982
1983 Barrier synchronization is supported for task 0 only or omitting
1984 user_tasks. All workers assigned task_id 0 must call sync whenever
1985 barrier syncing.
1986
1987 use MCE;
1988
1989 sub user_func {
1990
1991 my ($mce) = @_;
1992 my $wid = MCE->wid;
1993
1994 MCE->sendto("STDOUT", "a: $wid\n"); # MCE 1.0+
1995 MCE->sync;
1996
1997 MCE->sendto(\*STDOUT, "b: $wid\n"); # MCE 1.5+
1998 MCE->sync;
1999
2000 MCE->print("c: $wid\n"); # MCE 1.5+
2001 MCE->sync;
2002
2003 return;
2004 }
2005
2006 my $mce = MCE->new(
2007 max_workers => 4, user_func => \&user_func
2008 )->run;
2009
2010 -- Output (without barrier synchronization)
2011
2012 a: 1
2013 a: 2
2014 b: 1
2015 b: 2
2016 c: 1
2017 c: 2
2018 a: 3
2019 b: 3
2020 c: 3
2021 a: 4
2022 b: 4
2023 c: 4
2024
2025 -- Output (with barrier synchronization)
2026
2027 a: 1
2028 a: 2
2029 a: 4
2030 a: 3
2031 b: 2
2032 b: 1
2033 b: 3
2034 b: 4
2035 c: 1
2036 c: 4
2037 c: 2
2038 c: 3
2039
2040 Consider the following example. The MCE->sync operation is done inside
2041 a loop along with MCE->do. A stall may occur for workers calling sync
2042 the 2nd or 3rd time while other workers are sending results via MCE->do
2043 or MCE->sendto.
2044
2045 It requires another semaphore lock in MCE to solve this which was not
2046 done in order to keep resources low. Therefore, please keep this in
2047 mind when mixing MCE->sync with MCE->do or output serialization methods
2048 inside a loop.
2049
2050 sub user_func {
2051
2052 my ($mce) = @_;
2053 my @result;
2054
2055 for (1 .. 3) {
2056 ... compute algorithm ...
2057
2058 MCE->sync;
2059
2060 ... compute algorithm ...
2061
2062 MCE->sync;
2063
2064 MCE->do('aggregate_result', \@result); # or MCE->sendto
2065
2066 MCE->sync; # The sync operation is also needed here to
2067 # prevent MCE from stalling.
2068 }
2069 }
2070
2071 MCE->yield ( void )
2072 $mce->yield ( void )
2073 There may be on occasion when the MCE driven app is too fast. The
2074 interval option combined with the yield method, both introduced with
2075 MCE 1.5, allows one to throttle the app. It adds a "grace" factor to
2076 the design.
2077
2078 A use case is an app configured with 100 workers running on a 24
2079 logical way box. Data is polled from a database containing over 2.5
2080 million rows. Workers chunk away at 300 rows per chunk performing SNMP
2081 gets (300 sockets per worker) polling 25 metrics from each device. With
2082 this scenario, the load on the box may rise beyond 90+. In addition,
2083 IP_Tables may reach its contention point causing the entire application
2084 to fail.
2085
2086 The scenario above is solved by simply having workers yield among
2087 themselves in a synchronized fashion. A delay of 0.007 seconds between
2088 intervals is all that's needed. The load on the box will hover between
2089 23 ~ 27 for the duration of the run. Polling completes in under 17
2090 minutes time. This is quite fast considering the app polls 62.5 million
2091 metrics combined. The math equates to 3,676,470 per minute or rather
2092 61,275 per second from a single box.
2093
2094 # Both max_nodes and node_id are optional (default 1).
2095
2096 interval => {
2097 delay => 0.007, max_nodes => $max_nodes, node_id => $node_id
2098 }
2099
2100 A 4 node setup can poll 10 million devices without the additional
2101 overhead of a distribution agent. The difference between the 4 nodes
2102 are simply node_id and the where clause used to query the database. The
2103 mac addresses are random such that the data divides equally to any
2104 power of 2. The distribution key lies in the mac address itself. In
2105 fact, the 2nd character from the right is sufficient for maximizing on
2106 the power of randomness for equal distribution.
2107
2108 Query NodeID 1: ... AND substr(MAC, -2, 1) IN ('0', '1', '2', '3')
2109 Query NodeID 2: ... AND substr(MAC, -2, 1) IN ('4', '5', '6', '7')
2110 Query NodeID 3: ... AND substr(MAC, -2, 1) IN ('8', '9', 'A', 'B')
2111 Query NodeID 4: ... AND substr(MAC, -2, 1) IN ('C', 'D', 'E', 'F')
2112
2113 Below, the user_tasks is configured to simulate 4 nodes. This
2114 demonstration uses 2 workers to minimize the output size. Input is from
2115 the sequence option.
2116
2117 use Time::HiRes qw(time);
2118 use MCE;
2119
2120 my $d = shift || 0.1;
2121
2122 local $| = 1;
2123
2124 sub create_task {
2125
2126 my ($node_id) = @_;
2127
2128 my $seq_size = 6;
2129 my $seq_start = ($node_id - 1) * $seq_size + 1;
2130 my $seq_end = $seq_start + $seq_size - 1;
2131
2132 return {
2133 max_workers => 2, sequence => [ $seq_start, $seq_end ],
2134 interval => { delay => $d, max_nodes => 4, node_id => $node_id }
2135 };
2136 }
2137
2138 sub user_begin {
2139
2140 my ($mce, $task_id, $task_name) = @_;
2141
2142 # The yield method causes this worker to wait for its next time
2143 # interval slot before running. Yield has no effect without the
2144 # 'interval' option.
2145
2146 # Yielding is beneficial inside a user_begin block. A use case
2147 # is staggering database connections among workers in order
2148 # to not impact the DB server.
2149
2150 MCE->yield;
2151
2152 MCE->printf(
2153 "Node %2d: %0.5f -- Worker %2d: %12s -- Started\n",
2154 MCE->task_id + 1, time, MCE->task_wid, ''
2155 );
2156
2157 return;
2158 }
2159
2160 {
2161 my $prev_time = time;
2162
2163 sub user_func {
2164
2165 my ($mce, $seq_n, $chunk_id) = @_;
2166
2167 # Yield simply waits for the next time interval.
2168 MCE->yield;
2169
2170 # Calculate how long this worker has waited.
2171 my $curr_time = time;
2172 my $time_waited = $curr_time - $prev_time;
2173
2174 $prev_time = $curr_time;
2175
2176 MCE->printf(
2177 "Node %2d: %0.5f -- Worker %2d: %12.5f -- Seq_N %3d\n",
2178 MCE->task_id + 1, time, MCE->task_wid, $time_waited, $seq_n
2179 );
2180
2181 return;
2182 }
2183 }
2184
2185 # Simulate a 4 node environment passing node_id to create_task.
2186
2187 print "Node_ID Current_Time Worker_ID Time_Waited Comment\n";
2188
2189 MCE->new(
2190 user_begin => \&user_begin,
2191 user_func => \&user_func,
2192
2193 user_tasks => [
2194 create_task(1),
2195 create_task(2),
2196 create_task(3),
2197 create_task(4)
2198 ]
2199
2200 )->run;
2201
2202 -- Output (notice Current_Time below, stays 0.10 apart)
2203
2204 Node_ID Current_Time Worker_ID Time_Waited Comment
2205 Node 1: 1374807976.74634 -- Worker 1: -- Started
2206 Node 2: 1374807976.84634 -- Worker 1: -- Started
2207 Node 3: 1374807976.94638 -- Worker 1: -- Started
2208 Node 4: 1374807977.04639 -- Worker 1: -- Started
2209 Node 1: 1374807977.14634 -- Worker 2: -- Started
2210 Node 2: 1374807977.24640 -- Worker 2: -- Started
2211 Node 3: 1374807977.34649 -- Worker 2: -- Started
2212 Node 4: 1374807977.44657 -- Worker 2: -- Started
2213 Node 1: 1374807977.54636 -- Worker 1: 0.90037 -- Seq_N 1
2214 Node 2: 1374807977.64638 -- Worker 1: 1.00040 -- Seq_N 7
2215 Node 3: 1374807977.74642 -- Worker 1: 1.10043 -- Seq_N 13
2216 Node 4: 1374807977.84643 -- Worker 1: 1.20045 -- Seq_N 19
2217 Node 1: 1374807977.94636 -- Worker 2: 1.30037 -- Seq_N 2
2218 Node 2: 1374807978.04638 -- Worker 2: 1.40040 -- Seq_N 8
2219 Node 3: 1374807978.14641 -- Worker 2: 1.50042 -- Seq_N 14
2220 Node 4: 1374807978.24644 -- Worker 2: 1.60045 -- Seq_N 20
2221 Node 1: 1374807978.34628 -- Worker 1: 0.79996 -- Seq_N 3
2222 Node 2: 1374807978.44631 -- Worker 1: 0.79996 -- Seq_N 9
2223 Node 3: 1374807978.54634 -- Worker 1: 0.79996 -- Seq_N 15
2224 Node 4: 1374807978.64636 -- Worker 1: 0.79997 -- Seq_N 21
2225 Node 1: 1374807978.74628 -- Worker 2: 0.79996 -- Seq_N 4
2226 Node 2: 1374807978.84632 -- Worker 2: 0.79997 -- Seq_N 10
2227 Node 3: 1374807978.94634 -- Worker 2: 0.79996 -- Seq_N 16
2228 Node 4: 1374807979.04636 -- Worker 2: 0.79996 -- Seq_N 22
2229 Node 1: 1374807979.14628 -- Worker 1: 0.80001 -- Seq_N 5
2230 Node 2: 1374807979.24631 -- Worker 1: 0.80000 -- Seq_N 11
2231 Node 3: 1374807979.34634 -- Worker 1: 0.80001 -- Seq_N 17
2232 Node 4: 1374807979.44636 -- Worker 1: 0.80000 -- Seq_N 23
2233 Node 1: 1374807979.54628 -- Worker 2: 0.80000 -- Seq_N 6
2234 Node 2: 1374807979.64631 -- Worker 2: 0.80000 -- Seq_N 12
2235 Node 3: 1374807979.74633 -- Worker 2: 0.80000 -- Seq_N 18
2236 Node 4: 1374807979.84636 -- Worker 2: 0.80000 -- Seq_N 24
2237
2238 The interval.pl example above is included with MCE.
2239
2241 The "progress" option takes a code block for receiving info on the
2242 progress made while processing input data; e.g. "input_data" or
2243 "sequence". To make this work, one provides the "progress" option a
2244 closure block like so, passing along the size of the input_data; e.g
2245 "scalar @array" or "-s /path/to/file".
2246
2247 Current API available since 1.813.
2248
2249 A worker, upon completing processing its chunk, notifies the manager-
2250 process with the size of the chunk. That could be the number of rows or
2251 literally the size of the chunk when processing an input file. The
2252 manager-process accumulates the size before calling the code block
2253 associated with the "progress" option.
2254
2255 When running many tasks simultaneously, via "user_tasks", the call is
2256 initiated by workers at level 0 only or rather the first task, not
2257 shown here.
2258
2259 use Time::HiRes 'sleep';
2260 use MCE;
2261
2262 sub make_progress {
2263 my ($total_size) = @_;
2264 return sub {
2265 my ($completed_size) = @_;
2266 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2267 };
2268 }
2269
2270 my @input = (1..150);
2271
2272 MCE->new(
2273 chunk_size => 10,
2274 max_workers => 4,
2275 input_data => \@input,
2276 progress => make_progress( scalar @input ),
2277 user_func => sub { sleep 1.5 }
2278 )->run();
2279
2280 -- Output
2281
2282 6.7%
2283 13.3%
2284 20.0%
2285 26.7%
2286 33.3%
2287 40.0%
2288 46.7%
2289 53.3%
2290 60.0%
2291 66.7%
2292 73.3%
2293 80.0%
2294 86.7%
2295 93.3%
2296 100.0%
2297
2298 Next is the code using MCE::Flow and ProgressBar::Stack to do the same
2299 thing, practically.
2300
2301 use Time::HiRes 'sleep';
2302 use ProgressBar::Stack;
2303 use MCE::Flow;
2304
2305 sub make_progress {
2306 my ($total_size) = @_;
2307 init_progress();
2308 return sub {
2309 my ($completed_size) = @_;
2310 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2311 };
2312 }
2313
2314 my @input = (1..150);
2315
2316 MCE::Flow->init(
2317 chunk_size => 10,
2318 max_workers => 4,
2319 progress => make_progress( scalar @input )
2320 );
2321
2322 MCE::Flow->run( sub { sleep 1.5 }, \@input );
2323 MCE::Flow->finish();
2324
2325 print "\n";
2326
2327 -- Output
2328
2329 [################ ] 80.0% ETA: 0:01
2330
2331 For sequence of numbers, using the "sequence" option, one must account
2332 for "step_size", typically set to 1 automatically.
2333
2334 use Time::HiRes 'sleep';
2335 use MCE;
2336
2337 sub make_progress {
2338 my ($total_size) = @_;
2339 return sub {
2340 my ($completed_size) = @_;
2341 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2342 };
2343 }
2344
2345 MCE->new(
2346 chunk_size => 10,
2347 max_workers => 4,
2348 sequence => [ 1, 100, 2 ],
2349 progress => make_progress( int( 100 / 2 + 0.5 ) ),
2350 user_func => sub { sleep 1.5 }
2351 )->run();
2352
2353 -- Output
2354
2355 20.0%
2356 40.0%
2357 60.0%
2358 80.0%
2359 100.0%
2360
2361 Changing "chunk_size" to 1 means workers notify the manager process
2362 more often, thus increasing granularity. Take a look at the output.
2363
2364 2.0%
2365 4.0%
2366 6.0%
2367 8.0%
2368 10.0%
2369 ...
2370 92.0%
2371 94.0%
2372 96.0%
2373 98.0%
2374 100.0%
2375
2376 Here is the same thing using MCE::Flow together with
2377 ProgressBar::Stack.
2378
2379 use Time::HiRes 'sleep';
2380 use ProgressBar::Stack;
2381 use MCE::Flow;
2382
2383 sub make_progress {
2384 my ($total_size) = @_;
2385 init_progress();
2386 return sub {
2387 my ($completed_size) = @_;
2388 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2389 };
2390 }
2391
2392 MCE::Flow->init(
2393 chunk_size => 1,
2394 max_workers => 4,
2395 progress => make_progress( int( 100 / 2 + 0.5 ) )
2396 );
2397
2398 MCE::Flow->run_seq( sub { sleep 0.5 }, 1, 100, 2 );
2399 MCE::Flow->finish();
2400
2401 print "\n";
2402
2403 -- Output
2404
2405 [######### ] 48.0% ETA: 0:03
2406
2407 For files and file handles, workers send the actual size of the data
2408 read versus counting rows.
2409
2410 use Time::HiRes 'sleep';
2411 use MCE;
2412
2413 sub make_progress {
2414 my ($total_size) = @_;
2415 return sub {
2416 my ($completed_size) = @_;
2417 printf "%0.1f%%\n", $completed_size / $total_size * 100;
2418 };
2419 }
2420
2421 my $input_file = "/path/to/input_file.txt";
2422
2423 MCE->new(
2424 chunk_size => 5,
2425 max_workers => 4,
2426 input_data => $input_file,
2427 progress => make_progress( -s $input_file ),
2428 user_func => sub { sleep 0.03 }
2429 )->run();
2430
2431 For consistency, here is the example using MCE::Flow, again with
2432 ProgressBar::Stack.
2433
2434 use Time::HiRes 'sleep';
2435 use ProgressBar::Stack;
2436 use MCE::Flow;
2437
2438 sub make_progress {
2439 my ($total_size) = @_;
2440 init_progress();
2441 return sub {
2442 my ($completed_size) = @_;
2443 update_progress sprintf("%0.1f", $completed_size / $total_size * 100);
2444 };
2445 }
2446
2447 my $input_file = "/path/to/input_file.txt";
2448
2449 MCE::Flow->init(
2450 chunk_size => 5,
2451 max_workers => 4,
2452 progress => make_progress( -s $input_file )
2453 );
2454
2455 MCE::Flow->run_file( sub { sleep 0.03 }, $input_file );
2456 MCE::Flow->finish();
2457
2458 The next demonstration processes three arrays consecutively. For this
2459 one, MCE workers persist after running. This needs MCE 1.814 or later
2460 to run. Otherwise, the progress output is not shown in MCE 1.813.
2461
2462 use Time::HiRes 'sleep';
2463 use ProgressBar::Stack;
2464 use MCE;
2465
2466 sub make_progress {
2467 my ($total_size, $message) = @_;
2468 init_progress();
2469 return sub {
2470 my ($completed_size) = @_;
2471 update_progress(
2472 sprintf("%0.1f", $completed_size / $total_size * 100),
2473 $message
2474 );
2475 };
2476 }
2477
2478 my $mce = MCE->new(
2479 chunk_size => 10,
2480 max_workers => 4,
2481 user_func => sub { sleep 0.5 }
2482 )->spawn();
2483
2484 my @a1 = ( 1 .. 200 );
2485 my @a2 = ( 1 .. 500 );
2486 my @a3 = ( 1 .. 300 );
2487
2488 $mce->process({ progress => make_progress(scalar(@a1), "array 1") }, \@a1);
2489
2490 print "\n";
2491
2492 $mce->process({ progress => make_progress(scalar(@a2), "array 2") }, \@a2);
2493
2494 print "\n";
2495
2496 $mce->process({ progress => make_progress(scalar(@a3), "array 3") }, \@a3);
2497
2498 print "\n";
2499
2500 $mce->shutdown;
2501
2502 -- Output
2503
2504 [####################] 100.0% ETA: 0:00 array 1
2505 [####################] 100.0% ETA: 0:00 array 2
2506 [####################] 100.0% ETA: 0:00 array 3
2507
2508 When size is not know, such as reading from "STDIN", the only thing one
2509 can do is report the size completed thus far.
2510
2511 # 1 kibibyte equals 1024 bytes
2512
2513 progress => sub {
2514 my ($completed_size) = @_;
2515 printf "%0.1f kibibytes\n", $completed_size / 1024;
2516 }
2517
2519 • MCE::Examples
2520
2522 MCE
2523
2525 Mario E. Roy, <marioeroy AT gmail DOT com>
2526
2527
2528
2529perl v5.36.0 2023-01-20 MCE::Core(3)