1MCE::Relay(3) User Contributed Perl Documentation MCE::Relay(3)
2
3
4
6 MCE::Relay - Extends Many-Core Engine with relay capabilities
7
9 This document describes MCE::Relay version 1.838
10
12 use MCE::Flow;
13
14 my $file = shift || \*STDIN;
15
16 ## Line Count #######################################
17
18 mce_flow_f {
19 max_workers => 4,
20 use_slurpio => 1,
21 init_relay => 0,
22 },
23 sub {
24 my ($mce, $slurp_ref, $chunk_id) = @_;
25 my $line_count = ($$slurp_ref =~ tr/\n//);
26
27 ## Receive and pass on updated information.
28 my $lines_read = MCE::relay { $_ += $line_count };
29
30 }, $file;
31
32 my $total_lines = MCE->relay_final;
33
34 print {*STDERR} "$total_lines\n";
35
36 ## Orderly Action ###################################
37
38 $| = 1; # Important, must flush output immediately.
39
40 mce_flow_f {
41 max_workers => 2,
42 use_slurpio => 1,
43 init_relay => 0,
44 },
45 sub {
46 my ($mce, $slurp_ref, $chunk_id) = @_;
47
48 ## The relay value is relayed and remains 0.
49 ## Writes to STDOUT orderly.
50
51 MCE->relay_lock;
52 print $$slurp_ref;
53 MCE->relay_unlock;
54
55 }, $file;
56
58 This module enables workers to receive and pass on information orderly
59 with zero involvement by the manager process while running. The module
60 is loaded automatically when MCE option "init_relay" is specified.
61
62 All workers (belonging to task_id 0) must participate when relaying
63 data.
64
65 Relaying is not meant for passing big data. The last worker will stall
66 if exceeding the buffer size for the socket. Not exceeding 16 KiB - 7
67 is safe across all platforms.
68
70 MCE->relay ( sub { code } )
71 MCE::relay { code }
72 Relay is enabled by specifying the init_relay option which takes a
73 hash or array reference, or a scalar value. Relaying is orderly and
74 driven by chunk_id when processing data, otherwise task_wid.
75 Omitting the code block (e.g. MCE::relay) relays forward.
76
77 Below, relaying multiple values via a HASH reference.
78
79 use MCE::Flow max_workers => 4;
80
81 mce_flow {
82 init_relay => { p => 0, e => 0 },
83 },
84 sub {
85 my $wid = MCE->wid;
86
87 ## do work
88 my $pass = $wid % 3;
89 my $errs = $wid % 2;
90
91 ## relay
92 my %last_rpt = MCE::relay { $_->{p} += $pass; $_->{e} += $errs };
93
94 MCE->print("$wid: passed $pass, errors $errs\n");
95
96 return;
97 };
98
99 my %results = MCE->relay_final;
100
101 print " passed $results{p}, errors $results{e} final\n\n";
102
103 -- Output
104
105 1: passed 1, errors 1
106 2: passed 2, errors 0
107 3: passed 0, errors 1
108 4: passed 1, errors 0
109 passed 4, errors 2 final
110
111 Or multiple values via an ARRAY reference.
112
113 use MCE::Flow max_workers => 4;
114
115 mce_flow {
116 init_relay => [ 0, 0 ],
117 },
118 sub {
119 my $wid = MCE->wid;
120
121 ## do work
122 my $pass = $wid % 3;
123 my $errs = $wid % 2;
124
125 ## relay
126 my @last_rpt = MCE::relay { $_->[0] += $pass; $_->[1] += $errs };
127
128 MCE->print("$wid: passed $pass, errors $errs\n");
129
130 return;
131 };
132
133 my ($pass, $errs) = MCE->relay_final;
134
135 print " passed $pass, errors $errs final\n\n";
136
137 -- Output
138
139 1: passed 1, errors 1
140 2: passed 2, errors 0
141 3: passed 0, errors 1
142 4: passed 1, errors 0
143 passed 4, errors 2 final
144
145 Or simply a scalar value.
146
147 use MCE::Flow max_workers => 4;
148
149 mce_flow {
150 init_relay => 0,
151 },
152 sub {
153 my $wid = MCE->wid;
154
155 ## do work
156 my $bytes_read = 1000 + ((MCE->wid % 3) * 3);
157
158 ## relay
159 my $last_offset = MCE::relay { $_ += $bytes_read };
160
161 ## output
162 MCE->print("$wid: $bytes_read\n");
163
164 return;
165 };
166
167 my $total = MCE->relay_final;
168
169 print " $total size\n\n";
170
171 -- Output
172
173 1: 1003
174 2: 1006
175 3: 1000
176 4: 1003
177 4012 size
178
179 MCE->relay_final ( void )
180 Call this method to obtain the final relay value(s) after running.
181 See included example findnull.pl for another use case.
182
183 use MCE max_workers => 4;
184
185 my $mce = MCE->new(
186 init_relay => [ 0, 100 ], ## initial values (two counters)
187
188 user_func => sub {
189 my ($mce) = @_;
190
191 ## do work
192 my ($acc1, $acc2) = (10, 20);
193
194 ## relay to next worker
195 MCE::relay { $_->[0] += $acc1; $_->[1] += $acc2 };
196
197 return;
198 }
199 )->run;
200
201 my ($cnt1, $cnt2) = $mce->relay_final;
202
203 print "$cnt1 : $cnt2\n";
204
205 -- Output
206
207 40 : 180
208
209 MCE->relay_recv ( void )
210 Call this method to obtain the next relay value before relaying.
211 This allows serial-code to be processed orderly between workers. The
212 following is a parallel demonstration for the fasta-benchmark on the
213 web.
214
215 # perl fasta.pl 25000000
216
217 # The Computer Language Benchmarks game
218 # http://benchmarksgame.alioth.debian.org/
219 #
220 # contributed by Barry Walsh
221 # port of fasta.rb #6
222 #
223 # MCE::Flow version by Mario Roy
224 # requires MCE 1.807+
225 # requires MCE::Shared 1.806+
226
227 use strict;
228 use warnings;
229 use feature 'say';
230
231 use MCE::Flow;
232 use MCE::Shared;
233 use MCE::Candy;
234
235 use constant IM => 139968;
236 use constant IA => 3877;
237 use constant IC => 29573;
238
239 my $LAST = MCE::Shared->scalar( 42 );
240
241 my $alu =
242 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
243 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
244 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
245 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
246 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
247 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
248 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
249
250 my $iub = [
251 [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
252 [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
253 [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
254 [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
255 [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
256 ];
257
258 my $homosapiens = [
259 [ 'a', 0.3029549426680 ],
260 [ 'c', 0.1979883004921 ],
261 [ 'g', 0.1975473066391 ],
262 [ 't', 0.3015094502008 ]
263 ];
264
265 sub make_repeat_fasta {
266 my ( $src, $n ) = @_;
267 my $width = qr/(.{1,60})/;
268 my $l = length $src;
269 my $s = $src x ( ($n / $l) + 1 );
270 substr( $s, $n, $l ) = '';
271
272 while ( $s =~ m/$width/g ) { say $1 }
273 }
274
275 sub make_random_fasta {
276 my ( $table, $n ) = @_;
277 my $rand = undef;
278 my $width = 60;
279 my $prob = 0.0;
280 my $output = '';
281 my ( $c1, $c2, $last );
282
283 $_->[1] = ( $prob += $_->[1] ) for @$table;
284
285 $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
286 $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
287
288 my $seq = MCE::Shared->sequence(
289 { chunk_size => 2000, bounds_only => 1 },
290 1, $n / $width
291 );
292
293 my $code1 = q{
294 while ( 1 ) {
295 # --------------------------------------------
296 # Process code orderly between workers.
297 # --------------------------------------------
298
299 my $chunk_id = MCE->relay_recv;
300 my ( $begin, $end ) = $seq->next;
301
302 MCE->relay, last if ( !defined $begin );
303
304 my $last = $LAST->get;
305 my $temp = $last;
306
307 # Pre-compute $LAST value for the next worker
308 for ( 1 .. ( $end - $begin + 1 ) * $width ) {
309 $temp = ( $temp * IA + IC ) % IM;
310 }
311
312 $LAST->set( $temp );
313
314 # Increment chunk_id value
315 MCE->relay( sub { $_ += 1 } );
316
317 # --------------------------------------------
318 # Also run code in parallel between workers.
319 # --------------------------------------------
320
321 for ( $begin .. $end ) {
322 for ( 1 .. $width ) { !C! }
323 $output .= "\n";
324 }
325
326 # --------------------------------------------
327 # Display orderly.
328 # --------------------------------------------
329
330 MCE->gather( $chunk_id, $output );
331
332 $output = '';
333 }
334 };
335
336 $code1 =~ s/!C!/$c1/g;
337
338 MCE::Flow->init(
339 max_workers => 4, ## MCE::Util->get_ncpu || 4,
340 gather => MCE::Candy::out_iter_fh( \*STDOUT ),
341 init_relay => 1,
342 use_threads => 0,
343 );
344
345 MCE::Flow->run( sub { eval $code1 } );
346 MCE::Flow->finish;
347
348 $last = $LAST->get;
349
350 $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
351 $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
352
353 my $code2 = q{
354 if ( $n % $width != 0 ) {
355 for ( 1 .. $n % $width ) { !C! }
356 print "\n";
357 }
358 };
359
360 $code2 =~ s/!C!/$c2/g;
361 eval $code2;
362
363 $LAST->set( $last );
364 }
365
366 my $n = $ARGV[0] || 27;
367
368 say ">ONE Homo sapiens alu";
369 make_repeat_fasta( $alu, $n * 2 );
370
371 say ">TWO IUB ambiguity codes";
372 make_random_fasta( $iub, $n * 3 );
373
374 say ">THREE Homo sapiens frequency";
375 make_random_fasta( $homosapiens, $n * 5 );
376
377 MCE->relay_lock ( void )
378 MCE->relay_unlock ( void )
379 The "relay_lock" and "relay_unlock" methods, added to MCE 1.807, are
380 aliases for "relay_recv" and "relay" respectively. They allow one to
381 perform an exclusive action prior to actual relaying of data.
382
383 Below, "user_func" is taken from the "cat.pl" MCE example. Relaying
384 is driven by "chunk_id" or "task_wid" when not processing input,
385 thus occurs orderly.
386
387 user_func => sub {
388 my ($mce, $chunk_ref, $chunk_id) = @_;
389
390 if ($n_flag) {
391 ## Relays the total lines read.
392
393 my $output = ''; my $line_count = ($$chunk_ref =~ tr/\n//);
394 my $lines_read = MCE::relay { $_ += $line_count };
395
396 open my $fh, '<', $chunk_ref;
397 $output .= sprintf "%6d\t%s", ++$lines_read, $_ while (<$fh>);
398 close $fh;
399
400 $output .= ":$chunk_id";
401 MCE->do('display_chunk', $output);
402 }
403 else {
404 ## The following is another way to have ordered output. Workers
405 ## write directly to STDOUT exclusively without any involvement
406 ## from the manager process. The statement(s) between relay_lock
407 ## and relay_unlock run serially and most important orderly.
408
409 MCE->relay_lock; # alias for MCE->relay_recv
410
411 print $$chunk_ref; # ensure $| = 1 in script
412
413 MCE->relay_unlock; # alias for MCE->relay
414 }
415
416 return;
417 }
418
419 The following is a variant of the fasta-benchmark demonstration
420 shown above. Here, workers write exclusively and orderly to
421 "STDOUT".
422
423 # perl fasta.pl 25000000
424
425 # The Computer Language Benchmarks game
426 # http://benchmarksgame.alioth.debian.org/
427 #
428 # contributed by Barry Walsh
429 # port of fasta.rb #6
430 #
431 # MCE::Flow version by Mario Roy
432 # requires MCE 1.807+
433 # requires MCE::Shared 1.806+
434
435 use strict;
436 use warnings;
437 use feature 'say';
438
439 use MCE::Flow;
440 use MCE::Shared;
441
442 use constant IM => 139968;
443 use constant IA => 3877;
444 use constant IC => 29573;
445
446 my $LAST = MCE::Shared->scalar( 42 );
447
448 my $alu =
449 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
450 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
451 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
452 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
453 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
454 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
455 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
456
457 my $iub = [
458 [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
459 [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
460 [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
461 [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
462 [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
463 ];
464
465 my $homosapiens = [
466 [ 'a', 0.3029549426680 ],
467 [ 'c', 0.1979883004921 ],
468 [ 'g', 0.1975473066391 ],
469 [ 't', 0.3015094502008 ]
470 ];
471
472 sub make_repeat_fasta {
473 my ( $src, $n ) = @_;
474 my $width = qr/(.{1,60})/;
475 my $l = length $src;
476 my $s = $src x ( ($n / $l) + 1 );
477 substr( $s, $n, $l ) = '';
478
479 while ( $s =~ m/$width/g ) { say $1 }
480 }
481
482 sub make_random_fasta {
483 my ( $table, $n ) = @_;
484 my $rand = undef;
485 my $width = 60;
486 my $prob = 0.0;
487 my $output = '';
488 my ( $c1, $c2, $last );
489
490 $_->[1] = ( $prob += $_->[1] ) for @$table;
491
492 $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
493 $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
494
495 my $seq = MCE::Shared->sequence(
496 { chunk_size => 2000, bounds_only => 1 },
497 1, $n / $width
498 );
499
500 my $code1 = q{
501 $| = 1; # Important, must flush output immediately.
502
503 while ( 1 ) {
504 # --------------------------------------------
505 # Process code orderly between workers.
506 # --------------------------------------------
507
508 MCE->relay_lock;
509
510 my ( $begin, $end ) = $seq->next;
511 print( $output ), $output = '' if ( length $output );
512
513 MCE->relay_unlock, last if ( !defined $begin );
514
515 my $last = $LAST->get;
516 my $temp = $last;
517
518 # Pre-compute $LAST value for the next worker
519 for ( 1 .. ( $end - $begin + 1 ) * $width ) {
520 $temp = ( $temp * IA + IC ) % IM;
521 }
522
523 $LAST->set( $temp );
524
525 MCE->relay_unlock;
526
527 # --------------------------------------------
528 # Also run code in parallel.
529 # --------------------------------------------
530
531 for ( $begin .. $end ) {
532 for ( 1 .. $width ) { !C! }
533 $output .= "\n";
534 }
535 }
536 };
537
538 $code1 =~ s/!C!/$c1/g;
539
540 MCE::Flow->init(
541 max_workers => 4, ## MCE::Util->get_ncpu || 4,
542 init_relay => 0,
543 use_threads => 0,
544 );
545
546 MCE::Flow->run( sub { eval $code1 } );
547 MCE::Flow->finish;
548
549 $last = $LAST->get;
550
551 $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
552 $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
553
554 my $code2 = q{
555 if ( $n % $width != 0 ) {
556 for ( 1 .. $n % $width ) { !C! }
557 print "\n";
558 }
559 };
560
561 $code2 =~ s/!C!/$c2/g;
562 eval $code2;
563
564 $LAST->set( $last );
565 }
566
567 my $n = $ARGV[0] || 27;
568
569 say ">ONE Homo sapiens alu";
570 make_repeat_fasta( $alu, $n * 2 );
571
572 say ">TWO IUB ambiguity codes";
573 make_random_fasta( $iub, $n * 3 );
574
575 say ">THREE Homo sapiens frequency";
576 make_random_fasta( $homosapiens, $n * 5 );
577
579 I received a request from John Martel to process a large flat file and
580 expand each record to many records based on splitting out items in
581 field 4 delimited by semicolons. Each row in the output is given a
582 unique ID starting with one while preserving output order.
583
584 Input File, possibly larger than 500 GiB in size
585 foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
586 bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
587 baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
588 ...
589
590 Output File
591 000000000000001|item1|foo|field2|field3|field5|field6|field7
592 000000000000002|item2|foo|field2|field3|field5|field6|field7
593 000000000000003|item3|foo|field2|field3|field5|field6|field7
594 000000000000004|item4|foo|field2|field3|field5|field6|field7
595 000000000000005|itemN|foo|field2|field3|field5|field6|field7
596 000000000000006|item1|bar|field2|field3|field5|field6|field7
597 000000000000007|item2|bar|field2|field3|field5|field6|field7
598 000000000000008|item3|bar|field2|field3|field5|field6|field7
599 000000000000009|item4|bar|field2|field3|field5|field6|field7
600 000000000000010|itemN|bar|field2|field3|field5|field6|field7
601 000000000000011|item1|baz|field2|field3|field5|field6|field7
602 000000000000012|item2|baz|field2|field3|field5|field6|field7
603 000000000000013|item3|baz|field2|field3|field5|field6|field7
604 000000000000014|item4|baz|field2|field3|field5|field6|field7
605 000000000000015|itemN|baz|field2|field3|field5|field6|field7
606 ...
607
608 Example One
609 This example configures a custom function for preserving output
610 order. Unfortunately, the sprintf function alone involves extra CPU
611 time causing the manager process to fall behind. Thus, workers may
612 idle while waiting for the manager process to respond to the gather
613 request.
614
615 use strict;
616 use warnings;
617
618 use MCE::Loop;
619
620 my $infile = shift or die "Usage: $0 infile\n";
621 my $newfile = 'output.dat';
622
623 open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
624
625 sub preserve_order {
626 my ($fh) = @_;
627 my ($order_id, $start_idx, $idx, %tmp) = (1, 1);
628
629 return sub {
630 my ($chunk_id, $aref) = @_;
631 $tmp{ $chunk_id } = $aref;
632
633 while ( my $aref = delete $tmp{ $order_id } ) {
634 foreach my $line ( @{ $aref } ) {
635 $idx = sprintf "%015d", $start_idx++;
636 print $fh $idx, $line;
637 }
638 $order_id++;
639 }
640 }
641 }
642
643 MCE::Loop::init {
644 chunk_size => 'auto', max_workers => 3,
645 gather => preserve_order($fh_out)
646 };
647
648 mce_loop_f {
649 my ($mce, $chunk_ref, $chunk_id) = @_;
650 my @buf;
651
652 foreach my $line (@{ $chunk_ref }) {
653 $line =~ s/\r//g; chomp $line;
654
655 my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
656 my @items_array = split /;/, $items;
657
658 foreach my $item (@items_array) {
659 push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
660 }
661 }
662
663 MCE->gather($chunk_id, \@buf);
664
665 } $infile;
666
667 MCE::Loop::finish();
668 close $fh_out;
669
670 Example Two
671 In this example, workers obtain the current ID value and
672 increment/relay for the next worker, ordered by chunk ID behind the
673 scene. Workers call sprintf in parallel, allowing the manager
674 process (out_iter_fh) to accommodate up to 32 workers and not fall
675 behind.
676
677 Relay accounts for the worker handling the next chunk_id value.
678 Therefore, do not call relay more than once per chunk. Doing so will
679 cause IPC to stall.
680
681 use strict;
682 use warnings;
683
684 use MCE::Loop;
685 use MCE::Candy;
686
687 my $infile = shift or die "Usage: $0 infile\n";
688 my $newfile = 'output.dat';
689
690 open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
691
692 MCE::Loop::init {
693 chunk_size => 'auto', max_workers => 8,
694 gather => MCE::Candy::out_iter_fh($fh_out),
695 init_relay => 1
696 };
697
698 mce_loop_f {
699 my ($mce, $chunk_ref, $chunk_id) = @_;
700 my @lines;
701
702 foreach my $line (@{ $chunk_ref }) {
703 $line =~ s/\r//g; chomp $line;
704
705 my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
706 my @items_array = split /;/, $items;
707
708 foreach my $item (@items_array) {
709 push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
710 }
711 }
712
713 my $idx = MCE::relay { $_ += scalar @lines };
714 my $buf = '';
715
716 foreach my $line ( @lines ) {
717 $buf .= sprintf "%015d|%s", $idx++, $line
718 }
719
720 MCE->gather($chunk_id, $buf);
721
722 } $infile;
723
724 MCE::Loop::finish();
725 close $fh_out;
726
728 MCE, MCE::Core
729
731 Mario E. Roy, <marioeroy AT gmail DOT com>
732
733
734
735perl v5.28.1 2019-01-23 MCE::Relay(3)