1MCE::Channel(3)       User Contributed Perl Documentation      MCE::Channel(3)
2
3
4

NAME

6       MCE::Channel - Queue-like and two-way communication capability
7

VERSION

9       This document describes MCE::Channel version 1.874
10

SYNOPSIS

12        use MCE::Channel;
13
14        ########################
15        # Construction
16        ########################
17
18        # A single producer and many consumers supporting processes and threads
19
20        my $c1 = MCE::Channel->new( impl => 'Mutex' );    # default implementation
21        my $c2 = MCE::Channel->new( impl => 'Threads' );  # threads::shared locking
22
23        # Set the mp flag if two or more workers (many producers) will be calling
24        # enqueue/send or recv2/recv2_nb on the left end of the channel
25
26        my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 );
27        my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 );
28
29        # Tuned for one producer and one consumer, no locking
30
31        my $c5 = MCE::Channel->new( impl => 'Simple' );
32
33        ########################
34        # Queue-like behavior
35        ########################
36
37        # Send data to consumers
38        $c1->enqueue('item');
39        $c1->enqueue(qw/item1 item2 item3 itemN/);
40
41        # Receive data
42        my $item  = $c1->dequeue();      # item
43        my @items = $c1->dequeue(2);     # (item1, item2)
44
45        # Receive, non-blocking
46        my $item  = $c1->dequeue_nb();   # item
47        my @items = $c1->dequeue_nb(2);  # (item1, item2)
48
49        # Signal that there is no more work to be sent
50        $c1->end();
51
52        ########################
53        # Two-way communication
54        ########################
55
56        # Producer(s) sending data
57        $c3->send('message');
58        $c3->send(qw/arg1 arg2 arg3/);
59
60        # Consumer(s) receiving data
61        my $mesg = $c3->recv();          # message
62        my @args = $c3->recv();          # (arg1, arg2, arg3)
63
64        # Alternatively, non-blocking
65        my $mesg = $c3->recv_nb();       # message
66        my @args = $c3->recv_nb();       # (arg1, arg2, arg3)
67
68        # A producer signaling no more work to be sent
69        $c3->end();
70
71        # Consumers(s) sending data
72        $c3->send2('message');
73        $c3->send2(qw/arg1 arg2 arg3/);
74
75        # Producer(s) receiving data
76        my $mesg = $c3->recv2();         # message
77        my @args = $c3->recv2();         # (arg1, arg2, arg3)
78
79        # Alternatively, non-blocking
80        my $mesg = $c3->recv2_nb();      # message
81        my @args = $c3->recv2_nb();      # (arg1, arg2, arg3)
82

DESCRIPTION

84       A MCE::Channel object is a container for sending and receiving data
85       using socketpair handles. Serialization is provided by Sereal if
86       available.  Defaults to Storable otherwise. Excluding the "Simple"
87       implementation, both ends of the "channel" support many workers
88       concurrently (with mp => 1).
89
90   new ( impl => STRING, mp => BOOLEAN )
91       This creates a new channel. Three implementations are provided "Mutex",
92       "Threads", and "Simple" indicating the locking mechanism to use
93       "MCE::Mutex", "threads::shared", and no locking respectively.
94
95        $chnl = MCE::Channel->new();     # default: impl => 'Mutex', mp => 0
96                                         # default: impl => 'Threads' on Windows
97
98       The "Mutex" implementation supports processes and threads whereas the
99       "Threads" implementation is suited for Windows and threads only.
100
101        $chnl = MCE::Channel->new( impl => 'Mutex' );    # MCE::Mutex locking
102        $chnl = MCE::Channel->new( impl => 'Threads' );  # threads::shared locking
103
104        # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
105
106       Set the "mp" (m)any (p)roducers option to a true value if there will be
107       two or more workers calling "enqueue", <send>, "recv2", or "recv2_nb"
108       on the left end of the channel. This is important to not incur a race
109       condition.
110
111        $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 );
112        $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 );
113
114        # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
115
116       The "Simple" implementation is optimized for one producer and one
117       consumer max.  It omits locking for maximum performance. This
118       implementation is preferred for parent to child communication not
119       shared by another worker.
120
121        $chnl = MCE::Channel->new( impl => 'Simple' );
122

QUEUE-LIKE BEHAVIOR

124   enqueue ( ITEM1 [, ITEM2, ... ] )
125       Appends a list of items onto the left end of the channel. This will
126       block once the internal socket buffer becomes full (i.e. awaiting
127       workers to dequeue on the other end). This prevents producer(s) from
128       running faster than consumer(s).
129
130       Object (de)serialization is handled automatically using Sereal if
131       available or defaults to Storable otherwise.
132
133        $chnl->enqueue('item1');
134        $chnl->enqueue(qw/item2 item3 .../);
135
136        $chnl->enqueue([ array_ref1 ]);
137        $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...);
138
139        $chnl->enqueue({ hash_ref1 });
140        $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...);
141
142   dequeue
143   dequeue ( COUNT )
144       Removes the requested number of items (default 1) from the right end of
145       the channel. If the channel contains fewer than the requested number of
146       items, the method will block (i.e. until other producer(s) enqueue more
147       items).
148
149        $item  = $chnl->dequeue();       # item1
150        @items = $chnl->dequeue(2);      # ( item2, item3 )
151
152   dequeue_nb
153   dequeue_nb ( COUNT )
154       Removes the requested number of items (default 1) from the right end of
155       the channel. If the channel contains fewer than the requested number of
156       items, the method will return what it was able to retrieve and return
157       immediately.  If the channel is empty, then returns "an empty list" in
158       list context or "undef" in scalar context.
159
160        $item  = $chnl->dequeue_nb();    # array_ref1
161        @items = $chnl->dequeue_nb(2);   # ( array_ref2, array_ref3 )
162
163   end
164       This is called by a producer to signal that there is no more work to be
165       sent.  Once ended, no more items may be sent by the producer. Calling
166       "end" by multiple producers is not supported.
167
168        $chnl->end;
169

TWO-WAY IPC - PRODUCER TO CONSUMER

171   send ( ARG1 [, ARG2, ... ] )
172       Append data onto the left end of the channel. Unlike "enqueue", the
173       values are kept together for the receiving consumer, similarly to
174       calling a method.  Object (de)serialization is handled automatically.
175
176        $chnl->send('item');
177        $chnl->send([ list_ref ]);
178        $chnl->send([ hash_ref ]);
179
180        $chnl->send(qw/item1 item2 .../);
181        $chnl->send($id, [ list_ref ]);
182        $chnl->send($id, { hash_ref });
183
184   recv
185   recv_nb
186       Blocking and non-blocking fetch methods from the right end of the
187       channel.  For the latter and when the channel is empty, returns "an
188       empty list" in list context or "undef" in scalar context.
189
190        $item      = $chnl->recv();
191        $array_ref = $chnl->recv();
192        $hash_ref  = $chnl->recv();
193
194        ($item1, $item2)  = $chnl->recv_nb();
195        ($id, $array_ref) = $chnl->recv_nb();
196        ($id, $hash_ref)  = $chnl->recv_nb();
197

TWO-WAY IPC - CONSUMER TO PRODUCER

199   send2 ( ARG1 [, ARG2, ... ] )
200       Append data onto the right end of the channel. Unlike "enqueue", the
201       values are kept together for the receiving producer, similarly to
202       calling a method.  Object (de)serialization is handled automatically.
203
204        $chnl->send2('item');
205        $chnl->send2([ list_ref ]);
206        $chnl->send2([ hash_ref ]);
207
208        $chnl->send2(qw/item1 item2 .../);
209        $chnl->send2($id, [ list_ref ]);
210        $chnl->send2($id, { hash_ref });
211
212   recv2
213   recv2_nb
214       Blocking and non-blocking fetch methods from the left end of the
215       channel.  For the latter and when the channel is empty, returns "an
216       empty list" in list context or "undef" in scalar context.
217
218        $item      = $chnl->recv2();
219        $array_ref = $chnl->recv2();
220        $hash_ref  = $chnl->recv2();
221
222        ($item1, $item2)  = $chnl->recv2_nb();
223        ($id, $array_ref) = $chnl->recv2_nb();
224        ($id, $hash_ref)  = $chnl->recv2_nb();
225

DEMONSTRATIONS

227   Example 1 - threads
228       "MCE::Channel" was made to work efficiently with threads. The reason is
229       from using threads::shared for locking versus MCE::Mutex.
230
231        use strict;
232        use warnings;
233
234        use threads;
235        use MCE::Channel;
236
237        my $queue = MCE::Channel->new( impl => 'Threads' );
238        my $num_consumers = 10;
239
240        sub consumer {
241           my $count = 0;
242
243           # receive items
244           while ( my ($item1, $item2) = $queue->dequeue(2) ) {
245              $count += 2;
246           }
247
248           # send result
249           $queue->send2( threads->tid => $count );
250        }
251
252        threads->create('consumer') for 1 .. $num_consumers;
253
254        ## producer
255
256        $queue->enqueue($_, $_ * 2) for 1 .. 40000;
257        $queue->end;
258
259        my %results;
260        my $total = 0;
261
262        for ( 1 .. $num_consumers ) {
263           my ($id, $count) = $queue->recv2;
264           $results{$id} = $count;
265           $total += $count;
266        }
267
268        $_->join for threads->list;
269
270        print $results{$_}, "\n" for keys %results;
271        print "$total total\n\n";
272
273        __END__
274
275        # output
276
277        8034
278        8008
279        8036
280        8058
281        7990
282        7948
283        8068
284        7966
285        7960
286        7932
287        80000 total
288
289   Example 2 - MCE::Child
290       The following is similarly threads-like for Perl lacking threads
291       support.  It spawns processes instead, thus requires the "Mutex"
292       channel implementation which is the default if omitted.
293
294        use strict;
295        use warnings;
296
297        use MCE::Child;
298        use MCE::Channel;
299
300        my $queue = MCE::Channel->new( impl => 'Mutex' );
301        my $num_consumers = 10;
302
303        sub consumer {
304           my $count = 0;
305
306           # receive items
307           while ( my ($item1, $item2) = $queue->dequeue(2) ) {
308              $count += 2;
309           }
310
311           # send result
312           $queue->send2( MCE::Child->pid => $count );
313        }
314
315        MCE::Child->create('consumer') for 1 .. $num_consumers;
316
317        ## producer
318
319        $queue->enqueue($_, $_ * 2) for 1 .. 40000;
320        $queue->end;
321
322        my %results;
323        my $total = 0;
324
325        for ( 1 .. $num_consumers ) {
326           my ($id, $count) = $queue->recv2;
327           $results{$id} = $count;
328           $total += $count;
329        }
330
331        $_->join for MCE::Child->list;
332
333        print $results{$_}, "\n" for keys %results;
334        print "$total total\n\n";
335
336   Example 3 - Consumer requests item
337       Like the previous example, but have the manager process await a
338       notification from the consumer before inserting into the queue. This
339       allows the producer to end the channel early (i.e. exit loop).
340
341        use strict;
342        use warnings;
343
344        use MCE::Child;
345        use MCE::Channel;
346
347        my $queue = MCE::Channel->new( impl => 'Mutex' );
348        my $num_consumers = 10;
349
350        sub consumer {
351           # receive items
352           my $count = 0;
353
354           while () {
355              # Notify the manager process to send items. This allows the
356              # manager process to enqueue only when requested. The benefit
357              # is being able to end the channel immediately.
358
359              $queue->send2( MCE::Child->pid ); # channel is bi-directional
360
361              my ($item1, $item2) = $queue->dequeue(2);
362              last unless ( defined $item1 );   # channel ended
363
364              $count += 2;
365           }
366
367           # result
368           return ( MCE::Child->pid => $count );
369        }
370
371        MCE::Child->create('consumer') for 1 .. $num_consumers;
372
373        ## producer
374
375        for my $num (1 .. 40000) {
376           # Await worker notification before inserting (blocking).
377           my $consumer_pid = $queue->recv2;
378           $queue->enqueue($num, $num * 2);
379        }
380
381        $queue->end;
382
383        my %results;
384        my $total = 0;
385
386        for my $child ( MCE::Child->list ) {
387           my ($id, $count) = $child->join;
388           $results{$id} = $count;
389           $total += $count;
390        }
391
392        print $results{$_}, "\n" for keys %results;
393        print "$total total\n\n";
394
395   Example 4 - Many producers
396       Running with 2 or more producers requires setting the "mp" option.
397       Internally, this enables locking support for the left end of the
398       channel. The "mp" option applies to "Mutex" and "Threads" channel
399       implementations only.
400
401       Here, using the MCE facility for gathering the final count.
402
403        use strict;
404        use warnings;
405
406        use MCE::Flow;
407        use MCE::Channel;
408
409        my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 );
410        my $num_consumers = 10;
411
412        sub consumer {
413           # receive items
414           my $count = 0;
415           while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) {
416              $count += 2;
417           }
418           # send result
419           MCE->gather( MCE->wid => $count );
420        }
421
422        sub producer {
423           $queue->enqueue($_, $_ * 2) for 1 .. 20000;
424        }
425
426        ## run 2 producers and many consumers
427
428        MCE::Flow->init(
429           max_workers => [ 2, $num_consumers ],
430           task_name   => [ 'producer', 'consumer' ],
431           task_end    => sub {
432              my ($mce, $task_id, $task_name) = @_;
433              if ( $task_name eq 'producer' ) {
434                 $queue->end;
435              }
436           }
437        );
438
439        # consumers call gather above (i.e. send a key-value pair),
440        # have MCE append to a hash
441
442        my %results = mce_flow \&producer, \&consumer;
443
444        MCE::Flow->finish;
445
446        my $total = 0;
447
448        for ( keys %results ) {
449           $total += $results{$_};
450           print $results{$_}, "\n";
451        }
452
453        print "$total total\n\n";
454
455   Example 5 - Many channels
456       This demonstration configures a channel per consumer. Plus, a common
457       channel for consumers to request the next input item. The "Simple"
458       implementation is specified for the individual channels whereas locking
459       may be necessary for the $ready channel. However, consumers do not
460       incur reading and what is written is very small (i.e. atomic write is
461       guaranteed by the OS). Thus, am safely choosing the "Simple"
462       implementation versus "Mutex".
463
464        use strict;
465        use warnings;
466
467        use MCE::Flow;
468        use MCE::Channel;
469
470        my $prog_name  = $0; $prog_name =~ s{^.*[\\/]}{}g;
471        my $input_size = shift || 3000;
472
473        unless ($input_size =~ /\A\d+\z/) {
474           print {*STDERR} "usage: $prog_name [ size ]\n";
475           exit 1;
476        }
477
478        my $consumers = 4;
479
480        my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers;
481
482        my $ready =       MCE::Channel->new( impl => 'Simple' );
483
484        sub producer {
485           my $id = 0;
486
487           # send the next input item upon request
488           for ( 0 .. $input_size - 1 ) {
489              my $chnl_num = $ready->recv2;
490              $chnls[ $chnl_num ]->send( ++$id, $_ );
491           }
492
493           # signal no more work
494           $_->send( 0, undef ) for @chnls;
495        }
496
497        sub consumer {
498           my $chnl_num = MCE->task_wid - 1;
499
500           while () {
501              # notify the producer ready for input
502              $ready->send2( $chnl_num );
503
504              # retrieve input data
505              my ( $id, $item ) = $chnls[ $chnl_num ]->recv;
506
507              # leave loop if no more work
508              last unless $id;
509
510              # compute and send the result to the manager process
511              # ordered output requires an id (must be 1st argument)
512              MCE->gather( $id, [ $item, sqrt($item) ] );
513           }
514        }
515
516        # A custom 'ordered' output iterator for MCE's gather facility.
517        # It returns a closure block, expecting an ID for 1st argument.
518
519        sub output_iterator {
520           my %tmp; my $order_id = 1;
521
522           return sub {
523              my ( $id, $result ) = @_;
524              $tmp{ $id } = $result;
525
526              while () {
527                 last unless exists $tmp{ $order_id };
528                 $result = delete $tmp{ $order_id };
529                 printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1];
530                 $order_id++;
531              }
532           };
533        }
534
535        # Run one producer and many consumers.
536        # Output to be sent orderly to STDOUT.
537
538        MCE::Flow->init(
539           gather => output_iterator(),
540           max_workers => [ 1, $consumers ],
541        );
542
543        MCE::Flow->run( \&producer, \&consumer );
544        MCE::Flow->finish;
545
546        __END__
547
548        # Output
549
550        n: 0 sqrt(n): 0.000000
551        n: 1 sqrt(n): 1.000000
552        n: 2 sqrt(n): 1.414214
553        n: 3 sqrt(n): 1.732051
554        n: 4 sqrt(n): 2.000000
555        n: 5 sqrt(n): 2.236068
556        n: 6 sqrt(n): 2.449490
557        n: 7 sqrt(n): 2.645751
558        n: 8 sqrt(n): 2.828427
559        n: 9 sqrt(n): 3.000000
560        ...
561

SEE ALSO

563       •  <https://github.com/marioroy/mce-examples/tree/master/chameneos>
564
565       •  threads::lite
566

AUTHOR

568       Mario E. Roy, <marioeroy AT gmail DOT com>
569
571       Copyright (C) 2019-2020 by Mario E. Roy
572
573       MCE::Channel is released under the same license as Perl.
574
575       See <http://dev.perl.org/licenses/> for more information.
576
577
578
579perl v5.32.1                      2021-01-27                   MCE::Channel(3)
Impressum