1DBIAgent(3) User Contributed Perl Documentation DBIAgent(3)
2
3
4
6 POE::Component::DBIAgent - POE Component for running asynchronous DBI
7 calls.
8
10 sub _start {
11 my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
12
13 $heap->{helper} = POE::Component::DBIAgent->new( DSN => [$dsn,
14 $username,
15 $password
16 ],
17 Queries => $self->make_queries,
18 Count => 3,
19 Debug => 1,
20 );
21
22 # Queries takes a hashref of the form:
23 # { query_name => 'select blah from table where x = ?',
24 # other_query => 'select blah_blah from big_view',
25 # etc.
26 # }
27
28 $heap->{helper}->query(query_name =>
29 { cookie => 'starting_query' },
30 session => 'get_row_from_dbiagent');
31
32 }
33
34 sub get_row_from_dbiagent {
35 my ($kernel, $self, $heap, $row, $cookie) = @_[KERNEL, OBJECT, HEAP, ARG0, ARG1];
36 if ($row ne 'EOF') {
37
38 # {{{ PROCESS A ROW
39
40 #row is a listref of columns
41
42 # }}} PROCESS A ROW
43
44 } else {
45
46 # {{{ NO MORE ROWS
47
48 #cleanup code here
49
50 # }}} NO MORE ROWS
51
52 }
53
54 }
55
57 DBIAgent is your answer to non-blocking DBI in POE.
58
59 It fires off a configurable number child processes (defaults to 3) and
60 feeds database queries to it via two-way pipe (or sockets ... however
61 POE::Component::Wheel::Run is able to manage it). The primary method
62 is "query".
63
64 Usage
65 After initializing a DBIAgent and storing it in a session's heap, one
66 executes a "query" (or "query_slow") with the query name, destination
67 session (name or id) and destination state (as well as any query
68 parameters, optionally) as arguments. As each row of data comes back
69 from the query, the destination state (in the destination session) is
70 invoked with that row of data in its $_[ARG0] slot. When there are no
71 more rows to return, the data in $_[ARG0] is the string 'EOF'.
72
73 Not EVERY query should run through the DBIAgent. If you need to run a
74 short lookup from within a state, sometimes it can be a hassle to have
75 to define a whole seperate state to receive its value, and resume
76 processing from there.. The determining factor, of course, is how long
77 your query will take to execute. If you are trying to retrieve one row
78 from a properly indexed table, use "$dbh->selectrow_array()". If
79 there's a join involved, or multiple rows, or a view, you probably want
80 to use DBIAgent. If it's a longish query and startup costs (time)
81 don't matter to you, go ahead and do it inline.. but remember the whole
82 of your program suspends waiting for the result. If startup costs DO
83 matter, use DBIAgent.
84
85 Return Values
86 The destination state in the destination session (specified in the call
87 to query()) will receive the return values from the query in its
88 $_[ARG0] parameter. DBIAgent invokes DBI's "fetch" method internally,
89 so the value will be a reference to an array. If your query returns
90 multiple rows, then your state will be invoked multiple times, once per
91 row. ADDITIONALLY, your state will be called one time with $_[ARG0]
92 containing the string 'EOF'. 'EOF' is returned even if the query
93 doesn't return any other rows. This is also what to expect for DML
94 (INSERT, UPDATE, DELETE) queries. A way to utilise this might be as
95 follows:
96
97 sub some_state {
98 #...
99 if ($enough_values_to_begin_updating) {
100
101 $heap->{dbiagent}->query(update_values_query =>
102 this_session =>
103 update_next_value =>
104 shift @{$heap->{values_to_be_updated}}
105 );
106 }
107 }
108
109 sub update_next_value {
110 my ($self, $heap) = @_[OBJECT, HEAP];
111 # we got 'EOF' in ARG0 here but we don't care... we know that an
112 # update has been executed.
113
114 for (1..3) { # Do three at a time!
115 my $value;
116 last unless defined ($value = shift @{$heap->{values_to_be_updated}});
117 $heap->{dbiagent}->query(update_values =>
118 this_session =>
119 update_next_value =>
120 $value
121 );
122 }
123
124 }
125
126 new()
127 Creating an instance creates a POE::Session to manage communication
128 with the Helper processes. Queue management is transparent and
129 automatic. The constructor is named new() (surprised, eh? Yeah, me
130 too). The parameters are as follows:
131
132 DSN An arrayref of parameters to pass to DBI->connect (usually a dsn,
133 username, and password).
134
135 Queries
136 A hashref of the form Query_Name => "$SQL". For example:
137
138 {
139 sysdate => "select sysdate from dual",
140 employee_record => "select * from emp where id = ?",
141 increase_inventory => "update inventory
142 set count = count + ?
143 where item_id = ?",
144 }
145
146 As the example indicates, DBI placeholders are supported, as are
147 DML statements.
148
149 Count
150 The number of helper processes to spawn. Defaults to 3. The
151 optimal value for this parameter will depend on several factors,
152 such as: how many different queries your program will be running,
153 how much RAM you have, how often you run queries, and most
154 importantly, how many queries you intend to run simultaneously.
155
156 ErrorState
157 An listref containing a session and event name to receive error
158 messages from the DBI. The message arrives in ARG0.
159
160 query($query_name, [ \%args, ] $session, $state, [ @parameters ])
161 The query() method takes at least three parameters, plus any bind
162 values for the specific query you are executing.
163
164 $query_name
165 This parameter must be one of the keys to the Queries hashref you
166 passed to the constructor. It is used to indicate which query you
167 wish to execute.
168
169 \%args
170 This is an OPTIONAL hashref of arguments to pass to the query.
171
172 Currently supported arguments:
173
174 hash
175 Return rows hash references instead of array references.
176
177 cookie
178 A cookie to pass to this query. This is passed back unchanged
179 to the destination state in $_[ARG1]. Can be any scalar
180 (including references, and even POE postbacks, so be careful!).
181 You can use this as an identifier if you have one destination
182 state handling multiple different queries or sessions.
183
184 delay
185 Insert a 1ms delay between each row of output.
186
187 I know what you're thinking: "WHY would you want to slow down
188 query responses?!?!?" It has to do with CONCURRENCY. When a
189 response (finally) comes in from the agent after running the
190 query, it floods the input channel with response data. This
191 has the effect of monopolizing POE's attention, so that any
192 other handles (network sockets, pipes, file descriptors) keep
193 getting pushed further back on the queue, and to all other
194 processes EXCEPT the agent, your POE program looks hung for the
195 amount of time it takes to process all of the incoming query
196 data.
197
198 So, we insert 1ms of time via Time::HiRes's "usleep" function.
199 In human terms, this is essentially negligible. But it is just
200 enough time to allow competing handles (sockets, files) to
201 trigger select(), and get handled by the POE::Kernel, in
202 situations where concurrency has priority over transfer rate.
203
204 Naturally, the Time::HiRes module is required for this
205 functionality. If Time::HiRes is not installed, the delay is
206 ignored.
207
208 group
209 Sends the return event back when "group" rows are retrieved
210 from the database, to avoid event spam when selecting lots of
211 rows. NB: using group means that $row will be an arrayref of
212 rows, not just a single row.
213
214 $session, $state
215 These parameters indicate the POE state that is to receive the data
216 returned from the database. The state indicated will receive the
217 data in its $_[ARG0] parameter. PLEASE make sure this is a valid
218 state, otherwise you will spend a LOT of time banging your head
219 against the wall wondering where your query data is.
220
221 @parameters
222 These are any parameters your query requires. WARNING: You must
223 supply exactly as many parameters as your query has placeholders!
224 This means that if your query has NO placeholders, then you should
225 pass NO extra parameters to "query".
226
227 Suggestions to improve this syntax are welcome.
228
229 finish()
230 The finish() method tells DBIAgent that the program is finished sending
231 queries. DBIAgent will shut its helpers down gracefully after they
232 complete any pending queries. If there are no pending queries, the
233 DBIAgent will shut down immediately.
234
236 • Error handling is practically non-existent.
237
238 • The calling syntax is still pretty weak... but improving. We may
239 eventually add an optional attributes hash so that each query can
240 be called with its own individual characteristics.
241
242 • I might eventually want to support returning hashrefs, if there is
243 any demand.
244
245 • Every query is prepared at Helper startup. This could potentially
246 be pretty expensive. Perhaps a cached or deferred loading might be
247 better? This is considering that not every helper is going to run
248 every query, especially if you have a lot of miscellaneous queries.
249
250 Suggestions welcome! Diffs more welcome! :-)
251
253 This module has been fine-tuned and packaged by Rob Bloodgood
254 <robb@empire2.com>. However, most of the queuing code originated with
255 Fletch <fletch@phydeaux.org>, either directly or via his ideas. Thank
256 you for making this module a reality, Fletch!
257
258 However, I own all of the bugs.
259
260 This module is free software; you may redistribute it and/or modify it
261 under the same terms as Perl itself.
262
263
264
265perl v5.36.0 2023-01-20 DBIAgent(3)