1MCE::Stream(3)        User Contributed Perl Documentation       MCE::Stream(3)
2
3
4

NAME

6       MCE::Stream - Parallel stream model for chaining multiple maps and
7       greps
8

VERSION

10       This document describes MCE::Stream version 1.889
11

SYNOPSIS

13        ## Exports mce_stream, mce_stream_f, mce_stream_s
14        use MCE::Stream;
15
16        my (@m1, @m2, @m3);
17
18        ## Default mode is map and processed from right-to-left
19        @m1 = mce_stream sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
20        mce_stream \@m2, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
21
22        ## Native Perl
23        @m3 = map { $_ * $_ } grep { $_ % 5 == 0 } 1..10000;
24
25        ## Streaming grep and map in parallel
26        mce_stream \@m3,
27           { mode => 'map',  code => sub { $_ * $_ } },
28           { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
29
30        ## Array or array_ref
31        my @a = mce_stream sub { $_ * $_ }, 1..10000;
32        my @b = mce_stream sub { $_ * $_ }, \@list;
33
34        ## Important; pass an array_ref for deeply input data
35        my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
36        my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
37
38        ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
39        ## Workers read directly and not involve the manager process
40        my @e = mce_stream_f sub { chomp; $_ }, "/path/to/file"; # efficient
41
42        ## Involves the manager process, therefore slower
43        my @f = mce_stream_f sub { chomp; $_ }, $file_handle;
44        my @g = mce_stream_f sub { chomp; $_ }, $io;
45        my @h = mce_stream_f sub { chomp; $_ }, \$scalar;
46
47        ## Sequence of numbers (begin, end [, step, format])
48        my @i = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
49        my @j = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
50
51        my @k = mce_stream_s sub { $_ * $_ }, {
52           begin => 1, end => 10000, step => 5, format => undef
53        };
54

DESCRIPTION

56       This module allows one to stream multiple map and/or grep operations in
57       parallel. Code blocks run simultaneously from right-to-left. The
58       results are appended immediately when providing a reference to an
59       array.
60
61        ## Appends are serialized, even out-of-order ok, but immediately.
62        ## Out-of-order chunks are held temporarily until ordered chunks
63        ## arrive.
64
65        mce_stream \@a, sub { $_ }, sub { $_ }, sub { $_ }, 1..10000;
66
67        ##                                                    input
68        ##                                        chunk1      input
69        ##                            chunk3      chunk2      input
70        ##                chunk2      chunk2      chunk3      input
71        ##   append1      chunk3      chunk1      chunk4      input
72        ##   append2      chunk1      chunk5      chunk5      input
73        ##   append3      chunk5      chunk4      chunk6      ...
74        ##   append4      chunk4      chunk6      ...
75        ##   append5      chunk6      ...
76        ##   append6      ...
77        ##   ...
78        ##
79
80       MCE incurs a small overhead due to passing of data. A fast code block
81       will run faster natively when chaining multiple map functions. However,
82       the overhead will likely diminish as the complexity increases for the
83       code.
84
85        ## 0.334 secs -- baseline using the native map function
86        my @m1 = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..1000000;
87
88        ## 0.427 secs -- this is quite amazing considering data passing
89        my @m2 = mce_stream
90              sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
91
92        ## 0.355 secs -- appends to @m3 immediately, not after running
93        my @m3; mce_stream \@m3,
94              sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
95
96       Even faster is mce_stream_s; useful when input data is a range of
97       numbers.  Workers generate sequences mathematically among themselves
98       without any interaction from the manager process. Two arguments are
99       required for mce_stream_s (begin, end). Step defaults to 1 if begin is
100       smaller than end, otherwise -1.
101
102        ## 0.278 secs -- numbers are generated mathematically via sequence
103        my @m4; mce_stream_s \@m4,
104              sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1, 1000000;
105

OVERRIDING DEFAULTS

107       The following list options which may be overridden when loading the
108       module.  The fast option is obsolete in 1.867 onwards; ignored if
109       specified.
110
111        use Sereal qw( encode_sereal decode_sereal );
112        use CBOR::XS qw( encode_cbor decode_cbor );
113        use JSON::XS qw( encode_json decode_json );
114
115        use MCE::Stream
116            max_workers => 8,                # Default 'auto'
117            chunk_size => 500,               # Default 'auto'
118            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
119            freeze => \&encode_sereal,       # \&Storable::freeze
120            thaw => \&decode_sereal,         # \&Storable::thaw
121            init_relay => 0,                 # Default undef; MCE 1.882+
122            use_threads => 0,                # Default undef; MCE 1.882+
123            default_mode => 'grep',          # Default 'map'
124        ;
125
126       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
127       available.  Specify "Sereal => 0" to use Storable instead.
128
129        use MCE::Stream Sereal => 0;
130

CUSTOMIZING MCE

132       MCE::Stream->init ( options )
133       MCE::Stream::init { options }
134
135       The init function accepts a hash of MCE options. The gather and
136       bounds_only options, if specified, are ignored due to being used
137       internally by the module (not shown below).
138
139        use MCE::Stream;
140
141        MCE::Stream->init(
142           chunk_size => 1, max_workers => 4,
143
144           user_begin => sub {
145              print "## ", MCE->wid, " started\n";
146           },
147
148           user_end => sub {
149              print "## ", MCE->wid, " completed\n";
150           }
151        );
152
153        my @a = mce_stream sub { $_ * $_ }, 1..100;
154
155        print "\n", "@a", "\n";
156
157        -- Output
158
159        ## 1 started
160        ## 2 started
161        ## 3 started
162        ## 4 started
163        ## 3 completed
164        ## 1 completed
165        ## 2 completed
166        ## 4 completed
167
168        1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
169        400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
170        1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
171        2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
172        3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
173        5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
174        7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
175        10000
176
177       Like with MCE::Stream->init above, MCE options may be specified using
178       an anonymous hash for the first argument. Notice how both max_workers
179       and task_name can take an anonymous array for setting values uniquely
180       per each code block.
181
182       Remember that MCE::Stream processes from right-to-left when setting the
183       individual values.
184
185        use MCE::Stream;
186
187        my @a = mce_stream {
188           task_name   => [ 'c', 'b', 'a' ],
189           max_workers => [  2,   4,   3, ],
190
191           user_end => sub {
192              my ($mce, $task_id, $task_name) = @_;
193              print "$task_id - $task_name completed\n";
194           },
195
196           task_end => sub {
197              my ($mce, $task_id, $task_name) = @_;
198              MCE->print("$task_id - $task_name ended\n");
199           }
200        },
201        sub { $_ * 4 },             ## 2 workers, named c
202        sub { $_ * 3 },             ## 4 workers, named b
203        sub { $_ * 2 }, 1..10000;   ## 3 workers, named a
204
205        -- Output
206
207        0 - a completed
208        0 - a completed
209        0 - a completed
210        0 - a ended
211        1 - b completed
212        1 - b completed
213        1 - b completed
214        1 - b completed
215        1 - b ended
216        2 - c completed
217        2 - c completed
218        2 - c ended
219
220       Note that the anonymous hash, for specifying options, also comes first
221       when passing an array reference.
222
223        my @a; mce_stream {
224           ...
225        }, \@a, sub { ... }, sub { ... }, 1..10000;
226

API DOCUMENTATION

228       Scripts using MCE::Stream can be written using the long or short form.
229       The long form becomes relevant when mixing modes. Again, processing
230       occurs from right-to-left.
231
232        my @m3 = mce_stream
233           { mode => 'map',  code => sub { $_ * $_ } },
234           { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
235
236        my @m4; mce_stream \@m4,
237           { mode => 'map',  code => sub { $_ * $_ } },
238           { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
239
240       For multiple grep blocks, the short form can be used. Simply specify
241       the default mode for the module. The two valid values for default_mode
242       is 'grep' and 'map'.
243
244        use MCE::Stream default_mode => 'grep';
245
246        my @f = mce_stream_f sub { /ending$/ }, sub { /^starting/ }, $file;
247
248       The following assumes 'map' for default_mode in order to demonstrate
249       all the possibilities for providing input data.
250
251       MCE::Stream->run ( sub { code }, list )
252       mce_stream sub { code }, list
253
254       Input data may be defined using a list or an array reference. Unlike
255       MCE::Loop, Flow, and Step, specifying a hash reference as input data
256       isn't allowed.
257
258        ## Array or array_ref
259        my @a = mce_stream sub { $_ * 2 }, 1..1000;
260        my @b = mce_stream sub { $_ * 2 }, \@list;
261
262        ## Important; pass an array_ref for deeply input data
263        my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
264        my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
265
266        ## Not supported
267        my @z = mce_stream sub { ... }, \%hash;
268
269       MCE::Stream->run_file ( sub { code }, file )
270       mce_stream_f sub { code }, file
271
272       The fastest of these is the /path/to/file. Workers communicate the next
273       offset position among themselves with zero interaction by the manager
274       process.
275
276       "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
277
278        my @c = mce_stream_f sub { chomp; $_ . "\r\n" }, "/path/to/file";  # faster
279        my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle;
280        my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, $io;              # IO::All
281        my @f = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar;
282
283       MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
284       mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]
285
286       Sequence may be defined as a list, an array reference, or a hash
287       reference.  The functions require both begin and end values to run.
288       Step and format are optional. The format is passed to sprintf (% may be
289       omitted below).
290
291        my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
292
293        my @f = mce_stream_s sub { $_ }, $beg, $end, $step, $fmt;
294        my @g = mce_stream_s sub { $_ }, [ $beg, $end, $step, $fmt ];
295
296        my @h = mce_stream_s sub { $_ }, {
297           begin => $beg, end => $end, step => $step, format => $fmt
298        };
299
300       MCE::Stream->run ( { input_data => iterator }, sub { code } )
301       mce_stream { input_data => iterator }, sub { code }
302
303       An iterator reference may be specified for input_data. The only other
304       way is to specify input_data via MCE::Stream->init. This prevents
305       MCE::Stream from configuring the iterator reference as another user
306       task which will not work.
307
308       Iterators are described under section "SYNTAX for INPUT_DATA" at
309       MCE::Core.
310
311        MCE::Stream->init(
312           input_data => iterator
313        );
314
315        my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 };
316

MANUAL SHUTDOWN

318       MCE::Stream->finish
319       MCE::Stream::finish
320
321       Workers remain persistent as much as possible after running. Shutdown
322       occurs automatically when the script terminates. Call finish when
323       workers are no longer needed.
324
325        use MCE::Stream;
326
327        MCE::Stream->init(
328           chunk_size => 20, max_workers => 'auto'
329        );
330
331        my @a = mce_stream { ... } 1..100;
332
333        MCE::Stream->finish;
334

INDEX

336       MCE, MCE::Core
337

AUTHOR

339       Mario E. Roy, <marioeroy AT gmail DOT com>
340
341
342
343perl v5.38.0                      2023-09-14                    MCE::Stream(3)
Impressum