1IO::Async::Stream(3)  User Contributed Perl Documentation IO::Async::Stream(3)
2
3
4

NAME

6       "IO::Async::Stream" - read and write buffers around an IO handle
7

SYNOPSIS

9        use IO::Socket::INET;
10        use IO::Async::Stream;
11
12        use IO::Async::Loop;
13        my $loop = IO::Async::Loop->new();
14
15        my $socket = IO::Socket::INET->new(
16           PeerHost => "some.other.host",
17           PeerPort => 12345,
18           Blocking => 0,                   # This line is very important
19        );
20
21        my $stream = IO::Async::Stream->new(
22           handle => $socket,
23
24           on_read => sub {
25              my ( $self, $buffref, $closed ) = @_;
26
27              if( $$buffref =~ s/^(.*\n)// ) {
28                 print "Received a line $1";
29
30                 return 1;
31              }
32
33              if( $closed ) {
34                 print "Closed; last partial line is $$buffref\n";
35              }
36
37              return 0;
38           }
39        );
40
41        $stream->write( "An initial line here\n" );
42
43       Or
44
45        my $record_stream = IO::Async::Stream->new(
46           handle => ...,
47
48           on_read => sub {
49              my ( $self, $buffref, $closed ) = @_;
50
51              if( length $$buffref >= 16 ) {
52                 my $record = substr( $$buffref, 0, 16, "" );
53                 print "Received a 16-byte record: $record\n";
54
55                 return 1;
56              }
57
58              if( $closed and length $$buffref ) {
59                 print "Closed: a partial record still exists\n";
60              }
61
62              return 0;
63           }
64        );
65
66       Or
67
68        use IO::Handle;
69
70        my $stream = IO::Async::Stream->new(
71           read_handle  => \*STDIN,
72           write_handle => \*STDOUT,
73           ...
74        );
75

DESCRIPTION

77       This module provides a subclass of "IO::Async::Handle" which implements
78       asynchronous communications buffers around stream handles. It provides
79       buffering for both incoming and outgoing data, which are transferred to
80       or from the actual OS-level filehandle as controlled by the containing
81       Loop.
82
83       Data can be added to the outgoing buffer at any time using the
84       "write()" method, and will be flushed whenever the underlying handle is
85       notified as being write-ready. Whenever the handle is notified as being
86       read-ready, the data is read in from the handle, and the "on_read" code
87       is called to indicate the data is available. The code can then inspect
88       the buffer and possibly consume any input it considers ready.
89
90       This object may be used in one of two ways; with a callback function,
91       or as a base class.
92
93       Callbacks
94           If certain keys are supplied to the constructor, they should
95           contain CODE references to callback functions that will be called
96           in the following manner:
97
98            $ret = $on_read->( $self, \$buffer, $handleclosed )
99
100            $on_read_error->( $self, $errno )
101
102            $on_outgoing_empty->( $self )
103
104            $on_write_error->( $self, $errno )
105
106           A reference to the calling "IO::Async::Stream" object is passed as
107           the first argument, so that the callback can access it.
108
109       Base Class
110           If a subclass is built, then it can override the "on_read" or
111           "on_outgoing_empty" methods, which will be called in the following
112           manner:
113
114            $ret = $self->on_read( \$buffer, $handleclosed )
115
116            $self->on_read_error( $errno )
117
118            $self->on_outgoing_empty()
119
120            $self->on_write_error( $errno )
121
122       The first argument to the "on_read()" callback is a reference to a
123       plain perl string. The code should inspect and remove any data it
124       likes, but is not required to remove all, or indeed any of the data.
125       Any data remaining in the buffer will be preserved for the next call,
126       the next time more data is received from the handle.
127
128       In this way, it is easy to implement code that reads records of some
129       form when completed, but ignores partially-received records, until all
130       the data is present. If the method is confident no more useful data
131       remains, it should return 0. If not, it should return 1, and the method
132       will be called again. This makes it easy to implement code that handles
133       multiple incoming records at the same time. See the examples at the end
134       of this documentation for more detail.
135
136       The second argument to the "on_read()" method is a scalar indicating
137       whether the handle has been closed. Normally it is false, but will
138       become true once the handle closes. A reference to the buffer is passed
139       to the method in the usual way, so it may inspect data contained in it.
140       Once the method returns a false value, it will not be called again, as
141       the handle is now closed and no more data can arrive.
142
143       The "on_read()" code may also dynamically replace itself with a new
144       callback by returning a CODE reference instead of 0 or 1. The original
145       callback or method that the object first started with may be restored
146       by returning "undef". Whenever the callback is changed in this way, the
147       new code is called again; even if the read buffer is currently empty.
148       See the examples at the end of this documentation for more detail.
149
150       The "on_read_error" and "on_write_error" callbacks are passed the value
151       of $! at the time the error occured. (The $! variable itself, by its
152       nature, may have changed from the original error by the time this
153       callback runs so it should always use the value passed in).
154
155       If an error occurs when the corresponding error callback is not
156       supplied, and there is not a subclass method for it, then the "close()"
157       method is called instead.
158
159       The "on_outgoing_empty" callback is not passed any arguments.
160

PARAMETERS

162       The following named parameters may be passed to "new" or "configure":
163
164       read_handle => IO
165               The IO handle to read from. Must implement "fileno" and
166               "sysread" methods.
167
168       write_handle => IO
169               The IO handle to write to. Must implement "fileno" and
170               "syswrite" methods.
171
172       handle => IO
173               Shortcut to specifying the same IO handle for both of the
174               above.
175
176       on_read => CODE
177               A CODE reference for when more data is available in the
178               internal receiving buffer.
179
180       on_read_error => CODE
181               A CODE reference for when the "sysread()" method on the read
182               handle fails.
183
184       on_outgoing_empty => CODE
185               A CODE reference for when the writing data buffer becomes
186               empty.
187
188       on_write_error => CODE
189               A CODE reference for when the "syswrite()" method on the write
190               handle fails.
191
192       autoflush => BOOL
193               Optional. If true, the "write" method will attempt to write
194               data to the operating system immediately, without waiting for
195               the loop to indicate the filehandle is write-ready. This is
196               useful, for example, on streams that should contain up-to-date
197               logging or console information.
198
199               It currently defaults to false for any file handle, but future
200               versions of "IO::Async" may enable this by default on STDOUT
201               and STDERR.
202
203       read_all => BOOL
204               Optional. If true, attempt to read as much data from the kernel
205               as possible when the handle becomes readable. By default this
206               is turned off, meaning at most a fixed-size buffer of 8 KiB is
207               read. If there is still more data in the kernel's buffer, the
208               handle will still be readable, and will be read from again.
209               This behaviour allows multiple streams to be multiplexed
210               simultaneously, meaning that a large bulk transfer on one
211               stream cannot starve other filehandles of processing time.
212               Turning this option on may improve bulk data transfer rate, at
213               the risk of delaying or stalling processing on other
214               filehandles.
215
216       write_all => BOOL
217               Optional. Analogous to the "read_all" option, but for writing
218               to filehandles into the kernel buffer. When "autoflush" is
219               enabled, option only affects deferred writing if the initial
220               attempt failed due to buffer space.
221
222       If a read handle is given, it is required that either an "on_read"
223       callback reference is passed, or that the object provides an "on_read"
224       method. It is optional whether either is true for "on_outgoing_empty";
225       if neither is supplied then no action will be taken when the writing
226       buffer becomes empty.
227
228       An "on_read" callback may be supplied even if no read handle is yet
229       given, to be used when a read handle is eventually provided by the
230       "set_handles" method.
231

METHODS

233   $stream->close
234       A synonym for "close_when_empty". This should not be used when the
235       deferred wait behaviour is required, as the behaviour of "close" may
236       change in a future version of "IO::Async". Instead, call
237       "close_when_empty" directly.
238
239   $stream->close_when_empty
240       If the write buffer is empty, this method calls "close" on the
241       underlying IO handles, and removes the stream from its containing loop.
242       If the write buffer still contains data, then this is deferred until
243       the buffer is empty. This is intended for "write-then-close" one-shot
244       streams.
245
246        $stream->write( "Here is my final data\n" );
247        $stream->close_when_empty;
248
249       Because of this deferred nature, it may not be suitable for error
250       handling.  See instead the "close_now" method.
251
252   $stream->close_now
253       This method immediately closes the underlying IO handles and removes
254       the stream from the containing loop. It will not wait to flush the
255       remaining data in the write buffer.
256
257   $stream->write( $data )
258       This method adds data to the outgoing data queue. The data is not yet
259       sent to the handle; this will be done later in the "on_write_ready()"
260       method.
261
262       $data   A scalar containing data to write
263
264       If the "autoflush" option is set, this method will try immediately to
265       write the data to the underlying filehandle. If this completes
266       successfully then it will have been written by the time this method
267       returns. If it fails to write completely, then the data is queued as if
268       "autoflush" were not set, and will be flushed later as normal by the
269       "on_write_ready()" method.
270

EXAMPLES

272   A line-based "on_read()" method
273       The following "on_read()" method accepts incoming "\n"-terminated lines
274       and prints them to the program's "STDOUT" stream.
275
276        sub on_read
277        {
278           my $self = shift;
279           my ( $buffref, $handleclosed ) = @_;
280
281           if( $$buffref =~ s/^(.*\n)// ) {
282              print "Received a line: $1";
283              return 1;
284           }
285
286           return 0;
287        }
288
289       Because a reference to the buffer itself is passed, it is simple to use
290       a "s///" regular expression on the scalar it points at, to both check
291       if data is ready (i.e. a whole line), and to remove it from the buffer.
292       If no data is available then 0 is returned, to indicate it should not
293       be tried again. If a line was successfully extracted, then 1 is
294       returned, to indicate it should try again in case more lines exist in
295       the buffer.
296
297   Dynamic replacement of "on_read()"
298       Consider the following protocol (inspired by IMAP), which consists of
299       "\n"-terminated lines that may have an optional data block attached.
300       The presence of such a data block, as well as its size, is indicated by
301       the line prefix.
302
303        sub on_read
304        {
305           my $self = shift;
306           my ( $buffref, $handleclosed ) = @_;
307
308           if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
309              my $length = $1;
310              my $line   = $2;
311
312              return sub {
313                 my $self = shift;
314                 my ( $buffref, $handleclosed ) = @_;
315
316                 return 0 unless length $$buffref >= $length;
317
318                 # Take and remove the data from the buffer
319                 my $data = substr( $$buffref, 0, $length, "" );
320
321                 print "Received a line $line with some data ($data)\n";
322
323                 return undef; # Restore the original method
324              }
325           }
326           elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
327              my $line = $1;
328
329              print "Received a line $line with no data\n";
330
331              return 1;
332           }
333           else {
334              print STDERR "Unrecognised input\n";
335              # Handle it somehow
336           }
337        }
338
339       In the case where trailing data is supplied, a new temporary
340       "on_read()" callback is provided in a closure. This closure captures
341       the $length variable so it knows how much data to expect. It also
342       captures the $line variable so it can use it in the event report. When
343       this method has finished reading the data, it reports the event, then
344       restores the original method by returning "undef".
345

SEE ALSO

347       ยท   IO::Handle - Supply object methods for I/O handles
348

AUTHOR

350       Paul Evans <leonerd@leonerd.org.uk>
351
352
353
354perl v5.12.1                      2010-06-09              IO::Async::Stream(3)
Impressum