Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / tools / gcp / utils / big_query_utils.py
1 #!/usr/bin/env python2.7
2 # Copyright 2015 gRPC authors.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 from __future__ import print_function
17
18 import argparse
19 import json
20 import uuid
21 import httplib2
22
23 from apiclient import discovery
24 from apiclient.errors import HttpError
25 from oauth2client.client import GoogleCredentials
26
27 # 30 days in milliseconds
28 _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
29 NUM_RETRIES = 3
30
31
32 def create_big_query():
33     """Authenticates with cloud platform and gets a BiqQuery service object
34   """
35     creds = GoogleCredentials.get_application_default()
36     return discovery.build('bigquery',
37                            'v2',
38                            credentials=creds,
39                            cache_discovery=False)
40
41
42 def create_dataset(biq_query, project_id, dataset_id):
43     is_success = True
44     body = {
45         'datasetReference': {
46             'projectId': project_id,
47             'datasetId': dataset_id
48         }
49     }
50
51     try:
52         dataset_req = biq_query.datasets().insert(projectId=project_id,
53                                                   body=body)
54         dataset_req.execute(num_retries=NUM_RETRIES)
55     except HttpError as http_error:
56         if http_error.resp.status == 409:
57             print('Warning: The dataset %s already exists' % dataset_id)
58         else:
59             # Note: For more debugging info, print "http_error.content"
60             print('Error in creating dataset: %s. Err: %s' %
61                   (dataset_id, http_error))
62             is_success = False
63     return is_success
64
65
66 def create_table(big_query, project_id, dataset_id, table_id, table_schema,
67                  description):
68     fields = [{
69         'name': field_name,
70         'type': field_type,
71         'description': field_description
72     } for (field_name, field_type, field_description) in table_schema]
73     return create_table2(big_query, project_id, dataset_id, table_id, fields,
74                          description)
75
76
77 def create_partitioned_table(big_query,
78                              project_id,
79                              dataset_id,
80                              table_id,
81                              table_schema,
82                              description,
83                              partition_type='DAY',
84                              expiration_ms=_EXPIRATION_MS):
85     """Creates a partitioned table. By default, a date-paritioned table is created with
86   each partition lasting 30 days after it was last modified.
87   """
88     fields = [{
89         'name': field_name,
90         'type': field_type,
91         'description': field_description
92     } for (field_name, field_type, field_description) in table_schema]
93     return create_table2(big_query, project_id, dataset_id, table_id, fields,
94                          description, partition_type, expiration_ms)
95
96
97 def create_table2(big_query,
98                   project_id,
99                   dataset_id,
100                   table_id,
101                   fields_schema,
102                   description,
103                   partition_type=None,
104                   expiration_ms=None):
105     is_success = True
106
107     body = {
108         'description': description,
109         'schema': {
110             'fields': fields_schema
111         },
112         'tableReference': {
113             'datasetId': dataset_id,
114             'projectId': project_id,
115             'tableId': table_id
116         }
117     }
118
119     if partition_type and expiration_ms:
120         body["timePartitioning"] = {
121             "type": partition_type,
122             "expirationMs": expiration_ms
123         }
124
125     try:
126         table_req = big_query.tables().insert(projectId=project_id,
127                                               datasetId=dataset_id,
128                                               body=body)
129         res = table_req.execute(num_retries=NUM_RETRIES)
130         print('Successfully created %s "%s"' % (res['kind'], res['id']))
131     except HttpError as http_error:
132         if http_error.resp.status == 409:
133             print('Warning: Table %s already exists' % table_id)
134         else:
135             print('Error in creating table: %s. Err: %s' %
136                   (table_id, http_error))
137             is_success = False
138     return is_success
139
140
141 def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
142     is_success = True
143
144     body = {
145         'schema': {
146             'fields': fields_schema
147         },
148         'tableReference': {
149             'datasetId': dataset_id,
150             'projectId': project_id,
151             'tableId': table_id
152         }
153     }
154
155     try:
156         table_req = big_query.tables().patch(projectId=project_id,
157                                              datasetId=dataset_id,
158                                              tableId=table_id,
159                                              body=body)
160         res = table_req.execute(num_retries=NUM_RETRIES)
161         print('Successfully patched %s "%s"' % (res['kind'], res['id']))
162     except HttpError as http_error:
163         print('Error in creating table: %s. Err: %s' % (table_id, http_error))
164         is_success = False
165     return is_success
166
167
168 def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
169     is_success = True
170     body = {'rows': rows_list}
171     try:
172         insert_req = big_query.tabledata().insertAll(projectId=project_id,
173                                                      datasetId=dataset_id,
174                                                      tableId=table_id,
175                                                      body=body)
176         res = insert_req.execute(num_retries=NUM_RETRIES)
177         if res.get('insertErrors', None):
178             print('Error inserting rows! Response: %s' % res)
179             is_success = False
180     except HttpError as http_error:
181         print('Error inserting rows to the table %s' % table_id)
182         print('Error message: %s' % http_error)
183         is_success = False
184
185     return is_success
186
187
188 def sync_query_job(big_query, project_id, query, timeout=5000):
189     query_data = {'query': query, 'timeoutMs': timeout}
190     query_job = None
191     try:
192         query_job = big_query.jobs().query(
193             projectId=project_id,
194             body=query_data).execute(num_retries=NUM_RETRIES)
195     except HttpError as http_error:
196         print('Query execute job failed with error: %s' % http_error)
197         print(http_error.content)
198     return query_job
199
200
201     # List of (column name, column type, description) tuples
202 def make_row(unique_row_id, row_values_dict):
203     """row_values_dict is a dictionary of column name and column value.
204   """
205     return {'insertId': unique_row_id, 'json': row_values_dict}