1MCE::Stream(3)        User Contributed Perl Documentation       MCE::Stream(3)
2
3
4

NAME

6       MCE::Stream - Parallel stream model for chaining multiple maps and
7       greps
8

VERSION

10       This document describes MCE::Stream version 1.838
11

SYNOPSIS

13        ## Exports mce_stream, mce_stream_f, mce_stream_s
14        use MCE::Stream;
15
16        my (@m1, @m2, @m3);
17
18        ## Default mode is map and processed from right-to-left
19        @m1 = mce_stream sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
20        mce_stream \@m2, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
21
22        ## Native Perl
23        @m3 = map { $_ * $_ } grep { $_ % 5 == 0 } 1..10000;
24
25        ## Streaming grep and map in parallel
26        mce_stream \@m3,
27           { mode => 'map',  code => sub { $_ * $_ } },
28           { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
29
30        ## Array or array_ref
31        my @a = mce_stream sub { $_ * $_ }, 1..10000;
32        my @b = mce_stream sub { $_ * $_ }, [ 1..10000 ];
33
34        ## File_path, glob_ref, or scalar_ref
35        my @c = mce_stream_f sub { chomp; $_ }, "/path/to/file";
36        my @d = mce_stream_f sub { chomp; $_ }, $file_handle;
37        my @e = mce_stream_f sub { chomp; $_ }, \$scalar;
38
39        ## Sequence of numbers (begin, end [, step, format])
40        my @f = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
41        my @g = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
42
43        my @h = mce_stream_s sub { $_ * $_ }, {
44           begin => 1, end => 10000, step => 5, format => undef
45        };
46

DESCRIPTION

48       This module allows one to stream multiple map and/or grep operations in
49       parallel. Code blocks run simultaneously from right-to-left. The
50       results are appended immediately when providing a reference to an
51       array.
52
53        ## Appends are serialized, even out-of-order ok, but immediately.
54        ## Out-of-order chunks are held temporarily until ordered chunks
55        ## arrive.
56
57        mce_stream \@a, sub { $_ }, sub { $_ }, sub { $_ }, 1..10000;
58
59        ##                                                    input
60        ##                                        chunk1      input
61        ##                            chunk3      chunk2      input
62        ##                chunk2      chunk2      chunk3      input
63        ##   append1      chunk3      chunk1      chunk4      input
64        ##   append2      chunk1      chunk5      chunk5      input
65        ##   append3      chunk5      chunk4      chunk6      ...
66        ##   append4      chunk4      chunk6      ...
67        ##   append5      chunk6      ...
68        ##   append6      ...
69        ##   ...
70        ##
71
72       MCE incurs a small overhead due to passing of data. A fast code block
73       will run faster natively when chaining multiple map functions. However,
74       the overhead will likely diminish as the complexity increases for the
75       code.
76
77        ## 0.334 secs -- baseline using the native map function
78        my @m1 = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..1000000;
79
80        ## 0.427 secs -- this is quite amazing considering data passing
81        my @m2 = mce_stream
82              sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
83
84        ## 0.355 secs -- appends to @m3 immediately, not after running
85        my @m3; mce_stream \@m3,
86              sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
87
88       Even faster is mce_stream_s; useful when input data is a range of
89       numbers.  Workers generate sequences mathematically among themselves
90       without any interaction from the manager process. Two arguments are
91       required for mce_stream_s (begin, end). Step defaults to 1 if begin is
92       smaller than end, otherwise -1.
93
94        ## 0.278 secs -- numbers are generated mathematically via sequence
95        my @m4; mce_stream_s \@m4,
96              sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1, 1000000;
97

OVERRIDING DEFAULTS

99       The following list options which may be overridden when loading the
100       module.
101
102        use Sereal qw( encode_sereal decode_sereal );
103        use CBOR::XS qw( encode_cbor decode_cbor );
104        use JSON::XS qw( encode_json decode_json );
105
106        use MCE::Stream
107            max_workers => 8,                # Default 'auto'
108            chunk_size => 500,               # Default 'auto'
109            tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
110            freeze => \&encode_sereal,       # \&Storable::freeze
111            thaw => \&decode_sereal,         # \&Storable::thaw
112            default_mode => 'grep',          # Default 'map'
113            fast => 1                        # Default 0 (fast dequeue)
114        ;
115
116       From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if
117       available.  Specify "Sereal =" 0> to use Storable instead.
118
119        use MCE::Stream Sereal => 0;
120

CUSTOMIZING MCE

122       MCE::Stream->init ( options )
123       MCE::Stream::init { options }
124          The init function accepts a hash of MCE options. The gather and
125          bounds_only options, if specified, are ignored due to being used
126          internally by the module (not shown below).
127
128           use MCE::Stream;
129
130           MCE::Stream::init {
131              chunk_size => 1, max_workers => 4,
132
133              user_begin => sub {
134                 print "## ", MCE->wid, " started\n";
135              },
136
137              user_end => sub {
138                 print "## ", MCE->wid, " completed\n";
139              }
140           };
141
142           my @a = mce_stream sub { $_ * $_ }, 1..100;
143
144           print "\n", "@a", "\n";
145
146           -- Output
147
148           ## 1 started
149           ## 2 started
150           ## 3 started
151           ## 4 started
152           ## 3 completed
153           ## 1 completed
154           ## 2 completed
155           ## 4 completed
156
157           1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
158           400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
159           1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
160           2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
161           3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
162           5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
163           7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
164           10000
165
166       Like with MCE::Stream::init above, MCE options may be specified using
167       an anonymous hash for the first argument. Notice how both max_workers
168       and task_name can take an anonymous array for setting values uniquely
169       per each code block.
170
171       Remember that MCE::Stream processes from right-to-left when setting the
172       individual values.
173
174        use MCE::Stream;
175
176        my @a = mce_stream {
177           task_name   => [ 'c', 'b', 'a' ],
178           max_workers => [  2,   4,   3, ],
179
180           user_end => sub {
181              my ($mce, $task_id, $task_name) = @_;
182              print "$task_id - $task_name completed\n";
183           },
184
185           task_end => sub {
186              my ($mce, $task_id, $task_name) = @_;
187              MCE->print("$task_id - $task_name ended\n");
188           }
189        },
190        sub { $_ * 4 },             ## 2 workers, named c
191        sub { $_ * 3 },             ## 4 workers, named b
192        sub { $_ * 2 }, 1..10000;   ## 3 workers, named a
193
194        -- Output
195
196        0 - a completed
197        0 - a completed
198        0 - a completed
199        0 - a ended
200        1 - b completed
201        1 - b completed
202        1 - b completed
203        1 - b completed
204        1 - b ended
205        2 - c completed
206        2 - c completed
207        2 - c ended
208
209       Note that the anonymous hash, for specifying options, also comes first
210       when passing an array reference.
211
212        my @a; mce_stream {
213           ...
214        }, \@a, sub { ... }, sub { ... }, 1..10000;
215

API DOCUMENTATION

217       Scripts using MCE::Stream can be written using the long or short form.
218       The long form becomes relevant when mixing modes. Again, processing
219       occurs from right-to-left.
220
221        my @m3 = mce_stream
222           { mode => 'map',  code => sub { $_ * $_ } },
223           { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
224
225        my @m4; mce_stream \@m4,
226           { mode => 'map',  code => sub { $_ * $_ } },
227           { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
228
229       For multiple grep blocks, the short form can be used. Simply specify
230       the default mode for the module. The two valid values for default_mode
231       is 'grep' and 'map'.
232
233        use MCE::Stream default_mode => 'grep';
234
235        my @f = mce_stream_f sub { /ending$/ }, sub { /^starting/ }, $file;
236
237       The following assumes 'map' for default_mode in order to demonstrate
238       all the possibilities for providing input data.
239
240       MCE::Stream->run ( sub { code }, list )
241       mce_stream sub { code }, list
242          Input data may be defined using a list or an array reference. Unlike
243          MCE::Loop, Flow, and Step, specifying a hash reference as input data
244          isn't allowed.
245
246           my @a = mce_stream sub { $_ * 2 }, 1..1000;
247           my @b = mce_stream sub { $_ * 2 }, \@list;
248
249           my @z = mce_stream sub { $_ * 2 }, \%hash;  # not supported
250
251       MCE::Stream->run_file ( sub { code }, file )
252       mce_stream_f sub { code }, file
253          The fastest of these is the /path/to/file. Workers communicate the
254          next offset position among themselves with zero interaction by the
255          manager process.
256
257           my @c = mce_stream_f sub { chomp; $_ . "\r\n" }, "/path/to/file";  # faster
258           my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle;
259           my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar;
260
261       MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
262       mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]
263          Sequence may be defined as a list, an array reference, or a hash
264          reference.  The functions require both begin and end values to run.
265          Step and format are optional. The format is passed to sprintf (% may
266          be omitted below).
267
268           my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
269
270           my @f = mce_stream_s sub { $_ }, $beg, $end, $step, $fmt;
271           my @g = mce_stream_s sub { $_ }, [ $beg, $end, $step, $fmt ];
272
273           my @h = mce_stream_s sub { $_ }, {
274              begin => $beg, end => $end, step => $step, format => $fmt
275           };
276
277       MCE::Stream->run ( { input_data => iterator }, sub { code } )
278       mce_stream { input_data => iterator }, sub { code }
279          An iterator reference may be specified for input_data. The only
280          other way is to specify input_data via MCE::Stream::init. This
281          prevents MCE::Stream from configuring the iterator reference as
282          another user task which will not work.
283
284          Iterators are described under section "SYNTAX for INPUT_DATA" at
285          MCE::Core.
286
287           MCE::Stream::init {
288              input_data => iterator
289           };
290
291           my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 };
292

MANUAL SHUTDOWN

294       MCE::Stream->finish
295       MCE::Stream::finish
296          Workers remain persistent as much as possible after running.
297          Shutdown occurs automatically when the script terminates. Call
298          finish when workers are no longer needed.
299
300           use MCE::Stream;
301
302           MCE::Stream::init {
303              chunk_size => 20, max_workers => 'auto'
304           };
305
306           my @a = mce_stream { ... } 1..100;
307
308           MCE::Stream::finish;
309

INDEX

311       MCE, MCE::Core
312

AUTHOR

314       Mario E. Roy, <marioeroy AT gmail DOT com>
315
316
317
318perl v5.28.1                      2019-01-23                    MCE::Stream(3)
Impressum