1MCE::Stream(3) User Contributed Perl Documentation MCE::Stream(3)
2
3
4
6 MCE::Stream - Parallel stream model for chaining multiple maps and
7 greps
8
10 This document describes MCE::Stream version 1.837
11
13 ## Exports mce_stream, mce_stream_f, mce_stream_s
14 use MCE::Stream;
15
16 my (@m1, @m2, @m3);
17
18 ## Default mode is map and processed from right-to-left
19 @m1 = mce_stream sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
20 mce_stream \@m2, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
21
22 ## Native Perl
23 @m3 = map { $_ * $_ } grep { $_ % 5 == 0 } 1..10000;
24
25 ## Streaming grep and map in parallel
26 mce_stream \@m3,
27 { mode => 'map', code => sub { $_ * $_ } },
28 { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
29
30 ## Array or array_ref
31 my @a = mce_stream sub { $_ * $_ }, 1..10000;
32 my @b = mce_stream sub { $_ * $_ }, [ 1..10000 ];
33
34 ## File_path, glob_ref, or scalar_ref
35 my @c = mce_stream_f sub { chomp; $_ }, "/path/to/file";
36 my @d = mce_stream_f sub { chomp; $_ }, $file_handle;
37 my @e = mce_stream_f sub { chomp; $_ }, \$scalar;
38
39 ## Sequence of numbers (begin, end [, step, format])
40 my @f = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
41 my @g = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
42
43 my @h = mce_stream_s sub { $_ * $_ }, {
44 begin => 1, end => 10000, step => 5, format => undef
45 };
46
48 This module allows one to stream multiple map and/or grep operations in
49 parallel. Code blocks run simultaneously from right-to-left. The
50 results are appended immediately when providing a reference to an
51 array.
52
53 ## Appends are serialized, even out-of-order ok, but immediately.
54 ## Out-of-order chunks are held temporarily until ordered chunks
55 ## arrive.
56
57 mce_stream \@a, sub { $_ }, sub { $_ }, sub { $_ }, 1..10000;
58
59 ## input
60 ## chunk1 input
61 ## chunk3 chunk2 input
62 ## chunk2 chunk2 chunk3 input
63 ## append1 chunk3 chunk1 chunk4 input
64 ## append2 chunk1 chunk5 chunk5 input
65 ## append3 chunk5 chunk4 chunk6 ...
66 ## append4 chunk4 chunk6 ...
67 ## append5 chunk6 ...
68 ## append6 ...
69 ## ...
70 ##
71
72 MCE incurs a small overhead due to passing of data. A fast code block
73 will run faster natively when chaining multiple map functions. However,
74 the overhead will likely diminish as the complexity increases for the
75 code.
76
77 ## 0.334 secs -- baseline using the native map function
78 my @m1 = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..1000000;
79
80 ## 0.427 secs -- this is quite amazing considering data passing
81 my @m2 = mce_stream
82 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
83
84 ## 0.355 secs -- appends to @m3 immediately, not after running
85 my @m3; mce_stream \@m3,
86 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
87
88 Even faster is mce_stream_s; useful when input data is a range of
89 numbers. Workers generate sequences mathematically among themselves
90 without any interaction from the manager process. Two arguments are
91 required for mce_stream_s (begin, end). Step defaults to 1 if begin is
92 smaller than end, otherwise -1.
93
94 ## 0.278 secs -- numbers are generated mathematically via sequence
95 my @m4; mce_stream_s \@m4,
96 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1, 1000000;
97
99 The following list options which may be overridden when loading the
100 module.
101
102 use Sereal qw( encode_sereal decode_sereal );
103 use CBOR::XS qw( encode_cbor decode_cbor );
104 use JSON::XS qw( encode_json decode_json );
105
106 use MCE::Stream
107 max_workers => 8, # Default 'auto'
108 chunk_size => 500, # Default 'auto'
109 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
110 freeze => \&encode_sereal, # \&Storable::freeze
111 thaw => \&decode_sereal, # \&Storable::thaw
112 default_mode => 'grep', # Default 'map'
113 fast => 1 # Default 0 (fast dequeue)
114 ;
115
116 From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
117 available. Specify "Sereal =" 0> to use Storable instead.
118
119 use MCE::Stream Sereal => 0;
120
122 MCE::Stream->init ( options )
123 MCE::Stream::init { options }
124 The init function accepts a hash of MCE options. The gather and
125 bounds_only options, if specified, are ignored due to being used
126 internally by the module (not shown below).
127
128 use MCE::Stream;
129
130 MCE::Stream::init {
131 chunk_size => 1, max_workers => 4,
132
133 user_begin => sub {
134 print "## ", MCE->wid, " started\n";
135 },
136
137 user_end => sub {
138 print "## ", MCE->wid, " completed\n";
139 }
140 };
141
142 my @a = mce_stream sub { $_ * $_ }, 1..100;
143
144 print "\n", "@a", "\n";
145
146 -- Output
147
148 ## 1 started
149 ## 2 started
150 ## 3 started
151 ## 4 started
152 ## 3 completed
153 ## 1 completed
154 ## 2 completed
155 ## 4 completed
156
157 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
158 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
159 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
160 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
161 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
162 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
163 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
164 10000
165
166 Like with MCE::Stream::init above, MCE options may be specified using
167 an anonymous hash for the first argument. Notice how both max_workers
168 and task_name can take an anonymous array for setting values uniquely
169 per each code block.
170
171 Remember that MCE::Stream processes from right-to-left when setting the
172 individual values.
173
174 use MCE::Stream;
175
176 my @a = mce_stream {
177 task_name => [ 'c', 'b', 'a' ],
178 max_workers => [ 2, 4, 3, ],
179
180 user_end => sub {
181 my ($mce, $task_id, $task_name) = @_;
182 print "$task_id - $task_name completed\n";
183 },
184
185 task_end => sub {
186 my ($mce, $task_id, $task_name) = @_;
187 MCE->print("$task_id - $task_name ended\n");
188 }
189 },
190 sub { $_ * 4 }, ## 2 workers, named c
191 sub { $_ * 3 }, ## 4 workers, named b
192 sub { $_ * 2 }, 1..10000; ## 3 workers, named a
193
194 -- Output
195
196 0 - a completed
197 0 - a completed
198 0 - a completed
199 0 - a ended
200 1 - b completed
201 1 - b completed
202 1 - b completed
203 1 - b completed
204 1 - b ended
205 2 - c completed
206 2 - c completed
207 2 - c ended
208
209 Note that the anonymous hash, for specifying options, also comes first
210 when passing an array reference.
211
212 my @a; mce_stream {
213 ...
214 }, \@a, sub { ... }, sub { ... }, 1..10000;
215
217 Scripts using MCE::Stream can be written using the long or short form.
218 The long form becomes relevant when mixing modes. Again, processing
219 occurs from right-to-left.
220
221 my @m3 = mce_stream
222 { mode => 'map', code => sub { $_ * $_ } },
223 { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
224
225 my @m4; mce_stream \@m4,
226 { mode => 'map', code => sub { $_ * $_ } },
227 { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
228
229 For multiple grep blocks, the short form can be used. Simply specify
230 the default mode for the module. The two valid values for default_mode
231 is 'grep' and 'map'.
232
233 use MCE::Stream default_mode => 'grep';
234
235 my @f = mce_stream_f sub { /ending$/ }, sub { /^starting/ }, $file;
236
237 The following assumes 'map' for default_mode in order to demonstrate
238 all the possibilities for providing input data.
239
240 MCE::Stream->run ( sub { code }, list )
241 mce_stream sub { code }, list
242 Input data may be defined using a list or an array reference. Unlike
243 MCE::Loop, Flow, and Step, specifying a hash reference as input data
244 isn't allowed.
245
246 my @a = mce_stream sub { $_ * 2 }, 1..1000;
247 my @b = mce_stream sub { $_ * 2 }, \@list;
248
249 my @z = mce_stream sub { $_ * 2 }, \%hash; # not supported
250
251 MCE::Stream->run_file ( sub { code }, file )
252 mce_stream_f sub { code }, file
253 The fastest of these is the /path/to/file. Workers communicate the
254 next offset position among themselves with zero interaction by the
255 manager process.
256
257 my @c = mce_stream_f sub { chomp; $_ . "\r\n" }, "/path/to/file"; # faster
258 my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle;
259 my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar;
260
261 MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
262 mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]
263 Sequence may be defined as a list, an array reference, or a hash
264 reference. The functions require both begin and end values to run.
265 Step and format are optional. The format is passed to sprintf (% may
266 be omitted below).
267
268 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
269
270 my @f = mce_stream_s sub { $_ }, $beg, $end, $step, $fmt;
271 my @g = mce_stream_s sub { $_ }, [ $beg, $end, $step, $fmt ];
272
273 my @h = mce_stream_s sub { $_ }, {
274 begin => $beg, end => $end, step => $step, format => $fmt
275 };
276
277 MCE::Stream->run ( { input_data => iterator }, sub { code } )
278 mce_stream { input_data => iterator }, sub { code }
279 An iterator reference may be specified for input_data. The only
280 other way is to specify input_data via MCE::Stream::init. This
281 prevents MCE::Stream from configuring the iterator reference as
282 another user task which will not work.
283
284 Iterators are described under section "SYNTAX for INPUT_DATA" at
285 MCE::Core.
286
287 MCE::Stream::init {
288 input_data => iterator
289 };
290
291 my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 };
292
294 MCE::Stream->finish
295 MCE::Stream::finish
296 Workers remain persistent as much as possible after running.
297 Shutdown occurs automatically when the script terminates. Call
298 finish when workers are no longer needed.
299
300 use MCE::Stream;
301
302 MCE::Stream::init {
303 chunk_size => 20, max_workers => 'auto'
304 };
305
306 my @a = mce_stream { ... } 1..100;
307
308 MCE::Stream::finish;
309
311 MCE, MCE::Core
312
314 Mario E. Roy, <marioeroy AT gmail DOT com>
315
316
317
318perl v5.28.0 2018-08-25 MCE::Stream(3)