'migrations')
_RETRYABLE_OPERATIONAL_ERROR_CODES = (
+ 1053, # 'Server shutdown in progress'
+ 2003, # 'Can't connect to MySQL server'
2006, # Error code 2006 'MySQL server has gone away' indicates that
# the connection used was closed or dropped
2013, # 'Lost connection to MySQL server during query'
# this error code, not INSERT queries, since we don't know
# whether the query completed before or after the connection
# lost.
- 2026, # 'SSL connection error: unknown error number'
+ 2026, # 'SSL connection error: unknown error number'
)
temp_engine = sqlalchemy.create_engine(connect_url,
connect_args=self._ssl_args,
listeners=[StrictModeListener()])
- databases = temp_engine.execute('SHOW DATABASES').fetchall()
+ databases = self._ExecuteWithEngine('SHOW DATABASES',
+ temp_engine).fetchall()
if (db_name,) not in databases:
- temp_engine.execute('CREATE DATABASE %s' % db_name)
+ self._ExecuteWithEngine('CREATE DATABASE %s' % db_name, temp_engine)
logging.info('Created database %s', db_name)
temp_engine.dispose()
this database connection instance.
"""
self._meta = None
- self._GetEngine().execute('DROP DATABASE %s' % self.db_name)
+ self._Execute('DROP DATABASE %s' % self.db_name)
self._InvalidateEngine()
def QuerySchemaVersion(self):
The current schema version from the database's schema version table,
as an integer, or 0 if the table is empty or nonexistent.
"""
- tables = self._GetEngine().execute('SHOW TABLES').fetchall()
+ tables = self._Execute('SHOW TABLES').fetchall()
if (self.SCHEMA_VERSION_TABLE_NAME,) in tables:
- r = self._GetEngine().execute('SELECT MAX(%s) from %s' %
+ r = self._Execute('SELECT MAX(%s) from %s' %
(self.SCHEMA_VERSION_COL, self.SCHEMA_VERSION_TABLE_NAME))
return r.fetchone()[0] or 0
else:
script = f.read()
queries = [q.strip() for q in script.split(';') if q.strip()]
for q in queries:
+ # This is intentionally not wrapped in retries.
self._GetEngine().execute(q)
def _ReflectToMetadata(self):
return [dict(zip(columns, values)) for values in r.fetchall()]
def _Execute(self, query, *args, **kwargs):
- """Execute a query using engine, with retires.
+ """Execute a query with retries.
- This method wraps execution of a query in retries that create a new
- engine in case the engine's connection has been dropped.
+ This method executes a query using the engine credentials that
+ were set up in the constructor for this object. If necessary,
+ a new engine unique to this pid will be created.
Args:
query: Query to execute, of type string, or sqlalchemy.Executible,
Returns:
The result of .execute(...)
"""
- f = lambda: self._GetEngine().execute(query, *args, **kwargs)
+ return self._ExecuteWithEngine(query, self._GetEngine(),
+ *args, **kwargs)
+
+ def _ExecuteWithEngine(self, query, engine, *args, **kwargs):
+ """Execute a query using |engine|, with retires.
+
+ This method wraps execution of a query against an engine in retries.
+ The engine will automatically create new connections if a prior connection
+ was dropped.
+
+ Args:
+ query: Query to execute, of type string, or sqlalchemy.Executible,
+ or other sqlalchemy-executible statement (see sqlalchemy
+ docs).
+ engine: sqlalchemy.engine to use.
+ *args: Additional args passed along to .execute(...)
+ **kwargs: Additional args passed along to .execute(...)
+
+ Returns:
+ The result of .execute(...)
+ """
+ f = lambda: engine.execute(query, *args, **kwargs)
return retry_util.GenericRetry(
handler=_IsRetryableException,
max_retry=4,
elif cls._ConnectionType == cls._CONNECTION_TYPE_NONE:
return None
else:
- if cls._CachedCIDB:
- return cls._CachedCIDB
- try:
- cls._CachedCIDB = CIDBConnection(cls._ConnectionCredsPath)
- except sqlalchemy.exc.OperationalError as e:
- logging.warn('Retrying to create a database connection, due to '
- 'exception %s.', e)
+ if not cls._CachedCIDB:
cls._CachedCIDB = CIDBConnection(cls._ConnectionCredsPath)
return cls._CachedCIDB