1 # Copyright 2014 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Continuous Integration Database Library."""
14 import sqlalchemy.interfaces
15 from sqlalchemy import MetaData
18 from chromite.cbuildbot import constants
20 CIDB_MIGRATIONS_DIR = os.path.join(constants.CHROMITE_DIR, 'cidb',
23 class DBException(Exception):
24 """General exception class for this module."""
27 class UnsupportedMethodException(DBException):
28 """Raised when a call is made that the database does not support."""
31 def minimum_schema(min_version):
32 """Generate a decorator to specify a minimum schema version for a method.
34 This decorator should be applied only to instance methods of
35 SchemaVersionedMySQLConnection objects.
39 def wrapper(self, *args, **kwargs):
40 if self.schema_version < min_version:
41 raise UnsupportedMethodException()
42 return f(self, *args, **kwargs)
47 class StrictModeListener(sqlalchemy.interfaces.PoolListener):
48 """This listener ensures that STRICT_ALL_TABLES for all connections."""
49 # pylint: disable-msg=W0613
50 def connect(self, dbapi_con, *args, **kwargs):
51 cur = dbapi_con.cursor()
52 cur.execute("SET SESSION sql_mode='STRICT_ALL_TABLES'")
56 class SchemaVersionedMySQLConnection(object):
57 """Connection to a database that is aware of its schema version."""
59 SCHEMA_VERSION_TABLE_NAME = 'schemaVersionTable'
60 SCHEMA_VERSION_COL = 'schemaVersion'
62 def __init__(self, db_name, db_migrations_dir, db_credentials_dir):
63 """SchemaVersionedMySQLConnection constructor.
66 db_name: Name of the database to connect to.
67 db_migrations_dir: Absolute path to directory of migration scripts
69 db_credentials_dir: Absolute path to directory containing connection
70 information to the database. Specifically, this
71 directory should contain files names user.txt,
72 password.txt, host.txt, client-cert.pem,
73 client-key.pem, and server-ca.pem
75 # None, or a sqlalchemy.MetaData instance
78 # pid of process on which _engine was created
79 self._engine_pid = None
83 self.db_migrations_dir = db_migrations_dir
84 self.db_credentials_dir = db_credentials_dir
85 self.db_name = db_name
87 with open(os.path.join(db_credentials_dir, 'password.txt')) as f:
88 password = f.read().strip()
89 with open(os.path.join(db_credentials_dir, 'host.txt')) as f:
90 host = f.read().strip()
91 with open(os.path.join(db_credentials_dir, 'user.txt')) as f:
92 user = f.read().strip()
94 cert = os.path.join(db_credentials_dir, 'client-cert.pem')
95 key = os.path.join(db_credentials_dir, 'client-key.pem')
96 ca = os.path.join(db_credentials_dir, 'server-ca.pem')
97 self._ssl_args = {'ssl': {'cert': cert, 'key': key, 'ca': ca}}
99 connect_url = sqlalchemy.engine.url.URL('mysql', username=user,
103 # Create a temporary engine to connect to the mysql instance, and check if
104 # a database named |db_name| exists. If not, create one. We use a temporary
105 # engine here because the real engine will be opened with a default
106 # database name given by |db_name|.
107 temp_engine = sqlalchemy.create_engine(connect_url,
108 connect_args=self._ssl_args,
109 listeners=[StrictModeListener()])
110 databases = temp_engine.execute('SHOW DATABASES').fetchall()
111 if (db_name,) not in databases:
112 temp_engine.execute('CREATE DATABASE %s' % db_name)
113 logging.info('Created database %s', db_name)
115 temp_engine.dispose()
117 # Now create the persistent connection to the database named |db_name|.
118 # If there is a schema version table, read the current schema version
119 # from it. Otherwise, assume schema_version 0.
120 self._connect_url = sqlalchemy.engine.url.URL('mysql', username=user,
122 host=host, database=db_name)
124 self.schema_version = self.QuerySchemaVersion()
126 def DropDatabase(self):
127 """Delete all data and tables from database, and drop database.
129 Use with caution. All data in database will be deleted. Invalidates
130 this database connection instance.
133 self._GetEngine().execute('DROP DATABASE %s' % self.db_name)
134 self._InvalidateEngine()
136 def QuerySchemaVersion(self):
137 """Query the database for its current schema version number.
140 The current schema version from the database's schema version table,
141 as an integer, or 0 if the table is empty or nonexistent.
143 tables = self._GetEngine().execute('SHOW TABLES').fetchall()
144 if (self.SCHEMA_VERSION_TABLE_NAME,) in tables:
145 r = self._GetEngine().execute('SELECT MAX(%s) from %s' %
146 (self.SCHEMA_VERSION_COL, self.SCHEMA_VERSION_TABLE_NAME))
147 return r.fetchone()[0] or 0
151 def _GetMigrationScripts(self):
152 """Look for migration scripts and return their versions and paths."
155 A list of (schema_version, script_path) tuples of the migration
156 scripts for this database, sorted in ascending schema_version order.
158 # Look for migration script files in the migration script directory,
159 # with names of the form [number]*.sql, and sort these by number.
160 migration_scripts = glob.glob(os.path.join(self.db_migrations_dir, '*.sql'))
162 for script in migration_scripts:
163 match = re.match(r'([0-9]*).*', os.path.basename(script))
165 migrations.append((int(match.group(1)), script))
170 def ApplySchemaMigrations(self, maxVersion=None):
171 """Apply pending migration scripts to database, in order.
174 maxVersion: The highest version migration script to apply. If
175 unspecified, all migrations found will be applied.
177 migrations = self._GetMigrationScripts()
179 # Execute the migration scripts in order, asserting that each one
180 # updates the schema version to the expected number. If maxVersion
181 # is specified stop early.
182 for (number, script) in migrations:
183 if maxVersion is not None and number > maxVersion:
186 if number > self.schema_version:
187 # Invalidate self._meta, then run script and ensure that schema
188 # version was increased.
190 logging.info('Running migration script %s', script)
191 self.RunQueryScript(script)
192 self.schema_version = self.QuerySchemaVersion()
193 if self.schema_version != number:
194 raise DBException('Migration script %s did not update '
195 'schema version to %s as expected. ' % (number,
198 def RunQueryScript(self, script_path):
199 """Run a .sql script file located at |script_path| on the database."""
200 with open(script_path, 'r') as f:
202 queries = [q.strip() for q in script.split(';') if q.strip()]
204 self._GetEngine().execute(q)
206 def _ReflectToMetadata(self):
207 """Use sqlalchemy reflection to construct MetaData model of database.
209 If self._meta is already populated, this does nothing.
211 if self._meta is not None:
213 self._meta = MetaData()
214 self._meta.reflect(bind=self._GetEngine())
216 def _Insert(self, table, values):
217 """Create and execute an INSERT query.
220 table: Table name to insert to.
221 values: Dictionary of column values to insert. Or, list of
222 value dictionaries to insert multiple rows.
225 Integer primary key of the last inserted row.
227 self._ReflectToMetadata()
228 ins = self._meta.tables[table].insert()
229 r = self._Execute(ins, values)
230 return r.inserted_primary_key[0]
232 def _InsertMany(self, table, values):
233 """Create and execute an multi-row INSERT query.
236 table: Table name to insert to.
237 values: A list of value dictionaries to insert multiple rows.
240 The number of inserted rows.
242 self._ReflectToMetadata()
243 ins = self._meta.tables[table].insert()
244 r = self._Execute(ins, values)
247 def _GetPrimaryKey(self, table):
248 """Gets the primary key column of |table|.
250 This function requires that the given table have a 1-column promary key.
253 table: Name of table to primary key for.
256 A sqlalchemy.sql.schema.Column representing the primary key column.
259 DBException if the table does not have a single column primary key.
261 self._ReflectToMetadata()
262 t = self._meta.tables[table]
263 key_columns = t.primary_key.columns.values()
264 if len(key_columns) != 1:
265 raise DBException('Table %s does not have a 1-column primary '
267 return key_columns[0]
269 def _Update(self, table, row_id, values):
270 """Create and execute an UPDATE query by primary key.
273 table: Table name to update.
274 row_id: Primary key value of row to update.
275 values: Dictionary of column values to update.
278 The number of rows that were updated (0 or 1).
280 self._ReflectToMetadata()
281 primary_key = self._GetPrimaryKey(table)
282 upd = self._meta.tables[table].update().where(primary_key==row_id)
283 r = self._Execute(upd, values)
286 def _Execute(self, query, *args, **kwargs):
287 """Execute a query using engine, with retires.
289 This method wraps execution of a query in a single retry in case the
290 engine's connection has been dropped.
293 query: Query to execute, of type string, or sqlalchemy.Executible,
294 or other sqlalchemy-executible statement (see sqlalchemy
296 *args: Additional args passed along to .execute(...)
297 **kwargs: Additional args passed along to .execute(...)
300 The result of .execute(...)
303 return self._GetEngine().execute(query, *args, **kwargs)
304 except sqlalchemy.exc.OperationalError as e:
305 error_code = e.orig.args[0]
306 # Error coded 2006 'MySQL server has gone away' indicates that the
307 # connection used was closed or dropped.
308 if error_code == 2006:
309 logging.debug('Retrying a query on engine %s, due to dropped '
310 'connection.', self._GetEngine())
311 return self._GetEngine().execute(query, *args, **kwargs)
315 def _GetEngine(self):
316 """Get the sqlalchemy engine for this process.
318 This method creates a new sqlalchemy engine if necessary, and
319 returns an engine that is unique to this process.
322 An sqlalchemy.engine instance for this database.
325 if pid == self._engine_pid and self._engine:
328 e = sqlalchemy.create_engine(self._connect_url,
329 connect_args=self._ssl_args,
330 listeners=[StrictModeListener()])
332 self._engine_pid = pid
333 logging.debug('Created cidb engine %s@%s for pid %s', e.url.username,
337 def _InvalidateEngine(self):
338 """Dispose of an sqlalchemy engine."""
341 if pid == self._engine_pid and self._engine:
342 self._engine.dispose()
348 class CIDBConnection(SchemaVersionedMySQLConnection):
349 """Connection to a Continuous Integration database."""
350 def __init__(self, db_credentials_dir):
351 super(CIDBConnection, self).__init__('cidb', CIDB_MIGRATIONS_DIR,
355 def InsertBuild(self, builder_name, waterfall, build_number,
356 build_config, bot_hostname, start_time=None,
357 master_build_id=None):
358 """Insert a build row.
361 builder_name: buildbot builder name.
362 waterfall: buildbot waterfall name.
363 build_number: buildbot build number.
364 build_config: cbuildbot config of build
365 bot_hostname: hostname of bot running the build
366 start_time: (Optional) Unix timestamp of build start time. If None,
367 current time will be used.
368 master_build_id: (Optional) primary key of master build to this build.
370 start_time = start_time or time.mktime()
371 dt = datetime.datetime.fromtimestamp(start_time)
373 return self._Insert('buildTable', {'builder_name': builder_name,
374 'buildbot_generation':
375 constants.BUILDBOT_GENERATION,
376 'waterfall': waterfall,
377 'build_number': build_number,
378 'build_config' : build_config,
379 'bot_hostname': bot_hostname,
381 'master_build_id' : master_build_id}
385 def InsertCLActions(self, build_id, cl_actions):
386 """Insert a list of |cl_actions|.
388 If |cl_actions| is empty, this function does nothing.
391 build_id: primary key of build that performed these actions.
392 cl_actions: A list of cl_action tuples.
395 Number of actions inserted.
401 # TODO(akeshet): Refactor to use either cl action tuples out of the
402 # metadata dict (as now) OR CLActionTuple objects.
403 for cl_action in cl_actions:
404 change_source = 'internal' if cl_action[0]['internal'] else 'external'
405 change_number = cl_action[0]['gerrit_number']
406 patch_number = cl_action[0]['patch_number']
407 action = cl_action[1]
408 timestamp = cl_action[2]
409 reason = cl_action[3]
411 'build_id' : build_id,
412 'change_source' : change_source,
413 'change_number': change_number,
414 'patch_number' : patch_number,
416 'timestamp' : datetime.datetime.fromtimestamp(timestamp),
419 return self._InsertMany('clActionTable', values)
422 def InsertBuildStage(self, build_id, stage_name, board, status,
423 log_url, duration_seconds, summary):
424 """Insert a build stage into buildStageTable.
427 build_id: id of responsible build
428 stage_name: name of stage
429 board: board that stage ran for
430 status: 'pass' or 'fail'
431 log_url: URL of stage log
432 duration_seconds: run time of stage, in seconds
433 summary: summary message of stage
436 Primary key of inserted stage.
438 return self._Insert('buildStageTable',
439 {'build_id': build_id,
444 'duration_seconds': duration_seconds,
448 def InsertBuildStages(self, stages):
449 """For testing only. Insert multiple build stages into buildStageTable.
451 This method allows integration tests to more quickly populate build
452 stages into the database, from test data. Normal builder operations are
453 expected to insert build stage rows one at a time, using InsertBuildStage.
456 stages: A list of dictionaries, each dictionary containing keys
457 build_id, name, board, status, log_url, duration_seconds, and
461 The number of build stage rows inserted.
465 return self._InsertMany('buildStageTable',
469 def UpdateMetadata(self, build_id, metadata):
470 """Update the given metadata row in database.
473 build_id: id of row to update.
474 metadata: CBuildbotMetadata instance to update with.
477 The number of build rows that were updated (0 or 1).
479 d = metadata.GetDict()
480 versions = d.get('version') or {}
481 return self._Update('buildTable', build_id,
482 {'chrome_version': versions.get('chrome'),
483 'milestone_version': versions.get('milestone'),
484 'platform_version': versions.get('platform'),
485 'full_version': versions.get('full'),
486 'sdk_version': d.get('sdk-versions'),
487 'toolchain_url': d.get('toolchain-url'),
488 'build_type': d.get('build_type'),
489 'metadata_json': metadata.GetJSON()})
492 def FinishBuild(self, build_id, finish_time=None, status=None,
494 """Update the given build row, marking it as finished.
496 This should be called once per build, as the last update to the build.
497 This will also mark the row's final=True.
500 build_id: id of row to update.
501 finish_time: Unix timestamp of build finish time. If None, current time
503 status: Final build status, one of
504 manifest_version.BuilderStatus.COMPLETED_STATUSES.
505 status_pickle: Pickled manifest_version.BuilderStatus.
507 self._ReflectToMetadata()
508 finish_time = finish_time or time.mktime()
509 dt = datetime.datetime.fromtimestamp(finish_time)
511 # TODO(akeshet) atomically update the final field of metadata to
513 self._Update('buildTable', build_id, {'finish_time' : dt,
515 'status_pickle' : status_pickle,
519 def GetCIDBConnectionForBuilder(builder_run):
520 """Get a CIDBConnection.
523 builder_run: BuildRun instance for this builder.
526 A CIDBConnection instance.
528 if builder_run.options.debug:
529 return CIDBConnection(constants.CIDB_DEBUG_BOT_CREDS)
531 return CIDBConnection(constants.CIDB_PROD_BOT_CREDS)