1IO::Async::Stream(3) User Contributed Perl Documentation IO::Async::Stream(3)
2
3
4
6 "IO::Async::Stream" - read and write buffers around an IO handle
7
9 use IO::Socket::INET;
10 use IO::Async::Stream;
11
12 use IO::Async::Loop;
13 my $loop = IO::Async::Loop->new();
14
15 my $socket = IO::Socket::INET->new(
16 PeerHost => "some.other.host",
17 PeerPort => 12345,
18 Blocking => 0, # This line is very important
19 );
20
21 my $stream = IO::Async::Stream->new(
22 handle => $socket,
23
24 on_read => sub {
25 my ( $self, $buffref, $closed ) = @_;
26
27 if( $$buffref =~ s/^(.*\n)// ) {
28 print "Received a line $1";
29
30 return 1;
31 }
32
33 if( $closed ) {
34 print "Closed; last partial line is $$buffref\n";
35 }
36
37 return 0;
38 }
39 );
40
41 $stream->write( "An initial line here\n" );
42
43 Or
44
45 my $record_stream = IO::Async::Stream->new(
46 handle => ...,
47
48 on_read => sub {
49 my ( $self, $buffref, $closed ) = @_;
50
51 if( length $$buffref >= 16 ) {
52 my $record = substr( $$buffref, 0, 16, "" );
53 print "Received a 16-byte record: $record\n";
54
55 return 1;
56 }
57
58 if( $closed and length $$buffref ) {
59 print "Closed: a partial record still exists\n";
60 }
61
62 return 0;
63 }
64 );
65
66 Or
67
68 use IO::Handle;
69
70 my $stream = IO::Async::Stream->new(
71 read_handle => \*STDIN,
72 write_handle => \*STDOUT,
73 ...
74 );
75
77 This module provides a subclass of "IO::Async::Handle" which implements
78 asynchronous communications buffers around stream handles. It provides
79 buffering for both incoming and outgoing data, which are transferred to
80 or from the actual OS-level filehandle as controlled by the containing
81 Loop.
82
83 Data can be added to the outgoing buffer at any time using the
84 "write()" method, and will be flushed whenever the underlying handle is
85 notified as being write-ready. Whenever the handle is notified as being
86 read-ready, the data is read in from the handle, and the "on_read" code
87 is called to indicate the data is available. The code can then inspect
88 the buffer and possibly consume any input it considers ready.
89
90 This object may be used in one of two ways; with a callback function,
91 or as a base class.
92
93 Callbacks
94 If certain keys are supplied to the constructor, they should
95 contain CODE references to callback functions that will be called
96 in the following manner:
97
98 $ret = $on_read->( $self, \$buffer, $handleclosed )
99
100 $on_read_error->( $self, $errno )
101
102 $on_outgoing_empty->( $self )
103
104 $on_write_error->( $self, $errno )
105
106 A reference to the calling "IO::Async::Stream" object is passed as
107 the first argument, so that the callback can access it.
108
109 Base Class
110 If a subclass is built, then it can override the "on_read" or
111 "on_outgoing_empty" methods, which will be called in the following
112 manner:
113
114 $ret = $self->on_read( \$buffer, $handleclosed )
115
116 $self->on_read_error( $errno )
117
118 $self->on_outgoing_empty()
119
120 $self->on_write_error( $errno )
121
122 The first argument to the "on_read()" callback is a reference to a
123 plain perl string. The code should inspect and remove any data it
124 likes, but is not required to remove all, or indeed any of the data.
125 Any data remaining in the buffer will be preserved for the next call,
126 the next time more data is received from the handle.
127
128 In this way, it is easy to implement code that reads records of some
129 form when completed, but ignores partially-received records, until all
130 the data is present. If the method is confident no more useful data
131 remains, it should return 0. If not, it should return 1, and the method
132 will be called again. This makes it easy to implement code that handles
133 multiple incoming records at the same time. See the examples at the end
134 of this documentation for more detail.
135
136 The second argument to the "on_read()" method is a scalar indicating
137 whether the handle has been closed. Normally it is false, but will
138 become true once the handle closes. A reference to the buffer is passed
139 to the method in the usual way, so it may inspect data contained in it.
140 Once the method returns a false value, it will not be called again, as
141 the handle is now closed and no more data can arrive.
142
143 The "on_read()" code may also dynamically replace itself with a new
144 callback by returning a CODE reference instead of 0 or 1. The original
145 callback or method that the object first started with may be restored
146 by returning "undef". Whenever the callback is changed in this way, the
147 new code is called again; even if the read buffer is currently empty.
148 See the examples at the end of this documentation for more detail.
149
150 The "on_read_error" and "on_write_error" callbacks are passed the value
151 of $! at the time the error occured. (The $! variable itself, by its
152 nature, may have changed from the original error by the time this
153 callback runs so it should always use the value passed in).
154
155 If an error occurs when the corresponding error callback is not
156 supplied, and there is not a subclass method for it, then the "close()"
157 method is called instead.
158
159 The "on_outgoing_empty" callback is not passed any arguments.
160
162 The following named parameters may be passed to "new" or "configure":
163
164 read_handle => IO
165 The IO handle to read from. Must implement "fileno" and
166 "sysread" methods.
167
168 write_handle => IO
169 The IO handle to write to. Must implement "fileno" and
170 "syswrite" methods.
171
172 handle => IO
173 Shortcut to specifying the same IO handle for both of the
174 above.
175
176 on_read => CODE
177 A CODE reference for when more data is available in the
178 internal receiving buffer.
179
180 on_read_error => CODE
181 A CODE reference for when the "sysread()" method on the read
182 handle fails.
183
184 on_outgoing_empty => CODE
185 A CODE reference for when the writing data buffer becomes
186 empty.
187
188 on_write_error => CODE
189 A CODE reference for when the "syswrite()" method on the write
190 handle fails.
191
192 autoflush => BOOL
193 Optional. If true, the "write" method will attempt to write
194 data to the operating system immediately, without waiting for
195 the loop to indicate the filehandle is write-ready. This is
196 useful, for example, on streams that should contain up-to-date
197 logging or console information.
198
199 It currently defaults to false for any file handle, but future
200 versions of "IO::Async" may enable this by default on STDOUT
201 and STDERR.
202
203 read_all => BOOL
204 Optional. If true, attempt to read as much data from the kernel
205 as possible when the handle becomes readable. By default this
206 is turned off, meaning at most a fixed-size buffer of 8 KiB is
207 read. If there is still more data in the kernel's buffer, the
208 handle will still be readable, and will be read from again.
209 This behaviour allows multiple streams to be multiplexed
210 simultaneously, meaning that a large bulk transfer on one
211 stream cannot starve other filehandles of processing time.
212 Turning this option on may improve bulk data transfer rate, at
213 the risk of delaying or stalling processing on other
214 filehandles.
215
216 write_all => BOOL
217 Optional. Analogous to the "read_all" option, but for writing
218 to filehandles into the kernel buffer. When "autoflush" is
219 enabled, option only affects deferred writing if the initial
220 attempt failed due to buffer space.
221
222 If a read handle is given, it is required that either an "on_read"
223 callback reference is passed, or that the object provides an "on_read"
224 method. It is optional whether either is true for "on_outgoing_empty";
225 if neither is supplied then no action will be taken when the writing
226 buffer becomes empty.
227
228 An "on_read" callback may be supplied even if no read handle is yet
229 given, to be used when a read handle is eventually provided by the
230 "set_handles" method.
231
233 $stream->close
234 A synonym for "close_when_empty". This should not be used when the
235 deferred wait behaviour is required, as the behaviour of "close" may
236 change in a future version of "IO::Async". Instead, call
237 "close_when_empty" directly.
238
239 $stream->close_when_empty
240 If the write buffer is empty, this method calls "close" on the
241 underlying IO handles, and removes the stream from its containing loop.
242 If the write buffer still contains data, then this is deferred until
243 the buffer is empty. This is intended for "write-then-close" one-shot
244 streams.
245
246 $stream->write( "Here is my final data\n" );
247 $stream->close_when_empty;
248
249 Because of this deferred nature, it may not be suitable for error
250 handling. See instead the "close_now" method.
251
252 $stream->close_now
253 This method immediately closes the underlying IO handles and removes
254 the stream from the containing loop. It will not wait to flush the
255 remaining data in the write buffer.
256
257 $stream->write( $data )
258 This method adds data to the outgoing data queue. The data is not yet
259 sent to the handle; this will be done later in the "on_write_ready()"
260 method.
261
262 $data A scalar containing data to write
263
264 If the "autoflush" option is set, this method will try immediately to
265 write the data to the underlying filehandle. If this completes
266 successfully then it will have been written by the time this method
267 returns. If it fails to write completely, then the data is queued as if
268 "autoflush" were not set, and will be flushed later as normal by the
269 "on_write_ready()" method.
270
272 A line-based "on_read()" method
273 The following "on_read()" method accepts incoming "\n"-terminated lines
274 and prints them to the program's "STDOUT" stream.
275
276 sub on_read
277 {
278 my $self = shift;
279 my ( $buffref, $handleclosed ) = @_;
280
281 if( $$buffref =~ s/^(.*\n)// ) {
282 print "Received a line: $1";
283 return 1;
284 }
285
286 return 0;
287 }
288
289 Because a reference to the buffer itself is passed, it is simple to use
290 a "s///" regular expression on the scalar it points at, to both check
291 if data is ready (i.e. a whole line), and to remove it from the buffer.
292 If no data is available then 0 is returned, to indicate it should not
293 be tried again. If a line was successfully extracted, then 1 is
294 returned, to indicate it should try again in case more lines exist in
295 the buffer.
296
297 Dynamic replacement of "on_read()"
298 Consider the following protocol (inspired by IMAP), which consists of
299 "\n"-terminated lines that may have an optional data block attached.
300 The presence of such a data block, as well as its size, is indicated by
301 the line prefix.
302
303 sub on_read
304 {
305 my $self = shift;
306 my ( $buffref, $handleclosed ) = @_;
307
308 if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
309 my $length = $1;
310 my $line = $2;
311
312 return sub {
313 my $self = shift;
314 my ( $buffref, $handleclosed ) = @_;
315
316 return 0 unless length $$buffref >= $length;
317
318 # Take and remove the data from the buffer
319 my $data = substr( $$buffref, 0, $length, "" );
320
321 print "Received a line $line with some data ($data)\n";
322
323 return undef; # Restore the original method
324 }
325 }
326 elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
327 my $line = $1;
328
329 print "Received a line $line with no data\n";
330
331 return 1;
332 }
333 else {
334 print STDERR "Unrecognised input\n";
335 # Handle it somehow
336 }
337 }
338
339 In the case where trailing data is supplied, a new temporary
340 "on_read()" callback is provided in a closure. This closure captures
341 the $length variable so it knows how much data to expect. It also
342 captures the $line variable so it can use it in the event report. When
343 this method has finished reading the data, it reports the event, then
344 restores the original method by returning "undef".
345
347 ยท IO::Handle - Supply object methods for I/O handles
348
350 Paul Evans <leonerd@leonerd.org.uk>
351
352
353
354perl v5.12.1 2010-06-09 IO::Async::Stream(3)