1MCE::Channel(3) User Contributed Perl Documentation MCE::Channel(3)
2
3
4
6 MCE::Channel - Queue-like and two-way communication capability
7
9 This document describes MCE::Channel version 1.874
10
12 use MCE::Channel;
13
14 ########################
15 # Construction
16 ########################
17
18 # A single producer and many consumers supporting processes and threads
19
20 my $c1 = MCE::Channel->new( impl => 'Mutex' ); # default implementation
21 my $c2 = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
22
23 # Set the mp flag if two or more workers (many producers) will be calling
24 # enqueue/send or recv2/recv2_nb on the left end of the channel
25
26 my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 );
27 my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 );
28
29 # Tuned for one producer and one consumer, no locking
30
31 my $c5 = MCE::Channel->new( impl => 'Simple' );
32
33 ########################
34 # Queue-like behavior
35 ########################
36
37 # Send data to consumers
38 $c1->enqueue('item');
39 $c1->enqueue(qw/item1 item2 item3 itemN/);
40
41 # Receive data
42 my $item = $c1->dequeue(); # item
43 my @items = $c1->dequeue(2); # (item1, item2)
44
45 # Receive, non-blocking
46 my $item = $c1->dequeue_nb(); # item
47 my @items = $c1->dequeue_nb(2); # (item1, item2)
48
49 # Signal that there is no more work to be sent
50 $c1->end();
51
52 ########################
53 # Two-way communication
54 ########################
55
56 # Producer(s) sending data
57 $c3->send('message');
58 $c3->send(qw/arg1 arg2 arg3/);
59
60 # Consumer(s) receiving data
61 my $mesg = $c3->recv(); # message
62 my @args = $c3->recv(); # (arg1, arg2, arg3)
63
64 # Alternatively, non-blocking
65 my $mesg = $c3->recv_nb(); # message
66 my @args = $c3->recv_nb(); # (arg1, arg2, arg3)
67
68 # A producer signaling no more work to be sent
69 $c3->end();
70
71 # Consumers(s) sending data
72 $c3->send2('message');
73 $c3->send2(qw/arg1 arg2 arg3/);
74
75 # Producer(s) receiving data
76 my $mesg = $c3->recv2(); # message
77 my @args = $c3->recv2(); # (arg1, arg2, arg3)
78
79 # Alternatively, non-blocking
80 my $mesg = $c3->recv2_nb(); # message
81 my @args = $c3->recv2_nb(); # (arg1, arg2, arg3)
82
84 A MCE::Channel object is a container for sending and receiving data
85 using socketpair handles. Serialization is provided by Sereal if
86 available. Defaults to Storable otherwise. Excluding the "Simple"
87 implementation, both ends of the "channel" support many workers
88 concurrently (with mp => 1).
89
90 new ( impl => STRING, mp => BOOLEAN )
91 This creates a new channel. Three implementations are provided "Mutex",
92 "Threads", and "Simple" indicating the locking mechanism to use
93 "MCE::Mutex", "threads::shared", and no locking respectively.
94
95 $chnl = MCE::Channel->new(); # default: impl => 'Mutex', mp => 0
96 # default: impl => 'Threads' on Windows
97
98 The "Mutex" implementation supports processes and threads whereas the
99 "Threads" implementation is suited for Windows and threads only.
100
101 $chnl = MCE::Channel->new( impl => 'Mutex' ); # MCE::Mutex locking
102 $chnl = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
103
104 # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
105
106 Set the "mp" (m)any (p)roducers option to a true value if there will be
107 two or more workers calling "enqueue", <send>, "recv2", or "recv2_nb"
108 on the left end of the channel. This is important to not incur a race
109 condition.
110
111 $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 );
112 $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 );
113
114 # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
115
116 The "Simple" implementation is optimized for one producer and one
117 consumer max. It omits locking for maximum performance. This
118 implementation is preferred for parent to child communication not
119 shared by another worker.
120
121 $chnl = MCE::Channel->new( impl => 'Simple' );
122
124 enqueue ( ITEM1 [, ITEM2, ... ] )
125 Appends a list of items onto the left end of the channel. This will
126 block once the internal socket buffer becomes full (i.e. awaiting
127 workers to dequeue on the other end). This prevents producer(s) from
128 running faster than consumer(s).
129
130 Object (de)serialization is handled automatically using Sereal if
131 available or defaults to Storable otherwise.
132
133 $chnl->enqueue('item1');
134 $chnl->enqueue(qw/item2 item3 .../);
135
136 $chnl->enqueue([ array_ref1 ]);
137 $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...);
138
139 $chnl->enqueue({ hash_ref1 });
140 $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...);
141
142 dequeue
143 dequeue ( COUNT )
144 Removes the requested number of items (default 1) from the right end of
145 the channel. If the channel contains fewer than the requested number of
146 items, the method will block (i.e. until other producer(s) enqueue more
147 items).
148
149 $item = $chnl->dequeue(); # item1
150 @items = $chnl->dequeue(2); # ( item2, item3 )
151
152 dequeue_nb
153 dequeue_nb ( COUNT )
154 Removes the requested number of items (default 1) from the right end of
155 the channel. If the channel contains fewer than the requested number of
156 items, the method will return what it was able to retrieve and return
157 immediately. If the channel is empty, then returns "an empty list" in
158 list context or "undef" in scalar context.
159
160 $item = $chnl->dequeue_nb(); # array_ref1
161 @items = $chnl->dequeue_nb(2); # ( array_ref2, array_ref3 )
162
163 end
164 This is called by a producer to signal that there is no more work to be
165 sent. Once ended, no more items may be sent by the producer. Calling
166 "end" by multiple producers is not supported.
167
168 $chnl->end;
169
171 send ( ARG1 [, ARG2, ... ] )
172 Append data onto the left end of the channel. Unlike "enqueue", the
173 values are kept together for the receiving consumer, similarly to
174 calling a method. Object (de)serialization is handled automatically.
175
176 $chnl->send('item');
177 $chnl->send([ list_ref ]);
178 $chnl->send([ hash_ref ]);
179
180 $chnl->send(qw/item1 item2 .../);
181 $chnl->send($id, [ list_ref ]);
182 $chnl->send($id, { hash_ref });
183
184 recv
185 recv_nb
186 Blocking and non-blocking fetch methods from the right end of the
187 channel. For the latter and when the channel is empty, returns "an
188 empty list" in list context or "undef" in scalar context.
189
190 $item = $chnl->recv();
191 $array_ref = $chnl->recv();
192 $hash_ref = $chnl->recv();
193
194 ($item1, $item2) = $chnl->recv_nb();
195 ($id, $array_ref) = $chnl->recv_nb();
196 ($id, $hash_ref) = $chnl->recv_nb();
197
199 send2 ( ARG1 [, ARG2, ... ] )
200 Append data onto the right end of the channel. Unlike "enqueue", the
201 values are kept together for the receiving producer, similarly to
202 calling a method. Object (de)serialization is handled automatically.
203
204 $chnl->send2('item');
205 $chnl->send2([ list_ref ]);
206 $chnl->send2([ hash_ref ]);
207
208 $chnl->send2(qw/item1 item2 .../);
209 $chnl->send2($id, [ list_ref ]);
210 $chnl->send2($id, { hash_ref });
211
212 recv2
213 recv2_nb
214 Blocking and non-blocking fetch methods from the left end of the
215 channel. For the latter and when the channel is empty, returns "an
216 empty list" in list context or "undef" in scalar context.
217
218 $item = $chnl->recv2();
219 $array_ref = $chnl->recv2();
220 $hash_ref = $chnl->recv2();
221
222 ($item1, $item2) = $chnl->recv2_nb();
223 ($id, $array_ref) = $chnl->recv2_nb();
224 ($id, $hash_ref) = $chnl->recv2_nb();
225
227 Example 1 - threads
228 "MCE::Channel" was made to work efficiently with threads. The reason is
229 from using threads::shared for locking versus MCE::Mutex.
230
231 use strict;
232 use warnings;
233
234 use threads;
235 use MCE::Channel;
236
237 my $queue = MCE::Channel->new( impl => 'Threads' );
238 my $num_consumers = 10;
239
240 sub consumer {
241 my $count = 0;
242
243 # receive items
244 while ( my ($item1, $item2) = $queue->dequeue(2) ) {
245 $count += 2;
246 }
247
248 # send result
249 $queue->send2( threads->tid => $count );
250 }
251
252 threads->create('consumer') for 1 .. $num_consumers;
253
254 ## producer
255
256 $queue->enqueue($_, $_ * 2) for 1 .. 40000;
257 $queue->end;
258
259 my %results;
260 my $total = 0;
261
262 for ( 1 .. $num_consumers ) {
263 my ($id, $count) = $queue->recv2;
264 $results{$id} = $count;
265 $total += $count;
266 }
267
268 $_->join for threads->list;
269
270 print $results{$_}, "\n" for keys %results;
271 print "$total total\n\n";
272
273 __END__
274
275 # output
276
277 8034
278 8008
279 8036
280 8058
281 7990
282 7948
283 8068
284 7966
285 7960
286 7932
287 80000 total
288
289 Example 2 - MCE::Child
290 The following is similarly threads-like for Perl lacking threads
291 support. It spawns processes instead, thus requires the "Mutex"
292 channel implementation which is the default if omitted.
293
294 use strict;
295 use warnings;
296
297 use MCE::Child;
298 use MCE::Channel;
299
300 my $queue = MCE::Channel->new( impl => 'Mutex' );
301 my $num_consumers = 10;
302
303 sub consumer {
304 my $count = 0;
305
306 # receive items
307 while ( my ($item1, $item2) = $queue->dequeue(2) ) {
308 $count += 2;
309 }
310
311 # send result
312 $queue->send2( MCE::Child->pid => $count );
313 }
314
315 MCE::Child->create('consumer') for 1 .. $num_consumers;
316
317 ## producer
318
319 $queue->enqueue($_, $_ * 2) for 1 .. 40000;
320 $queue->end;
321
322 my %results;
323 my $total = 0;
324
325 for ( 1 .. $num_consumers ) {
326 my ($id, $count) = $queue->recv2;
327 $results{$id} = $count;
328 $total += $count;
329 }
330
331 $_->join for MCE::Child->list;
332
333 print $results{$_}, "\n" for keys %results;
334 print "$total total\n\n";
335
336 Example 3 - Consumer requests item
337 Like the previous example, but have the manager process await a
338 notification from the consumer before inserting into the queue. This
339 allows the producer to end the channel early (i.e. exit loop).
340
341 use strict;
342 use warnings;
343
344 use MCE::Child;
345 use MCE::Channel;
346
347 my $queue = MCE::Channel->new( impl => 'Mutex' );
348 my $num_consumers = 10;
349
350 sub consumer {
351 # receive items
352 my $count = 0;
353
354 while () {
355 # Notify the manager process to send items. This allows the
356 # manager process to enqueue only when requested. The benefit
357 # is being able to end the channel immediately.
358
359 $queue->send2( MCE::Child->pid ); # channel is bi-directional
360
361 my ($item1, $item2) = $queue->dequeue(2);
362 last unless ( defined $item1 ); # channel ended
363
364 $count += 2;
365 }
366
367 # result
368 return ( MCE::Child->pid => $count );
369 }
370
371 MCE::Child->create('consumer') for 1 .. $num_consumers;
372
373 ## producer
374
375 for my $num (1 .. 40000) {
376 # Await worker notification before inserting (blocking).
377 my $consumer_pid = $queue->recv2;
378 $queue->enqueue($num, $num * 2);
379 }
380
381 $queue->end;
382
383 my %results;
384 my $total = 0;
385
386 for my $child ( MCE::Child->list ) {
387 my ($id, $count) = $child->join;
388 $results{$id} = $count;
389 $total += $count;
390 }
391
392 print $results{$_}, "\n" for keys %results;
393 print "$total total\n\n";
394
395 Example 4 - Many producers
396 Running with 2 or more producers requires setting the "mp" option.
397 Internally, this enables locking support for the left end of the
398 channel. The "mp" option applies to "Mutex" and "Threads" channel
399 implementations only.
400
401 Here, using the MCE facility for gathering the final count.
402
403 use strict;
404 use warnings;
405
406 use MCE::Flow;
407 use MCE::Channel;
408
409 my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 );
410 my $num_consumers = 10;
411
412 sub consumer {
413 # receive items
414 my $count = 0;
415 while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) {
416 $count += 2;
417 }
418 # send result
419 MCE->gather( MCE->wid => $count );
420 }
421
422 sub producer {
423 $queue->enqueue($_, $_ * 2) for 1 .. 20000;
424 }
425
426 ## run 2 producers and many consumers
427
428 MCE::Flow->init(
429 max_workers => [ 2, $num_consumers ],
430 task_name => [ 'producer', 'consumer' ],
431 task_end => sub {
432 my ($mce, $task_id, $task_name) = @_;
433 if ( $task_name eq 'producer' ) {
434 $queue->end;
435 }
436 }
437 );
438
439 # consumers call gather above (i.e. send a key-value pair),
440 # have MCE append to a hash
441
442 my %results = mce_flow \&producer, \&consumer;
443
444 MCE::Flow->finish;
445
446 my $total = 0;
447
448 for ( keys %results ) {
449 $total += $results{$_};
450 print $results{$_}, "\n";
451 }
452
453 print "$total total\n\n";
454
455 Example 5 - Many channels
456 This demonstration configures a channel per consumer. Plus, a common
457 channel for consumers to request the next input item. The "Simple"
458 implementation is specified for the individual channels whereas locking
459 may be necessary for the $ready channel. However, consumers do not
460 incur reading and what is written is very small (i.e. atomic write is
461 guaranteed by the OS). Thus, am safely choosing the "Simple"
462 implementation versus "Mutex".
463
464 use strict;
465 use warnings;
466
467 use MCE::Flow;
468 use MCE::Channel;
469
470 my $prog_name = $0; $prog_name =~ s{^.*[\\/]}{}g;
471 my $input_size = shift || 3000;
472
473 unless ($input_size =~ /\A\d+\z/) {
474 print {*STDERR} "usage: $prog_name [ size ]\n";
475 exit 1;
476 }
477
478 my $consumers = 4;
479
480 my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers;
481
482 my $ready = MCE::Channel->new( impl => 'Simple' );
483
484 sub producer {
485 my $id = 0;
486
487 # send the next input item upon request
488 for ( 0 .. $input_size - 1 ) {
489 my $chnl_num = $ready->recv2;
490 $chnls[ $chnl_num ]->send( ++$id, $_ );
491 }
492
493 # signal no more work
494 $_->send( 0, undef ) for @chnls;
495 }
496
497 sub consumer {
498 my $chnl_num = MCE->task_wid - 1;
499
500 while () {
501 # notify the producer ready for input
502 $ready->send2( $chnl_num );
503
504 # retrieve input data
505 my ( $id, $item ) = $chnls[ $chnl_num ]->recv;
506
507 # leave loop if no more work
508 last unless $id;
509
510 # compute and send the result to the manager process
511 # ordered output requires an id (must be 1st argument)
512 MCE->gather( $id, [ $item, sqrt($item) ] );
513 }
514 }
515
516 # A custom 'ordered' output iterator for MCE's gather facility.
517 # It returns a closure block, expecting an ID for 1st argument.
518
519 sub output_iterator {
520 my %tmp; my $order_id = 1;
521
522 return sub {
523 my ( $id, $result ) = @_;
524 $tmp{ $id } = $result;
525
526 while () {
527 last unless exists $tmp{ $order_id };
528 $result = delete $tmp{ $order_id };
529 printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1];
530 $order_id++;
531 }
532 };
533 }
534
535 # Run one producer and many consumers.
536 # Output to be sent orderly to STDOUT.
537
538 MCE::Flow->init(
539 gather => output_iterator(),
540 max_workers => [ 1, $consumers ],
541 );
542
543 MCE::Flow->run( \&producer, \&consumer );
544 MCE::Flow->finish;
545
546 __END__
547
548 # Output
549
550 n: 0 sqrt(n): 0.000000
551 n: 1 sqrt(n): 1.000000
552 n: 2 sqrt(n): 1.414214
553 n: 3 sqrt(n): 1.732051
554 n: 4 sqrt(n): 2.000000
555 n: 5 sqrt(n): 2.236068
556 n: 6 sqrt(n): 2.449490
557 n: 7 sqrt(n): 2.645751
558 n: 8 sqrt(n): 2.828427
559 n: 9 sqrt(n): 3.000000
560 ...
561
563 • <https://github.com/marioroy/mce-examples/tree/master/chameneos>
564
565 • threads::lite
566
568 Mario E. Roy, <marioeroy AT gmail DOT com>
569
571 Copyright (C) 2019-2020 by Mario E. Roy
572
573 MCE::Channel is released under the same license as Perl.
574
575 See <http://dev.perl.org/licenses/> for more information.
576
577
578
579perl v5.34.0 2021-07-22 MCE::Channel(3)