1MCE::Channel(3)       User Contributed Perl Documentation      MCE::Channel(3)
2
3
4

NAME

6       MCE::Channel - Queue-like and two-way communication capability
7

VERSION

9       This document describes MCE::Channel version 1.862
10

SYNOPSIS

12        use MCE::Channel;
13
14        ########################
15        # Construction
16        ########################
17
18        # A single producer and many consumers supporting processes and threads
19
20        my $c1 = MCE::Channel->new( impl => 'Mutex' );    # default implementation
21        my $c2 = MCE::Channel->new( impl => 'Threads' );  # threads::shared locking
22
23        # Set the mp flag if two or more workers (many producers) will be calling
24        # enqueue/send or recv2/recv2_nb on the left end of the channel
25
26        my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 );
27        my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 );
28
29        # Tuned for one producer and one consumer, no locking
30
31        my $c5 = MCE::Channel->new( impl => 'Simple' );
32
33        ########################
34        # Queue-like behavior
35        ########################
36
37        # Send data to consumers
38        $c1->enqueue('item');
39        $c1->enqueue(qw/item1 item2 item3 itemN/);
40
41        # Receive data
42        my $item  = $c1->dequeue();      # item
43        my @items = $c1->dequeue(2);     # (item1, item2)
44
45        # Receive, non-blocking
46        my $item  = $c1->dequeue_nb();   # item
47        my @items = $c1->dequeue_nb(2);  # (item1, item2)
48
49        # Signal that there is no more work to be sent
50        $c1->end();
51
52        ########################
53        # Two-way communication
54        ########################
55
56        # Producer(s) sending data
57        $c3->send('message');
58        $c3->send(qw/arg1 arg2 arg3/);
59
60        # Consumer(s) receiving data
61        my $mesg = $c3->recv();          # message
62        my @args = $c3->recv();          # (arg1, arg2, arg3)
63
64        # Alternatively, non-blocking
65        my $mesg = $c3->recv_nb();       # message
66        my @args = $c3->recv_nb();       # (arg1, arg2, arg3)
67
68        # A producer signaling no more work to be sent
69        $c3->end();
70
71        # Consumers(s) sending data
72        $c3->send2('message');
73        $c3->send2(qw/arg1 arg2 arg3/);
74
75        # Producer(s) receiving data
76        my $mesg = $c3->recv2();         # message
77        my @args = $c3->recv2();         # (arg1, arg2, arg3)
78
79        # Alternatively, non-blocking
80        my $mesg = $c3->recv2_nb();      # message
81        my @args = $c3->recv2_nb();      # (arg1, arg2, arg3)
82

DESCRIPTION

84       A MCE::Channel object is a container for sending and receiving data
85       using socketpair handles. Serialization is provided by Sereal if
86       available.  Defaults to Storable otherwise. Excluding the "Simple"
87       implementation, both ends of the "channel" support many workers
88       concurrently (with mp => 1).
89
90   new ( impl => STRING, mp => BOOLEAN )
91       This creates a new channel. Three implementations are provided "Mutex"
92       (default), "Threads", and "Simple" indicating the locking mechanism to
93       use "MCE::Mutex", "threads::shared", and no locking respectively.
94
95        $chnl = MCE::Channel->new();     # default: impl => 'Mutex', mp => 0
96
97       The "Mutex" channel implementation supports processes and threads
98       whereas the "Threads" channel implementation is suited for threads
99       only.
100
101        $chnl = MCE::Channel->new( impl => 'Mutex' );    # MCE::Mutex locking
102        $chnl = MCE::Channel->new( impl => 'Threads' );  # threads::shared locking
103
104       Set the "mp" (m)any (p)roducers option to a true value if there will be
105       two or more workers calling "enqueue", <send>, "recv2", or "recv2_nb"
106       on the left end of the channel. This is important to not incur a race
107       condition.
108
109        $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 );
110        $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 );
111
112       The "Simple" implementation is optimized for one producer and one
113       consumer max.  It omits locking for maximum performance. This
114       implementation is preferred for parent to child communication not
115       shared by another worker.
116
117        $chnl = MCE::Channel->new( impl => 'Simple' );
118

QUEUE-LIKE BEHAVIOR

120   enqueue ( ITEM1 [, ITEM2, ... ] )
121       Appends a list of items onto the left end of the channel. This will
122       block once the internal socket buffer becomes full (i.e. awaiting
123       workers to dequeue on the other end). This prevents producer(s) from
124       running faster than consumer(s).
125
126       Object (de)serialization is handled automatically using Sereal if
127       available or defaults to Storable otherwise.
128
129        $chnl->enqueue('item1');
130        $chnl->enqueue(qw/item2 item3 .../);
131
132        $chnl->enqueue([ array_ref1 ]);
133        $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...);
134
135        $chnl->enqueue({ hash_ref1 });
136        $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...);
137
138   dequeue
139   dequeue ( COUNT )
140       Removes the requested number of items (default 1) from the right end of
141       the channel. If the channel contains fewer than the requested number of
142       items, the method will block (i.e. until other producer(s) enqueue more
143       items).
144
145        $item  = $chnl->dequeue();       # item1
146        @items = $chnl->dequeue(2);      # ( item2, item3 )
147
148   dequeue_nb
149   dequeue_nb ( COUNT )
150       Removes the requested number of items (default 1) from the right end of
151       the channel. If the channel contains fewer than the requested number of
152       items, the method will return what it was able to retrieve and return
153       immediately.  If the channel is empty, then returns "an empty list" in
154       list context or "undef" in scalar context.
155
156        $item  = $chnl->dequeue_nb();    # array_ref1
157        @items = $chnl->dequeue_nb(2);   # ( array_ref2, array_ref3 )
158
159   end
160       This is called by a producer to signal that there is no more work to be
161       sent.  Once ended, no more items may be sent by the producer. Calling
162       "end" by multiple producers is not supported.
163
164        $chnl->end;
165

TWO-WAY IPC - PRODUCER TO CONSUMER

167   send ( ARG1 [, ARG2, ... ] )
168       Append data onto the left end of the channel. Unlike "enqueue", the
169       values are kept together for the receiving consumer, similarly to
170       calling a method.  Object (de)serialization is handled automatically.
171
172        $chnl->send('item');
173        $chnl->send([ list_ref ]);
174        $chnl->send([ hash_ref ]);
175
176        $chnl->send(qw/item1 item2 .../);
177        $chnl->send($id, [ list_ref ]);
178        $chnl->send($id, { hash_ref });
179
180   recv
181   recv_nb
182       Blocking and non-blocking fetch methods from the right end of the
183       channel.  For the latter and when the channel is empty, returns "an
184       empty list" in list context or "undef" in scalar context.
185
186        $item      = $chnl->recv();
187        $array_ref = $chnl->recv();
188        $hash_ref  = $chnl->recv();
189
190        ($item1, $item2)  = $chnl->recv_nb();
191        ($id, $array_ref) = $chnl->recv_nb();
192        ($id, $hash_ref)  = $chnl->recv_nb();
193

TWO-WAY IPC - CONSUMER TO PRODUCER

195   send2 ( ARG1 [, ARG2, ... ] )
196       Append data onto the right end of the channel. Unlike "enqueue", the
197       values are kept together for the receiving producer, similarly to
198       calling a method.  Object (de)serialization is handled automatically.
199
200        $chnl->send2('item');
201        $chnl->send2([ list_ref ]);
202        $chnl->send2([ hash_ref ]);
203
204        $chnl->send2(qw/item1 item2 .../);
205        $chnl->send2($id, [ list_ref ]);
206        $chnl->send2($id, { hash_ref });
207
208   recv2
209   recv2_nb
210       Blocking and non-blocking fetch methods from the left end of the
211       channel.  For the latter and when the channel is empty, returns "an
212       empty list" in list context or "undef" in scalar context.
213
214        $item      = $chnl->recv2();
215        $array_ref = $chnl->recv2();
216        $hash_ref  = $chnl->recv2();
217
218        ($item1, $item2)  = $chnl->recv2_nb();
219        ($id, $array_ref) = $chnl->recv2_nb();
220        ($id, $hash_ref)  = $chnl->recv2_nb();
221

DEMONSTRATIONS

223   Example 1 - threads
224       "MCE::Channel" was made to work efficiently with threads. The reason is
225       from using threads::shared for locking versus MCE::Mutex.
226
227        use strict;
228        use warnings;
229
230        use threads;
231        use MCE::Channel;
232
233        my $queue = MCE::Channel->new( impl => 'Threads' );
234        my $num_consumers = 10;
235
236        sub consumer {
237           my $count = 0;
238
239           # receive items
240           while ( my ($item1, $item2) = $queue->dequeue(2) ) {
241              $count += 2;
242           }
243
244           # send result
245           $queue->send2( threads->tid => $count );
246        }
247
248        threads->create('consumer') for 1 .. $num_consumers;
249
250        ## producer
251
252        $queue->enqueue($_, $_ * 2) for 1 .. 40000;
253        $queue->end;
254
255        my %results;
256        my $total = 0;
257
258        for ( 1 .. $num_consumers ) {
259           my ($id, $count) = $queue->recv2;
260           $results{$id} = $count;
261           $total += $count;
262        }
263
264        $_->join for threads->list;
265
266        print $results{$_}, "\n" for keys %results;
267        print "$total total\n\n";
268
269        __END__
270
271        # output
272
273        8034
274        8008
275        8036
276        8058
277        7990
278        7948
279        8068
280        7966
281        7960
282        7932
283        80000 total
284
285   Example 2 - MCE::Child
286       The following is similarly threads-like for Perl lacking threads
287       support.  It spawns processes instead, thus requires the "Mutex"
288       channel implementation which is the default if omitted.
289
290        use strict;
291        use warnings;
292
293        use MCE::Child;
294        use MCE::Channel;
295
296        my $queue = MCE::Channel->new( impl => 'Mutex' );
297        my $num_consumers = 10;
298
299        sub consumer {
300           my $count = 0;
301
302           # receive items
303           while ( my ($item1, $item2) = $queue->dequeue(2) ) {
304              $count += 2;
305           }
306
307           # send result
308           $queue->send2( MCE::Child->pid => $count );
309        }
310
311        MCE::Child->create('consumer') for 1 .. $num_consumers;
312
313        ## producer
314
315        $queue->enqueue($_, $_ * 2) for 1 .. 40000;
316        $queue->end;
317
318        my %results;
319        my $total = 0;
320
321        for ( 1 .. $num_consumers ) {
322           my ($id, $count) = $queue->recv2;
323           $results{$id} = $count;
324           $total += $count;
325        }
326
327        $_->join for MCE::Child->list;
328
329        print $results{$_}, "\n" for keys %results;
330        print "$total total\n\n";
331
332   Example 3 - Consumer requests item
333       Like the previous example, but have the manager process await a
334       notification from the consumer before inserting into the queue. This
335       allows the producer to end the channel early (i.e. exit loop).
336
337        use strict;
338        use warnings;
339
340        use MCE::Child;
341        use MCE::Channel;
342
343        my $queue = MCE::Channel->new( impl => 'Mutex' );
344        my $num_consumers = 10;
345
346        sub consumer {
347           # receive items
348           my $count = 0;
349
350           while () {
351              # Notify the manager process to send items. This allows the
352              # manager process to enqueue only when requested. The benefit
353              # is being able to end the channel immediately.
354
355              $queue->send2( MCE::Child->pid ); # channel is bi-directional
356
357              my ($item1, $item2) = $queue->dequeue(2);
358              last unless ( defined $item1 );   # channel ended
359
360              $count += 2;
361           }
362
363           # result
364           return ( MCE::Child->pid => $count );
365        }
366
367        MCE::Child->create('consumer') for 1 .. $num_consumers;
368
369        ## producer
370
371        for my $num (1 .. 40000) {
372           # Await worker notification before inserting (blocking).
373           my $consumer_pid = $queue->recv2;
374           $queue->enqueue($num, $num * 2);
375        }
376
377        $queue->end;
378
379        my %results;
380        my $total = 0;
381
382        for my $child ( MCE::Child->list ) {
383           my ($id, $count) = $child->join;
384           $results{$id} = $count;
385           $total += $count;
386        }
387
388        print $results{$_}, "\n" for keys %results;
389        print "$total total\n\n";
390
391   Example 4 - Many producers
392       Running with 2 or more producers requires setting the "mp" option.
393       Internally, this enables locking support for the left end of the
394       channel. The "mp" option applies to "Mutex" and "Threads" channel
395       implementations only.
396
397       Here, using the MCE facility for gathering the final count.
398
399        use strict;
400        use warnings;
401
402        use MCE::Flow;
403        use MCE::Channel;
404
405        my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 );
406        my $num_consumers = 10;
407
408        sub consumer {
409           # receive items
410           my $count = 0;
411           while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) {
412              $count += 2;
413           }
414           # send result
415           MCE->gather( MCE->wid => $count );
416        }
417
418        sub producer {
419           $queue->enqueue($_, $_ * 2) for 1 .. 20000;
420        }
421
422        ## run 2 producers and many consumers
423
424        MCE::Flow::init(
425           max_workers => [ 2, $num_consumers ],
426           task_name   => [ 'producer', 'consumer' ],
427           task_end    => sub {
428              my ($mce, $task_id, $task_name) = @_;
429              if ( $task_name eq 'producer' ) {
430                 $queue->end;
431              }
432           }
433        );
434
435        # consumers call gather above (i.e. send a key-value pair),
436        # have MCE append to a hash
437
438        my %results = mce_flow \&producer, \&consumer;
439
440        MCE::Flow::finish;
441
442        my $total = 0;
443
444        for ( keys %results ) {
445           $total += $results{$_};
446           print $results{$_}, "\n";
447        }
448
449        print "$total total\n\n";
450
451   Example 5 - Many channels
452       This demonstration configures a channel per consumer. Plus, a common
453       channel for consumers to request the next input item. The "Simple"
454       implementation is specified for the individual channels whereas locking
455       may be necessary for the $ready channel. However, consumers do not
456       incur reading and what is written is very small (i.e. atomic write is
457       guaranteed by the OS). Thus, am safely choosing the "Simple"
458       implementation versus "Mutex".
459
460        use strict;
461        use warnings;
462
463        use MCE::Flow;
464        use MCE::Channel;
465
466        my $prog_name  = $0; $prog_name =~ s{^.*[\\/]}{}g;
467        my $input_size = shift || 3000;
468
469        unless ($input_size =~ /\A\d+\z/) {
470           print {*STDERR} "usage: $prog_name [ size ]\n";
471           exit 1;
472        }
473
474        my $consumers = 4;
475
476        my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers;
477
478        my $ready =       MCE::Channel->new( impl => 'Simple' );
479
480        sub producer {
481           my $id = 0;
482
483           # send the next input item upon request
484           for ( 0 .. $input_size - 1 ) {
485              my $chnl_num = $ready->recv2;
486              $chnls[ $chnl_num ]->send( ++$id, $_ );
487           }
488
489           # signal no more work
490           $_->send( 0, undef ) for @chnls;
491        }
492
493        sub consumer {
494           my $chnl_num = MCE->task_wid - 1;
495
496           while () {
497              # notify the producer ready for input
498              $ready->send2( $chnl_num );
499
500              # retrieve input data
501              my ( $id, $item ) = $chnls[ $chnl_num ]->recv;
502
503              # leave loop if no more work
504              last unless $id;
505
506              # compute and send the result to the manager process
507              # ordered output requires an id (must be 1st argument)
508              MCE->gather( $id, [ $item, sqrt($item) ] );
509           }
510        }
511
512        # A custom 'ordered' output iterator for MCE's gather facility.
513        # It returns a closure block, expecting an ID for 1st argument.
514
515        sub output_iterator {
516           my %tmp; my $order_id = 1;
517
518           return sub {
519              my ( $id, $result ) = @_;
520              $tmp{ $id } = $result;
521
522              while () {
523                 last unless exists $tmp{ $order_id };
524                 $result = delete $tmp{ $order_id };
525                 printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1];
526                 $order_id++;
527              }
528           };
529        }
530
531        # Run one producer and many consumers.
532        # Output to be sent orderly to STDOUT.
533
534        MCE::Flow->init(
535           gather => output_iterator(),
536           max_workers => [ 1, $consumers ],
537        );
538
539        MCE::Flow->run( \&producer, \&consumer );
540        MCE::Flow->finish;
541
542        __END__
543
544        # Output
545
546        n: 0 sqrt(n): 0.000000
547        n: 1 sqrt(n): 1.000000
548        n: 2 sqrt(n): 1.414214
549        n: 3 sqrt(n): 1.732051
550        n: 4 sqrt(n): 2.000000
551        n: 5 sqrt(n): 2.236068
552        n: 6 sqrt(n): 2.449490
553        n: 7 sqrt(n): 2.645751
554        n: 8 sqrt(n): 2.828427
555        n: 9 sqrt(n): 3.000000
556        ...
557

SEE ALSO

559       ·  <https://github.com/marioroy/mce-examples/tree/master/chameneos>
560
561       ·  threads::lite
562

AUTHOR

564       Mario E. Roy, <marioeroy AT gmail DOT com>
565
567       Copyright (C) 2019 by Mario E. Roy
568
569       MCE::Channel is released under the same license as Perl.
570
571       See <http://dev.perl.org/licenses/> for more information.
572
573
574
575perl v5.30.0                      2019-09-19                   MCE::Channel(3)
Impressum