1MCE::Stream(3) User Contributed Perl Documentation MCE::Stream(3)
2
3
4
6 MCE::Stream - Parallel stream model for chaining multiple maps and
7 greps
8
10 This document describes MCE::Stream version 1.879
11
13 ## Exports mce_stream, mce_stream_f, mce_stream_s
14 use MCE::Stream;
15
16 my (@m1, @m2, @m3);
17
18 ## Default mode is map and processed from right-to-left
19 @m1 = mce_stream sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
20 mce_stream \@m2, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
21
22 ## Native Perl
23 @m3 = map { $_ * $_ } grep { $_ % 5 == 0 } 1..10000;
24
25 ## Streaming grep and map in parallel
26 mce_stream \@m3,
27 { mode => 'map', code => sub { $_ * $_ } },
28 { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
29
30 ## Array or array_ref
31 my @a = mce_stream sub { $_ * $_ }, 1..10000;
32 my @b = mce_stream sub { $_ * $_ }, \@list;
33
34 ## Important; pass an array_ref for deeply input data
35 my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
36 my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
37
38 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
39 ## Workers read directly and not involve the manager process
40 my @e = mce_stream_f sub { chomp; $_ }, "/path/to/file"; # efficient
41
42 ## Involves the manager process, therefore slower
43 my @f = mce_stream_f sub { chomp; $_ }, $file_handle;
44 my @g = mce_stream_f sub { chomp; $_ }, $io;
45 my @h = mce_stream_f sub { chomp; $_ }, \$scalar;
46
47 ## Sequence of numbers (begin, end [, step, format])
48 my @i = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
49 my @j = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
50
51 my @k = mce_stream_s sub { $_ * $_ }, {
52 begin => 1, end => 10000, step => 5, format => undef
53 };
54
56 This module allows one to stream multiple map and/or grep operations in
57 parallel. Code blocks run simultaneously from right-to-left. The
58 results are appended immediately when providing a reference to an
59 array.
60
61 ## Appends are serialized, even out-of-order ok, but immediately.
62 ## Out-of-order chunks are held temporarily until ordered chunks
63 ## arrive.
64
65 mce_stream \@a, sub { $_ }, sub { $_ }, sub { $_ }, 1..10000;
66
67 ## input
68 ## chunk1 input
69 ## chunk3 chunk2 input
70 ## chunk2 chunk2 chunk3 input
71 ## append1 chunk3 chunk1 chunk4 input
72 ## append2 chunk1 chunk5 chunk5 input
73 ## append3 chunk5 chunk4 chunk6 ...
74 ## append4 chunk4 chunk6 ...
75 ## append5 chunk6 ...
76 ## append6 ...
77 ## ...
78 ##
79
80 MCE incurs a small overhead due to passing of data. A fast code block
81 will run faster natively when chaining multiple map functions. However,
82 the overhead will likely diminish as the complexity increases for the
83 code.
84
85 ## 0.334 secs -- baseline using the native map function
86 my @m1 = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..1000000;
87
88 ## 0.427 secs -- this is quite amazing considering data passing
89 my @m2 = mce_stream
90 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
91
92 ## 0.355 secs -- appends to @m3 immediately, not after running
93 my @m3; mce_stream \@m3,
94 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
95
96 Even faster is mce_stream_s; useful when input data is a range of
97 numbers. Workers generate sequences mathematically among themselves
98 without any interaction from the manager process. Two arguments are
99 required for mce_stream_s (begin, end). Step defaults to 1 if begin is
100 smaller than end, otherwise -1.
101
102 ## 0.278 secs -- numbers are generated mathematically via sequence
103 my @m4; mce_stream_s \@m4,
104 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1, 1000000;
105
107 The following list options which may be overridden when loading the
108 module. The fast option is obsolete in 1.867 onwards; ignored if
109 specified.
110
111 use Sereal qw( encode_sereal decode_sereal );
112 use CBOR::XS qw( encode_cbor decode_cbor );
113 use JSON::XS qw( encode_json decode_json );
114
115 use MCE::Stream
116 max_workers => 8, # Default 'auto'
117 chunk_size => 500, # Default 'auto'
118 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
119 freeze => \&encode_sereal, # \&Storable::freeze
120 thaw => \&decode_sereal, # \&Storable::thaw
121 default_mode => 'grep', # Default 'map'
122 fast => 1 # Default 0 (fast dequeue)
123 ;
124
125 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
126 available. Specify "Sereal => 0" to use Storable instead.
127
128 use MCE::Stream Sereal => 0;
129
131 MCE::Stream->init ( options )
132 MCE::Stream::init { options }
133
134 The init function accepts a hash of MCE options. The gather and
135 bounds_only options, if specified, are ignored due to being used
136 internally by the module (not shown below).
137
138 use MCE::Stream;
139
140 MCE::Stream->init(
141 chunk_size => 1, max_workers => 4,
142
143 user_begin => sub {
144 print "## ", MCE->wid, " started\n";
145 },
146
147 user_end => sub {
148 print "## ", MCE->wid, " completed\n";
149 }
150 );
151
152 my @a = mce_stream sub { $_ * $_ }, 1..100;
153
154 print "\n", "@a", "\n";
155
156 -- Output
157
158 ## 1 started
159 ## 2 started
160 ## 3 started
161 ## 4 started
162 ## 3 completed
163 ## 1 completed
164 ## 2 completed
165 ## 4 completed
166
167 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
168 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
169 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
170 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
171 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
172 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
173 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
174 10000
175
176 Like with MCE::Stream->init above, MCE options may be specified using
177 an anonymous hash for the first argument. Notice how both max_workers
178 and task_name can take an anonymous array for setting values uniquely
179 per each code block.
180
181 Remember that MCE::Stream processes from right-to-left when setting the
182 individual values.
183
184 use MCE::Stream;
185
186 my @a = mce_stream {
187 task_name => [ 'c', 'b', 'a' ],
188 max_workers => [ 2, 4, 3, ],
189
190 user_end => sub {
191 my ($mce, $task_id, $task_name) = @_;
192 print "$task_id - $task_name completed\n";
193 },
194
195 task_end => sub {
196 my ($mce, $task_id, $task_name) = @_;
197 MCE->print("$task_id - $task_name ended\n");
198 }
199 },
200 sub { $_ * 4 }, ## 2 workers, named c
201 sub { $_ * 3 }, ## 4 workers, named b
202 sub { $_ * 2 }, 1..10000; ## 3 workers, named a
203
204 -- Output
205
206 0 - a completed
207 0 - a completed
208 0 - a completed
209 0 - a ended
210 1 - b completed
211 1 - b completed
212 1 - b completed
213 1 - b completed
214 1 - b ended
215 2 - c completed
216 2 - c completed
217 2 - c ended
218
219 Note that the anonymous hash, for specifying options, also comes first
220 when passing an array reference.
221
222 my @a; mce_stream {
223 ...
224 }, \@a, sub { ... }, sub { ... }, 1..10000;
225
227 Scripts using MCE::Stream can be written using the long or short form.
228 The long form becomes relevant when mixing modes. Again, processing
229 occurs from right-to-left.
230
231 my @m3 = mce_stream
232 { mode => 'map', code => sub { $_ * $_ } },
233 { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
234
235 my @m4; mce_stream \@m4,
236 { mode => 'map', code => sub { $_ * $_ } },
237 { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
238
239 For multiple grep blocks, the short form can be used. Simply specify
240 the default mode for the module. The two valid values for default_mode
241 is 'grep' and 'map'.
242
243 use MCE::Stream default_mode => 'grep';
244
245 my @f = mce_stream_f sub { /ending$/ }, sub { /^starting/ }, $file;
246
247 The following assumes 'map' for default_mode in order to demonstrate
248 all the possibilities for providing input data.
249
250 MCE::Stream->run ( sub { code }, list )
251 mce_stream sub { code }, list
252
253 Input data may be defined using a list or an array reference. Unlike
254 MCE::Loop, Flow, and Step, specifying a hash reference as input data
255 isn't allowed.
256
257 ## Array or array_ref
258 my @a = mce_stream sub { $_ * 2 }, 1..1000;
259 my @b = mce_stream sub { $_ * 2 }, \@list;
260
261 ## Important; pass an array_ref for deeply input data
262 my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
263 my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
264
265 ## Not supported
266 my @z = mce_stream sub { ... }, \%hash;
267
268 MCE::Stream->run_file ( sub { code }, file )
269 mce_stream_f sub { code }, file
270
271 The fastest of these is the /path/to/file. Workers communicate the next
272 offset position among themselves with zero interaction by the manager
273 process.
274
275 "IO::All" { File, Pipe, STDIO } is supported since MCE 1.845.
276
277 my @c = mce_stream_f sub { chomp; $_ . "\r\n" }, "/path/to/file"; # faster
278 my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle;
279 my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, $io; # IO::All
280 my @f = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar;
281
282 MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
283 mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]
284
285 Sequence may be defined as a list, an array reference, or a hash
286 reference. The functions require both begin and end values to run.
287 Step and format are optional. The format is passed to sprintf (% may be
288 omitted below).
289
290 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
291
292 my @f = mce_stream_s sub { $_ }, $beg, $end, $step, $fmt;
293 my @g = mce_stream_s sub { $_ }, [ $beg, $end, $step, $fmt ];
294
295 my @h = mce_stream_s sub { $_ }, {
296 begin => $beg, end => $end, step => $step, format => $fmt
297 };
298
299 MCE::Stream->run ( { input_data => iterator }, sub { code } )
300 mce_stream { input_data => iterator }, sub { code }
301
302 An iterator reference may be specified for input_data. The only other
303 way is to specify input_data via MCE::Stream->init. This prevents
304 MCE::Stream from configuring the iterator reference as another user
305 task which will not work.
306
307 Iterators are described under section "SYNTAX for INPUT_DATA" at
308 MCE::Core.
309
310 MCE::Stream->init(
311 input_data => iterator
312 );
313
314 my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 };
315
317 MCE::Stream->finish
318 MCE::Stream::finish
319
320 Workers remain persistent as much as possible after running. Shutdown
321 occurs automatically when the script terminates. Call finish when
322 workers are no longer needed.
323
324 use MCE::Stream;
325
326 MCE::Stream->init(
327 chunk_size => 20, max_workers => 'auto'
328 );
329
330 my @a = mce_stream { ... } 1..100;
331
332 MCE::Stream->finish;
333
335 MCE, MCE::Core
336
338 Mario E. Roy, <marioeroy AT gmail DOT com>
339
340
341
342perl v5.36.0 2022-07-22 MCE::Stream(3)