1MCE::Channel(3) User Contributed Perl Documentation MCE::Channel(3)
2
3
4
6 MCE::Channel - Queue-like and two-way communication capability
7
9 This document describes MCE::Channel version 1.884
10
12 use MCE::Channel;
13
14 ########################
15 # Construction
16 ########################
17
18 # A single producer and many consumers supporting processes and threads
19
20 my $c1 = MCE::Channel->new( impl => 'Mutex' ); # default implementation
21 my $c2 = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
22
23 # Set the mp flag if two or more workers (many producers) will be calling
24 # enqueue/send or recv2/recv2_nb on the left end of the channel
25
26 my $c3 = MCE::Channel->new( impl => 'Mutex', mp => 1 );
27 my $c4 = MCE::Channel->new( impl => 'Threads', mp => 1 );
28
29 # Tuned for one producer and one consumer, no locking
30
31 my $c5 = MCE::Channel->new( impl => 'Simple' );
32
33 ########################
34 # Queue-like behavior
35 ########################
36
37 # Send data to consumers
38 $c1->enqueue('item');
39 $c1->enqueue(qw/item1 item2 item3 itemN/);
40
41 # Receive data
42 my $item = $c1->dequeue(); # item
43 my @items = $c1->dequeue(2); # (item1, item2)
44
45 # Receive, non-blocking
46 my $item = $c1->dequeue_nb(); # item
47 my @items = $c1->dequeue_nb(2); # (item1, item2)
48
49 # Signal that there is no more work to be sent
50 $c1->end();
51
52 ########################
53 # Two-way communication
54 ########################
55
56 # Producer(s) sending data
57 $c3->send('message');
58 $c3->send(qw/arg1 arg2 arg3/);
59
60 # Consumer(s) receiving data
61 my $mesg = $c3->recv(); # message
62 my @args = $c3->recv(); # (arg1, arg2, arg3)
63
64 # Alternatively, non-blocking
65 my $mesg = $c3->recv_nb(); # message
66 my @args = $c3->recv_nb(); # (arg1, arg2, arg3)
67
68 # A producer signaling no more work to be sent
69 $c3->end();
70
71 # Consumers(s) sending data
72 $c3->send2('message');
73 $c3->send2(qw/arg1 arg2 arg3/);
74
75 # Producer(s) receiving data
76 my $mesg = $c3->recv2(); # message
77 my @args = $c3->recv2(); # (arg1, arg2, arg3)
78
79 # Alternatively, non-blocking
80 my $mesg = $c3->recv2_nb(); # message
81 my @args = $c3->recv2_nb(); # (arg1, arg2, arg3)
82
84 A MCE::Channel object is a container for sending and receiving data
85 using socketpair handles. Serialization is provided by Sereal if
86 available. Defaults to Storable otherwise. Excluding the "Simple"
87 implementation, both ends of the "channel" support many workers
88 concurrently (with mp => 1).
89
90 new ( impl => STRING, mp => BOOLEAN )
91 This creates a new channel. Three implementations are provided "Mutex",
92 "Threads", and "Simple" indicating the locking mechanism to use
93 "MCE::Mutex", "threads::shared", and no locking respectively.
94
95 $chnl = MCE::Channel->new(); # default: impl => 'Mutex', mp => 0
96 # default: impl => 'Threads' on Windows
97
98 The "Mutex" implementation supports processes and threads whereas the
99 "Threads" implementation is suited for Windows and threads only.
100
101 $chnl = MCE::Channel->new( impl => 'Mutex' ); # MCE::Mutex locking
102 $chnl = MCE::Channel->new( impl => 'Threads' ); # threads::shared locking
103
104 # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
105
106 Set the "mp" (m)any (p)roducers option to a true value if there will be
107 two or more workers calling "enqueue", <send>, "recv2", or "recv2_nb"
108 on the left end of the channel. This is important to not incur a race
109 condition.
110
111 $chnl = MCE::Channel->new( impl => 'Mutex', mp => 1 );
112 $chnl = MCE::Channel->new( impl => 'Threads', mp => 1 );
113
114 # on Windows, silently becomes impl => 'Threads' when specifying 'Mutex'
115
116 The "Simple" implementation is optimized for one producer and one
117 consumer max. It omits locking for maximum performance. This
118 implementation is preferred for parent to child communication not
119 shared by another worker.
120
121 $chnl = MCE::Channel->new( impl => 'Simple' );
122
124 enqueue ( ITEM1 [, ITEM2, ... ] )
125 Appends a list of items onto the left end of the channel. This will
126 block once the internal socket buffer becomes full (i.e. awaiting
127 workers to dequeue on the other end). This prevents producer(s) from
128 running faster than consumer(s).
129
130 Object (de)serialization is handled automatically using Sereal if
131 available or defaults to Storable otherwise.
132
133 $chnl->enqueue('item1');
134 $chnl->enqueue(qw/item2 item3 .../);
135
136 $chnl->enqueue([ array_ref1 ]);
137 $chnl->enqueue([ array_ref2 ], [ array_ref3 ], ...);
138
139 $chnl->enqueue({ hash_ref1 });
140 $chnl->enqueue({ hash_ref2 }, { hash_ref3 }, ...);
141
142 dequeue
143 dequeue ( COUNT )
144 Removes the requested number of items (default 1) from the right end of
145 the channel. If the channel contains fewer than the requested number of
146 items, the method will block (i.e. until other producer(s) enqueue more
147 items).
148
149 $item = $chnl->dequeue(); # item1
150 @items = $chnl->dequeue(2); # ( item2, item3 )
151
152 dequeue_nb
153 dequeue_nb ( COUNT )
154 Removes the requested number of items (default 1) from the right end of
155 the channel. If the channel contains fewer than the requested number of
156 items, the method will return what it was able to retrieve and return
157 immediately. If the channel is empty, then returns "an empty list" in
158 list context or "undef" in scalar context.
159
160 $item = $chnl->dequeue_nb(); # array_ref1
161 @items = $chnl->dequeue_nb(2); # ( array_ref2, array_ref3 )
162
163 end
164 This is called by a producer to signal that there is no more work to be
165 sent. Once ended, no more items may be sent by the producer. Calling
166 "end" by multiple producers is not supported.
167
168 $chnl->end;
169
171 send ( ARG1 [, ARG2, ... ] )
172 Append data onto the left end of the channel. Unlike "enqueue", the
173 values are kept together for the receiving consumer, similarly to
174 calling a method. Object (de)serialization is handled automatically.
175
176 $chnl->send('item');
177 $chnl->send([ list_ref ]);
178 $chnl->send([ hash_ref ]);
179
180 $chnl->send(qw/item1 item2 .../);
181 $chnl->send($id, [ list_ref ]);
182 $chnl->send($id, { hash_ref });
183
184 The fast channel implementations, introduced in MCE 1.877, support one
185 item for "send". If you want to pass multiple arguments, simply join
186 the arguments into a string. That means the receiver will need to split
187 the string.
188
189 $chnl = MCE::Channel->new(impl => "SimpleFast");
190
191 $chnl->send(join(" ", qw/item1 item2 item3/);
192 my ($item1, $item2, $item3) = split " ", $chnl->recv();
193
194 recv
195 recv_nb
196 Blocking and non-blocking fetch methods from the right end of the
197 channel. For the latter and when the channel is empty, returns "an
198 empty list" in list context or "undef" in scalar context.
199
200 $item = $chnl->recv();
201 $array_ref = $chnl->recv();
202 $hash_ref = $chnl->recv();
203
204 ($item1, $item2) = $chnl->recv_nb();
205 ($id, $array_ref) = $chnl->recv_nb();
206 ($id, $hash_ref) = $chnl->recv_nb();
207
209 send2 ( ARG1 [, ARG2, ... ] )
210 Append data onto the right end of the channel. Unlike "enqueue", the
211 values are kept together for the receiving producer, similarly to
212 calling a method. Object (de)serialization is handled automatically.
213
214 $chnl->send2('item');
215 $chnl->send2([ list_ref ]);
216 $chnl->send2([ hash_ref ]);
217
218 $chnl->send2(qw/item1 item2 .../);
219 $chnl->send2($id, [ list_ref ]);
220 $chnl->send2($id, { hash_ref });
221
222 The fast channel implementations, introduced in MCE 1.877, support one
223 item for "send2". If you want to pass multiple arguments, simply join
224 the arguments into a string. Not to forget, the receiver must split the
225 string as well.
226
227 $chnl = MCE::Channel->new(impl => "MutexFast");
228
229 $chnl->send2(join(" ", qw/item1 item2 item3/);
230 my ($item1, $item2, $item3) = split " ", $chnl->recv();
231
232 recv2
233 recv2_nb
234 Blocking and non-blocking fetch methods from the left end of the
235 channel. For the latter and when the channel is empty, returns "an
236 empty list" in list context or "undef" in scalar context.
237
238 $item = $chnl->recv2();
239 $array_ref = $chnl->recv2();
240 $hash_ref = $chnl->recv2();
241
242 ($item1, $item2) = $chnl->recv2_nb();
243 ($id, $array_ref) = $chnl->recv2_nb();
244 ($id, $hash_ref) = $chnl->recv2_nb();
245
247 Example 1 - threads
248 "MCE::Channel" was made to work efficiently with threads. The reason
249 comes from using threads::shared for locking versus MCE::Mutex.
250
251 use strict;
252 use warnings;
253
254 use threads;
255 use MCE::Channel;
256
257 my $queue = MCE::Channel->new( impl => 'Threads' );
258 my $num_consumers = 10;
259
260 sub consumer {
261 my $count = 0;
262
263 # receive items
264 while ( my ($item1, $item2) = $queue->dequeue(2) ) {
265 $count += 2;
266 }
267
268 # send result
269 $queue->send2( threads->tid => $count );
270 }
271
272 threads->create('consumer') for 1 .. $num_consumers;
273
274 ## producer
275
276 $queue->enqueue($_, $_ * 2) for 1 .. 40000;
277 $queue->end;
278
279 my %results;
280 my $total = 0;
281
282 for ( 1 .. $num_consumers ) {
283 my ($id, $count) = $queue->recv2;
284 $results{$id} = $count;
285 $total += $count;
286 }
287
288 $_->join for threads->list;
289
290 print $results{$_}, "\n" for keys %results;
291 print "$total total\n\n";
292
293 __END__
294
295 # output
296
297 8034
298 8008
299 8036
300 8058
301 7990
302 7948
303 8068
304 7966
305 7960
306 7932
307 80000 total
308
309 Example 2 - MCE::Child
310 The following is similarly threads-like for Perl lacking threads
311 support. It spawns processes instead, thus requires the "Mutex"
312 channel implementation which is the default if omitted.
313
314 use strict;
315 use warnings;
316
317 use MCE::Child;
318 use MCE::Channel;
319
320 my $queue = MCE::Channel->new( impl => 'Mutex' );
321 my $num_consumers = 10;
322
323 sub consumer {
324 my $count = 0;
325
326 # receive items
327 while ( my ($item1, $item2) = $queue->dequeue(2) ) {
328 $count += 2;
329 }
330
331 # send result
332 $queue->send2( MCE::Child->pid => $count );
333 }
334
335 MCE::Child->create('consumer') for 1 .. $num_consumers;
336
337 ## producer
338
339 $queue->enqueue($_, $_ * 2) for 1 .. 40000;
340 $queue->end;
341
342 my %results;
343 my $total = 0;
344
345 for ( 1 .. $num_consumers ) {
346 my ($id, $count) = $queue->recv2;
347 $results{$id} = $count;
348 $total += $count;
349 }
350
351 $_->join for MCE::Child->list;
352
353 print $results{$_}, "\n" for keys %results;
354 print "$total total\n\n";
355
356 Example 3 - Consumer requests item
357 Like the previous example, but have the manager process await a
358 notification from the consumer before inserting into the queue. This
359 allows the producer to end the channel early (i.e. exit loop).
360
361 use strict;
362 use warnings;
363
364 use MCE::Child;
365 use MCE::Channel;
366
367 my $queue = MCE::Channel->new( impl => 'Mutex' );
368 my $num_consumers = 10;
369
370 sub consumer {
371 # receive items
372 my $count = 0;
373
374 while () {
375 # Notify the manager process to send items. This allows the
376 # manager process to enqueue only when requested. The benefit
377 # is being able to end the channel immediately.
378
379 $queue->send2( MCE::Child->pid ); # channel is bi-directional
380
381 my ($item1, $item2) = $queue->dequeue(2);
382 last unless ( defined $item1 ); # channel ended
383
384 $count += 2;
385 }
386
387 # result
388 return ( MCE::Child->pid => $count );
389 }
390
391 MCE::Child->create('consumer') for 1 .. $num_consumers;
392
393 ## producer
394
395 for my $num (1 .. 40000) {
396 # Await worker notification before inserting (blocking).
397 my $consumer_pid = $queue->recv2;
398 $queue->enqueue($num, $num * 2);
399 }
400
401 $queue->end;
402
403 my %results;
404 my $total = 0;
405
406 for my $child ( MCE::Child->list ) {
407 my ($id, $count) = $child->join;
408 $results{$id} = $count;
409 $total += $count;
410 }
411
412 print $results{$_}, "\n" for keys %results;
413 print "$total total\n\n";
414
415 Example 4 - Many producers
416 Running with 2 or more producers requires setting the "mp" option.
417 Internally, this enables locking support for the left end of the
418 channel. The "mp" option applies to "Mutex" and "Threads" channel
419 implementations only.
420
421 Here, using the MCE facility for gathering the final count.
422
423 use strict;
424 use warnings;
425
426 use MCE::Flow;
427 use MCE::Channel;
428
429 my $queue = MCE::Channel->new( impl => 'Mutex', mp => 1 );
430 my $num_consumers = 10;
431
432 sub consumer {
433 # receive items
434 my $count = 0;
435 while ( my ( $item1, $item2 ) = $queue->dequeue(2) ) {
436 $count += 2;
437 }
438 # send result
439 MCE->gather( MCE->wid => $count );
440 }
441
442 sub producer {
443 $queue->enqueue($_, $_ * 2) for 1 .. 20000;
444 }
445
446 ## run 2 producers and many consumers
447
448 MCE::Flow->init(
449 max_workers => [ 2, $num_consumers ],
450 task_name => [ 'producer', 'consumer' ],
451 task_end => sub {
452 my ($mce, $task_id, $task_name) = @_;
453 if ( $task_name eq 'producer' ) {
454 $queue->end;
455 }
456 }
457 );
458
459 # consumers call gather above (i.e. send a key-value pair),
460 # have MCE append to a hash
461
462 my %results = mce_flow \&producer, \&consumer;
463
464 MCE::Flow->finish;
465
466 my $total = 0;
467
468 for ( keys %results ) {
469 $total += $results{$_};
470 print $results{$_}, "\n";
471 }
472
473 print "$total total\n\n";
474
475 Example 5 - Many channels
476 This demonstration configures a channel per consumer. Plus, a common
477 channel for consumers to request the next input item. The "Simple"
478 implementation is specified for the individual channels whereas locking
479 may be necessary for the $ready channel. However, consumers do not
480 incur reading and what is written is very small (i.e. atomic write is
481 guaranteed by the OS). Thus, am safely choosing the "Simple"
482 implementation versus "Mutex".
483
484 use strict;
485 use warnings;
486
487 use MCE::Flow;
488 use MCE::Channel;
489
490 my $prog_name = $0; $prog_name =~ s{^.*[\\/]}{}g;
491 my $input_size = shift || 3000;
492
493 unless ($input_size =~ /\A\d+\z/) {
494 print {*STDERR} "usage: $prog_name [ size ]\n";
495 exit 1;
496 }
497
498 my $consumers = 4;
499
500 my @chnls = map { MCE::Channel->new( impl => 'Simple' ) } 1 .. $consumers;
501
502 my $ready = MCE::Channel->new( impl => 'Simple' );
503
504 sub producer {
505 my $id = 0;
506
507 # send the next input item upon request
508 for ( 0 .. $input_size - 1 ) {
509 my $chnl_num = $ready->recv2;
510 $chnls[ $chnl_num ]->send( ++$id, $_ );
511 }
512
513 # signal no more work
514 $_->send( 0, undef ) for @chnls;
515 }
516
517 sub consumer {
518 my $chnl_num = MCE->task_wid - 1;
519
520 while () {
521 # notify the producer ready for input
522 $ready->send2( $chnl_num );
523
524 # retrieve input data
525 my ( $id, $item ) = $chnls[ $chnl_num ]->recv;
526
527 # leave loop if no more work
528 last unless $id;
529
530 # compute and send the result to the manager process
531 # ordered output requires an id (must be 1st argument)
532 MCE->gather( $id, [ $item, sqrt($item) ] );
533 }
534 }
535
536 # A custom 'ordered' output iterator for MCE's gather facility.
537 # It returns a closure block, expecting an ID for 1st argument.
538
539 sub output_iterator {
540 my %tmp; my $order_id = 1;
541
542 return sub {
543 my ( $id, $result ) = @_;
544 $tmp{ $id } = $result;
545
546 while () {
547 last unless exists $tmp{ $order_id };
548 $result = delete $tmp{ $order_id };
549 printf "n: %d sqrt(n): %f\n", $result->[0], $result->[1];
550 $order_id++;
551 }
552 };
553 }
554
555 # Run one producer and many consumers.
556 # Output to be sent orderly to STDOUT.
557
558 MCE::Flow->init(
559 gather => output_iterator(),
560 max_workers => [ 1, $consumers ],
561 );
562
563 MCE::Flow->run( \&producer, \&consumer );
564 MCE::Flow->finish;
565
566 __END__
567
568 # Output
569
570 n: 0 sqrt(n): 0.000000
571 n: 1 sqrt(n): 1.000000
572 n: 2 sqrt(n): 1.414214
573 n: 3 sqrt(n): 1.732051
574 n: 4 sqrt(n): 2.000000
575 n: 5 sqrt(n): 2.236068
576 n: 6 sqrt(n): 2.449490
577 n: 7 sqrt(n): 2.645751
578 n: 8 sqrt(n): 2.828427
579 n: 9 sqrt(n): 3.000000
580 ...
581
583 • <https://github.com/marioroy/mce-examples/tree/master/chameneos>
584
585 • threads::lite
586
588 Mario E. Roy, <marioeroy AT gmail DOT com>
589
591 Copyright (C) 2019-2023 by Mario E. Roy
592
593 MCE::Channel is released under the same license as Perl.
594
595 See <https://dev.perl.org/licenses/> for more information.
596
597
598
599perl v5.36.0 2023-01-20 MCE::Channel(3)