Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / third_party / chromite / lib / cidb.py
1 # Copyright 2014 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Continuous Integration Database Library."""
6
7 import datetime
8 import glob
9 import logging
10 import os
11 import re
12 import sqlalchemy
13 import sqlalchemy.exc
14 import sqlalchemy.interfaces
15 from sqlalchemy import MetaData
16 import time
17
18 from chromite.cbuildbot import constants
19
20 CIDB_MIGRATIONS_DIR = os.path.join(constants.CHROMITE_DIR, 'cidb',
21                                    'migrations')
22
23 class DBException(Exception):
24   """General exception class for this module."""
25
26
27 class UnsupportedMethodException(DBException):
28   """Raised when a call is made that the database does not support."""
29
30
31 def minimum_schema(min_version):
32   """Generate a decorator to specify a minimum schema version for a method.
33
34   This decorator should be applied only to instance methods of
35   SchemaVersionedMySQLConnection objects.
36   """
37
38   def decorator(f):
39     def wrapper(self, *args, **kwargs):
40       if self.schema_version < min_version:
41         raise UnsupportedMethodException()
42       return f(self, *args, **kwargs)
43     return wrapper
44   return decorator
45
46
47 class StrictModeListener(sqlalchemy.interfaces.PoolListener):
48   """This listener ensures that STRICT_ALL_TABLES for all connections."""
49   # pylint: disable-msg=W0613
50   def connect(self, dbapi_con, *args, **kwargs):
51     cur = dbapi_con.cursor()
52     cur.execute("SET SESSION sql_mode='STRICT_ALL_TABLES'")
53     cur.close()
54
55
56 class SchemaVersionedMySQLConnection(object):
57   """Connection to a database that is aware of its schema version."""
58
59   SCHEMA_VERSION_TABLE_NAME = 'schemaVersionTable'
60   SCHEMA_VERSION_COL = 'schemaVersion'
61
62   def __init__(self, db_name, db_migrations_dir, db_credentials_dir):
63     """SchemaVersionedMySQLConnection constructor.
64
65     Args:
66       db_name: Name of the database to connect to.
67       db_migrations_dir: Absolute path to directory of migration scripts
68                          for this database.
69       db_credentials_dir: Absolute path to directory containing connection
70                           information to the database. Specifically, this
71                           directory should contain files names user.txt,
72                           password.txt, host.txt, client-cert.pem,
73                           client-key.pem, and server-ca.pem
74     """
75     # None, or a sqlalchemy.MetaData instance
76     self._meta = None
77
78     # pid of process on which _engine was created
79     self._engine_pid = None
80
81     self._engine = None
82
83     self.db_migrations_dir = db_migrations_dir
84     self.db_credentials_dir = db_credentials_dir
85     self.db_name = db_name
86
87     with open(os.path.join(db_credentials_dir, 'password.txt')) as f:
88       password = f.read().strip()
89     with open(os.path.join(db_credentials_dir, 'host.txt')) as f:
90       host = f.read().strip()
91     with open(os.path.join(db_credentials_dir, 'user.txt')) as f:
92       user = f.read().strip()
93
94     cert = os.path.join(db_credentials_dir, 'client-cert.pem')
95     key = os.path.join(db_credentials_dir, 'client-key.pem')
96     ca = os.path.join(db_credentials_dir, 'server-ca.pem')
97     self._ssl_args = {'ssl': {'cert': cert, 'key': key, 'ca': ca}}
98
99     connect_url = sqlalchemy.engine.url.URL('mysql', username=user,
100                                             password=password,
101                                             host=host)
102
103     # Create a temporary engine to connect to the mysql instance, and check if
104     # a database named |db_name| exists. If not, create one. We use a temporary
105     # engine here because the real engine will be opened with a default
106     # database name given by |db_name|.
107     temp_engine = sqlalchemy.create_engine(connect_url,
108                                            connect_args=self._ssl_args,
109                                            listeners=[StrictModeListener()])
110     databases = temp_engine.execute('SHOW DATABASES').fetchall()
111     if (db_name,) not in databases:
112       temp_engine.execute('CREATE DATABASE %s' % db_name)
113       logging.info('Created database %s', db_name)
114
115     temp_engine.dispose()
116
117     # Now create the persistent connection to the database named |db_name|.
118     # If there is a schema version table, read the current schema version
119     # from it. Otherwise, assume schema_version 0.
120     self._connect_url = sqlalchemy.engine.url.URL('mysql', username=user,
121                                                   password=password,
122                                                   host=host, database=db_name)
123
124     self.schema_version = self.QuerySchemaVersion()
125
126   def DropDatabase(self):
127     """Delete all data and tables from database, and drop database.
128
129     Use with caution. All data in database will be deleted. Invalidates
130     this database connection instance.
131     """
132     self._meta = None
133     self._GetEngine().execute('DROP DATABASE %s' % self.db_name)
134     self._InvalidateEngine()
135
136   def QuerySchemaVersion(self):
137     """Query the database for its current schema version number.
138
139     Returns:
140       The current schema version from the database's schema version table,
141       as an integer, or 0 if the table is empty or nonexistent.
142     """
143     tables = self._GetEngine().execute('SHOW TABLES').fetchall()
144     if (self.SCHEMA_VERSION_TABLE_NAME,) in tables:
145       r = self._GetEngine().execute('SELECT MAX(%s) from %s' %
146           (self.SCHEMA_VERSION_COL, self.SCHEMA_VERSION_TABLE_NAME))
147       return r.fetchone()[0] or 0
148     else:
149       return 0
150
151   def _GetMigrationScripts(self):
152     """Look for migration scripts and return their versions and paths."
153
154     Returns:
155       A list of (schema_version, script_path) tuples of the migration
156       scripts for this database, sorted in ascending schema_version order.
157     """
158     # Look for migration script files in the migration script directory,
159     # with names of the form [number]*.sql, and sort these by number.
160     migration_scripts = glob.glob(os.path.join(self.db_migrations_dir, '*.sql'))
161     migrations = []
162     for script in migration_scripts:
163       match = re.match(r'([0-9]*).*', os.path.basename(script))
164       if match:
165         migrations.append((int(match.group(1)), script))
166
167     migrations.sort()
168     return migrations
169
170   def ApplySchemaMigrations(self, maxVersion=None):
171     """Apply pending migration scripts to database, in order.
172
173     Args:
174       maxVersion: The highest version migration script to apply. If
175                   unspecified, all migrations found will be applied.
176     """
177     migrations = self._GetMigrationScripts()
178
179     # Execute the migration scripts in order, asserting that each one
180     # updates the schema version to the expected number. If maxVersion
181     # is specified stop early.
182     for (number, script) in migrations:
183       if maxVersion is not None and number > maxVersion:
184         break
185
186       if number > self.schema_version:
187         # Invalidate self._meta, then run script and ensure that schema
188         # version was increased.
189         self._meta = None
190         logging.info('Running migration script %s', script)
191         self.RunQueryScript(script)
192         self.schema_version = self.QuerySchemaVersion()
193         if self.schema_version != number:
194           raise DBException('Migration script %s did not update '
195                             'schema version to %s as expected. ' % (number,
196                                                                     script))
197
198   def RunQueryScript(self, script_path):
199     """Run a .sql script file located at |script_path| on the database."""
200     with open(script_path, 'r') as f:
201       script = f.read()
202     queries = [q.strip() for q in script.split(';') if q.strip()]
203     for q in queries:
204       self._GetEngine().execute(q)
205
206   def _ReflectToMetadata(self):
207     """Use sqlalchemy reflection to construct MetaData model of database.
208
209     If self._meta is already populated, this does nothing.
210     """
211     if self._meta is not None:
212       return
213     self._meta = MetaData()
214     self._meta.reflect(bind=self._GetEngine())
215
216   def _Insert(self, table, values):
217     """Create and execute an INSERT query.
218
219     Args:
220       table: Table name to insert to.
221       values: Dictionary of column values to insert. Or, list of
222               value dictionaries to insert multiple rows.
223
224     Returns:
225       Integer primary key of the last inserted row.
226     """
227     self._ReflectToMetadata()
228     ins = self._meta.tables[table].insert()
229     r = self._Execute(ins, values)
230     return r.inserted_primary_key[0]
231
232   def _InsertMany(self, table, values):
233     """Create and execute an multi-row INSERT query.
234
235     Args:
236       table: Table name to insert to.
237       values: A list of value dictionaries to insert multiple rows.
238
239     Returns:
240       The number of inserted rows.
241     """
242     self._ReflectToMetadata()
243     ins = self._meta.tables[table].insert()
244     r = self._Execute(ins, values)
245     return r.rowcount
246
247   def _GetPrimaryKey(self, table):
248     """Gets the primary key column of |table|.
249
250     This function requires that the given table have a 1-column promary key.
251
252     Args:
253       table: Name of table to primary key for.
254
255     Returns:
256       A sqlalchemy.sql.schema.Column representing the primary key column.
257
258     Raises:
259       DBException if the table does not have a single column primary key.
260    """
261     self._ReflectToMetadata()
262     t = self._meta.tables[table]
263     key_columns = t.primary_key.columns.values()
264     if len(key_columns) != 1:
265       raise DBException('Table %s does not have a 1-column primary '
266                         'key.' % table)
267     return key_columns[0]
268
269   def _Update(self, table, row_id, values):
270     """Create and execute an UPDATE query by primary key.
271
272     Args:
273       table: Table name to update.
274       row_id: Primary key value of row to update.
275       values: Dictionary of column values to update.
276
277     Returns:
278       The number of rows that were updated (0 or 1).
279     """
280     self._ReflectToMetadata()
281     primary_key = self._GetPrimaryKey(table)
282     upd = self._meta.tables[table].update().where(primary_key==row_id)
283     r = self._Execute(upd, values)
284     return r.rowcount
285
286   def _Execute(self, query, *args, **kwargs):
287     """Execute a query using engine, with retires.
288
289     This method wraps execution of a query in a single retry in case the
290     engine's connection has been dropped.
291
292     Args:
293       query: Query to execute, of type string, or sqlalchemy.Executible,
294              or other sqlalchemy-executible statement (see sqlalchemy
295              docs).
296       *args: Additional args passed along to .execute(...)
297       **kwargs: Additional args passed along to .execute(...)
298
299     Returns:
300       The result of .execute(...)
301     """
302     try:
303       return self._GetEngine().execute(query, *args, **kwargs)
304     except sqlalchemy.exc.OperationalError as e:
305       error_code = e.orig.args[0]
306       # Error coded 2006 'MySQL server has gone away' indicates that the
307       # connection used was closed or dropped.
308       if error_code == 2006:
309         logging.debug('Retrying a query on engine %s, due to dropped '
310                       'connection.', self._GetEngine())
311         return self._GetEngine().execute(query, *args, **kwargs)
312       else:
313         raise
314
315   def _GetEngine(self):
316     """Get the sqlalchemy engine for this process.
317
318     This method creates a new sqlalchemy engine if necessary, and
319     returns an engine that is unique to this process.
320
321     Returns:
322       An sqlalchemy.engine instance for this database.
323     """
324     pid = os.getpid()
325     if pid == self._engine_pid and self._engine:
326       return self._engine
327     else:
328       e = sqlalchemy.create_engine(self._connect_url,
329                                    connect_args=self._ssl_args,
330                                    listeners=[StrictModeListener()])
331       self._engine = e
332       self._engine_pid = pid
333       logging.debug('Created cidb engine %s@%s for pid %s', e.url.username,
334                     e.url.host, pid)
335       return self._engine
336
337   def _InvalidateEngine(self):
338     """Dispose of an sqlalchemy engine."""
339     try:
340       pid = os.getpid()
341       if pid == self._engine_pid and self._engine:
342         self._engine.dispose()
343     finally:
344       self._engine = None
345       self._meta = None
346
347
348 class CIDBConnection(SchemaVersionedMySQLConnection):
349   """Connection to a Continuous Integration database."""
350   def __init__(self, db_credentials_dir):
351     super(CIDBConnection, self).__init__('cidb', CIDB_MIGRATIONS_DIR,
352                                          db_credentials_dir)
353
354   @minimum_schema(2)
355   def InsertBuild(self, builder_name, waterfall, build_number,
356                   build_config, bot_hostname, start_time=None,
357                   master_build_id=None):
358     """Insert a build row.
359
360     Args:
361       builder_name: buildbot builder name.
362       waterfall: buildbot waterfall name.
363       build_number: buildbot build number.
364       build_config: cbuildbot config of build
365       bot_hostname: hostname of bot running the build
366       start_time: (Optional) Unix timestamp of build start time. If None,
367                   current time will be used.
368       master_build_id: (Optional) primary key of master build to this build.
369     """
370     start_time = start_time or time.mktime()
371     dt = datetime.datetime.fromtimestamp(start_time)
372
373     return self._Insert('buildTable', {'builder_name': builder_name,
374                                        'buildbot_generation':
375                                          constants.BUILDBOT_GENERATION,
376                                        'waterfall': waterfall,
377                                        'build_number': build_number,
378                                        'build_config' : build_config,
379                                        'bot_hostname': bot_hostname,
380                                        'start_time' : dt,
381                                        'master_build_id' : master_build_id}
382                         )
383
384   @minimum_schema(3)
385   def InsertCLActions(self, build_id, cl_actions):
386     """Insert a list of |cl_actions|.
387
388     If |cl_actions| is empty, this function does nothing.
389
390     Args:
391       build_id: primary key of build that performed these actions.
392       cl_actions: A list of cl_action tuples.
393
394     Returns:
395       Number of actions inserted.
396     """
397     if not cl_actions:
398       return 0
399
400     values = []
401     # TODO(akeshet): Refactor to use either cl action tuples out of the
402     # metadata dict (as now) OR CLActionTuple objects.
403     for cl_action in cl_actions:
404       change_source = 'internal' if cl_action[0]['internal'] else 'external'
405       change_number = cl_action[0]['gerrit_number']
406       patch_number = cl_action[0]['patch_number']
407       action = cl_action[1]
408       timestamp = cl_action[2]
409       reason = cl_action[3]
410       values.append({
411           'build_id' : build_id,
412           'change_source' : change_source,
413           'change_number': change_number,
414           'patch_number' : patch_number,
415           'action' : action,
416           'timestamp' : datetime.datetime.fromtimestamp(timestamp),
417           'reason' : reason})
418
419     return self._InsertMany('clActionTable', values)
420
421   @minimum_schema(4)
422   def InsertBuildStage(self, build_id, stage_name, board, status,
423                        log_url, duration_seconds, summary):
424     """Insert a build stage into buildStageTable.
425
426     Args:
427       build_id: id of responsible build
428       stage_name: name of stage
429       board: board that stage ran for
430       status: 'pass' or 'fail'
431       log_url: URL of stage log
432       duration_seconds: run time of stage, in seconds
433       summary: summary message of stage
434
435     Returns:
436       Primary key of inserted stage.
437     """
438     return self._Insert('buildStageTable',
439                         {'build_id': build_id,
440                          'name': stage_name,
441                          'board': board,
442                          'status': status,
443                          'log_url': log_url,
444                          'duration_seconds': duration_seconds,
445                          'summary': summary})
446
447   @minimum_schema(4)
448   def InsertBuildStages(self, stages):
449     """For testing only. Insert multiple build stages into buildStageTable.
450
451     This method allows integration tests to more quickly populate build
452     stages into the database, from test data. Normal builder operations are
453     expected to insert build stage rows one at a time, using InsertBuildStage.
454
455     Args:
456       stages: A list of dictionaries, each dictionary containing keys
457               build_id, name, board, status, log_url, duration_seconds, and
458               summary.
459
460     Returns:
461       The number of build stage rows inserted.
462     """
463     if not stages:
464       return 0
465     return self._InsertMany('buildStageTable',
466                             stages)
467
468   @minimum_schema(2)
469   def UpdateMetadata(self, build_id, metadata):
470     """Update the given metadata row in database.
471
472     Args:
473       build_id: id of row to update.
474       metadata: CBuildbotMetadata instance to update with.
475
476     Returns:
477       The number of build rows that were updated (0 or 1).
478     """
479     d = metadata.GetDict()
480     versions = d.get('version') or {}
481     return self._Update('buildTable', build_id,
482                         {'chrome_version': versions.get('chrome'),
483                          'milestone_version': versions.get('milestone'),
484                          'platform_version': versions.get('platform'),
485                          'full_version': versions.get('full'),
486                          'sdk_version': d.get('sdk-versions'),
487                          'toolchain_url': d.get('toolchain-url'),
488                          'build_type': d.get('build_type'),
489                          'metadata_json': metadata.GetJSON()})
490
491   @minimum_schema(2)
492   def FinishBuild(self, build_id, finish_time=None, status=None,
493                   status_pickle=None):
494     """Update the given build row, marking it as finished.
495
496     This should be called once per build, as the last update to the build.
497     This will also mark the row's final=True.
498
499     Args:
500       build_id: id of row to update.
501       finish_time: Unix timestamp of build finish time. If None, current time
502                    will be used.
503       status: Final build status, one of
504               manifest_version.BuilderStatus.COMPLETED_STATUSES.
505       status_pickle: Pickled manifest_version.BuilderStatus.
506     """
507     self._ReflectToMetadata()
508     finish_time = finish_time or time.mktime()
509     dt = datetime.datetime.fromtimestamp(finish_time)
510
511     # TODO(akeshet) atomically update the final field of metadata to
512     # True
513     self._Update('buildTable', build_id, {'finish_time' : dt,
514                                           'status' : status,
515                                           'status_pickle' : status_pickle,
516                                           'final' : True})
517
518
519 def GetCIDBConnectionForBuilder(builder_run):
520   """Get a CIDBConnection.
521
522   Args:
523     builder_run: BuildRun instance for this builder.
524
525   Returns:
526     A CIDBConnection instance.
527   """
528   if builder_run.options.debug:
529     return CIDBConnection(constants.CIDB_DEBUG_BOT_CREDS)
530   else:
531     return CIDBConnection(constants.CIDB_PROD_BOT_CREDS)
532