1MCE::Relay(3)         User Contributed Perl Documentation        MCE::Relay(3)
2
3
4

NAME

6       MCE::Relay - Extends Many-Core Engine with relay capabilities
7

VERSION

9       This document describes MCE::Relay version 1.838
10

SYNOPSIS

12        use MCE::Flow;
13
14        my $file = shift || \*STDIN;
15
16        ## Line Count #######################################
17
18        mce_flow_f {
19           max_workers => 4,
20           use_slurpio => 1,
21           init_relay  => 0,
22        },
23        sub {
24           my ($mce, $slurp_ref, $chunk_id) = @_;
25           my $line_count = ($$slurp_ref =~ tr/\n//);
26
27           ## Receive and pass on updated information.
28           my $lines_read = MCE::relay { $_ += $line_count };
29
30        }, $file;
31
32        my $total_lines = MCE->relay_final;
33
34        print {*STDERR} "$total_lines\n";
35
36        ## Orderly Action ###################################
37
38        $| = 1; # Important, must flush output immediately.
39
40        mce_flow_f {
41           max_workers => 2,
42           use_slurpio => 1,
43           init_relay  => 0,
44        },
45        sub {
46           my ($mce, $slurp_ref, $chunk_id) = @_;
47
48           ## The relay value is relayed and remains 0.
49           ## Writes to STDOUT orderly.
50
51           MCE->relay_lock;
52           print $$slurp_ref;
53           MCE->relay_unlock;
54
55        }, $file;
56

DESCRIPTION

58       This module enables workers to receive and pass on information orderly
59       with zero involvement by the manager process while running. The module
60       is loaded automatically when MCE option "init_relay" is specified.
61
62       All workers (belonging to task_id 0) must participate when relaying
63       data.
64
65       Relaying is not meant for passing big data. The last worker will stall
66       if exceeding the buffer size for the socket. Not exceeding 16 KiB - 7
67       is safe across all platforms.
68

API DOCUMENTATION

70       MCE->relay ( sub { code } )
71       MCE::relay { code }
72          Relay is enabled by specifying the init_relay option which takes a
73          hash or array reference, or a scalar value. Relaying is orderly and
74          driven by chunk_id when processing data, otherwise task_wid.
75          Omitting the code block (e.g. MCE::relay) relays forward.
76
77          Below, relaying multiple values via a HASH reference.
78
79           use MCE::Flow max_workers => 4;
80
81           mce_flow {
82              init_relay => { p => 0, e => 0 },
83           },
84           sub {
85              my $wid = MCE->wid;
86
87              ## do work
88              my $pass = $wid % 3;
89              my $errs = $wid % 2;
90
91              ## relay
92              my %last_rpt = MCE::relay { $_->{p} += $pass; $_->{e} += $errs };
93
94              MCE->print("$wid: passed $pass, errors $errs\n");
95
96              return;
97           };
98
99           my %results = MCE->relay_final;
100
101           print "   passed $results{p}, errors $results{e} final\n\n";
102
103           -- Output
104
105           1: passed 1, errors 1
106           2: passed 2, errors 0
107           3: passed 0, errors 1
108           4: passed 1, errors 0
109              passed 4, errors 2 final
110
111          Or multiple values via an ARRAY reference.
112
113           use MCE::Flow max_workers => 4;
114
115           mce_flow {
116              init_relay => [ 0, 0 ],
117           },
118           sub {
119              my $wid = MCE->wid;
120
121              ## do work
122              my $pass = $wid % 3;
123              my $errs = $wid % 2;
124
125              ## relay
126              my @last_rpt = MCE::relay { $_->[0] += $pass; $_->[1] += $errs };
127
128              MCE->print("$wid: passed $pass, errors $errs\n");
129
130              return;
131           };
132
133           my ($pass, $errs) = MCE->relay_final;
134
135           print "   passed $pass, errors $errs final\n\n";
136
137           -- Output
138
139           1: passed 1, errors 1
140           2: passed 2, errors 0
141           3: passed 0, errors 1
142           4: passed 1, errors 0
143              passed 4, errors 2 final
144
145          Or simply a scalar value.
146
147           use MCE::Flow max_workers => 4;
148
149           mce_flow {
150              init_relay => 0,
151           },
152           sub {
153              my $wid = MCE->wid;
154
155              ## do work
156              my $bytes_read = 1000 + ((MCE->wid % 3) * 3);
157
158              ## relay
159              my $last_offset = MCE::relay { $_ += $bytes_read };
160
161              ## output
162              MCE->print("$wid: $bytes_read\n");
163
164              return;
165           };
166
167           my $total = MCE->relay_final;
168
169           print "   $total size\n\n";
170
171           -- Output
172
173           1: 1003
174           2: 1006
175           3: 1000
176           4: 1003
177              4012 size
178
179       MCE->relay_final ( void )
180          Call this method to obtain the final relay value(s) after running.
181          See included example findnull.pl for another use case.
182
183           use MCE max_workers => 4;
184
185           my $mce = MCE->new(
186              init_relay => [ 0, 100 ],       ## initial values (two counters)
187
188              user_func => sub {
189                 my ($mce) = @_;
190
191                 ## do work
192                 my ($acc1, $acc2) = (10, 20);
193
194                 ## relay to next worker
195                 MCE::relay { $_->[0] += $acc1; $_->[1] += $acc2 };
196
197                 return;
198              }
199           )->run;
200
201           my ($cnt1, $cnt2) = $mce->relay_final;
202
203           print "$cnt1 : $cnt2\n";
204
205           -- Output
206
207           40 : 180
208
209       MCE->relay_recv ( void )
210          Call this method to obtain the next relay value before relaying.
211          This allows serial-code to be processed orderly between workers. The
212          following is a parallel demonstration for the fasta-benchmark on the
213          web.
214
215           # perl fasta.pl 25000000
216
217           # The Computer Language Benchmarks game
218           # http://benchmarksgame.alioth.debian.org/
219           #
220           # contributed by Barry Walsh
221           # port of fasta.rb #6
222           #
223           # MCE::Flow version by Mario Roy
224           # requires MCE 1.807+
225           # requires MCE::Shared 1.806+
226
227           use strict;
228           use warnings;
229           use feature 'say';
230
231           use MCE::Flow;
232           use MCE::Shared;
233           use MCE::Candy;
234
235           use constant IM => 139968;
236           use constant IA => 3877;
237           use constant IC => 29573;
238
239           my $LAST = MCE::Shared->scalar( 42 );
240
241           my $alu =
242              'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
243              'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
244              'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
245              'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
246              'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
247              'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
248              'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
249
250           my $iub = [
251              [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
252              [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
253              [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
254              [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
255              [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
256           ];
257
258           my $homosapiens = [
259              [ 'a', 0.3029549426680 ],
260              [ 'c', 0.1979883004921 ],
261              [ 'g', 0.1975473066391 ],
262              [ 't', 0.3015094502008 ]
263           ];
264
265           sub make_repeat_fasta {
266              my ( $src, $n ) = @_;
267              my $width = qr/(.{1,60})/;
268              my $l     = length $src;
269              my $s     = $src x ( ($n / $l) + 1 );
270              substr( $s, $n, $l ) = '';
271
272              while ( $s =~ m/$width/g ) { say $1 }
273           }
274
275           sub make_random_fasta {
276              my ( $table, $n ) = @_;
277              my $rand   = undef;
278              my $width  = 60;
279              my $prob   = 0.0;
280              my $output = '';
281              my ( $c1, $c2, $last );
282
283              $_->[1] = ( $prob += $_->[1] ) for @$table;
284
285              $c1  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
286              $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
287
288              my $seq = MCE::Shared->sequence(
289                 { chunk_size => 2000, bounds_only => 1 },
290                 1, $n / $width
291              );
292
293              my $code1 = q{
294                 while ( 1 ) {
295                    # --------------------------------------------
296                    # Process code orderly between workers.
297                    # --------------------------------------------
298
299                    my $chunk_id = MCE->relay_recv;
300                    my ( $begin, $end ) = $seq->next;
301
302                    MCE->relay, last if ( !defined $begin );
303
304                    my $last = $LAST->get;
305                    my $temp = $last;
306
307                    # Pre-compute $LAST value for the next worker
308                    for ( 1 .. ( $end - $begin + 1 ) * $width ) {
309                       $temp = ( $temp * IA + IC ) % IM;
310                    }
311
312                    $LAST->set( $temp );
313
314                    # Increment chunk_id value
315                    MCE->relay( sub { $_ += 1 } );
316
317                    # --------------------------------------------
318                    # Also run code in parallel between workers.
319                    # --------------------------------------------
320
321                    for ( $begin .. $end ) {
322                       for ( 1 .. $width ) { !C! }
323                       $output .= "\n";
324                    }
325
326                    # --------------------------------------------
327                    # Display orderly.
328                    # --------------------------------------------
329
330                    MCE->gather( $chunk_id, $output );
331
332                    $output = '';
333                 }
334              };
335
336              $code1 =~ s/!C!/$c1/g;
337
338              MCE::Flow->init(
339                 max_workers => 4, ## MCE::Util->get_ncpu || 4,
340                 gather      => MCE::Candy::out_iter_fh( \*STDOUT ),
341                 init_relay  => 1,
342                 use_threads => 0,
343              );
344
345              MCE::Flow->run( sub { eval $code1 } );
346              MCE::Flow->finish;
347
348              $last = $LAST->get;
349
350              $c2  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
351              $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
352
353              my $code2 = q{
354                 if ( $n % $width != 0 ) {
355                    for ( 1 .. $n % $width ) { !C! }
356                    print "\n";
357                 }
358              };
359
360              $code2 =~ s/!C!/$c2/g;
361              eval $code2;
362
363              $LAST->set( $last );
364           }
365
366           my $n = $ARGV[0] || 27;
367
368           say ">ONE Homo sapiens alu";
369           make_repeat_fasta( $alu, $n * 2 );
370
371           say ">TWO IUB ambiguity codes";
372           make_random_fasta( $iub, $n * 3 );
373
374           say ">THREE Homo sapiens frequency";
375           make_random_fasta( $homosapiens, $n * 5 );
376
377       MCE->relay_lock ( void )
378       MCE->relay_unlock ( void )
379          The "relay_lock" and "relay_unlock" methods, added to MCE 1.807, are
380          aliases for "relay_recv" and "relay" respectively. They allow one to
381          perform an exclusive action prior to actual relaying of data.
382
383          Below, "user_func" is taken from the "cat.pl" MCE example. Relaying
384          is driven by "chunk_id" or "task_wid" when not processing input,
385          thus occurs orderly.
386
387           user_func => sub {
388              my ($mce, $chunk_ref, $chunk_id) = @_;
389
390              if ($n_flag) {
391                 ## Relays the total lines read.
392
393                 my $output = ''; my $line_count = ($$chunk_ref =~ tr/\n//);
394                 my $lines_read = MCE::relay { $_ += $line_count };
395
396                 open my $fh, '<', $chunk_ref;
397                 $output .= sprintf "%6d\t%s", ++$lines_read, $_ while (<$fh>);
398                 close $fh;
399
400                 $output .= ":$chunk_id";
401                 MCE->do('display_chunk', $output);
402              }
403              else {
404                 ## The following is another way to have ordered output. Workers
405                 ## write directly to STDOUT exclusively without any involvement
406                 ## from the manager process. The statement(s) between relay_lock
407                 ## and relay_unlock run serially and most important orderly.
408
409                 MCE->relay_lock;      # alias for MCE->relay_recv
410
411                 print $$chunk_ref;    # ensure $| = 1 in script
412
413                 MCE->relay_unlock;    # alias for MCE->relay
414              }
415
416              return;
417           }
418
419          The following is a variant of the fasta-benchmark demonstration
420          shown above.  Here, workers write exclusively and orderly to
421          "STDOUT".
422
423           # perl fasta.pl 25000000
424
425           # The Computer Language Benchmarks game
426           # http://benchmarksgame.alioth.debian.org/
427           #
428           # contributed by Barry Walsh
429           # port of fasta.rb #6
430           #
431           # MCE::Flow version by Mario Roy
432           # requires MCE 1.807+
433           # requires MCE::Shared 1.806+
434
435           use strict;
436           use warnings;
437           use feature 'say';
438
439           use MCE::Flow;
440           use MCE::Shared;
441
442           use constant IM => 139968;
443           use constant IA => 3877;
444           use constant IC => 29573;
445
446           my $LAST = MCE::Shared->scalar( 42 );
447
448           my $alu =
449              'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
450              'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
451              'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
452              'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
453              'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
454              'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
455              'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
456
457           my $iub = [
458              [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
459              [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
460              [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
461              [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
462              [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
463           ];
464
465           my $homosapiens = [
466              [ 'a', 0.3029549426680 ],
467              [ 'c', 0.1979883004921 ],
468              [ 'g', 0.1975473066391 ],
469              [ 't', 0.3015094502008 ]
470           ];
471
472           sub make_repeat_fasta {
473              my ( $src, $n ) = @_;
474              my $width = qr/(.{1,60})/;
475              my $l     = length $src;
476              my $s     = $src x ( ($n / $l) + 1 );
477              substr( $s, $n, $l ) = '';
478
479              while ( $s =~ m/$width/g ) { say $1 }
480           }
481
482           sub make_random_fasta {
483              my ( $table, $n ) = @_;
484              my $rand   = undef;
485              my $width  = 60;
486              my $prob   = 0.0;
487              my $output = '';
488              my ( $c1, $c2, $last );
489
490              $_->[1] = ( $prob += $_->[1] ) for @$table;
491
492              $c1  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
493              $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
494
495              my $seq = MCE::Shared->sequence(
496                 { chunk_size => 2000, bounds_only => 1 },
497                 1, $n / $width
498              );
499
500              my $code1 = q{
501                 $| = 1; # Important, must flush output immediately.
502
503                 while ( 1 ) {
504                    # --------------------------------------------
505                    # Process code orderly between workers.
506                    # --------------------------------------------
507
508                    MCE->relay_lock;
509
510                    my ( $begin, $end ) = $seq->next;
511                    print( $output ), $output = '' if ( length $output );
512
513                    MCE->relay_unlock, last if ( !defined $begin );
514
515                    my $last = $LAST->get;
516                    my $temp = $last;
517
518                    # Pre-compute $LAST value for the next worker
519                    for ( 1 .. ( $end - $begin + 1 ) * $width ) {
520                       $temp = ( $temp * IA + IC ) % IM;
521                    }
522
523                    $LAST->set( $temp );
524
525                    MCE->relay_unlock;
526
527                    # --------------------------------------------
528                    # Also run code in parallel.
529                    # --------------------------------------------
530
531                    for ( $begin .. $end ) {
532                       for ( 1 .. $width ) { !C! }
533                       $output .= "\n";
534                    }
535                 }
536              };
537
538              $code1 =~ s/!C!/$c1/g;
539
540              MCE::Flow->init(
541                 max_workers => 4, ## MCE::Util->get_ncpu || 4,
542                 init_relay  => 0,
543                 use_threads => 0,
544              );
545
546              MCE::Flow->run( sub { eval $code1 } );
547              MCE::Flow->finish;
548
549              $last = $LAST->get;
550
551              $c2  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
552              $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
553
554              my $code2 = q{
555                 if ( $n % $width != 0 ) {
556                    for ( 1 .. $n % $width ) { !C! }
557                    print "\n";
558                 }
559              };
560
561              $code2 =~ s/!C!/$c2/g;
562              eval $code2;
563
564              $LAST->set( $last );
565           }
566
567           my $n = $ARGV[0] || 27;
568
569           say ">ONE Homo sapiens alu";
570           make_repeat_fasta( $alu, $n * 2 );
571
572           say ">TWO IUB ambiguity codes";
573           make_random_fasta( $iub, $n * 3 );
574
575           say ">THREE Homo sapiens frequency";
576           make_random_fasta( $homosapiens, $n * 5 );
577

GATHER AND RELAY DEMONSTRATIONS

579       I received a request from John Martel to process a large flat file and
580       expand each record to many records based on splitting out items in
581       field 4 delimited by semicolons. Each row in the output is given a
582       unique ID starting with one while preserving output order.
583
584       Input File, possibly larger than 500 GiB in size
585           foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
586           bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
587           baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
588           ...
589
590       Output File
591           000000000000001|item1|foo|field2|field3|field5|field6|field7
592           000000000000002|item2|foo|field2|field3|field5|field6|field7
593           000000000000003|item3|foo|field2|field3|field5|field6|field7
594           000000000000004|item4|foo|field2|field3|field5|field6|field7
595           000000000000005|itemN|foo|field2|field3|field5|field6|field7
596           000000000000006|item1|bar|field2|field3|field5|field6|field7
597           000000000000007|item2|bar|field2|field3|field5|field6|field7
598           000000000000008|item3|bar|field2|field3|field5|field6|field7
599           000000000000009|item4|bar|field2|field3|field5|field6|field7
600           000000000000010|itemN|bar|field2|field3|field5|field6|field7
601           000000000000011|item1|baz|field2|field3|field5|field6|field7
602           000000000000012|item2|baz|field2|field3|field5|field6|field7
603           000000000000013|item3|baz|field2|field3|field5|field6|field7
604           000000000000014|item4|baz|field2|field3|field5|field6|field7
605           000000000000015|itemN|baz|field2|field3|field5|field6|field7
606           ...
607
608       Example One
609          This example configures a custom function for preserving output
610          order.  Unfortunately, the sprintf function alone involves extra CPU
611          time causing the manager process to fall behind. Thus, workers may
612          idle while waiting for the manager process to respond to the gather
613          request.
614
615           use strict;
616           use warnings;
617
618           use MCE::Loop;
619
620           my $infile  = shift or die "Usage: $0 infile\n";
621           my $newfile = 'output.dat';
622
623           open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
624
625           sub preserve_order {
626               my ($fh) = @_;
627               my ($order_id, $start_idx, $idx, %tmp) = (1, 1);
628
629               return sub {
630                   my ($chunk_id, $aref) = @_;
631                   $tmp{ $chunk_id } = $aref;
632
633                   while ( my $aref = delete $tmp{ $order_id } ) {
634                       foreach my $line ( @{ $aref } ) {
635                           $idx = sprintf "%015d", $start_idx++;
636                           print $fh $idx, $line;
637                       }
638                       $order_id++;
639                   }
640               }
641           }
642
643           MCE::Loop::init {
644               chunk_size => 'auto', max_workers => 3,
645               gather => preserve_order($fh_out)
646           };
647
648           mce_loop_f {
649               my ($mce, $chunk_ref, $chunk_id) = @_;
650               my @buf;
651
652               foreach my $line (@{ $chunk_ref }) {
653                   $line =~ s/\r//g; chomp $line;
654
655                   my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
656                   my @items_array = split /;/, $items;
657
658                   foreach my $item (@items_array) {
659                       push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
660                   }
661               }
662
663               MCE->gather($chunk_id, \@buf);
664
665           } $infile;
666
667           MCE::Loop::finish();
668           close $fh_out;
669
670       Example Two
671          In this example, workers obtain the current ID value and
672          increment/relay for the next worker, ordered by chunk ID behind the
673          scene. Workers call sprintf in parallel, allowing the manager
674          process (out_iter_fh) to accommodate up to 32 workers and not fall
675          behind.
676
677          Relay accounts for the worker handling the next chunk_id value.
678          Therefore, do not call relay more than once per chunk. Doing so will
679          cause IPC to stall.
680
681           use strict;
682           use warnings;
683
684           use MCE::Loop;
685           use MCE::Candy;
686
687           my $infile  = shift or die "Usage: $0 infile\n";
688           my $newfile = 'output.dat';
689
690           open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
691
692           MCE::Loop::init {
693               chunk_size => 'auto', max_workers => 8,
694               gather => MCE::Candy::out_iter_fh($fh_out),
695               init_relay => 1
696           };
697
698           mce_loop_f {
699               my ($mce, $chunk_ref, $chunk_id) = @_;
700               my @lines;
701
702               foreach my $line (@{ $chunk_ref }) {
703                   $line =~ s/\r//g; chomp $line;
704
705                   my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
706                   my @items_array = split /;/, $items;
707
708                   foreach my $item (@items_array) {
709                       push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
710                   }
711               }
712
713               my $idx = MCE::relay { $_ += scalar @lines };
714               my $buf = '';
715
716               foreach my $line ( @lines ) {
717                   $buf .= sprintf "%015d|%s", $idx++, $line
718               }
719
720               MCE->gather($chunk_id, $buf);
721
722           } $infile;
723
724           MCE::Loop::finish();
725           close $fh_out;
726

INDEX

728       MCE, MCE::Core
729

AUTHOR

731       Mario E. Roy, <marioeroy AT gmail DOT com>
732
733
734
735perl v5.28.1                      2019-01-23                     MCE::Relay(3)
Impressum