1MCE::Relay(3)         User Contributed Perl Documentation        MCE::Relay(3)
2
3
4

NAME

6       MCE::Relay - Extends Many-Core Engine with relay capabilities
7

VERSION

9       This document describes MCE::Relay version 1.874
10

SYNOPSIS

12        use MCE::Flow;
13
14        my $file = shift || \*STDIN;
15
16        ## Line Count #######################################
17
18        mce_flow_f {
19           max_workers => 4,
20           use_slurpio => 1,
21           init_relay  => 0,
22        },
23        sub {
24           my ($mce, $slurp_ref, $chunk_id) = @_;
25           my $line_count = ($$slurp_ref =~ tr/\n//);
26
27           ## Receive and pass on updated information.
28           my $lines_read = MCE::relay { $_ += $line_count };
29
30        }, $file;
31
32        my $total_lines = MCE->relay_final;
33
34        print {*STDERR} "$total_lines\n";
35
36        ## Orderly Action ###################################
37
38        $| = 1; # Important, must flush output immediately.
39
40        mce_flow_f {
41           max_workers => 2,
42           use_slurpio => 1,
43           init_relay  => 0,
44        },
45        sub {
46           my ($mce, $slurp_ref, $chunk_id) = @_;
47
48           ## The relay value is relayed and remains 0.
49           ## Writes to STDOUT orderly.
50
51           MCE->relay_lock;
52           print $$slurp_ref;
53           MCE->relay_unlock;
54
55        }, $file;
56

DESCRIPTION

58       This module enables workers to receive and pass on information orderly
59       with zero involvement by the manager process while running. The module
60       is loaded automatically when MCE option "init_relay" is specified.
61
62       All workers (belonging to task_id 0) must participate when relaying
63       data.
64
65       Relaying is not meant for passing big data. The last worker will stall
66       if exceeding the buffer size for the socket. Not exceeding 16 KiB - 7
67       is safe across all platforms.
68

API DOCUMENTATION

70       MCE::relay { code }
71       MCE->relay ( sub { code } )
72       $mce->relay ( sub { code } )
73
74       Relay is enabled by specifying the init_relay option which takes a hash
75       or array reference, or a scalar value. Relaying is orderly and driven
76       by chunk_id when processing data, otherwise task_wid. Omitting the code
77       block (e.g. MCE::relay) relays forward.
78
79       Below, relaying multiple values via a HASH reference.
80
81        use MCE::Flow max_workers => 4;
82
83        mce_flow {
84           init_relay => { p => 0, e => 0 },
85        },
86        sub {
87           my $wid = MCE->wid;
88
89           ## do work
90           my $pass = $wid % 3;
91           my $errs = $wid % 2;
92
93           ## relay
94           my %last_rpt = MCE::relay { $_->{p} += $pass; $_->{e} += $errs };
95
96           MCE->print("$wid: passed $pass, errors $errs\n");
97
98           return;
99        };
100
101        my %results = MCE->relay_final;
102
103        print "   passed $results{p}, errors $results{e} final\n\n";
104
105        -- Output
106
107        1: passed 1, errors 1
108        2: passed 2, errors 0
109        3: passed 0, errors 1
110        4: passed 1, errors 0
111           passed 4, errors 2 final
112
113       Or multiple values via an ARRAY reference.
114
115        use MCE::Flow max_workers => 4;
116
117        mce_flow {
118           init_relay => [ 0, 0 ],
119        },
120        sub {
121           my $wid = MCE->wid;
122
123           ## do work
124           my $pass = $wid % 3;
125           my $errs = $wid % 2;
126
127           ## relay
128           my @last_rpt = MCE::relay { $_->[0] += $pass; $_->[1] += $errs };
129
130           MCE->print("$wid: passed $pass, errors $errs\n");
131
132           return;
133        };
134
135        my ($pass, $errs) = MCE->relay_final;
136
137        print "   passed $pass, errors $errs final\n\n";
138
139        -- Output
140
141        1: passed 1, errors 1
142        2: passed 2, errors 0
143        3: passed 0, errors 1
144        4: passed 1, errors 0
145           passed 4, errors 2 final
146
147       Or simply a scalar value.
148
149        use MCE::Flow max_workers => 4;
150
151        mce_flow {
152           init_relay => 0,
153        },
154        sub {
155           my $wid = MCE->wid;
156
157           ## do work
158           my $bytes_read = 1000 + ((MCE->wid % 3) * 3);
159
160           ## relay
161           my $last_offset = MCE::relay { $_ += $bytes_read };
162
163           ## output
164           MCE->print("$wid: $bytes_read\n");
165
166           return;
167        };
168
169        my $total = MCE->relay_final;
170
171        print "   $total size\n\n";
172
173        -- Output
174
175        1: 1003
176        2: 1006
177        3: 1000
178        4: 1003
179           4012 size
180
181       MCE->relay_final ( void )
182       $mce->relay_final ( void )
183
184       Call this method to obtain the final relay value(s) after running. See
185       included example findnull.pl for another use case.
186
187        use MCE max_workers => 4;
188
189        my $mce = MCE->new(
190           init_relay => [ 0, 100 ],       ## initial values (two counters)
191
192           user_func => sub {
193              my ($mce) = @_;
194
195              ## do work
196              my ($acc1, $acc2) = (10, 20);
197
198              ## relay to next worker
199              MCE::relay { $_->[0] += $acc1; $_->[1] += $acc2 };
200
201              return;
202           }
203        )->run;
204
205        my ($cnt1, $cnt2) = $mce->relay_final;
206
207        print "$cnt1 : $cnt2\n";
208
209        -- Output
210
211        40 : 180
212
213       MCE->relay_recv ( void )
214       $mce->relay_recv ( void )
215
216       Call this method to obtain the next relay value before relaying. This
217       allows serial-code to be processed orderly between workers. The
218       following is a parallel demonstration for the fasta-benchmark on the
219       web.
220
221        # perl fasta.pl 25000000
222
223        # The Computer Language Benchmarks game
224        # http://benchmarksgame.alioth.debian.org/
225        #
226        # contributed by Barry Walsh
227        # port of fasta.rb #6
228        #
229        # MCE::Flow version by Mario Roy
230        # requires MCE 1.807+
231        # requires MCE::Shared 1.806+
232
233        use strict;
234        use warnings;
235        use feature 'say';
236
237        use MCE::Flow;
238        use MCE::Shared;
239        use MCE::Candy;
240
241        use constant IM => 139968;
242        use constant IA => 3877;
243        use constant IC => 29573;
244
245        my $LAST = MCE::Shared->scalar( 42 );
246
247        my $alu =
248           'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
249           'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
250           'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
251           'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
252           'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
253           'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
254           'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
255
256        my $iub = [
257           [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
258           [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
259           [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
260           [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
261           [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
262        ];
263
264        my $homosapiens = [
265           [ 'a', 0.3029549426680 ],
266           [ 'c', 0.1979883004921 ],
267           [ 'g', 0.1975473066391 ],
268           [ 't', 0.3015094502008 ]
269        ];
270
271        sub make_repeat_fasta {
272           my ( $src, $n ) = @_;
273           my $width = qr/(.{1,60})/;
274           my $l     = length $src;
275           my $s     = $src x ( ($n / $l) + 1 );
276           substr( $s, $n, $l ) = '';
277
278           while ( $s =~ m/$width/g ) { say $1 }
279        }
280
281        sub make_random_fasta {
282           my ( $table, $n ) = @_;
283           my $rand   = undef;
284           my $width  = 60;
285           my $prob   = 0.0;
286           my $output = '';
287           my ( $c1, $c2, $last );
288
289           $_->[1] = ( $prob += $_->[1] ) for @$table;
290
291           $c1  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
292           $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
293
294           my $seq = MCE::Shared->sequence(
295              { chunk_size => 2000, bounds_only => 1 },
296              1, $n / $width
297           );
298
299           my $code1 = q{
300              while ( 1 ) {
301                 # --------------------------------------------
302                 # Process code orderly between workers.
303                 # --------------------------------------------
304
305                 my $chunk_id = MCE->relay_recv;
306                 my ( $begin, $end ) = $seq->next;
307
308                 MCE->relay, last if ( !defined $begin );
309
310                 my $last = $LAST->get;
311                 my $temp = $last;
312
313                 # Pre-compute $LAST value for the next worker
314                 for ( 1 .. ( $end - $begin + 1 ) * $width ) {
315                    $temp = ( $temp * IA + IC ) % IM;
316                 }
317
318                 $LAST->set( $temp );
319
320                 # Increment chunk_id value
321                 MCE->relay( sub { $_ += 1 } );
322
323                 # --------------------------------------------
324                 # Also run code in parallel between workers.
325                 # --------------------------------------------
326
327                 for ( $begin .. $end ) {
328                    for ( 1 .. $width ) { !C! }
329                    $output .= "\n";
330                 }
331
332                 # --------------------------------------------
333                 # Display orderly.
334                 # --------------------------------------------
335
336                 MCE->gather( $chunk_id, $output );
337
338                 $output = '';
339              }
340           };
341
342           $code1 =~ s/!C!/$c1/g;
343
344           MCE::Flow->init(
345              max_workers => 4, ## MCE::Util->get_ncpu || 4,
346              gather      => MCE::Candy::out_iter_fh( \*STDOUT ),
347              init_relay  => 1,
348              use_threads => 0,
349           );
350
351           MCE::Flow->run( sub { eval $code1 } );
352           MCE::Flow->finish;
353
354           $last = $LAST->get;
355
356           $c2  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
357           $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
358
359           my $code2 = q{
360              if ( $n % $width != 0 ) {
361                 for ( 1 .. $n % $width ) { !C! }
362                 print "\n";
363              }
364           };
365
366           $code2 =~ s/!C!/$c2/g;
367           eval $code2;
368
369           $LAST->set( $last );
370        }
371
372        my $n = $ARGV[0] || 27;
373
374        say ">ONE Homo sapiens alu";
375        make_repeat_fasta( $alu, $n * 2 );
376
377        say ">TWO IUB ambiguity codes";
378        make_random_fasta( $iub, $n * 3 );
379
380        say ">THREE Homo sapiens frequency";
381        make_random_fasta( $homosapiens, $n * 5 );
382
383       MCE->relay_lock ( void )
384       MCE->relay_unlock ( void )
385       $mce->relay_lock ( void )
386       $mce->relay_unlock ( void )
387
388       The "relay_lock" and "relay_unlock" methods, added to MCE 1.807, are
389       aliases for "relay_recv" and "relay" respectively. Together, they allow
390       one to perform an exclusive action prior to actual relaying of data.
391
392       Relaying is driven by "chunk_id" or "task_wid" when not processing
393       input, as seen here.
394
395        MCE->new(
396           max_workers => 8,
397           init_relay => 0,
398           user_func => sub {
399              MCE->relay_lock;
400              MCE->say("wid: ", MCE->task_wid);
401              MCE->relay_unlock( sub {
402                 $_ += 2;
403              });
404           }
405        )->run;
406
407        MCE->say("sum: ", MCE->relay_final);
408
409        __END__
410
411        wid: 1
412        wid: 2
413        wid: 3
414        wid: 4
415        wid: 5
416        wid: 6
417        wid: 7
418        wid: 8
419        sum: 16
420
421       Described above, "relay" takes a code block and combines "relay_lock"
422       and "relay_unlock" into a single call. To make this more interesting, I
423       define "init_relay" to a hash containing two key-value pairs.
424
425        MCE->new(
426           max_workers => 8,
427           init_relay => { count => 0, total => 0 },
428           user_func => sub {
429              MCE->relay_lock;
430              MCE->say("wid: ", MCE->task_wid);
431              MCE->relay_unlock( sub {
432                 $_->{count} += 1;
433                 $_->{total} += 2;
434              });
435           }
436        )->run;
437
438        my %results = MCE->relay_final;
439
440        MCE->say("count: ", $results{count});
441        MCE->say("total: ", $results{total});
442
443        __END__
444
445        wid: 1
446        wid: 2
447        wid: 3
448        wid: 4
449        wid: 5
450        wid: 6
451        wid: 7
452        wid: 8
453        count: 8
454        total: 16
455
456       Below, "user_func" is taken from the "cat.pl" MCE example. Incrementing
457       the count is done only when the "-n" switch is passed to the script.
458       Otherwise, output is displaced orderly and not necessary to update the
459       $_ value if exclusive locking is all you need.
460
461        user_func => sub {
462           my ($mce, $chunk_ref, $chunk_id) = @_;
463
464           if ($n_flag) {
465              ## Relays the total lines read.
466
467              my $output = ''; my $line_count = ($$chunk_ref =~ tr/\n//);
468              my $lines_read = MCE::relay { $_ += $line_count };
469
470              open my $fh, '<', $chunk_ref;
471              $output .= sprintf "%6d\t%s", ++$lines_read, $_ while (<$fh>);
472              close $fh;
473
474              $output .= ":$chunk_id";
475              MCE->do('display_chunk', $output);
476           }
477           else {
478              ## The following is another way to have ordered output. Workers
479              ## write directly to STDOUT exclusively without any involvement
480              ## from the manager process. The statement(s) between relay_lock
481              ## and relay_unlock run serially and most important orderly.
482
483              MCE->relay_lock;      # alias for MCE->relay_recv
484              print $$chunk_ref;    # ensure $| = 1 in script
485              MCE->relay_unlock;    # alias for MCE->relay
486           }
487
488           return;
489        }
490
491       The following is a variant of the fasta-benchmark demonstration shown
492       above.  Here, workers write exclusively and orderly to "STDOUT".
493
494        # perl fasta.pl 25000000
495
496        # The Computer Language Benchmarks game
497        # http://benchmarksgame.alioth.debian.org/
498        #
499        # contributed by Barry Walsh
500        # port of fasta.rb #6
501        #
502        # MCE::Flow version by Mario Roy
503        # requires MCE 1.807+
504        # requires MCE::Shared 1.806+
505
506        use strict;
507        use warnings;
508        use feature 'say';
509
510        use MCE::Flow;
511        use MCE::Shared;
512
513        use constant IM => 139968;
514        use constant IA => 3877;
515        use constant IC => 29573;
516
517        my $LAST = MCE::Shared->scalar( 42 );
518
519        my $alu =
520           'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
521           'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
522           'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
523           'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
524           'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
525           'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
526           'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
527
528        my $iub = [
529           [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
530           [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
531           [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
532           [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
533           [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
534        ];
535
536        my $homosapiens = [
537           [ 'a', 0.3029549426680 ],
538           [ 'c', 0.1979883004921 ],
539           [ 'g', 0.1975473066391 ],
540           [ 't', 0.3015094502008 ]
541        ];
542
543        sub make_repeat_fasta {
544           my ( $src, $n ) = @_;
545           my $width = qr/(.{1,60})/;
546           my $l     = length $src;
547           my $s     = $src x ( ($n / $l) + 1 );
548           substr( $s, $n, $l ) = '';
549
550           while ( $s =~ m/$width/g ) { say $1 }
551        }
552
553        sub make_random_fasta {
554           my ( $table, $n ) = @_;
555           my $rand   = undef;
556           my $width  = 60;
557           my $prob   = 0.0;
558           my $output = '';
559           my ( $c1, $c2, $last );
560
561           $_->[1] = ( $prob += $_->[1] ) for @$table;
562
563           $c1  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
564           $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
565
566           my $seq = MCE::Shared->sequence(
567              { chunk_size => 2000, bounds_only => 1 },
568              1, $n / $width
569           );
570
571           my $code1 = q{
572              $| = 1; # Important, must flush output immediately.
573
574              while ( 1 ) {
575                 # --------------------------------------------
576                 # Process code orderly between workers.
577                 # --------------------------------------------
578
579                 MCE->relay_lock;
580
581                 my ( $begin, $end ) = $seq->next;
582                 print( $output ), $output = '' if ( length $output );
583
584                 MCE->relay_unlock, last if ( !defined $begin );
585
586                 my $last = $LAST->get;
587                 my $temp = $last;
588
589                 # Pre-compute $LAST value for the next worker
590                 for ( 1 .. ( $end - $begin + 1 ) * $width ) {
591                    $temp = ( $temp * IA + IC ) % IM;
592                 }
593
594                 $LAST->set( $temp );
595
596                 MCE->relay_unlock;
597
598                 # --------------------------------------------
599                 # Also run code in parallel.
600                 # --------------------------------------------
601
602                 for ( $begin .. $end ) {
603                    for ( 1 .. $width ) { !C! }
604                    $output .= "\n";
605                 }
606              }
607           };
608
609           $code1 =~ s/!C!/$c1/g;
610
611           MCE::Flow->init(
612              max_workers => 4, ## MCE::Util->get_ncpu || 4,
613              init_relay  => 0,
614              use_threads => 0,
615           );
616
617           MCE::Flow->run( sub { eval $code1 } );
618           MCE::Flow->finish;
619
620           $last = $LAST->get;
621
622           $c2  = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
623           $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
624
625           my $code2 = q{
626              if ( $n % $width != 0 ) {
627                 for ( 1 .. $n % $width ) { !C! }
628                 print "\n";
629              }
630           };
631
632           $code2 =~ s/!C!/$c2/g;
633           eval $code2;
634
635           $LAST->set( $last );
636        }
637
638        my $n = $ARGV[0] || 27;
639
640        say ">ONE Homo sapiens alu";
641        make_repeat_fasta( $alu, $n * 2 );
642
643        say ">TWO IUB ambiguity codes";
644        make_random_fasta( $iub, $n * 3 );
645
646        say ">THREE Homo sapiens frequency";
647        make_random_fasta( $homosapiens, $n * 5 );
648

GATHER AND RELAY DEMONSTRATIONS

650       I received a request from John Martel to process a large flat file and
651       expand each record to many records based on splitting out items in
652       field 4 delimited by semicolons. Each row in the output is given a
653       unique ID starting with one while preserving output order.
654
655       Input File, possibly larger than 500 GiB in size
656           foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
657           bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
658           baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
659           ...
660
661       Output File
662           000000000000001|item1|foo|field2|field3|field5|field6|field7
663           000000000000002|item2|foo|field2|field3|field5|field6|field7
664           000000000000003|item3|foo|field2|field3|field5|field6|field7
665           000000000000004|item4|foo|field2|field3|field5|field6|field7
666           000000000000005|itemN|foo|field2|field3|field5|field6|field7
667           000000000000006|item1|bar|field2|field3|field5|field6|field7
668           000000000000007|item2|bar|field2|field3|field5|field6|field7
669           000000000000008|item3|bar|field2|field3|field5|field6|field7
670           000000000000009|item4|bar|field2|field3|field5|field6|field7
671           000000000000010|itemN|bar|field2|field3|field5|field6|field7
672           000000000000011|item1|baz|field2|field3|field5|field6|field7
673           000000000000012|item2|baz|field2|field3|field5|field6|field7
674           000000000000013|item3|baz|field2|field3|field5|field6|field7
675           000000000000014|item4|baz|field2|field3|field5|field6|field7
676           000000000000015|itemN|baz|field2|field3|field5|field6|field7
677           ...
678
679       Example One
680
681       This example configures a custom function for preserving output order.
682       Unfortunately, the sprintf function alone involves extra CPU time
683       causing the manager process to fall behind. Thus, workers may idle
684       while waiting for the manager process to respond to the gather request.
685
686        use strict;
687        use warnings;
688
689        use MCE::Loop;
690
691        my $infile  = shift or die "Usage: $0 infile\n";
692        my $newfile = 'output.dat';
693
694        open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
695
696        sub preserve_order {
697            my ($fh) = @_;
698            my ($order_id, $start_idx, $idx, %tmp) = (1, 1);
699
700            return sub {
701                my ($chunk_id, $aref) = @_;
702                $tmp{ $chunk_id } = $aref;
703
704                while ( my $aref = delete $tmp{ $order_id } ) {
705                    foreach my $line ( @{ $aref } ) {
706                        $idx = sprintf "%015d", $start_idx++;
707                        print $fh $idx, $line;
708                    }
709                    $order_id++;
710                }
711            }
712        }
713
714        MCE::Loop->init(
715            chunk_size => 'auto', max_workers => 3,
716            gather => preserve_order($fh_out)
717        );
718
719        mce_loop_f {
720            my ($mce, $chunk_ref, $chunk_id) = @_;
721            my @buf;
722
723            foreach my $line (@{ $chunk_ref }) {
724                $line =~ s/\r//g; chomp $line;
725
726                my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
727                my @items_array = split /;/, $items;
728
729                foreach my $item (@items_array) {
730                    push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
731                }
732            }
733
734            MCE->gather($chunk_id, \@buf);
735
736        } $infile;
737
738        MCE::Loop->finish();
739        close $fh_out;
740
741       Example Two
742
743       In this example, workers obtain the current ID value and
744       increment/relay for the next worker, ordered by chunk ID behind the
745       scene. Workers call sprintf in parallel, allowing the manager process
746       (out_iter_fh) to accommodate up to 32 workers and not fall behind.
747
748       Relay accounts for the worker handling the next chunk_id value.
749       Therefore, do not call relay more than once per chunk. Doing so will
750       cause IPC to stall.
751
752        use strict;
753        use warnings;
754
755        use MCE::Loop;
756        use MCE::Candy;
757
758        my $infile  = shift or die "Usage: $0 infile\n";
759        my $newfile = 'output.dat';
760
761        open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
762
763        MCE::Loop->init(
764            chunk_size => 'auto', max_workers => 8,
765            gather => MCE::Candy::out_iter_fh($fh_out),
766            init_relay => 1
767        );
768
769        mce_loop_f {
770            my ($mce, $chunk_ref, $chunk_id) = @_;
771            my @lines;
772
773            foreach my $line (@{ $chunk_ref }) {
774                $line =~ s/\r//g; chomp $line;
775
776                my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
777                my @items_array = split /;/, $items;
778
779                foreach my $item (@items_array) {
780                    push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
781                }
782            }
783
784            my $idx = MCE::relay { $_ += scalar @lines };
785            my $buf = '';
786
787            foreach my $line ( @lines ) {
788                $buf .= sprintf "%015d|%s", $idx++, $line
789            }
790
791            MCE->gather($chunk_id, $buf);
792
793        } $infile;
794
795        MCE::Loop->finish();
796        close $fh_out;
797

INDEX

799       MCE, MCE::Core
800

AUTHOR

802       Mario E. Roy, <marioeroy AT gmail DOT com>
803
804
805
806perl v5.32.1                      2021-01-27                     MCE::Relay(3)
Impressum