1MCE::Relay(3) User Contributed Perl Documentation MCE::Relay(3)
2
3
4
6 MCE::Relay - Extends Many-Core Engine with relay capabilities
7
9 This document describes MCE::Relay version 1.874
10
12 use MCE::Flow;
13
14 my $file = shift || \*STDIN;
15
16 ## Line Count #######################################
17
18 mce_flow_f {
19 max_workers => 4,
20 use_slurpio => 1,
21 init_relay => 0,
22 },
23 sub {
24 my ($mce, $slurp_ref, $chunk_id) = @_;
25 my $line_count = ($$slurp_ref =~ tr/\n//);
26
27 ## Receive and pass on updated information.
28 my $lines_read = MCE::relay { $_ += $line_count };
29
30 }, $file;
31
32 my $total_lines = MCE->relay_final;
33
34 print {*STDERR} "$total_lines\n";
35
36 ## Orderly Action ###################################
37
38 $| = 1; # Important, must flush output immediately.
39
40 mce_flow_f {
41 max_workers => 2,
42 use_slurpio => 1,
43 init_relay => 0,
44 },
45 sub {
46 my ($mce, $slurp_ref, $chunk_id) = @_;
47
48 ## The relay value is relayed and remains 0.
49 ## Writes to STDOUT orderly.
50
51 MCE->relay_lock;
52 print $$slurp_ref;
53 MCE->relay_unlock;
54
55 }, $file;
56
58 This module enables workers to receive and pass on information orderly
59 with zero involvement by the manager process while running. The module
60 is loaded automatically when MCE option "init_relay" is specified.
61
62 All workers (belonging to task_id 0) must participate when relaying
63 data.
64
65 Relaying is not meant for passing big data. The last worker will stall
66 if exceeding the buffer size for the socket. Not exceeding 16 KiB - 7
67 is safe across all platforms.
68
70 MCE::relay { code }
71 MCE->relay ( sub { code } )
72 $mce->relay ( sub { code } )
73
74 Relay is enabled by specifying the init_relay option which takes a hash
75 or array reference, or a scalar value. Relaying is orderly and driven
76 by chunk_id when processing data, otherwise task_wid. Omitting the code
77 block (e.g. MCE::relay) relays forward.
78
79 Below, relaying multiple values via a HASH reference.
80
81 use MCE::Flow max_workers => 4;
82
83 mce_flow {
84 init_relay => { p => 0, e => 0 },
85 },
86 sub {
87 my $wid = MCE->wid;
88
89 ## do work
90 my $pass = $wid % 3;
91 my $errs = $wid % 2;
92
93 ## relay
94 my %last_rpt = MCE::relay { $_->{p} += $pass; $_->{e} += $errs };
95
96 MCE->print("$wid: passed $pass, errors $errs\n");
97
98 return;
99 };
100
101 my %results = MCE->relay_final;
102
103 print " passed $results{p}, errors $results{e} final\n\n";
104
105 -- Output
106
107 1: passed 1, errors 1
108 2: passed 2, errors 0
109 3: passed 0, errors 1
110 4: passed 1, errors 0
111 passed 4, errors 2 final
112
113 Or multiple values via an ARRAY reference.
114
115 use MCE::Flow max_workers => 4;
116
117 mce_flow {
118 init_relay => [ 0, 0 ],
119 },
120 sub {
121 my $wid = MCE->wid;
122
123 ## do work
124 my $pass = $wid % 3;
125 my $errs = $wid % 2;
126
127 ## relay
128 my @last_rpt = MCE::relay { $_->[0] += $pass; $_->[1] += $errs };
129
130 MCE->print("$wid: passed $pass, errors $errs\n");
131
132 return;
133 };
134
135 my ($pass, $errs) = MCE->relay_final;
136
137 print " passed $pass, errors $errs final\n\n";
138
139 -- Output
140
141 1: passed 1, errors 1
142 2: passed 2, errors 0
143 3: passed 0, errors 1
144 4: passed 1, errors 0
145 passed 4, errors 2 final
146
147 Or simply a scalar value.
148
149 use MCE::Flow max_workers => 4;
150
151 mce_flow {
152 init_relay => 0,
153 },
154 sub {
155 my $wid = MCE->wid;
156
157 ## do work
158 my $bytes_read = 1000 + ((MCE->wid % 3) * 3);
159
160 ## relay
161 my $last_offset = MCE::relay { $_ += $bytes_read };
162
163 ## output
164 MCE->print("$wid: $bytes_read\n");
165
166 return;
167 };
168
169 my $total = MCE->relay_final;
170
171 print " $total size\n\n";
172
173 -- Output
174
175 1: 1003
176 2: 1006
177 3: 1000
178 4: 1003
179 4012 size
180
181 MCE->relay_final ( void )
182 $mce->relay_final ( void )
183
184 Call this method to obtain the final relay value(s) after running. See
185 included example findnull.pl for another use case.
186
187 use MCE max_workers => 4;
188
189 my $mce = MCE->new(
190 init_relay => [ 0, 100 ], ## initial values (two counters)
191
192 user_func => sub {
193 my ($mce) = @_;
194
195 ## do work
196 my ($acc1, $acc2) = (10, 20);
197
198 ## relay to next worker
199 MCE::relay { $_->[0] += $acc1; $_->[1] += $acc2 };
200
201 return;
202 }
203 )->run;
204
205 my ($cnt1, $cnt2) = $mce->relay_final;
206
207 print "$cnt1 : $cnt2\n";
208
209 -- Output
210
211 40 : 180
212
213 MCE->relay_recv ( void )
214 $mce->relay_recv ( void )
215
216 Call this method to obtain the next relay value before relaying. This
217 allows serial-code to be processed orderly between workers. The
218 following is a parallel demonstration for the fasta-benchmark on the
219 web.
220
221 # perl fasta.pl 25000000
222
223 # The Computer Language Benchmarks game
224 # http://benchmarksgame.alioth.debian.org/
225 #
226 # contributed by Barry Walsh
227 # port of fasta.rb #6
228 #
229 # MCE::Flow version by Mario Roy
230 # requires MCE 1.807+
231 # requires MCE::Shared 1.806+
232
233 use strict;
234 use warnings;
235 use feature 'say';
236
237 use MCE::Flow;
238 use MCE::Shared;
239 use MCE::Candy;
240
241 use constant IM => 139968;
242 use constant IA => 3877;
243 use constant IC => 29573;
244
245 my $LAST = MCE::Shared->scalar( 42 );
246
247 my $alu =
248 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
249 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
250 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
251 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
252 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
253 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
254 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
255
256 my $iub = [
257 [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
258 [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
259 [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
260 [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
261 [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
262 ];
263
264 my $homosapiens = [
265 [ 'a', 0.3029549426680 ],
266 [ 'c', 0.1979883004921 ],
267 [ 'g', 0.1975473066391 ],
268 [ 't', 0.3015094502008 ]
269 ];
270
271 sub make_repeat_fasta {
272 my ( $src, $n ) = @_;
273 my $width = qr/(.{1,60})/;
274 my $l = length $src;
275 my $s = $src x ( ($n / $l) + 1 );
276 substr( $s, $n, $l ) = '';
277
278 while ( $s =~ m/$width/g ) { say $1 }
279 }
280
281 sub make_random_fasta {
282 my ( $table, $n ) = @_;
283 my $rand = undef;
284 my $width = 60;
285 my $prob = 0.0;
286 my $output = '';
287 my ( $c1, $c2, $last );
288
289 $_->[1] = ( $prob += $_->[1] ) for @$table;
290
291 $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
292 $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
293
294 my $seq = MCE::Shared->sequence(
295 { chunk_size => 2000, bounds_only => 1 },
296 1, $n / $width
297 );
298
299 my $code1 = q{
300 while ( 1 ) {
301 # --------------------------------------------
302 # Process code orderly between workers.
303 # --------------------------------------------
304
305 my $chunk_id = MCE->relay_recv;
306 my ( $begin, $end ) = $seq->next;
307
308 MCE->relay, last if ( !defined $begin );
309
310 my $last = $LAST->get;
311 my $temp = $last;
312
313 # Pre-compute $LAST value for the next worker
314 for ( 1 .. ( $end - $begin + 1 ) * $width ) {
315 $temp = ( $temp * IA + IC ) % IM;
316 }
317
318 $LAST->set( $temp );
319
320 # Increment chunk_id value
321 MCE->relay( sub { $_ += 1 } );
322
323 # --------------------------------------------
324 # Also run code in parallel between workers.
325 # --------------------------------------------
326
327 for ( $begin .. $end ) {
328 for ( 1 .. $width ) { !C! }
329 $output .= "\n";
330 }
331
332 # --------------------------------------------
333 # Display orderly.
334 # --------------------------------------------
335
336 MCE->gather( $chunk_id, $output );
337
338 $output = '';
339 }
340 };
341
342 $code1 =~ s/!C!/$c1/g;
343
344 MCE::Flow->init(
345 max_workers => 4, ## MCE::Util->get_ncpu || 4,
346 gather => MCE::Candy::out_iter_fh( \*STDOUT ),
347 init_relay => 1,
348 use_threads => 0,
349 );
350
351 MCE::Flow->run( sub { eval $code1 } );
352 MCE::Flow->finish;
353
354 $last = $LAST->get;
355
356 $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
357 $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
358
359 my $code2 = q{
360 if ( $n % $width != 0 ) {
361 for ( 1 .. $n % $width ) { !C! }
362 print "\n";
363 }
364 };
365
366 $code2 =~ s/!C!/$c2/g;
367 eval $code2;
368
369 $LAST->set( $last );
370 }
371
372 my $n = $ARGV[0] || 27;
373
374 say ">ONE Homo sapiens alu";
375 make_repeat_fasta( $alu, $n * 2 );
376
377 say ">TWO IUB ambiguity codes";
378 make_random_fasta( $iub, $n * 3 );
379
380 say ">THREE Homo sapiens frequency";
381 make_random_fasta( $homosapiens, $n * 5 );
382
383 MCE->relay_lock ( void )
384 MCE->relay_unlock ( void )
385 $mce->relay_lock ( void )
386 $mce->relay_unlock ( void )
387
388 The "relay_lock" and "relay_unlock" methods, added to MCE 1.807, are
389 aliases for "relay_recv" and "relay" respectively. Together, they allow
390 one to perform an exclusive action prior to actual relaying of data.
391
392 Relaying is driven by "chunk_id" or "task_wid" when not processing
393 input, as seen here.
394
395 MCE->new(
396 max_workers => 8,
397 init_relay => 0,
398 user_func => sub {
399 MCE->relay_lock;
400 MCE->say("wid: ", MCE->task_wid);
401 MCE->relay_unlock( sub {
402 $_ += 2;
403 });
404 }
405 )->run;
406
407 MCE->say("sum: ", MCE->relay_final);
408
409 __END__
410
411 wid: 1
412 wid: 2
413 wid: 3
414 wid: 4
415 wid: 5
416 wid: 6
417 wid: 7
418 wid: 8
419 sum: 16
420
421 Described above, "relay" takes a code block and combines "relay_lock"
422 and "relay_unlock" into a single call. To make this more interesting, I
423 define "init_relay" to a hash containing two key-value pairs.
424
425 MCE->new(
426 max_workers => 8,
427 init_relay => { count => 0, total => 0 },
428 user_func => sub {
429 MCE->relay_lock;
430 MCE->say("wid: ", MCE->task_wid);
431 MCE->relay_unlock( sub {
432 $_->{count} += 1;
433 $_->{total} += 2;
434 });
435 }
436 )->run;
437
438 my %results = MCE->relay_final;
439
440 MCE->say("count: ", $results{count});
441 MCE->say("total: ", $results{total});
442
443 __END__
444
445 wid: 1
446 wid: 2
447 wid: 3
448 wid: 4
449 wid: 5
450 wid: 6
451 wid: 7
452 wid: 8
453 count: 8
454 total: 16
455
456 Below, "user_func" is taken from the "cat.pl" MCE example. Incrementing
457 the count is done only when the "-n" switch is passed to the script.
458 Otherwise, output is displaced orderly and not necessary to update the
459 $_ value if exclusive locking is all you need.
460
461 user_func => sub {
462 my ($mce, $chunk_ref, $chunk_id) = @_;
463
464 if ($n_flag) {
465 ## Relays the total lines read.
466
467 my $output = ''; my $line_count = ($$chunk_ref =~ tr/\n//);
468 my $lines_read = MCE::relay { $_ += $line_count };
469
470 open my $fh, '<', $chunk_ref;
471 $output .= sprintf "%6d\t%s", ++$lines_read, $_ while (<$fh>);
472 close $fh;
473
474 $output .= ":$chunk_id";
475 MCE->do('display_chunk', $output);
476 }
477 else {
478 ## The following is another way to have ordered output. Workers
479 ## write directly to STDOUT exclusively without any involvement
480 ## from the manager process. The statement(s) between relay_lock
481 ## and relay_unlock run serially and most important orderly.
482
483 MCE->relay_lock; # alias for MCE->relay_recv
484 print $$chunk_ref; # ensure $| = 1 in script
485 MCE->relay_unlock; # alias for MCE->relay
486 }
487
488 return;
489 }
490
491 The following is a variant of the fasta-benchmark demonstration shown
492 above. Here, workers write exclusively and orderly to "STDOUT".
493
494 # perl fasta.pl 25000000
495
496 # The Computer Language Benchmarks game
497 # http://benchmarksgame.alioth.debian.org/
498 #
499 # contributed by Barry Walsh
500 # port of fasta.rb #6
501 #
502 # MCE::Flow version by Mario Roy
503 # requires MCE 1.807+
504 # requires MCE::Shared 1.806+
505
506 use strict;
507 use warnings;
508 use feature 'say';
509
510 use MCE::Flow;
511 use MCE::Shared;
512
513 use constant IM => 139968;
514 use constant IA => 3877;
515 use constant IC => 29573;
516
517 my $LAST = MCE::Shared->scalar( 42 );
518
519 my $alu =
520 'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
521 'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
522 'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
523 'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
524 'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
525 'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
526 'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
527
528 my $iub = [
529 [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
530 [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
531 [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
532 [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
533 [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
534 ];
535
536 my $homosapiens = [
537 [ 'a', 0.3029549426680 ],
538 [ 'c', 0.1979883004921 ],
539 [ 'g', 0.1975473066391 ],
540 [ 't', 0.3015094502008 ]
541 ];
542
543 sub make_repeat_fasta {
544 my ( $src, $n ) = @_;
545 my $width = qr/(.{1,60})/;
546 my $l = length $src;
547 my $s = $src x ( ($n / $l) + 1 );
548 substr( $s, $n, $l ) = '';
549
550 while ( $s =~ m/$width/g ) { say $1 }
551 }
552
553 sub make_random_fasta {
554 my ( $table, $n ) = @_;
555 my $rand = undef;
556 my $width = 60;
557 my $prob = 0.0;
558 my $output = '';
559 my ( $c1, $c2, $last );
560
561 $_->[1] = ( $prob += $_->[1] ) for @$table;
562
563 $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
564 $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
565
566 my $seq = MCE::Shared->sequence(
567 { chunk_size => 2000, bounds_only => 1 },
568 1, $n / $width
569 );
570
571 my $code1 = q{
572 $| = 1; # Important, must flush output immediately.
573
574 while ( 1 ) {
575 # --------------------------------------------
576 # Process code orderly between workers.
577 # --------------------------------------------
578
579 MCE->relay_lock;
580
581 my ( $begin, $end ) = $seq->next;
582 print( $output ), $output = '' if ( length $output );
583
584 MCE->relay_unlock, last if ( !defined $begin );
585
586 my $last = $LAST->get;
587 my $temp = $last;
588
589 # Pre-compute $LAST value for the next worker
590 for ( 1 .. ( $end - $begin + 1 ) * $width ) {
591 $temp = ( $temp * IA + IC ) % IM;
592 }
593
594 $LAST->set( $temp );
595
596 MCE->relay_unlock;
597
598 # --------------------------------------------
599 # Also run code in parallel.
600 # --------------------------------------------
601
602 for ( $begin .. $end ) {
603 for ( 1 .. $width ) { !C! }
604 $output .= "\n";
605 }
606 }
607 };
608
609 $code1 =~ s/!C!/$c1/g;
610
611 MCE::Flow->init(
612 max_workers => 4, ## MCE::Util->get_ncpu || 4,
613 init_relay => 0,
614 use_threads => 0,
615 );
616
617 MCE::Flow->run( sub { eval $code1 } );
618 MCE::Flow->finish;
619
620 $last = $LAST->get;
621
622 $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
623 $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
624
625 my $code2 = q{
626 if ( $n % $width != 0 ) {
627 for ( 1 .. $n % $width ) { !C! }
628 print "\n";
629 }
630 };
631
632 $code2 =~ s/!C!/$c2/g;
633 eval $code2;
634
635 $LAST->set( $last );
636 }
637
638 my $n = $ARGV[0] || 27;
639
640 say ">ONE Homo sapiens alu";
641 make_repeat_fasta( $alu, $n * 2 );
642
643 say ">TWO IUB ambiguity codes";
644 make_random_fasta( $iub, $n * 3 );
645
646 say ">THREE Homo sapiens frequency";
647 make_random_fasta( $homosapiens, $n * 5 );
648
650 I received a request from John Martel to process a large flat file and
651 expand each record to many records based on splitting out items in
652 field 4 delimited by semicolons. Each row in the output is given a
653 unique ID starting with one while preserving output order.
654
655 Input File, possibly larger than 500 GiB in size
656 foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
657 bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
658 baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
659 ...
660
661 Output File
662 000000000000001|item1|foo|field2|field3|field5|field6|field7
663 000000000000002|item2|foo|field2|field3|field5|field6|field7
664 000000000000003|item3|foo|field2|field3|field5|field6|field7
665 000000000000004|item4|foo|field2|field3|field5|field6|field7
666 000000000000005|itemN|foo|field2|field3|field5|field6|field7
667 000000000000006|item1|bar|field2|field3|field5|field6|field7
668 000000000000007|item2|bar|field2|field3|field5|field6|field7
669 000000000000008|item3|bar|field2|field3|field5|field6|field7
670 000000000000009|item4|bar|field2|field3|field5|field6|field7
671 000000000000010|itemN|bar|field2|field3|field5|field6|field7
672 000000000000011|item1|baz|field2|field3|field5|field6|field7
673 000000000000012|item2|baz|field2|field3|field5|field6|field7
674 000000000000013|item3|baz|field2|field3|field5|field6|field7
675 000000000000014|item4|baz|field2|field3|field5|field6|field7
676 000000000000015|itemN|baz|field2|field3|field5|field6|field7
677 ...
678
679 Example One
680
681 This example configures a custom function for preserving output order.
682 Unfortunately, the sprintf function alone involves extra CPU time
683 causing the manager process to fall behind. Thus, workers may idle
684 while waiting for the manager process to respond to the gather request.
685
686 use strict;
687 use warnings;
688
689 use MCE::Loop;
690
691 my $infile = shift or die "Usage: $0 infile\n";
692 my $newfile = 'output.dat';
693
694 open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
695
696 sub preserve_order {
697 my ($fh) = @_;
698 my ($order_id, $start_idx, $idx, %tmp) = (1, 1);
699
700 return sub {
701 my ($chunk_id, $aref) = @_;
702 $tmp{ $chunk_id } = $aref;
703
704 while ( my $aref = delete $tmp{ $order_id } ) {
705 foreach my $line ( @{ $aref } ) {
706 $idx = sprintf "%015d", $start_idx++;
707 print $fh $idx, $line;
708 }
709 $order_id++;
710 }
711 }
712 }
713
714 MCE::Loop->init(
715 chunk_size => 'auto', max_workers => 3,
716 gather => preserve_order($fh_out)
717 );
718
719 mce_loop_f {
720 my ($mce, $chunk_ref, $chunk_id) = @_;
721 my @buf;
722
723 foreach my $line (@{ $chunk_ref }) {
724 $line =~ s/\r//g; chomp $line;
725
726 my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
727 my @items_array = split /;/, $items;
728
729 foreach my $item (@items_array) {
730 push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
731 }
732 }
733
734 MCE->gather($chunk_id, \@buf);
735
736 } $infile;
737
738 MCE::Loop->finish();
739 close $fh_out;
740
741 Example Two
742
743 In this example, workers obtain the current ID value and
744 increment/relay for the next worker, ordered by chunk ID behind the
745 scene. Workers call sprintf in parallel, allowing the manager process
746 (out_iter_fh) to accommodate up to 32 workers and not fall behind.
747
748 Relay accounts for the worker handling the next chunk_id value.
749 Therefore, do not call relay more than once per chunk. Doing so will
750 cause IPC to stall.
751
752 use strict;
753 use warnings;
754
755 use MCE::Loop;
756 use MCE::Candy;
757
758 my $infile = shift or die "Usage: $0 infile\n";
759 my $newfile = 'output.dat';
760
761 open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
762
763 MCE::Loop->init(
764 chunk_size => 'auto', max_workers => 8,
765 gather => MCE::Candy::out_iter_fh($fh_out),
766 init_relay => 1
767 );
768
769 mce_loop_f {
770 my ($mce, $chunk_ref, $chunk_id) = @_;
771 my @lines;
772
773 foreach my $line (@{ $chunk_ref }) {
774 $line =~ s/\r//g; chomp $line;
775
776 my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
777 my @items_array = split /;/, $items;
778
779 foreach my $item (@items_array) {
780 push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
781 }
782 }
783
784 my $idx = MCE::relay { $_ += scalar @lines };
785 my $buf = '';
786
787 foreach my $line ( @lines ) {
788 $buf .= sprintf "%015d|%s", $idx++, $line
789 }
790
791 MCE->gather($chunk_id, $buf);
792
793 } $infile;
794
795 MCE::Loop->finish();
796 close $fh_out;
797
799 MCE, MCE::Core
800
802 Mario E. Roy, <marioeroy AT gmail DOT com>
803
804
805
806perl v5.34.0 2021-07-22 MCE::Relay(3)