1MCE::Channel(3) User Contributed Perl Documentation MCE::Channel(3)
2
3
4
6 MCE::Channel - Queue-like and two-way communication capability
7
9 This document describes MCE::Channel version 1.862
10
12 use MCE::Channel;
13
14 ########################
15 # Construction
16 ########################
17
18 # A single producer and many consumers supporting processes and threads
19
20 my $c1 = MCE::Channel->new( impl => 'Mutex' ); # default implementation
21 my $c2 = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
22
23 # Set the mp flag if two or more workers (many producers) will be calling
24 # enqueue/send or recv2/recv2_nb on the left end of the channel
25
26 my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 );
27 my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 );
28
29 # Tuned for one producer and one consumer, no locking
30
31 my $c5 = MCE::Channel->new( impl => 'Simple' );
32
33 ########################
34 # Queue-like behavior
35 ########################
36
37 # Send data to consumers
38 $c1->enqueue('item');
39 $c1->enqueue(qw/item1 item2 item3 itemN/);
40
41 # Receive data
42 my $item = $c1->dequeue(); # item
43 my @items = $c1->dequeue(2); # (item1, item2)
44
45 # Receive, non-blocking
46 my $item = $c1->dequeue_nb(); # item
47 my @items = $c1->dequeue_nb(2); # (item1, item2)
48
49 # Signal that there is no more work to be sent
50 $c1->end();
51
52 ########################
53 # Two-way communication
54 ########################
55
56 # Producer(s) sending data
57 $c3->send('message');
58 $c3->send(qw/arg1 arg2 arg3/);
59
60 # Consumer(s) receiving data
61 my $mesg = $c3->recv(); # message
62 my @args = $c3->recv(); # (arg1, arg2, arg3)
63
64 # Alternatively, non-blocking
65 my $mesg = $c3->recv_nb(); # message
66 my @args = $c3->recv_nb(); # (arg1, arg2, arg3)
67
68 # A producer signaling no more work to be sent
69 $c3->end();
70
71 # Consumers(s) sending data
72 $c3->send2('message');
73 $c3->send2(qw/arg1 arg2 arg3/);
74
75 # Producer(s) receiving data
76 my $mesg = $c3->recv2(); # message
77 my @args = $c3->recv2(); # (arg1, arg2, arg3)
78
79 # Alternatively, non-blocking
80 my $mesg = $c3->recv2_nb(); # message
81 my @args = $c3->recv2_nb(); # (arg1, arg2, arg3)
82
84 A MCE::Channel object is a container for sending and receiving data
85 using socketpair handles. Serialization is provided by Sereal if
86 available. Defaults to Storable otherwise. Excluding the "Simple"
87 implementation, both ends of the "channel" support many workers
88 concurrently (with mp => 1).
89
90 new ( impl => STRING, mp => BOOLEAN )
91 This creates a new channel. Three implementations are provided "Mutex"
92 (default), "Threads", and "Simple" indicating the locking mechanism to
93 use "MCE::Mutex", "threads::shared", and no locking respectively.
94
95 $chnl = MCE::Channel->new(); # default: impl => 'Mutex', mp => 0
96
97 The "Mutex" channel implementation supports processes and threads
98 whereas the "Threads" channel implementation is suited for threads
99 only.
100
101 $chnl = MCE::Channel->new( impl => 'Mutex' ); # MCE::Mutex locking
102 $chnl = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
103
104 Set the "mp" (m)any (p)roducers option to a true value if there will be
105 two or more workers calling "enqueue", <send>, "recv2", or "recv2_nb"
106 on the left end of the channel. This is important to not incur a race
107 condition.
108
109 $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 );
110 $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 );
111
112 The "Simple" implementation is optimized for one producer and one
113 consumer max. It omits locking for maximum performance. This
114 implementation is preferred for parent to child communication not
115 shared by another worker.
116
117 $chnl = MCE::Channel->new( impl => 'Simple' );
118
120 enqueue ( ITEM1 [, ITEM2, ... ] )
121 Appends a list of items onto the left end of the channel. This will
122 block once the internal socket buffer becomes full (i.e. awaiting
123 workers to dequeue on the other end). This prevents producer(s) from
124 running faster than consumer(s).
125
126 Object (de)serialization is handled automatically using Sereal if
127 available or defaults to Storable otherwise.
128
129 $chnl->enqueue('item1');
130 $chnl->enqueue(qw/item2 item3 .../);
131
132 $chnl->enqueue([ array_ref1 ]);
133 $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...);
134
135 $chnl->enqueue({ hash_ref1 });
136 $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...);
137
138 dequeue
139 dequeue ( COUNT )
140 Removes the requested number of items (default 1) from the right end of
141 the channel. If the channel contains fewer than the requested number of
142 items, the method will block (i.e. until other producer(s) enqueue more
143 items).
144
145 $item = $chnl->dequeue(); # item1
146 @items = $chnl->dequeue(2); # ( item2, item3 )
147
148 dequeue_nb
149 dequeue_nb ( COUNT )
150 Removes the requested number of items (default 1) from the right end of
151 the channel. If the channel contains fewer than the requested number of
152 items, the method will return what it was able to retrieve and return
153 immediately. If the channel is empty, then returns "an empty list" in
154 list context or "undef" in scalar context.
155
156 $item = $chnl->dequeue_nb(); # array_ref1
157 @items = $chnl->dequeue_nb(2); # ( array_ref2, array_ref3 )
158
159 end
160 This is called by a producer to signal that there is no more work to be
161 sent. Once ended, no more items may be sent by the producer. Calling
162 "end" by multiple producers is not supported.
163
164 $chnl->end;
165
167 send ( ARG1 [, ARG2, ... ] )
168 Append data onto the left end of the channel. Unlike "enqueue", the
169 values are kept together for the receiving consumer, similarly to
170 calling a method. Object (de)serialization is handled automatically.
171
172 $chnl->send('item');
173 $chnl->send([ list_ref ]);
174 $chnl->send([ hash_ref ]);
175
176 $chnl->send(qw/item1 item2 .../);
177 $chnl->send($id, [ list_ref ]);
178 $chnl->send($id, { hash_ref });
179
180 recv
181 recv_nb
182 Blocking and non-blocking fetch methods from the right end of the
183 channel. For the latter and when the channel is empty, returns "an
184 empty list" in list context or "undef" in scalar context.
185
186 $item = $chnl->recv();
187 $array_ref = $chnl->recv();
188 $hash_ref = $chnl->recv();
189
190 ($item1, $item2) = $chnl->recv_nb();
191 ($id, $array_ref) = $chnl->recv_nb();
192 ($id, $hash_ref) = $chnl->recv_nb();
193
195 send2 ( ARG1 [, ARG2, ... ] )
196 Append data onto the right end of the channel. Unlike "enqueue", the
197 values are kept together for the receiving producer, similarly to
198 calling a method. Object (de)serialization is handled automatically.
199
200 $chnl->send2('item');
201 $chnl->send2([ list_ref ]);
202 $chnl->send2([ hash_ref ]);
203
204 $chnl->send2(qw/item1 item2 .../);
205 $chnl->send2($id, [ list_ref ]);
206 $chnl->send2($id, { hash_ref });
207
208 recv2
209 recv2_nb
210 Blocking and non-blocking fetch methods from the left end of the
211 channel. For the latter and when the channel is empty, returns "an
212 empty list" in list context or "undef" in scalar context.
213
214 $item = $chnl->recv2();
215 $array_ref = $chnl->recv2();
216 $hash_ref = $chnl->recv2();
217
218 ($item1, $item2) = $chnl->recv2_nb();
219 ($id, $array_ref) = $chnl->recv2_nb();
220 ($id, $hash_ref) = $chnl->recv2_nb();
221
223 Example 1 - threads
224 "MCE::Channel" was made to work efficiently with threads. The reason is
225 from using threads::shared for locking versus MCE::Mutex.
226
227 use strict;
228 use warnings;
229
230 use threads;
231 use MCE::Channel;
232
233 my $queue = MCE::Channel->new( impl => 'Threads' );
234 my $num_consumers = 10;
235
236 sub consumer {
237 my $count = 0;
238
239 # receive items
240 while ( my ($item1, $item2) = $queue->dequeue(2) ) {
241 $count += 2;
242 }
243
244 # send result
245 $queue->send2( threads->tid => $count );
246 }
247
248 threads->create('consumer') for 1 .. $num_consumers;
249
250 ## producer
251
252 $queue->enqueue($_, $_ * 2) for 1 .. 40000;
253 $queue->end;
254
255 my %results;
256 my $total = 0;
257
258 for ( 1 .. $num_consumers ) {
259 my ($id, $count) = $queue->recv2;
260 $results{$id} = $count;
261 $total += $count;
262 }
263
264 $_->join for threads->list;
265
266 print $results{$_}, "\n" for keys %results;
267 print "$total total\n\n";
268
269 __END__
270
271 # output
272
273 8034
274 8008
275 8036
276 8058
277 7990
278 7948
279 8068
280 7966
281 7960
282 7932
283 80000 total
284
285 Example 2 - MCE::Child
286 The following is similarly threads-like for Perl lacking threads
287 support. It spawns processes instead, thus requires the "Mutex"
288 channel implementation which is the default if omitted.
289
290 use strict;
291 use warnings;
292
293 use MCE::Child;
294 use MCE::Channel;
295
296 my $queue = MCE::Channel->new( impl => 'Mutex' );
297 my $num_consumers = 10;
298
299 sub consumer {
300 my $count = 0;
301
302 # receive items
303 while ( my ($item1, $item2) = $queue->dequeue(2) ) {
304 $count += 2;
305 }
306
307 # send result
308 $queue->send2( MCE::Child->pid => $count );
309 }
310
311 MCE::Child->create('consumer') for 1 .. $num_consumers;
312
313 ## producer
314
315 $queue->enqueue($_, $_ * 2) for 1 .. 40000;
316 $queue->end;
317
318 my %results;
319 my $total = 0;
320
321 for ( 1 .. $num_consumers ) {
322 my ($id, $count) = $queue->recv2;
323 $results{$id} = $count;
324 $total += $count;
325 }
326
327 $_->join for MCE::Child->list;
328
329 print $results{$_}, "\n" for keys %results;
330 print "$total total\n\n";
331
332 Example 3 - Consumer requests item
333 Like the previous example, but have the manager process await a
334 notification from the consumer before inserting into the queue. This
335 allows the producer to end the channel early (i.e. exit loop).
336
337 use strict;
338 use warnings;
339
340 use MCE::Child;
341 use MCE::Channel;
342
343 my $queue = MCE::Channel->new( impl => 'Mutex' );
344 my $num_consumers = 10;
345
346 sub consumer {
347 # receive items
348 my $count = 0;
349
350 while () {
351 # Notify the manager process to send items. This allows the
352 # manager process to enqueue only when requested. The benefit
353 # is being able to end the channel immediately.
354
355 $queue->send2( MCE::Child->pid ); # channel is bi-directional
356
357 my ($item1, $item2) = $queue->dequeue(2);
358 last unless ( defined $item1 ); # channel ended
359
360 $count += 2;
361 }
362
363 # result
364 return ( MCE::Child->pid => $count );
365 }
366
367 MCE::Child->create('consumer') for 1 .. $num_consumers;
368
369 ## producer
370
371 for my $num (1 .. 40000) {
372 # Await worker notification before inserting (blocking).
373 my $consumer_pid = $queue->recv2;
374 $queue->enqueue($num, $num * 2);
375 }
376
377 $queue->end;
378
379 my %results;
380 my $total = 0;
381
382 for my $child ( MCE::Child->list ) {
383 my ($id, $count) = $child->join;
384 $results{$id} = $count;
385 $total += $count;
386 }
387
388 print $results{$_}, "\n" for keys %results;
389 print "$total total\n\n";
390
391 Example 4 - Many producers
392 Running with 2 or more producers requires setting the "mp" option.
393 Internally, this enables locking support for the left end of the
394 channel. The "mp" option applies to "Mutex" and "Threads" channel
395 implementations only.
396
397 Here, using the MCE facility for gathering the final count.
398
399 use strict;
400 use warnings;
401
402 use MCE::Flow;
403 use MCE::Channel;
404
405 my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 );
406 my $num_consumers = 10;
407
408 sub consumer {
409 # receive items
410 my $count = 0;
411 while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) {
412 $count += 2;
413 }
414 # send result
415 MCE->gather( MCE->wid => $count );
416 }
417
418 sub producer {
419 $queue->enqueue($_, $_ * 2) for 1 .. 20000;
420 }
421
422 ## run 2 producers and many consumers
423
424 MCE::Flow::init(
425 max_workers => [ 2, $num_consumers ],
426 task_name => [ 'producer', 'consumer' ],
427 task_end => sub {
428 my ($mce, $task_id, $task_name) = @_;
429 if ( $task_name eq 'producer' ) {
430 $queue->end;
431 }
432 }
433 );
434
435 # consumers call gather above (i.e. send a key-value pair),
436 # have MCE append to a hash
437
438 my %results = mce_flow \&producer, \&consumer;
439
440 MCE::Flow::finish;
441
442 my $total = 0;
443
444 for ( keys %results ) {
445 $total += $results{$_};
446 print $results{$_}, "\n";
447 }
448
449 print "$total total\n\n";
450
451 Example 5 - Many channels
452 This demonstration configures a channel per consumer. Plus, a common
453 channel for consumers to request the next input item. The "Simple"
454 implementation is specified for the individual channels whereas locking
455 may be necessary for the $ready channel. However, consumers do not
456 incur reading and what is written is very small (i.e. atomic write is
457 guaranteed by the OS). Thus, am safely choosing the "Simple"
458 implementation versus "Mutex".
459
460 use strict;
461 use warnings;
462
463 use MCE::Flow;
464 use MCE::Channel;
465
466 my $prog_name = $0; $prog_name =~ s{^.*[\\/]}{}g;
467 my $input_size = shift || 3000;
468
469 unless ($input_size =~ /\A\d+\z/) {
470 print {*STDERR} "usage: $prog_name [ size ]\n";
471 exit 1;
472 }
473
474 my $consumers = 4;
475
476 my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers;
477
478 my $ready = MCE::Channel->new( impl => 'Simple' );
479
480 sub producer {
481 my $id = 0;
482
483 # send the next input item upon request
484 for ( 0 .. $input_size - 1 ) {
485 my $chnl_num = $ready->recv2;
486 $chnls[ $chnl_num ]->send( ++$id, $_ );
487 }
488
489 # signal no more work
490 $_->send( 0, undef ) for @chnls;
491 }
492
493 sub consumer {
494 my $chnl_num = MCE->task_wid - 1;
495
496 while () {
497 # notify the producer ready for input
498 $ready->send2( $chnl_num );
499
500 # retrieve input data
501 my ( $id, $item ) = $chnls[ $chnl_num ]->recv;
502
503 # leave loop if no more work
504 last unless $id;
505
506 # compute and send the result to the manager process
507 # ordered output requires an id (must be 1st argument)
508 MCE->gather( $id, [ $item, sqrt($item) ] );
509 }
510 }
511
512 # A custom 'ordered' output iterator for MCE's gather facility.
513 # It returns a closure block, expecting an ID for 1st argument.
514
515 sub output_iterator {
516 my %tmp; my $order_id = 1;
517
518 return sub {
519 my ( $id, $result ) = @_;
520 $tmp{ $id } = $result;
521
522 while () {
523 last unless exists $tmp{ $order_id };
524 $result = delete $tmp{ $order_id };
525 printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1];
526 $order_id++;
527 }
528 };
529 }
530
531 # Run one producer and many consumers.
532 # Output to be sent orderly to STDOUT.
533
534 MCE::Flow->init(
535 gather => output_iterator(),
536 max_workers => [ 1, $consumers ],
537 );
538
539 MCE::Flow->run( \&producer, \&consumer );
540 MCE::Flow->finish;
541
542 __END__
543
544 # Output
545
546 n: 0 sqrt(n): 0.000000
547 n: 1 sqrt(n): 1.000000
548 n: 2 sqrt(n): 1.414214
549 n: 3 sqrt(n): 1.732051
550 n: 4 sqrt(n): 2.000000
551 n: 5 sqrt(n): 2.236068
552 n: 6 sqrt(n): 2.449490
553 n: 7 sqrt(n): 2.645751
554 n: 8 sqrt(n): 2.828427
555 n: 9 sqrt(n): 3.000000
556 ...
557
559 · <https://github.com/marioroy/mce-examples/tree/master/chameneos>
560
561 · threads::lite
562
564 Mario E. Roy, <marioeroy AT gmail DOT com>
565
567 Copyright (C) 2019 by Mario E. Roy
568
569 MCE::Channel is released under the same license as Perl.
570
571 See <http://dev.perl.org/licenses/> for more information.
572
573
574
575perl v5.30.0 2019-09-19 MCE::Channel(3)