1 # -*- test-case-name: twisted.test.test_adbapi -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 An asynchronous mapping to U{DB-API 2.0<http://www.python.org/topics/database/DatabaseAPI-2.0.html>}.
11 from twisted.internet import threads
12 from twisted.python import reflect, log
13 from twisted.python.deprecate import deprecated
14 from twisted.python.versions import Version
18 class ConnectionLost(Exception):
20 This exception means that a db connection has been lost. Client code may
26 class Connection(object):
28 A wrapper for a DB-API connection instance.
30 The wrapper passes almost everything to the wrapped connection and so has
31 the same API. However, the Connection knows about its pool and also
32 handle reconnecting should when the real connection dies.
35 def __init__(self, pool):
37 self._connection = None
41 # The way adbapi works right now means that closing a connection is
42 # a really bad thing as it leaves a dead connection associated with
43 # a thread in the thread pool.
44 # Really, I think closing a pooled connection should return it to the
45 # pool but that's handled by the runWithConnection method already so,
46 # rather than upsetting anyone by raising an exception, let's ignore
51 if not self._pool.reconnect:
52 self._connection.rollback()
56 self._connection.rollback()
57 curs = self._connection.cursor()
58 curs.execute(self._pool.good_sql)
60 self._connection.commit()
63 log.err(None, "Rollback failed")
65 self._pool.disconnect(self._connection)
68 log.msg("Connection lost.")
70 raise ConnectionLost()
73 if self._connection is not None:
74 self._pool.disconnect(self._connection)
75 self._connection = self._pool.connect()
77 def __getattr__(self, name):
78 return getattr(self._connection, name)
82 """A lightweight wrapper for a DB-API 'cursor' object.
84 Relays attribute access to the DB cursor. That is, you can call
85 execute(), fetchall(), etc., and they will be called on the
86 underlying DB-API cursor object. Attributes will also be
91 def __init__(self, pool, connection):
93 self._connection = connection
97 _cursor = self._cursor
102 if self._cursor is not None:
106 self._cursor = self._connection.cursor()
109 if not self._pool.reconnect:
112 log.err(None, "Cursor creation failed")
115 log.msg('Connection lost, reconnecting')
118 self._cursor = self._connection.cursor()
121 self._connection.reconnect()
124 def __getattr__(self, name):
125 return getattr(self._cursor, name)
128 class ConnectionPool:
130 Represent a pool of connections to a DB-API 2.0 compliant database.
132 @ivar connectionFactory: factory for connections, default to L{Connection}.
133 @type connectionFactory: any callable.
135 @ivar transactionFactory: factory for transactions, default to
137 @type transactionFactory: any callable
139 @ivar shutdownID: C{None} or a handle on the shutdown event trigger
140 which will be used to stop the connection pool workers when the
143 @ivar _reactor: The reactor which will be used to schedule startup and
145 @type _reactor: L{IReactorCore} provider
148 CP_ARGS = "min max name noisy openfun reconnect good_sql".split()
150 noisy = False # if true, generate informational log messages
151 min = 3 # minimum number of connections in pool
152 max = 5 # maximum number of connections in pool
153 name = None # Name to assign to thread pool for debugging
154 openfun = None # A function to call on new connections
155 reconnect = False # reconnect when connections fail
156 good_sql = 'select 1' # a query which should always succeed
158 running = False # true when the pool is operating
159 connectionFactory = Connection
160 transactionFactory = Transaction
162 # Initialize this to None so it's available in close() even if start()
166 def __init__(self, dbapiName, *connargs, **connkw):
167 """Create a new ConnectionPool.
169 Any positional or keyword arguments other than those documented here
170 are passed to the DB-API object when connecting. Use these arguments to
171 pass database names, usernames, passwords, etc.
173 @param dbapiName: an import string to use to obtain a DB-API compatible
174 module (e.g. 'pyPgSQL.PgSQL')
176 @param cp_min: the minimum number of connections in pool (default 3)
178 @param cp_max: the maximum number of connections in pool (default 5)
180 @param cp_noisy: generate informational log messages during operation
183 @param cp_openfun: a callback invoked after every connect() on the
184 underlying DB-API object. The callback is passed a
185 new DB-API connection object. This callback can
186 setup per-connection state such as charset,
189 @param cp_reconnect: detect connections which have failed and reconnect
190 (default False). Failed connections may result in
191 ConnectionLost exceptions, which indicate the
192 query may need to be re-sent.
194 @param cp_good_sql: an sql query which should always succeed and change
195 no state (default 'select 1')
197 @param cp_reactor: use this reactor instead of the global reactor
198 (added in Twisted 10.2).
199 @type cp_reactor: L{IReactorCore} provider
202 self.dbapiName = dbapiName
203 self.dbapi = reflect.namedModule(dbapiName)
205 if getattr(self.dbapi, 'apilevel', None) != '2.0':
206 log.msg('DB API module not DB API 2.0 compliant.')
208 if getattr(self.dbapi, 'threadsafety', 0) < 1:
209 log.msg('DB API module not sufficiently thread-safe.')
211 reactor = connkw.pop('cp_reactor', None)
213 from twisted.internet import reactor
214 self._reactor = reactor
216 self.connargs = connargs
219 for arg in self.CP_ARGS:
220 cp_arg = 'cp_%s' % arg
221 if connkw.has_key(cp_arg):
222 setattr(self, arg, connkw[cp_arg])
225 self.min = min(self.min, self.max)
226 self.max = max(self.min, self.max)
228 self.connections = {} # all connections, hashed on thread id
230 # these are optional so import them here
231 from twisted.python import threadpool
234 self.threadID = thread.get_ident
235 self.threadpool = threadpool.ThreadPool(self.min, self.max)
236 self.startID = self._reactor.callWhenRunning(self._start)
246 Start the connection pool.
248 If you are using the reactor normally, this function does *not*
252 self.threadpool.start()
253 self.shutdownID = self._reactor.addSystemEventTrigger(
254 'during', 'shutdown', self.finalClose)
258 def runWithConnection(self, func, *args, **kw):
260 Execute a function with a database connection and return the result.
262 @param func: A callable object of one argument which will be executed
263 in a thread with a connection from the pool. It will be passed as
264 its first argument a L{Connection} instance (whose interface is
265 mostly identical to that of a connection object for your DB-API
266 module of choice), and its results will be returned as a Deferred.
267 If the method raises an exception the transaction will be rolled
268 back. Otherwise, the transaction will be committed. B{Note} that
269 this function is B{not} run in the main thread: it must be
272 @param *args: positional arguments to be passed to func
274 @param **kw: keyword arguments to be passed to func
276 @return: a Deferred which will fire the return value of
277 C{func(Transaction(...), *args, **kw)}, or a Failure.
279 from twisted.internet import reactor
280 return threads.deferToThreadPool(reactor, self.threadpool,
281 self._runWithConnection,
285 def _runWithConnection(self, func, *args, **kw):
286 conn = self.connectionFactory(self)
288 result = func(conn, *args, **kw)
292 excType, excValue, excTraceback = sys.exc_info()
296 log.err(None, "Rollback failed")
297 raise excType, excValue, excTraceback
300 def runInteraction(self, interaction, *args, **kw):
302 Interact with the database and return the result.
304 The 'interaction' is a callable object which will be executed
305 in a thread using a pooled connection. It will be passed an
306 L{Transaction} object as an argument (whose interface is
307 identical to that of the database cursor for your DB-API
308 module of choice), and its results will be returned as a
309 Deferred. If running the method raises an exception, the
310 transaction will be rolled back. If the method returns a
311 value, the transaction will be committed.
313 NOTE that the function you pass is *not* run in the main
314 thread: you may have to worry about thread-safety in the
315 function you pass to this if it tries to use non-local
318 @param interaction: a callable object whose first argument
319 is an L{adbapi.Transaction}.
321 @param *args: additional positional arguments to be passed
324 @param **kw: keyword arguments to be passed to interaction
326 @return: a Deferred which will fire the return value of
327 'interaction(Transaction(...), *args, **kw)', or a Failure.
329 from twisted.internet import reactor
330 return threads.deferToThreadPool(reactor, self.threadpool,
331 self._runInteraction,
332 interaction, *args, **kw)
335 def runQuery(self, *args, **kw):
336 """Execute an SQL query and return the result.
338 A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
339 The exact nature of the arguments will depend on the specific flavor
340 of DB-API being used, but the first argument in *args be an SQL
341 statement. The result of a subsequent cursor.fetchall() will be
342 fired to the Deferred which is returned. If either the 'execute' or
343 'fetchall' methods raise an exception, the transaction will be rolled
344 back and a Failure returned.
346 The *args and **kw arguments will be passed to the DB-API cursor's
349 @return: a Deferred which will fire the return value of a DB-API
350 cursor's 'fetchall' method, or a Failure.
352 return self.runInteraction(self._runQuery, *args, **kw)
355 def runOperation(self, *args, **kw):
356 """Execute an SQL query and return None.
358 A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
359 The exact nature of the arguments will depend on the specific flavor
360 of DB-API being used, but the first argument in *args will be an SQL
361 statement. This method will not attempt to fetch any results from the
362 query and is thus suitable for INSERT, DELETE, and other SQL statements
363 which do not return values. If the 'execute' method raises an
364 exception, the transaction will be rolled back and a Failure returned.
366 The args and kw arguments will be passed to the DB-API cursor's
369 return: a Deferred which will fire None or a Failure.
371 return self.runInteraction(self._runOperation, *args, **kw)
376 Close all pool connections and shutdown the pool.
379 self._reactor.removeSystemEventTrigger(self.shutdownID)
380 self.shutdownID = None
382 self._reactor.removeSystemEventTrigger(self.startID)
386 def finalClose(self):
387 """This should only be called by the shutdown trigger."""
389 self.shutdownID = None
390 self.threadpool.stop()
392 for conn in self.connections.values():
394 self.connections.clear()
397 """Return a database connection when one becomes available.
399 This method blocks and should be run in a thread from the internal
400 threadpool. Don't call this method directly from non-threaded code.
401 Using this method outside the external threadpool may exceed the
402 maximum number of connections in the pool.
404 @return: a database connection from the pool.
407 tid = self.threadID()
408 conn = self.connections.get(tid)
411 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName,
414 conn = self.dbapi.connect(*self.connargs, **self.connkw)
415 if self.openfun != None:
417 self.connections[tid] = conn
420 def disconnect(self, conn):
421 """Disconnect a database connection associated with this pool.
423 Note: This function should only be used by the same thread which
424 called connect(). As with connect(), this function is not used
425 in normal non-threaded twisted code.
427 tid = self.threadID()
428 if conn is not self.connections.get(tid):
429 raise Exception("wrong connection for thread")
432 del self.connections[tid]
435 def _close(self, conn):
437 log.msg('adbapi closing: %s' % (self.dbapiName,))
441 log.err(None, "Connection close failed")
444 def _runInteraction(self, interaction, *args, **kw):
445 conn = self.connectionFactory(self)
446 trans = self.transactionFactory(self, conn)
448 result = interaction(trans, *args, **kw)
453 excType, excValue, excTraceback = sys.exc_info()
457 log.err(None, "Rollback failed")
458 raise excType, excValue, excTraceback
461 def _runQuery(self, trans, *args, **kw):
462 trans.execute(*args, **kw)
463 return trans.fetchall()
465 def _runOperation(self, trans, *args, **kw):
466 trans.execute(*args, **kw)
468 def __getstate__(self):
469 return {'dbapiName': self.dbapiName,
473 'reconnect': self.reconnect,
474 'good_sql': self.good_sql,
475 'connargs': self.connargs,
476 'connkw': self.connkw}
478 def __setstate__(self, state):
479 self.__dict__ = state
480 self.__init__(self.dbapiName, *self.connargs, **self.connkw)
483 __all__ = ['Transaction', 'ConnectionPool']