1MCE::Channel(3)       User Contributed Perl Documentation      MCE::Channel(3)
2
3
4

NAME

6       MCE::Channel - Queue-like and two-way communication capability
7

VERSION

9       This document describes MCE::Channel version 1.889
10

SYNOPSIS

12        use MCE::Channel;
13
14        ########################
15        # Construction
16        ########################
17
18        # A single producer and many consumers supporting processes and threads
19
20        my $c1 = MCE::Channel->new( impl => 'Mutex' );    # default implementation
21        my $c2 = MCE::Channel->new( impl => 'Threads' );  # threads::shared locking
22
23        # Set the mp flag if two or more workers (many producers) will be calling
24        # enqueue/send or recv2/recv2_nb on the left end of the channel
25
26        my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 );
27        my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 );
28
29        # Tuned for one producer and one consumer, no locking
30
31        my $c5 = MCE::Channel->new( impl => 'Simple' );
32
33        ########################
34        # Queue-like behavior
35        ########################
36
37        # Send data to consumers
38        $c1->enqueue('item');
39        $c1->enqueue(qw/item1 item2 item3 itemN/);
40
41        # Receive data
42        my $item  = $c1->dequeue();      # item
43        my @items = $c1->dequeue(2);     # (item1, item2)
44
45        # Receive, non-blocking
46        my $item  = $c1->dequeue_nb();   # item
47        my @items = $c1->dequeue_nb(2);  # (item1, item2)
48
49        # Signal that there is no more work to be sent
50        $c1->end();
51
52        ########################
53        # Two-way communication
54        ########################
55
56        # Producer(s) sending data
57        $c3->send('message');
58        $c3->send(qw/arg1 arg2 arg3/);
59
60        # Consumer(s) receiving data
61        my $mesg = $c3->recv();          # message
62        my @args = $c3->recv();          # (arg1, arg2, arg3)
63
64        # Alternatively, non-blocking
65        my $mesg = $c3->recv_nb();       # message
66        my @args = $c3->recv_nb();       # (arg1, arg2, arg3)
67
68        # A producer signaling no more work to be sent
69        $c3->end();
70
71        # Consumers(s) sending data
72        $c3->send2('message');
73        $c3->send2(qw/arg1 arg2 arg3/);
74
75        # Producer(s) receiving data
76        my $mesg = $c3->recv2();         # message
77        my @args = $c3->recv2();         # (arg1, arg2, arg3)
78
79        # Alternatively, non-blocking
80        my $mesg = $c3->recv2_nb();      # message
81        my @args = $c3->recv2_nb();      # (arg1, arg2, arg3)
82

DESCRIPTION

84       A MCE::Channel object is a container for sending and receiving data
85       using socketpair handles. Serialization is provided by Sereal if
86       available.  Defaults to Storable otherwise. Excluding the "Simple"
87       implementation, both ends of the "channel" support many workers
88       concurrently (with mp => 1).
89
90   new ( impl => STRING, mp => BOOLEAN )
91       This creates a new channel. Three implementations are provided "Mutex",
92       "Threads", and "Simple" indicating the locking mechanism to use
93       "MCE::Mutex", "threads::shared", and no locking respectively.
94
95        $chnl = MCE::Channel->new();     # default: impl => 'Mutex', mp => 0
96                                         # default: impl => 'Threads' on Windows
97
98       The "Mutex" implementation supports processes and threads whereas the
99       "Threads" implementation is suited for Windows and threads only.
100
101        $chnl = MCE::Channel->new( impl => 'Mutex' );    # MCE::Mutex locking
102        $chnl = MCE::Channel->new( impl => 'Threads' );  # threads::shared locking
103
104        # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
105
106       Set the "mp" (m)any (p)roducers option to a true value if there will be
107       two or more workers calling "enqueue", <send>, "recv2", or "recv2_nb"
108       on the left end of the channel. This is important to not incur a race
109       condition.
110
111        $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 );
112        $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 );
113
114        # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
115
116       The "Simple" implementation is optimized for one producer and one
117       consumer max.  It omits locking for maximum performance. This
118       implementation is preferred for parent to child communication not
119       shared by another worker.
120
121        $chnl = MCE::Channel->new( impl => 'Simple' );
122

QUEUE-LIKE BEHAVIOR

124   enqueue ( ITEM1 [, ITEM2, ... ] )
125       Appends a list of items onto the left end of the channel. This will
126       block once the internal socket buffer becomes full (i.e. awaiting
127       workers to dequeue on the other end). This prevents producer(s) from
128       running faster than consumer(s).
129
130       Object (de)serialization is handled automatically using Sereal if
131       available or defaults to Storable otherwise.
132
133        $chnl->enqueue('item1');
134        $chnl->enqueue(qw/item2 item3 .../);
135
136        $chnl->enqueue([ array_ref1 ]);
137        $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...);
138
139        $chnl->enqueue({ hash_ref1 });
140        $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...);
141
142   dequeue
143   dequeue ( COUNT )
144       Removes the requested number of items (default 1) from the right end of
145       the channel. If the channel contains fewer than the requested number of
146       items, the method will block (i.e. until other producer(s) enqueue more
147       items).
148
149        $item  = $chnl->dequeue();       # item1
150        @items = $chnl->dequeue(2);      # ( item2, item3 )
151
152   dequeue_nb
153   dequeue_nb ( COUNT )
154       Removes the requested number of items (default 1) from the right end of
155       the channel. If the channel contains fewer than the requested number of
156       items, the method will return what it was able to retrieve and return
157       immediately.  If the channel is empty, then returns "an empty list" in
158       list context or "undef" in scalar context.
159
160        $item  = $chnl->dequeue_nb();    # array_ref1
161        @items = $chnl->dequeue_nb(2);   # ( array_ref2, array_ref3 )
162
163   end
164       This is called by a producer to signal that there is no more work to be
165       sent.  Once ended, no more items may be sent by the producer. Calling
166       "end" by multiple producers is not supported.
167
168        $chnl->end;
169

TWO-WAY IPC - PRODUCER TO CONSUMER

171   send ( ARG1 [, ARG2, ... ] )
172       Append data onto the left end of the channel. Unlike "enqueue", the
173       values are kept together for the receiving consumer, similarly to
174       calling a method.  Object (de)serialization is handled automatically.
175
176        $chnl->send('item');
177        $chnl->send([ list_ref ]);
178        $chnl->send([ hash_ref ]);
179
180        $chnl->send(qw/item1 item2 .../);
181        $chnl->send($id, [ list_ref ]);
182        $chnl->send($id, { hash_ref });
183
184       The fast channel implementations, introduced in MCE 1.877, support one
185       item for "send". If you want to pass multiple arguments, simply join
186       the arguments into a string. That means the receiver will need to split
187       the string.
188
189        $chnl = MCE::Channel->new(impl => "SimpleFast");
190
191        $chnl->send(join(" ", qw/item1 item2 item3/);
192        my ($item1, $item2, $item3) = split " ", $chnl->recv();
193
194   recv
195   recv_nb
196       Blocking and non-blocking fetch methods from the right end of the
197       channel.  For the latter and when the channel is empty, returns "an
198       empty list" in list context or "undef" in scalar context.
199
200        $item      = $chnl->recv();
201        $array_ref = $chnl->recv();
202        $hash_ref  = $chnl->recv();
203
204        ($item1, $item2)  = $chnl->recv_nb();
205        ($id, $array_ref) = $chnl->recv_nb();
206        ($id, $hash_ref)  = $chnl->recv_nb();
207

TWO-WAY IPC - CONSUMER TO PRODUCER

209   send2 ( ARG1 [, ARG2, ... ] )
210       Append data onto the right end of the channel. Unlike "enqueue", the
211       values are kept together for the receiving producer, similarly to
212       calling a method.  Object (de)serialization is handled automatically.
213
214        $chnl->send2('item');
215        $chnl->send2([ list_ref ]);
216        $chnl->send2([ hash_ref ]);
217
218        $chnl->send2(qw/item1 item2 .../);
219        $chnl->send2($id, [ list_ref ]);
220        $chnl->send2($id, { hash_ref });
221
222       The fast channel implementations, introduced in MCE 1.877, support one
223       item for "send2". If you want to pass multiple arguments, simply join
224       the arguments into a string. Not to forget, the receiver must split the
225       string as well.
226
227        $chnl = MCE::Channel->new(impl => "MutexFast");
228
229        $chnl->send2(join(" ", qw/item1 item2 item3/);
230        my ($item1, $item2, $item3) = split " ", $chnl->recv();
231
232   recv2
233   recv2_nb
234       Blocking and non-blocking fetch methods from the left end of the
235       channel.  For the latter and when the channel is empty, returns "an
236       empty list" in list context or "undef" in scalar context.
237
238        $item      = $chnl->recv2();
239        $array_ref = $chnl->recv2();
240        $hash_ref  = $chnl->recv2();
241
242        ($item1, $item2)  = $chnl->recv2_nb();
243        ($id, $array_ref) = $chnl->recv2_nb();
244        ($id, $hash_ref)  = $chnl->recv2_nb();
245

DEMONSTRATIONS

247   Example 1 - threads
248       "MCE::Channel" was made to work efficiently with threads. The reason
249       comes from using threads::shared for locking versus MCE::Mutex.
250
251        use strict;
252        use warnings;
253
254        use threads;
255        use MCE::Channel;
256
257        my $queue = MCE::Channel->new( impl => 'Threads' );
258        my $num_consumers = 10;
259
260        sub consumer {
261           my $count = 0;
262
263           # receive items
264           while ( my ($item1, $item2) = $queue->dequeue(2) ) {
265              $count += 2;
266           }
267
268           # send result
269           $queue->send2( threads->tid => $count );
270        }
271
272        threads->create('consumer') for 1 .. $num_consumers;
273
274        ## producer
275
276        $queue->enqueue($_, $_ * 2) for 1 .. 40000;
277        $queue->end;
278
279        my %results;
280        my $total = 0;
281
282        for ( 1 .. $num_consumers ) {
283           my ($id, $count) = $queue->recv2;
284           $results{$id} = $count;
285           $total += $count;
286        }
287
288        $_->join for threads->list;
289
290        print $results{$_}, "\n" for keys %results;
291        print "$total total\n\n";
292
293        __END__
294
295        # output
296
297        8034
298        8008
299        8036
300        8058
301        7990
302        7948
303        8068
304        7966
305        7960
306        7932
307        80000 total
308
309   Example 2 - MCE::Child
310       The following is similarly threads-like for Perl lacking threads
311       support.  It spawns processes instead, thus requires the "Mutex"
312       channel implementation which is the default if omitted.
313
314        use strict;
315        use warnings;
316
317        use MCE::Child;
318        use MCE::Channel;
319
320        my $queue = MCE::Channel->new( impl => 'Mutex' );
321        my $num_consumers = 10;
322
323        sub consumer {
324           my $count = 0;
325
326           # receive items
327           while ( my ($item1, $item2) = $queue->dequeue(2) ) {
328              $count += 2;
329           }
330
331           # send result
332           $queue->send2( MCE::Child->pid => $count );
333        }
334
335        MCE::Child->create('consumer') for 1 .. $num_consumers;
336
337        ## producer
338
339        $queue->enqueue($_, $_ * 2) for 1 .. 40000;
340        $queue->end;
341
342        my %results;
343        my $total = 0;
344
345        for ( 1 .. $num_consumers ) {
346           my ($id, $count) = $queue->recv2;
347           $results{$id} = $count;
348           $total += $count;
349        }
350
351        $_->join for MCE::Child->list;
352
353        print $results{$_}, "\n" for keys %results;
354        print "$total total\n\n";
355
356   Example 3 - Consumer requests item
357       Like the previous example, but have the manager process await a
358       notification from the consumer before inserting into the queue. This
359       allows the producer to end the channel early (i.e. exit loop).
360
361        use strict;
362        use warnings;
363
364        use MCE::Child;
365        use MCE::Channel;
366
367        my $queue = MCE::Channel->new( impl => 'Mutex' );
368        my $num_consumers = 10;
369
370        sub consumer {
371           # receive items
372           my $count = 0;
373
374           while () {
375              # Notify the manager process to send items. This allows the
376              # manager process to enqueue only when requested. The benefit
377              # is being able to end the channel immediately.
378
379              $queue->send2( MCE::Child->pid ); # channel is bi-directional
380
381              my ($item1, $item2) = $queue->dequeue(2);
382              last unless ( defined $item1 );   # channel ended
383
384              $count += 2;
385           }
386
387           # result
388           return ( MCE::Child->pid => $count );
389        }
390
391        MCE::Child->create('consumer') for 1 .. $num_consumers;
392
393        ## producer
394
395        for my $num (1 .. 40000) {
396           # Await worker notification before inserting (blocking).
397           my $consumer_pid = $queue->recv2;
398           $queue->enqueue($num, $num * 2);
399        }
400
401        $queue->end;
402
403        my %results;
404        my $total = 0;
405
406        for my $child ( MCE::Child->list ) {
407           my ($id, $count) = $child->join;
408           $results{$id} = $count;
409           $total += $count;
410        }
411
412        print $results{$_}, "\n" for keys %results;
413        print "$total total\n\n";
414
415   Example 4 - Many producers
416       Running with 2 or more producers requires setting the "mp" option.
417       Internally, this enables locking support for the left end of the
418       channel. The "mp" option applies to "Mutex" and "Threads" channel
419       implementations only.
420
421       Here, using the MCE facility for gathering the final count.
422
423        use strict;
424        use warnings;
425
426        use MCE::Flow;
427        use MCE::Channel;
428
429        my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 );
430        my $num_consumers = 10;
431
432        sub consumer {
433           # receive items
434           my $count = 0;
435           while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) {
436              $count += 2;
437           }
438           # send result
439           MCE->gather( MCE->wid => $count );
440        }
441
442        sub producer {
443           $queue->enqueue($_, $_ * 2) for 1 .. 20000;
444        }
445
446        ## run 2 producers and many consumers
447
448        MCE::Flow->init(
449           max_workers => [ 2, $num_consumers ],
450           task_name   => [ 'producer', 'consumer' ],
451           task_end    => sub {
452              my ($mce, $task_id, $task_name) = @_;
453              if ( $task_name eq 'producer' ) {
454                 $queue->end;
455              }
456           }
457        );
458
459        # consumers call gather above (i.e. send a key-value pair),
460        # have MCE append to a hash
461
462        my %results = mce_flow \&producer, \&consumer;
463
464        MCE::Flow->finish;
465
466        my $total = 0;
467
468        for ( keys %results ) {
469           $total += $results{$_};
470           print $results{$_}, "\n";
471        }
472
473        print "$total total\n\n";
474
475   Example 5 - Many channels
476       This demonstration configures a channel per consumer. Plus, a common
477       channel for consumers to request the next input item. The "Simple"
478       implementation is specified for the individual channels whereas locking
479       may be necessary for the $ready channel. However, consumers do not
480       incur reading and what is written is very small (i.e. atomic write is
481       guaranteed by the OS). Thus, am safely choosing the "Simple"
482       implementation versus "Mutex".
483
484        use strict;
485        use warnings;
486
487        use MCE::Flow;
488        use MCE::Channel;
489
490        my $prog_name  = $0; $prog_name =~ s{^.*[\\/]}{}g;
491        my $input_size = shift || 3000;
492
493        unless ($input_size =~ /\A\d+\z/) {
494           print {*STDERR} "usage: $prog_name [ size ]\n";
495           exit 1;
496        }
497
498        my $consumers = 4;
499
500        my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers;
501
502        my $ready =       MCE::Channel->new( impl => 'Simple' );
503
504        sub producer {
505           my $id = 0;
506
507           # send the next input item upon request
508           for ( 0 .. $input_size - 1 ) {
509              my $chnl_num = $ready->recv2;
510              $chnls[ $chnl_num ]->send( ++$id, $_ );
511           }
512
513           # signal no more work
514           $_->send( 0, undef ) for @chnls;
515        }
516
517        sub consumer {
518           my $chnl_num = MCE->task_wid - 1;
519
520           while () {
521              # notify the producer ready for input
522              $ready->send2( $chnl_num );
523
524              # retrieve input data
525              my ( $id, $item ) = $chnls[ $chnl_num ]->recv;
526
527              # leave loop if no more work
528              last unless $id;
529
530              # compute and send the result to the manager process
531              # ordered output requires an id (must be 1st argument)
532              MCE->gather( $id, [ $item, sqrt($item) ] );
533           }
534        }
535
536        # A custom 'ordered' output iterator for MCE's gather facility.
537        # It returns a closure block, expecting an ID for 1st argument.
538
539        sub output_iterator {
540           my %tmp; my $order_id = 1;
541
542           return sub {
543              my ( $id, $result ) = @_;
544              $tmp{ $id } = $result;
545
546              while () {
547                 last unless exists $tmp{ $order_id };
548                 $result = delete $tmp{ $order_id };
549                 printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1];
550                 $order_id++;
551              }
552           };
553        }
554
555        # Run one producer and many consumers.
556        # Output to be sent orderly to STDOUT.
557
558        MCE::Flow->init(
559           gather => output_iterator(),
560           max_workers => [ 1, $consumers ],
561        );
562
563        MCE::Flow->run( \&producer, \&consumer );
564        MCE::Flow->finish;
565
566        __END__
567
568        # Output
569
570        n: 0 sqrt(n): 0.000000
571        n: 1 sqrt(n): 1.000000
572        n: 2 sqrt(n): 1.414214
573        n: 3 sqrt(n): 1.732051
574        n: 4 sqrt(n): 2.000000
575        n: 5 sqrt(n): 2.236068
576        n: 6 sqrt(n): 2.449490
577        n: 7 sqrt(n): 2.645751
578        n: 8 sqrt(n): 2.828427
579        n: 9 sqrt(n): 3.000000
580        ...
581

SEE ALSO

583       •  <https://github.com/marioroy/mce-examples/tree/master/chameneos>
584
585       •  threads::lite
586

AUTHOR

588       Mario E. Roy, <marioeroy AT gmail DOT com>
589
591       Copyright (C) 2019-2023 by Mario E. Roy
592
593       MCE::Channel is released under the same license as Perl.
594
595       See <https://dev.perl.org/licenses/> for more information.
596
597
598
599perl v5.38.0                      2023-09-14                   MCE::Channel(3)
Impressum