1MCE::Shared(3) User Contributed Perl Documentation MCE::Shared(3)
2
3
4
6 MCE::Shared - MCE extension for sharing data supporting threads and
7 processes
8
10 This document describes MCE::Shared version 1.839
11
13 # OO construction.
14
15 use MCE::Shared;
16
17 my $ar = MCE::Shared->array( @list );
18 my $ca = MCE::Shared->cache( max_keys => 500, max_age => 60 );
19 my $cv = MCE::Shared->condvar( 0 );
20 my $fh = MCE::Shared->handle( '>>', \*STDOUT ) or die "$!";
21 my $ha = MCE::Shared->hash( @pairs );
22 my $oh = MCE::Shared->ordhash( @pairs );
23 my $db = MCE::Shared->minidb();
24 my $qu = MCE::Shared->queue( await => 1, fast => 0 );
25 my $va = MCE::Shared->scalar( $value );
26 my $se = MCE::Shared->sequence( $begin, $end, $step, $fmt );
27 my $ob = MCE::Shared->share( $blessed_object );
28
29 # The Perl-like mce_open function is available since 1.002.
30
31 mce_open my $fh, ">>", "/foo/bar.log" or die "open error: $!";
32
33 # Tie construction. The module API option is available since 1.825.
34
35 use v5.10;
36 use MCE::Flow;
37 use MCE::Shared;
38
39 my %args = ( max_keys => 500, max_age => 60 );
40 my @pairs = ( foo => 'bar', woo => 'baz' );
41 my @list = ( 'a' .. 'z' );
42
43 tie my $va1, 'MCE::Shared', { module => 'MCE::Shared::Scalar' }, 'foo';
44 tie my @ar1, 'MCE::Shared', { module => 'MCE::Shared::Array' }, @list;
45 tie my %ca1, 'MCE::Shared', { module => 'MCE::Shared::Cache' }, %args;
46 tie my %ha1, 'MCE::Shared', { module => 'MCE::Shared::Hash' }, @pairs;
47 tie my %oh1, 'MCE::Shared', { module => 'MCE::Shared::Ordhash' }, @pairs;
48 tie my %oh2, 'MCE::Shared', { module => 'Hash::Ordered' }, @pairs;
49 tie my %oh3, 'MCE::Shared', { module => 'Tie::IxHash' }, @pairs;
50 tie my $cy1, 'MCE::Shared', { module => 'Tie::Cycle' }, [ 1 .. 8 ];
51 tie my $va2, 'MCE::Shared', { module => 'Tie::StdScalar' }, 'hello';
52 tie my @ar3, 'MCE::Shared', { module => 'Tie::StdArray' }, @list;
53 tie my %ha2, 'MCE::Shared', { module => 'Tie::StdHash' }, @pairs;
54 tie my %ha3, 'MCE::Shared', { module => 'Tie::ExtraHash' }, @pairs;
55
56 tie my @ary, 'MCE::Shared', qw( a list of values );
57 tie my %ca, 'MCE::Shared', { max_keys => 500, max_age => 60 };
58 tie my %ha, 'MCE::Shared', key1 => 'val1', key2 => 'val2';
59 tie my %oh, 'MCE::Shared', { ordered => 1 }, key1 => 'value';
60
61 tie my $cnt, 'MCE::Shared', 0;
62 tie my @foo, 'MCE::Shared';
63 tie my %bar, 'MCE::Shared';
64
65 my $mutex = MCE::Mutex->new;
66
67 mce_flow {
68 max_workers => 4
69 },
70 sub {
71 my ( $mce ) = @_;
72 my ( $pid, $wid ) = ( MCE->pid, MCE->wid );
73
74 # Locking is necessary when multiple workers update the same
75 # element. The reason is that it may involve 2 trips to the
76 # shared-manager process: fetch and store in this case.
77
78 $mutex->enter( sub { $cnt += 1 } );
79
80 # Otherwise, locking is optional for unique elements.
81
82 $foo[ $wid - 1 ] = $pid;
83 $bar{ $pid } = $wid;
84
85 return;
86 };
87
88 say "scalar : $cnt";
89 say " array : $_" for (@foo);
90 say " hash : $_ => $bar{$_}" for (sort keys %bar);
91
92 __END__
93
94 # Output
95
96 scalar : 4
97 array : 37847
98 array : 37848
99 array : 37849
100 array : 37850
101 hash : 37847 => 1
102 hash : 37848 => 2
103 hash : 37849 => 3
104 hash : 37850 => 4
105
107 This module provides data sharing capabilities for MCE supporting
108 threads and processes. MCE::Hobo provides threads-like parallelization
109 for running code asynchronously.
110
112 MCE::Shared enables extra functionality on systems with IO::FDPass
113 installed. Without it, MCE::Shared is unable to send file descriptors
114 to the shared-manager process. The use applies to Condvar, Queue, and
115 Handle (mce_open). IO::FDpass isn't used for anything else.
116
117 use MCE::Shared;
118
119 # One may want to start the shared-manager early.
120
121 MCE::Shared->start();
122
123 # Typically, the shared-manager is started automatically when
124 # constructing a shared object.
125
126 my $ca = MCE::Shared->cache( max_keys => 500 );
127
128 # IO::FDPass is necessary for constructing a shared condvar or queue
129 # while the manager is running in order to send file descriptors
130 # associated with the object.
131
132 # Workers block using a socket handle for ->wait and ->timedwait.
133
134 my $cv = MCE::Shared->condvar();
135
136 # Workers block using a socket handle for ->dequeue and ->await.
137
138 my $q1 = MCE::Shared->queue();
139 my $q2 = MCE::Shared->queue( await => 1 );
140
141 For platforms where IO::FDPass isn't possible (e.g. Cygwin), construct
142 "condvar" and "queue" before other classes. The shared-manager process
143 will be delayed until sharing other classes (e.g. Array, Hash) or
144 starting explicitly.
145
146 use MCE::Shared;
147
148 my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
149
150 my $cv = MCE::Shared->condvar( 0 );
151 my $que = MCE::Shared->queue( fast => 1 );
152
153 MCE::Shared->start() unless $has_IO_FDPass;
154
155 my $ha = MCE::Shared->hash(); # started implicitly
156
157 Note that MCE starts the shared-manager, prior to spawning workers, if
158 not yet started. Ditto for MCE::Hobo.
159
160 Regarding mce_open, "IO::FDPass" is needed for constructing a shared-
161 handle from a non-shared handle not yet available inside the shared-
162 manager process. The workaround is to have the non-shared handle made
163 before the shared-manager is started. Passing a file by reference is
164 fine for the three STD* handles.
165
166 # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
167
168 mce_open my $shared_in, "<", \*STDIN; # ok
169 mce_open my $shared_out, ">>", \*STDOUT; # ok
170 mce_open my $shared_err, ">>", \*STDERR; # ok
171 mce_open my $shared_fh1, "<", "/path/to/sequence.fasta"; # ok
172 mce_open my $shared_fh2, ">>", "/path/to/results.log"; # ok
173
174 mce_open my $shared_fh, ">>", \*NON_SHARED_FH; # requires IO::FDPass
175
176 The IO::FDPass module is known to work reliably on most platforms.
177 Install 1.1 or later to rid of limitations described above.
178
179 perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
180
182 · array MCE::Shared::Array
183
184 · cache MCE::Shared::Cache
185
186 · condvar MCE::Shared::Condvar
187
188 · handle MCE::Shared::Handle
189
190 · hash MCE::Shared::Hash
191
192 · minidb MCE::Shared::Minidb
193
194 · ordhash MCE::Shared::Ordhash
195
196 · queue MCE::Shared::Queue
197
198 · scalar MCE::Shared::Scalar
199
200 · sequence MCE::Shared::Sequence
201
202 Below, synopsis for sharing classes included with MCE::Shared.
203
204 use MCE::Shared;
205
206 # short form
207
208 $ar = MCE::Shared->array( @list );
209 $ca = MCE::Shared->cache( max_keys => 500, max_age => 60 );
210 $cv = MCE::Shared->condvar( 0 );
211 $fh = MCE::Shared->handle( ">>", \*STDOUT ); # see mce_open below
212 $ha = MCE::Shared->hash( @pairs );
213 $db = MCE::Shared->minidb();
214 $oh = MCE::Shared->ordhash( @pairs );
215 $qu = MCE::Shared->queue( await => 1, fast => 0 );
216 $va = MCE::Shared->scalar( $value );
217 $se = MCE::Shared->sequence( $begin, $end, $step, $fmt );
218
219 mce_open my $fh, ">>", \*STDOUT or die "open error: $!";
220
221 # long form
222
223 $ar = MCE::Shared->share( { module => 'MCE::Shared::Array' }, ... );
224 $ca = MCE::Shared->share( { module => 'MCE::Shared::Cache' }, ... );
225 $cv = MCE::Shared->share( { module => 'MCE::Shared::Condvar' }, ... );
226 $fh = MCE::Shared->share( { module => 'MCE::Shared::Handle' }, ... );
227 $ha = MCE::Shared->share( { module => 'MCE::Shared::Hash' }, ... );
228 $db = MCE::Shared->share( { module => 'MCE::Shared::Minidb' }, ... );
229 $oh = MCE::Shared->share( { module => 'MCE::Shared::Ordhash' }, ... );
230 $qu = MCE::Shared->share( { module => 'MCE::Shared::Queue' }, ... );
231 $va = MCE::Shared->share( { module => 'MCE::Shared::Scalar' }, ... );
232 $se = MCE::Shared->share( { module => 'MCE::Shared::Sequence' }, ... );
233
234 The restriction for sharing classes not included with MCE::Shared is
235 that the object must not have file-handles nor code-blocks.
236
237 $oh = MCE::Shared->share( { module => 'Hash::Ordered' }, ... );
238
239 open ( filehandle, expr )
240 open ( filehandle, mode, expr )
241 open ( filehandle, mode, reference )
242 In version 1.002 and later, constructs a new object by opening the
243 file whose filename is given by "expr", and associates it with
244 "filehandle". When omitting error checking at the application
245 level, MCE::Shared emits a message and stop if open fails.
246
247 See MCE::Shared::Handle for chunk IO demonstrations.
248
249 {
250 use MCE::Shared::Handle;
251
252 # "non-shared" or "local construction" for use by a single process
253 MCE::Shared::Handle->open( my $fh, "<", "file.log" ) or die "$!";
254 MCE::Shared::Handle::open my $fh, "<", "file.log" or die "$!";
255
256 # mce_open is an alias for MCE::Shared::Handle::open
257 mce_open my $fh, "<", "file.log" or die "$!";
258 }
259
260 {
261 use MCE::Shared;
262
263 # construction for "sharing" with other threads and processes
264 MCE::Shared->open( my $fh, "<", "file.log" ) or die "$!";
265 MCE::Shared::open my $fh, "<", "file.log" or die "$!";
266
267 # mce_open is an alias for MCE::Shared::open
268 mce_open my $fh, "<", "file.log" or die "$!";
269 }
270
271 Simple examples to open a file for reading:
272
273 use MCE::Shared;
274
275 # mce_open is exported by MCE::Shared or MCE::Shared::Handle.
276 # It creates a shared file handle with MCE::Shared present
277 # or a non-shared handle otherwise.
278
279 mce_open my $fh, "< input.txt" or die "open error: $!";
280 mce_open my $fh, "<", "input.txt" or die "open error: $!";
281 mce_open my $fh, "<", \*STDIN or die "open error: $!";
282
283 and for writing:
284
285 mce_open my $fh, "> output.txt" or die "open error: $!";
286 mce_open my $fh, ">", "output.txt" or die "open error: $!";
287 mce_open my $fh, ">", \*STDOUT or die "open error: $!";
288
289 num_sequence
290 "num_sequence" is an alias for "sequence".
291
293 The following is a demonstration for a shared tied-hash variable.
294 Before venturing into the actual code, notice the dump function making
295 a call to "export" explicitly for objects of type
296 "MCE::Shared::Object". This is necessary in order to retrieve the data
297 from the shared-manager process.
298
299 The "export" method is described later under the Common API section.
300
301 use MCE::Shared;
302
303 sub _dump {
304 require Data::Dumper unless $INC{'Data/Dumper.pm'};
305 no warnings 'once';
306
307 local $Data::Dumper::Varname = 'VAR';
308 local $Data::Dumper::Deepcopy = 1;
309 local $Data::Dumper::Indent = 1;
310 local $Data::Dumper::Purity = 1;
311 local $Data::Dumper::Sortkeys = 0;
312 local $Data::Dumper::Terse = 0;
313
314 ( ref $_[0] eq 'MCE::Shared::Object' )
315 ? print Data::Dumper::Dumper( $_[0]->export ) . "\n"
316 : print Data::Dumper::Dumper( $_[0] ) . "\n";
317 }
318
319 tie my %abc, 'MCE::Shared';
320
321 my @parents = qw( a b c );
322 my @children = qw( 1 2 3 4 );
323
324 for my $parent ( @parents ) {
325 for my $child ( @children ) {
326 $abc{ $parent }{ $child } = 1;
327 }
328 }
329
330 _dump( tied( %abc ) );
331
332 __END__
333
334 # Output
335
336 $VAR1 = bless( {
337 'c' => bless( {
338 '1' => '1',
339 '4' => '1',
340 '3' => '1',
341 '2' => '1'
342 }, 'MCE::Shared::Hash' ),
343 'a' => bless( {
344 '1' => '1',
345 '4' => '1',
346 '3' => '1',
347 '2' => '1'
348 }, 'MCE::Shared::Hash' ),
349 'b' => bless( {
350 '1' => '1',
351 '4' => '1',
352 '3' => '1',
353 '2' => '1'
354 }, 'MCE::Shared::Hash' )
355 }, 'MCE::Shared::Hash' );
356
357 Dereferencing provides hash-like behavior for "hash" and "ordhash".
358 Array-like behavior is allowed for "array", not shown below.
359
360 use MCE::Shared;
361 use Data::Dumper;
362
363 my $abc = MCE::Shared->hash;
364
365 my @parents = qw( a b c );
366 my @children = qw( 1 2 3 4 );
367
368 for my $parent ( @parents ) {
369 for my $child ( @children ) {
370 $abc->{ $parent }{ $child } = 1;
371 }
372 }
373
374 print Dumper( $abc->export({ unbless => 1 }) ), "\n";
375
376 Each level in a deeply structure requires a separate trip to the
377 shared-manager process. The included "MCE::Shared::Minidb" module
378 provides optimized methods for working with hash of hashes "HoH" and
379 hash of arrays "HoA".
380
381 use MCE::Shared;
382
383 my $abc = MCE::Shared->minidb;
384
385 my @parents = qw( a b c );
386 my @children = qw( 1 2 3 4 );
387
388 for my $parent ( @parents ) {
389 for my $child ( @children ) {
390 $abc->hset($parent, $child, 1);
391 }
392 }
393
394 _dump( $abc );
395
396 For further reading, see MCE::Shared::Minidb.
397
399 share
400 This class method transfers the blessed-object to the shared-manager
401 process and returns a "MCE::Shared::Object" containing the
402 "SHARED_ID". Starting with the 1.827 release, the "module" option
403 sends parameters to the shared-manager, where the object is then
404 constructed. This is useful for classes involving XS code or a file
405 handle.
406
407 use MCE::Shared;
408
409 {
410 use Math::BigFloat try => 'GMP';
411 use Math::BigInt try => 'GMP';
412
413 my $bf = MCE::Shared->share({ module => 'Math::BigFloat' }, 0);
414 my $bi = MCE::Shared->share({ module => 'Math::BigInt' }, 0);
415 my $y = 1e9;
416
417 $bf->badd($y); # addition (add $y to shared BigFloat object)
418 $bi->badd($y); # addition (add $y to shared BigInt object)
419 }
420
421 {
422 use Bio::Seq;
423 use Bio::SeqIO;
424
425 my $seq_io = MCE::Shared->share({ module => 'Bio::SeqIO' },
426 -file => ">/path/to/fasta/file.fa",
427 -format => 'Fasta',
428 -verbose => -1,
429 );
430
431 my $seq_obj = Bio::Seq->new(
432 -display_id => "name", -desc => "desc", -seq => "seq",
433 -alphabet => "dna"
434 );
435
436 $seq_io->write_seq($seq_obj); # write to shared SeqIO handle
437 }
438
439 {
440 my $oh1 = MCE::Shared->share({ module => 'MCE::Shared::Ordhash' });
441 my $oh2 = MCE::Shared->ordhash(); # same thing
442
443 $oh1->assign( @pairs );
444 $oh2->assign( @pairs );
445 }
446
447 {
448 my ($ho_shared, $ho_nonshared);
449
450 $ho_shared = MCE::Shared->share({ module => 'Hash::Ordered' });
451 $ho_shared->push( @pairs );
452
453 $ho_nonshared = $ho_shared->export(); # back to non-shared
454 $ho_nonshared = $ho_shared->destroy(); # including shared destruction
455 }
456
457 The following provides long and short forms for constructing a
458 shared array, hash, or scalar object.
459
460 use MCE::Shared;
461
462 my $a1 = MCE::Shared->share( { module => 'MCE::Shared::Array' }, @list );
463 my $a2 = MCE::Shared->share( [ @list ] );
464 my $a3 = MCE::Shared->array( @list );
465
466 my $h1 = MCE::Shared->share( { module => 'MCE::Shared::Hash' }, @pairs );
467 my $h2 = MCE::Shared->share( { @pairs } );
468 my $h3 = MCE::Shared->hash( @pairs );
469
470 my $s1 = MCE::Shared->share( { module => 'MCE::Shared::Scalar' }, 20 );
471 my $s2 = MCE::Shared->share( \do{ my $o = 20 } );
472 my $s3 = MCE::Shared->scalar( 20 );
473
474 When the "module" option is given, one may optionally specify the
475 constructor function via the "new" option. This is necessary for the
476 CDB_File module, which provides two different objects. One is
477 created by new (default), and accessed by insert and finish. The
478 other is created by TIEHASH, and accessed by FETCH.
479
480 use MCE::Hobo;
481 use MCE::Shared;
482
483 # populate CDB file
484 my $cdb = MCE::Shared->share({ module => 'CDB_File' }, 't.cdb', "t.cdb.$$")
485 or die "$!\n";
486
487 $cdb->insert( $_ => $_ ) for ('aa'..'zz');
488 $cdb->finish;
489
490 # use CDB file
491 my $cdb1 = tie my %hash, 'MCE::Shared', { module => 'CDB_File' }, 't.cdb';
492
493 # same thing, without involving TIE and extra hash variable
494 my $cdb2 = MCE::Shared->share(
495 { module => 'CDB_File', new => 'TIEHASH' }, 't.cdb'
496 );
497
498 print $hash{'aa'}, "\n";
499 print $cdb1->FETCH('bb'), "\n";
500 print $cdb2->FETCH('cc'), "\n";
501
502 # rewind may be omitted on first use for parallel iteration
503 $cdb2->rewind;
504
505 for ( 1 .. 3 ) {
506 mce_async {
507 while ( my ($k,$v) = $cdb2->next ) {
508 print "[$$] $k => $v\n";
509 }
510 };
511 }
512
513 MCE::Hobo->waitall;
514
516 Construting a shared DBM object is possible starting with the 1.827
517 release. Supported modules are AnyDBM_File, BerkeleyDB, CDB_File,
518 DB_File, GDBM_File, NDBM_File, ODBM_File, SDBM_File, SQLite_File,
519 Tie::Array::DBD, and Tie::Hash::DBD. The list includes Tokyo Cabinet
520 <http://fallabs.com/tokyocabinet/> and Kyoto Cabinet
521 <http://fallabs.com/kyotocabinet/>. Also, see forked version by Altice
522 Labs <https://github.com/alticelabs/kyoto>. It contains an updated
523 "kyotocabinet" folder that builds successfully with recent compilers.
524
525 Freeze-thaw during "STORE"-"FETCH" (for complex data) is handled
526 automatically using Serial 3.015+ (if available) or Storable. Below,
527 are constructions for sharing various DBM modules. The construction for
528 "CDB_File" is given in the prior section.
529
530 AnyDBM_File
531 use MCE::Shared;
532 use Fcntl;
533 use AnyDBM_File;
534
535 BEGIN { @AnyDBM_File::ISA = qw( DB_File GDBM_File NDBM_File ODBM_File ); }
536
537 tie my %h1, 'MCE::Shared', { module => 'AnyDBM_File' },
538 'foo_a', O_CREAT|O_RDWR or die "open error: $!";
539
540 BerkeleyDB
541 use MCE::Shared;
542 use BerkeleyDB;
543
544 tie my %h1, 'MCE::Shared', { module => 'BerkeleyDB::Hash' },
545 -Filename => 'foo_a', -Flags => DB_CREATE
546 or die "open error: $!";
547
548 tie my %h2, 'MCE::Shared', { module => 'BerkeleyDB::Btree' },
549 -Filename => 'foo_b', -Flags => DB_CREATE
550 or die "open error: $!";
551
552 tie my @a1, 'MCE::Shared', { module => 'BerkeleyDB::Queue' },
553 -Filename => 'foo_c', -Flags => DB_CREATE
554 or die "open error: $!";
555
556 tie my @a2, 'MCE::Shared', { module => 'BerkeleyDB::Recno' },
557 -Filename => 'foo_d', -Flags => DB_CREATE -Len => 20
558 or die "open error: $!";
559
560 DB_File
561 use MCE::Shared;
562 use Fcntl;
563 use DB_File;
564
565 # Use pre-defined references ( $DB_HASH, $DB_BTREE, $DB_RECNO ).
566
567 tie my %h1, 'MCE::Shared', { module => 'DB_File' },
568 'foo_a', O_CREAT|O_RDWR, 0640, $DB_HASH or die "open error: $!";
569
570 tie my %h2, 'MCE::Shared', { module => 'DB_File' },
571 'foo_b', O_CREAT|O_RDWR, 0640, $DB_BTREE or die "open error: $!";
572
573 tie my @a1, 'MCE::Shared', { module => 'DB_File' },
574 'foo_c', O_CREAT|O_RDWR, 0640, $DB_RECNO or die "open error: $!";
575
576 # Changing defaults - see DB_File for valid options.
577
578 my $opt_h = DB_File::HASHINFO->new();
579 my $opt_b = DB_File::BTREEINFO->new();
580 my $opt_r = DB_File::RECNOINFO->new();
581
582 $opt_h->{'cachesize'} = 12345;
583
584 tie my %h3, 'MCE::Shared', { module => 'DB_File' },
585 'foo_d', O_CREAT|O_RDWR, 0640, $opt_h or die "open error: $!";
586
587 KyotoCabinet
588 TokyoCabinet
589 use MCE::Shared;
590 use KyotoCabinet;
591 use TokyoCabinet;
592
593 # file extension denotes hash database
594
595 tie my %h1, 'MCE::Shared', { module => 'KyotoCabinet::DB' }, 'foo.kch',
596 KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE
597 or die "open error: $!";
598
599 tie my %h2, 'MCE::Shared', { module => 'TokyoCabinet::HDB' }, 'foo.tch',
600 TokyoCabinet::HDB::OWRITER | TokyoCabinet::HDB::OCREAT
601 or die "open error: $!";
602
603 # file extension denotes tree database
604
605 tie my %h3, 'MCE::Shared', { module => 'KyotoCabinet::DB' }, 'foo.kct',
606 KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE
607 or die "open error: $!";
608
609 tie my %h4, 'MCE::Shared', { module => 'TokyoCabinet::BDB' }, 'foo.tcb',
610 TokyoCabinet::BDB::OWRITER | TokyoCabinet::BDB::OCREAT
611 or die "open error: $!";
612
613 # on-memory hash database
614
615 tie my %h5, 'MCE::Shared', { module => 'KyotoCabinet::DB' }, '*';
616 tie my %h6, 'MCE::Shared', { module => 'TokyoCabinet::ADB' }, '*';
617
618 # on-memory tree database
619
620 tie my %h7, 'MCE::Shared', { module => 'KyotoCabinet::DB' }, '%#pccap=256m';
621 tie my %h8, 'MCE::Shared', { module => 'TokyoCabinet::ADB' }, '+';
622
623 Tie::Array::DBD
624 Tie::Hash::DBD
625 use MCE::Shared;
626 use Tie::Array::DBD;
627 use Tie::Hash::DBD;
628
629 # A valid string is required for the DSN argument, not a DBI handle.
630 # Do not specify the 'str' option for Tie::(Array|Hash)::DBD.
631 # Instead, see encoder-decoder methods described under Common API.
632
633 tie my @a1, 'MCE::Shared', { module => 'Tie::Array::DBD' },
634 'dbi:SQLite:dbname=foo_a.db', {
635 tbl => 't_tie_analysis',
636 key => 'h_key',
637 fld => 'h_value'
638 };
639
640 tie my %h1, 'MCE::Shared', { module => 'Tie::Hash::DBD' },
641 'dbi:SQLite:dbname=foo_h.db', {
642 tbl => 't_tie_analysis',
643 key => 'h_key',
644 fld => 'h_value'
645 };
646
647 tie my %h2, 'MCE::Shared', { module => 'Tie::Hash::DBD'},
648 'dbi:CSV:f_dir=.;f_ext=.csv/r;csv_null=1;csv_decode_utf8=0', {
649 tbl => 'mytable',
650 key => 'h_key',
651 fld => 'h_value'
652 };
653
654 # By default, Sereal 3.015+ is used for serialization if available.
655 # This overrides serialization from Sereal-or-Storable to JSON::XS.
656
657 use JSON::XS ();
658
659 tied(%ha2)->encoder( \&JSON::XS::encode_json );
660 tied(%ha2)->decoder( \&JSON::XS::decode_json );
661
662 $h2{'foo'} = 'plain value';
663 $h2{'bar'} = { @pairs };
664 $h2{'baz'} = [ @list ];
665
667 DB cursors, filters, and duplicate keys are not supported, just plain
668 array and hash functionality. The OO interface provides better
669 performance when needed. Use "iterator" or "next" for iterating over
670 the elements.
671
672 use MCE::Hobo;
673 use MCE::Shared;
674 use Fcntl;
675 use DB_File;
676
677 unlink 'foo_a';
678
679 my $ob = tie my %h1, 'MCE::Shared', { module => 'DB_File' },
680 'foo_a', O_CREAT|O_RDWR, 0640, $DB_HASH or die "open error: $!";
681
682 $h1{key} = 'value';
683 my $val = $h1{key};
684
685 while ( my ($k, $v) = each %h1 ) {
686 print "1: $k => $v\n";
687 }
688
689 # object oriented fashion, faster
690
691 tied(%h1)->STORE( key1 => 'value1' );
692 my $val1 = tied(%h1)->FETCH('key1');
693
694 $ob->STORE( key2 => 'value2' );
695 my $val2 = $ob->FETCH('key2');
696
697 # non-parallel iteration
698
699 my $iter = $ob->iterator;
700 while ( my ($k, $v) = $iter->() ) {
701 print "2: $k => $v\n";
702 }
703
704 # parallel iteration
705
706 sub task {
707 while ( my ($k, $v) = $ob->next ) {
708 print "[$$] $k => $v\n";
709 sleep 1;
710 }
711 }
712
713 MCE::Hobo->create(\&task) for 1 .. 3;
714 MCE::Hobo->waitall;
715
716 $ob->rewind;
717
718 # undef $ob and $iter before %h1 when destroying manually
719
720 undef $ob;
721 undef $iter;
722
723 untie %h1;
724
725 See also Tie::File Demonstration, at the end of the documentation.
726
728 · pdl_byte
729
730 · pdl_short
731
732 · pdl_ushort
733
734 · pdl_long
735
736 · pdl_longlong
737
738 · pdl_float
739
740 · pdl_double
741
742 · pdl_ones
743
744 · pdl_sequence
745
746 · pdl_zeroes
747
748 · pdl_indx
749
750 · pdl
751
752 "pdl_byte", "pdl_short", "pdl_ushort", "pdl_long", "pdl_longlong",
753 "pdl_float", "pdl_double", "pdl_ones", "pdl_sequence", "pdl_zeroes",
754 "pdl_indx", and "pdl" are sugar syntax for PDL construction take place
755 under the shared-manager process.
756
757 use PDL; # must load PDL before MCE::Shared
758 use MCE::Shared;
759
760 # makes extra copy/transfer and unnecessary destruction
761 my $ob1 = MCE::Shared->share( zeroes( 256, 256 ) );
762
763 # do this instead, efficient
764 my $ob1 = MCE::Shared->zeroes( 256, 256 );
765
766 ins_inplace
767 The "ins_inplace" method applies to shared PDL objects. It supports
768 three forms for writing elements back to the PDL object, residing
769 under the shared-manager process.
770
771 # --- action taken by the shared-manager process
772 # ins_inplace( 1 arg ): ins( inplace( $this ), $what, 0, 0 );
773 # ins_inplace( 2 args ): $this->slice( $arg1 ) .= $arg2;
774 # ins_inplace( >2 args ): ins( inplace( $this ), $what, @coords );
775
776 # --- use case
777 $o->ins_inplace( $result ); # 1 arg
778 $o->ins_inplace( ":,$start:$stop", $result ); # 2 args
779 $o->ins_inplace( $result, 0, $seq_n ); # >2 args
780
781 Operations such as " + 5 " will not work on shared PDL objects. At
782 this time, the OO interface is the only mechanism for communicating
783 with the PDL piddle. For example, call "slice", "sever", or "copy"
784 to fetch elements. Call "ins_inplace" to update elements.
785
786 # make a shared PDL piddle
787 my $b = MCE::Shared->pdl_sequence(20,20);
788
789 # fetch, add 10 to row 2 only
790 my $res1 = $b->slice(":,1:1") + 10;
791 $b->ins_inplace($res1, 0, 1);
792
793 # fetch, add 10 to rows 4 and 5
794 my $res2 = $b->slice(":,3:4") + 10;
795 $b->ins_inplace($res2, 0, 3);
796
797 # make non-shared object, export-destroy the shared object
798 $b = $b->destroy;
799
800 print "$b\n";
801
802 The following provides parallel demonstrations using "MCE::Flow".
803
804 use PDL; # must load PDL before MCE::Shared
805
806 use MCE::Flow;
807 use MCE::Shared;
808
809 my $a = MCE::Shared->pdl_sequence(20,20);
810 my $b = MCE::Shared->pdl_zeroes(20,20);
811
812 # with chunking disabled
813
814 mce_flow_s {
815 max_workers => 4, chunk_size => 1
816 },
817 sub {
818 my $row = $_;
819 my $result = $a->slice(":,$row:$row") + 5;
820 $b->ins_inplace($result, 0, $row);
821 }, 0, 20 - 1;
822
823 # with chunking enabled
824
825 mce_flow_s {
826 max_workers => 4, chunk_size => 5, bounds_only => 1
827 },
828 sub {
829 my ($row1, $row2) = @{ $_ };
830 my $result = $a->slice(":,$row1:$row2") + 5;
831 $b->ins_inplace($result, 0, $row1);
832 }, 0, 20 - 1;
833
834 # make non-shared object, export-destroy the shared object
835
836 $b = $b->destroy;
837
838 print "$b\n";
839
840 See also PDL::ParallelCPU and PDL::Parallel::threads. For further
841 reading, the MCE-Cookbook on Github provides two PDL demonstrations.
842
843 <https://github.com/marioroy/mce-cookbook>
844
846 blessed
847 Returns the real "blessed" name, provided by the shared-manager
848 process.
849
850 use MCE::Shared;
851 use Scalar::Util qw(blessed);
852
853 my $oh1 = MCE::Shared->share({ module => 'MCE::Shared::Ordhash' });
854 my $oh2 = MCE::Shared->share({ module => 'Hash::Ordered' });
855
856 print blessed($oh1), "\n"; # MCE::Shared::Object
857 print blessed($oh2), "\n"; # MCE::Shared::Object
858
859 print $oh1->blessed(), "\n"; # MCE::Shared::Ordhash
860 print $oh2->blessed(), "\n"; # Hash::Ordered
861
862 destroy ( { unbless => 1 } )
863 destroy
864 Exports optionally, but destroys the shared object entirely from the
865 shared-manager process. The unbless option is passed to export.
866
867 my $exported_ob = $shared_ob->destroy();
868
869 $shared_ob; # becomes undef
870
871 encoder ( CODE )
872 decoder ( CODE )
873 Override freeze/thaw routines. Applies to STORE and FETCH only,
874 particularly for TIE'd objects. These are called internally for
875 shared DB objects.
876
877 Current API available since 1.827.
878
879 use MCE::Shared;
880 use BerkeleyDB;
881 use DB_File;
882
883 my $file1 = 'file1.db';
884 my $file2 = 'file2.db';
885
886 tie my @db1, 'MCE::Shared', { module => 'DB_File' }, $file1,
887 O_RDWR|O_CREAT, 0640 or die "open error '$file1': $!";
888
889 tie my %db2, 'MCE::Shared', { module => 'BerkeleyDB::Hash' },
890 -Filename => $file2, -Flags => DB_CREATE
891 or die "open error '$file2': $!";
892
893 # Called automatically by MCE::Shared for DB files.
894 # tied(@db1)->encoder( MCE::Shared::Server::_get_freeze );
895 # tied(@db1)->decoder( MCE::Shared::Server::_get_thaw );
896 # tied(%db2)->encoder( MCE::Shared::Server::_get_freeze );
897 # tied(%db2)->decoder( MCE::Shared::Server::_get_thaw );
898 # et cetera.
899
900 $db1[0] = 'foo'; # store plain and complex structure
901 $db1[1] = { key => 'value' };
902 $db1[2] = [ 'complex' ];
903
904 $db2{key} = 'foo'; # ditto, plain and complex structure
905 $db2{sun} = [ 'complex' ];
906
907 export ( { unbless => 1 }, keys )
908 export
909 Exports the shared object as a non-shared object. One must export
910 the shared object when passing into any dump routine. Otherwise, the
911 "shared_id value" and "blessed name" is all one will see. The
912 unbless option unblesses any shared Array, Hash, and Scalar object
913 to a non-blessed array, hash, and scalar respectively.
914
915 use MCE::Shared;
916 use MCE::Shared::Ordhash;
917
918 sub _dump {
919 require Data::Dumper unless $INC{'Data/Dumper.pm'};
920 no warnings 'once';
921
922 local $Data::Dumper::Varname = 'VAR';
923 local $Data::Dumper::Deepcopy = 1;
924 local $Data::Dumper::Indent = 1;
925 local $Data::Dumper::Purity = 1;
926 local $Data::Dumper::Sortkeys = 0;
927 local $Data::Dumper::Terse = 0;
928
929 print Data::Dumper::Dumper($_[0]) . "\n";
930 }
931
932 my $oh1 = MCE::Shared->share({ module => 'MCE::Shared::Ordhash' });
933 my $oh2 = MCE::Shared->ordhash(); # same thing
934
935 _dump($oh1);
936 # bless( [ 1, 'MCE::Shared::Ordhash' ], 'MCE::Shared::Object' )
937
938 _dump($oh2);
939 # bless( [ 2, 'MCE::Shared::Ordhash' ], 'MCE::Shared::Object' )
940
941 _dump( $oh1->export ); # dumps object structure and content
942 _dump( $oh2->export ); # ditto
943
944 "export" can optionally take a list of indices/keys for what to
945 export. This applies to shared array, hash, and ordhash.
946
947 use MCE::Shared;
948
949 # shared hash
950 my $h1 = MCE::Shared->hash(
951 qw/ I Heard The Bluebirds Sing by Marty Robbins /
952 # k v k v k v k v
953 );
954
955 # non-shared hash
956 my $h2 = $h1->export( qw/ I The / );
957
958 _dump($h2);
959
960 __END__
961
962 # Output
963
964 $VAR1 = bless( {
965 'I' => 'Heard',
966 'The' => 'Bluebirds'
967 }, 'MCE::Shared::Hash' );
968
969 Specifying the unbless option exports a non-blessed data structure
970 instead. The unbless option applies to shared MCE::Shared::{ Array,
971 Hash, and Scalar } objects.
972
973 my $h2 = $h1->export( { unbless => 1 }, qw/ I The / );
974 my $h3 = $h1->export( { unbless => 1 } );
975
976 _dump($h2);
977 _dump($h3);
978
979 __END__
980
981 # Output
982
983 $VAR1 = {
984 'The' => 'Bluebirds',
985 'I' => 'Heard'
986 };
987
988 $VAR1 = {
989 'Marty' => 'Robbins',
990 'Sing' => 'by',
991 'The' => 'Bluebirds',
992 'I' => 'Heard'
993 };
994
995 next
996 The "next" method provides parallel iteration between workers for
997 shared "array", "hash", "ordhash", and "sequence". In list context,
998 returns the next key-value pair or beg-end pair for sequence. In
999 scalar context, returns the next item. The "undef" value is returned
1000 after the iteration has completed.
1001
1002 Internally, the list of keys to return is set when the closure is
1003 constructed. Later keys added to the shared array or hash are not
1004 included. Subsequently, the "undef" value is returned for deleted
1005 keys.
1006
1007 The following example iterates through a shared array in parallel.
1008
1009 use MCE::Hobo;
1010 use MCE::Shared;
1011
1012 my $ar = MCE::Shared->array( 'a' .. 'j' );
1013
1014 sub demo1 {
1015 my ( $wid ) = @_;
1016 while ( my ( $index, $value ) = $ar->next ) {
1017 print "$wid: [ $index ] $value\n";
1018 sleep 1;
1019 }
1020 }
1021
1022 sub demo2 {
1023 my ( $wid ) = @_;
1024 while ( defined ( my $value = $ar->next ) ) {
1025 print "$wid: $value\n";
1026 sleep 1;
1027 }
1028 }
1029
1030 $ar->rewind();
1031
1032 MCE::Hobo->new( \&demo1, $_ ) for 1 .. 3;
1033 MCE::Hobo->waitall(), print "\n";
1034
1035 $ar->rewind();
1036
1037 MCE::Hobo->new( \&demo2, $_ ) for 1 .. 3;
1038 MCE::Hobo->waitall(), print "\n";
1039
1040 __END__
1041
1042 # Output
1043
1044 1: [ 0 ] a
1045 2: [ 1 ] b
1046 3: [ 2 ] c
1047 1: [ 3 ] d
1048 2: [ 5 ] f
1049 3: [ 4 ] e
1050 2: [ 8 ] i
1051 3: [ 6 ] g
1052 1: [ 7 ] h
1053 2: [ 9 ] j
1054
1055 1: a
1056 2: b
1057 3: c
1058 2: e
1059 3: f
1060 1: d
1061 3: g
1062 1: i
1063 2: h
1064 1: j
1065
1066 The form is similar for "sequence". For large sequences, the
1067 "bounds_only" option is recommended. Also, specify "chunk_size"
1068 accordingly. This reduces the amount of traffic to and from the
1069 shared-manager process.
1070
1071 use MCE::Hobo;
1072 use MCE::Shared;
1073
1074 my $N = shift || 4_000_000;
1075 my $pi = MCE::Shared->scalar( 0.0 );
1076
1077 my $seq = MCE::Shared->sequence(
1078 { chunk_size => 200_000, bounds_only => 1 }, 0, $N - 1
1079 );
1080
1081 sub compute_pi {
1082 my ( $wid ) = @_;
1083
1084 # Optionally, also receive the chunk_id value
1085 # while ( my ( $beg, $end, $chunk_id ) = $seq->next ) { ... }
1086
1087 while ( my ( $beg, $end ) = $seq->next ) {
1088 my ( $_pi, $t ) = ( 0.0 );
1089 for my $i ( $beg .. $end ) {
1090 $t = ( $i + 0.5 ) / $N;
1091 $_pi += 4.0 / ( 1.0 + $t * $t );
1092 }
1093 $pi->incrby( $_pi );
1094 }
1095
1096 return;
1097 }
1098
1099 MCE::Hobo->create( \&compute_pi, $_ ) for ( 1 .. 8 );
1100
1101 # ... do other stuff ...
1102
1103 MCE::Hobo->waitall();
1104
1105 printf "pi = %0.13f\n", $pi->get / $N;
1106
1107 __END__
1108
1109 # Output
1110
1111 3.1415926535898
1112
1113 rewind ( index, [, index, ... ] )
1114 rewind ( key, [, key, ... ] )
1115 rewind ( "query string" )
1116 rewind ( )
1117 Rewinds the parallel iterator for MCE::Shared::Array,
1118 MCE::Shared::Hash, or MCE::Shared::Ordhash when no arguments are
1119 given. Otherwise, resets the iterator with given criteria. The
1120 syntax for "query string" is described in the shared module.
1121
1122 # array
1123 $ar->rewind;
1124
1125 $ar->rewind( 0, 1 );
1126 $ar->rewind( "val eq some_value" );
1127 $ar->rewind( "key >= 50 :AND val =~ /sun|moon|air|wind/" );
1128 $ar->rewind( "val eq sun :OR val eq moon :OR val eq foo" );
1129 $ar->rewind( "key =~ /$pattern/" );
1130
1131 while ( my ( $index, $value ) = $ar->next ) {
1132 ...
1133 }
1134
1135 # hash, ordhash
1136 $oh->rewind;
1137
1138 $oh->rewind( "key1", "key2" );
1139 $oh->rewind( "val eq some_value" );
1140 $oh->rewind( "key eq some_key :AND val =~ /sun|moon|air|wind/" );
1141 $oh->rewind( "val eq sun :OR val eq moon :OR val eq foo" );
1142 $oh->rewind( "key =~ /$pattern/" );
1143
1144 while ( my ( $key, $value ) = $oh->next ) {
1145 ...
1146 }
1147
1148 rewind ( { options }, begin, end [, step, format ] )
1149 rewind ( begin, end [, step, format ] )
1150 rewind ( )
1151 Rewinds the parallel iterator for MCE::Shared::Sequence when no
1152 arguments are given. Otherwise, resets the iterator with given
1153 criteria.
1154
1155 # sequence
1156 $seq->rewind;
1157
1158 $seq->rewind( { chunk_size => 10, bounds_only => 1 }, 1, 100 );
1159
1160 while ( my ( $beg, $end ) = $seq->next ) {
1161 for my $i ( $beg .. $end ) {
1162 ...
1163 }
1164 }
1165
1166 $seq->rewind( 1, 100 );
1167
1168 while ( defined ( my $num = $seq->next ) ) {
1169 ...
1170 }
1171
1172 store ( key, value )
1173 Deep-sharing a non-blessed structure recursively is possible with
1174 "store", an alias to "STORE".
1175
1176 use MCE::Shared;
1177
1178 my $h1 = MCE::Shared->hash();
1179 my $h2 = MCE::Shared->hash();
1180
1181 # auto-shares deeply
1182 $h1->store('key', [ 0, 2, 5, { 'foo' => 'bar' } ]);
1183 $h2->{key}[3]{foo} = 'baz'; # via auto-vivification
1184
1185 my $v1 = $h1->get('key')->get(3)->get('foo'); # bar
1186 my $v2 = $h2->get('key')->get(3)->get('foo'); # baz
1187 my $v3 = $h2->{key}[3]{foo}; # baz
1188
1190 init
1191 This method is called by each MCE and Hobo worker automatically
1192 after spawning. The effect is extra parallelism and decreased
1193 latency during inter-process communication to the shared-manager
1194 process. The optional ID (an integer) is modded internally in a
1195 round-robin fashion.
1196
1197 MCE::Shared->init();
1198 MCE::Shared->init( ID );
1199
1200 start
1201 Starts the shared-manager process. This is done automatically unless
1202 Perl lacks IO::FDPass, needed to share "condvar" or "queue" while
1203 the shared-manager is running.
1204
1205 MCE::Shared->start();
1206
1207 stop
1208 Stops the shared-manager process, wiping all shared data content.
1209 This is called by the "END" block automatically when the script
1210 terminates.
1211
1212 MCE::Shared->stop();
1213
1215 Application-level advisory locking is possible with MCE::Mutex.
1216
1217 use MCE::Hobo;
1218 use MCE::Mutex;
1219 use MCE::Shared;
1220
1221 my $mutex = MCE::Mutex->new();
1222
1223 tie my $cntr, 'MCE::Shared', 0;
1224
1225 sub work {
1226 for ( 1 .. 1000 ) {
1227 $mutex->lock;
1228
1229 # Incrementing involves 2 IPC ops ( FETCH and STORE ).
1230 # Thus, locking is required.
1231 $cntr++;
1232
1233 $mutex->unlock;
1234 }
1235 }
1236
1237 MCE::Hobo->create('work') for ( 1 .. 8 );
1238 MCE::Hobo->waitall;
1239
1240 print $cntr, "\n"; # 8000
1241
1242 Typically, locking is not necessary using the OO interface. The reason
1243 is that MCE::Shared is implemented using a single-point of entry for
1244 commands sent to the shared-manager process. Furthermore, the shared
1245 classes include sugar methods for combining set and get in a single
1246 operation.
1247
1248 use MCE::Hobo;
1249 use MCE::Shared;
1250
1251 my $cntr = MCE::Shared->scalar( 0 );
1252
1253 sub work {
1254 for ( 1 .. 1000 ) {
1255 # The next statement increments the value without having
1256 # to call set and get explicitly.
1257 $cntr->incr;
1258 }
1259 }
1260
1261 MCE::Hobo->create('work') for ( 1 .. 8 );
1262 MCE::Hobo->waitall;
1263
1264 print $cntr->get, "\n"; # 8000
1265
1266 Another possibility when running threads is locking via
1267 threads::shared.
1268
1269 use threads;
1270 use threads::shared;
1271
1272 use MCE::Flow;
1273 use MCE::Shared;
1274
1275 my $mutex : shared;
1276
1277 tie my $cntr, 'MCE::Shared', 0;
1278
1279 sub work {
1280 for ( 1 .. 1000 ) {
1281 lock $mutex;
1282
1283 # the next statement involves 2 IPC ops ( get and set )
1284 # thus, locking is required
1285 $cntr++;
1286 }
1287 }
1288
1289 MCE::Flow->run( { max_workers => 8 }, \&work );
1290 MCE::Flow->finish;
1291
1292 print $cntr, "\n"; # 8000
1293
1294 Of the three demonstrations, the OO interface yields the best
1295 performance. This is from the lack of locking at the application
1296 level. The results were obtained from a MacBook Pro (Haswell) running
1297 at 2.6 GHz, 1600 MHz RAM.
1298
1299 CentOS 7.2 VM
1300
1301 -- Perl v5.16.3
1302 MCE::Mutex .... : 0.528 secs.
1303 OO Interface .. : 0.062 secs.
1304 threads::shared : 0.545 secs.
1305
1306 FreeBSD 10.0 VM
1307
1308 -- Perl v5.16.3
1309 MCE::Mutex .... : 0.367 secs.
1310 OO Interface .. : 0.083 secs.
1311 threads::shared : 0.593 secs.
1312
1313 Mac OS X 10.11.6 ( Host OS )
1314
1315 -- Perl v5.18.2
1316 MCE::Mutex .... : 0.397 secs.
1317 OO Interface .. : 0.070 secs.
1318 threads::shared : 0.463 secs.
1319
1320 Solaris 11.2 VM
1321
1322 -- Perl v5.12.5 installed with the OS
1323 MCE::Mutex .... : 0.895 secs.
1324 OO Interface .. : 0.099 secs.
1325 threads::shared : Perl not built to support threads
1326
1327 -- Perl v5.22.2 built with threads support
1328 MCE::Mutex .... : 0.788 secs.
1329 OO Interface .. : 0.086 secs.
1330 threads::shared : 0.895 secs.
1331
1332 Windows 7 VM
1333
1334 -- Perl v5.22.2
1335 MCE::Mutex .... : 1.045 secs.
1336 OO Interface .. : 0.312 secs.
1337 threads::shared : 1.061 secs.
1338
1339 Beginning with MCE::Shared 1.809, the "pipeline" method provides
1340 another way. Included in "Array", "Cache", "Hash", "Minidb", and
1341 "Ordhash", it combines multiple commands for the object to be processed
1342 serially. For shared objects, the call is made atomically due to single
1343 IPC to the shared-manager process.
1344
1345 The "pipeline" method is fully "wantarray"-aware and receives a list of
1346 commands and their arguments. In scalar or list context, it returns
1347 data from the last command in the pipeline.
1348
1349 use MCE::Mutex;
1350 use MCE::Shared;
1351
1352 my $mutex = MCE::Mutex->new();
1353 my $oh = MCE::Shared->ordhash();
1354 my @vals;
1355
1356 # mutex locking
1357
1358 $mutex->lock;
1359 $oh->set( foo => "a_a" );
1360 $oh->set( bar => "b_b" );
1361 $oh->set( baz => "c_c" );
1362 @vals = $oh->mget( qw/ foo bar baz / );
1363 $mutex->unlock;
1364
1365 # pipeline, same thing done atomically
1366
1367 @vals = $oh->pipeline(
1368 [ "set", foo => "a_a" ],
1369 [ "set", bar => "b_b" ],
1370 [ "set", baz => "c_c" ],
1371 [ "mget", qw/ foo bar baz / ]
1372 );
1373
1374 # ( "a_a", "b_b", "c_c" )
1375
1376 There is also "pipeline_ex", same as "pipeline", but returns data for
1377 every command in the pipeline.
1378
1379 @vals = $oh->pipeline_ex(
1380 [ "set", foo => "a_a" ],
1381 [ "set", bar => "b_b" ],
1382 [ "set", baz => "c_c" ]
1383 );
1384
1385 # ( "a_a", "b_b", "c_c" )
1386
1388 Sharing a Python class is possible, starting with the 1.827 release.
1389 The construction is simply calling share with the module option.
1390 Methods are accessible via the OO interface.
1391
1392 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1393 # Share Python class. Requires MCE::Shared 1.827 or later.
1394 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1395
1396 use strict;
1397 use warnings;
1398
1399 use MCE::Hobo;
1400 use MCE::Shared;
1401
1402 my $py1 = MCE::Shared->share({ module => 'My::Class' });
1403 my $py2 = MCE::Shared->share({ module => 'My::Class' });
1404
1405 MCE::Shared->start;
1406
1407 $py1->set(0, 100);
1408 $py2->set(1, 200);
1409
1410 die "Ooops" unless $py1->get(0) eq '100';
1411 die "Ooops" unless $py2->get(1) eq '200';
1412
1413 sub task {
1414 $py1->incr(0) for 1 .. 50000;
1415 $py2->incr(1) for 1 .. 50000;
1416 }
1417
1418 MCE::Hobo->create(\&task) for 1 .. 3;
1419 MCE::Hobo->waitall;
1420
1421 print $py1->get(0), "\n"; # 150100
1422 print $py2->get(1), "\n"; # 150200
1423
1424 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1425 # Python class.
1426 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1427
1428 package My::Class;
1429
1430 use strict;
1431 use warnings;
1432
1433 use Inline::Python qw( py_eval py_bind_class );
1434
1435 py_eval ( <<'END_OF_PYTHON_CLASS' );
1436
1437 class MyClass:
1438 def __init__(self):
1439 self.data = [0,0]
1440
1441 def set (self, key, value):
1442 self.data[key] = value
1443
1444 def get (self, key):
1445 try: return self.data[key]
1446 except KeyError: return None
1447
1448 def incr (self, key):
1449 try: self.data[key] = self.data[key] + 1
1450 except KeyError: self.data[key] = 1
1451
1452 END_OF_PYTHON_CLASS
1453
1454 # Register methods for best performance.
1455
1456 py_bind_class(
1457 'My::Class', '__main__', 'MyClass',
1458 'set', 'get', 'incr'
1459 );
1460
1461 1;
1462
1464 Often, the requirement may call for concurrent logging by many workers.
1465 Calling localtime or gmtime per each log entry is expensive. This uses
1466 the old time-stamp value until one second has elapsed.
1467
1468 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1469 # Concurrent logger demo. Requires MCE::Shared 1.827 or later.
1470 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1471
1472 use strict;
1473 use warnings;
1474
1475 use MCE::Hobo;
1476 use MCE::Shared;
1477
1478 my $file = "log.txt";
1479 my $pid = $$;
1480
1481 my $ob = MCE::Shared->share( { module => 'My::Logger' }, path => $file )
1482 or die "open error '$file': $!";
1483
1484 # $ob->autoflush(1); # optional, flush writes immediately
1485
1486 sub work {
1487 my $id = shift;
1488 for ( 1 .. 250_000 ) {
1489 $ob->log("Hello from $id: $_");
1490 }
1491 }
1492
1493 MCE::Hobo->create('work', $_) for 1 .. 4;
1494 MCE::Hobo->waitall;
1495
1496 # Threads and multi-process safety for closing the handle.
1497
1498 sub CLONE { $pid = 0; }
1499
1500 END { $ob->close if $ob && $pid == $$; }
1501
1502 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1503 # Logger class.
1504 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1505
1506 package My::Logger;
1507
1508 use strict;
1509 use warnings;
1510
1511 use Time::HiRes qw( time );
1512
1513 # construction
1514
1515 sub new {
1516 my ( $class, %self ) = @_;
1517
1518 open $self{fh}, ">>", $self{path} or return '';
1519 binmode $self{fh};
1520
1521 $self{stamp} = localtime; # or gmtime
1522 $self{time } = time;
1523
1524 bless \%self, $class;
1525 }
1526
1527 # $ob->log("message");
1528
1529 sub log {
1530 my ( $self, $stamp ) = ( shift );
1531
1532 if ( time - $self->{time} > 1.0 ) {
1533 $self->{stamp} = $stamp = localtime; # or gmtime
1534 $self->{time } = time;
1535 }
1536 else {
1537 $stamp = $self->{stamp};
1538 }
1539
1540 print {$self->{fh}} "$stamp --- @_\n";
1541 }
1542
1543 # $ob->autoflush(0);
1544 # $ob->autoflush(1);
1545
1546 sub autoflush {
1547 my ( $self, $flag ) = @_;
1548
1549 if ( defined fileno($self->{fh}) ) {
1550 $flag ? select(( select($self->{fh}), $| = 1 )[0])
1551 : select(( select($self->{fh}), $| = 0 )[0]);
1552
1553 return 1;
1554 }
1555
1556 return;
1557 }
1558
1559 # $ob->binmode($layer);
1560 # $ob->binmode();
1561
1562 sub binmode {
1563 my ( $self, $layer ) = @_;
1564
1565 if ( defined fileno($self->{fh}) ) {
1566 CORE::binmode $self->{fh}, $layer // ':raw';
1567
1568 return 1;
1569 }
1570
1571 return;
1572 }
1573
1574 # $ob->close()
1575
1576 sub close {
1577 my ( $self ) = @_;
1578
1579 if ( defined fileno($self->{fh}) ) {
1580 close $self->{'fh'};
1581 }
1582
1583 return;
1584 }
1585
1586 # $ob->flush()
1587
1588 sub flush {
1589 my ( $self ) = @_;
1590
1591 if ( defined fileno($self->{fh}) ) {
1592 my $old_fh = select $self->{fh};
1593 my $old_af = $|; $| = 1; $| = $old_af;
1594 select $old_fh;
1595
1596 return 1;
1597 }
1598
1599 return;
1600 }
1601
1602 1;
1603
1605 The following presents a concurrent Tie::File demonstration. Each
1606 element in the array corresponds to a record in the text file. JSON,
1607 being readable, seems appropiate for encoding complex objects.
1608
1609 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1610 # The MCE::Mutex module isn't needed unless IPC involves two or
1611 # more trips for the underlying action.
1612 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1613
1614 use strict;
1615 use warnings;
1616
1617 use MCE::Hobo;
1618 use MCE::Mutex;
1619 use MCE::Shared;
1620
1621 use JSON::MaybeXS;
1622
1623 # Safety for data having line breaks.
1624 use constant EOL => "\x{0a}~\x{0a}";
1625
1626 my $file = 'file.txt';
1627 my $mutex = MCE::Mutex->new();
1628 my $pid = $$;
1629
1630 my $ob = tie my @db, 'MCE::Shared', { module => 'My::File' }, $file,
1631 recsep => EOL or die "open error '$file': $!";
1632
1633 $ob->encoder( \&JSON::MaybeXS::encode_json );
1634 $ob->decoder( \&JSON::MaybeXS::decode_json );
1635
1636 $db[20] = 0; # a counter at offset 20 into the array
1637 $db[21] = [ qw/ foo bar / ]; # store complex structure
1638
1639 sub task {
1640 my $id = sprintf "%02s", shift;
1641 my $row = int($id) - 1;
1642 my $chr = sprintf "%c", 97 + $id - 1;
1643
1644 # A mutex isn't necessary when storing a value.
1645 # Ditto for fetching a value.
1646
1647 $db[$row] = "Hello from $id: "; # 1 trip
1648 my $val = length $db[$row]; # 1 trip
1649
1650 # A mutex may be necessary for updates involving 2 or
1651 # more trips (FETCH and STORE) during IPC, from and to
1652 # the shared-manager process, unless a unique row.
1653
1654 for ( 1 .. 40 ) {
1655 # $db[$row] .= $id; # 2 trips, unique row - okay
1656 $ob->append($row, $chr); # 1 trip via the OO interface
1657
1658 # $mu->lock;
1659 # $db[20] += 1; # incrementing counter, 2 trips
1660 # $mu->unlock;
1661
1662 $ob->incr(20); # same thing via OO, 1 trip
1663 }
1664
1665 my $len = length $db[$row]; # 1 trip
1666
1667 printf "hobo %2d : %d\n", $id, $len;
1668 }
1669
1670 MCE::Hobo->create('task', $_) for 1 .. 20;
1671 MCE::Hobo->waitall;
1672
1673 printf "counter : %d\n", $db[20];
1674 print $db[21]->[0], "\n"; # foo
1675
1676 # Threads and multi-process safety for closing the handle.
1677
1678 sub CLONE { $pid = 0; }
1679
1680 END {
1681 if ( $pid == $$ ) {
1682 undef $ob; # important, undef $ob before @db
1683 untie @db; # untie @db to flush pending writes
1684 }
1685 }
1686
1687 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1688 # Class extending Tie::File with two sugar methods.
1689 # Requires MCE::Shared 1.827 or later.
1690 #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1691
1692 package My::File;
1693
1694 use strict;
1695 use warnings;
1696
1697 use Tie::File;
1698
1699 our @ISA = 'Tie::File';
1700
1701 # $ob->append('string');
1702
1703 sub append {
1704 my ($self, $key) = @_;
1705 my $val = $self->FETCH($key); $val .= $_[2];
1706 $self->STORE($key, $val);
1707 length $val;
1708 }
1709
1710 # $ob->incr($key);
1711
1712 sub incr {
1713 my ( $self, $key ) = @_;
1714 my $val = $self->FETCH($key); $val += 1;
1715 $self->STORE($key, $val);
1716 $val;
1717 }
1718
1719 1;
1720
1722 MCE::Shared requires Perl 5.10.1 or later. The IO::FDPass module is
1723 highly recommended on UNIX and Windows. This module does not install it
1724 by default.
1725
1727 The source, cookbook, and examples are hosted at GitHub.
1728
1729 · <https://github.com/marioroy/mce-shared>
1730
1731 · <https://github.com/marioroy/mce-cookbook>
1732
1733 · <https://github.com/marioroy/mce-examples>
1734
1736 MCE, MCE::Hobo
1737
1739 Mario E. Roy, <marioeroy AT gmail DOT com>
1740
1742 Copyright (C) 2016-2018 by Mario E. Roy
1743
1744 MCE::Shared is released under the same license as Perl.
1745
1746 See <http://dev.perl.org/licenses/> for more information.
1747
1748
1749
1750perl v5.28.0 2018-08-25 MCE::Shared(3)