1 #!/usr/bin/env python2.7
2 # Copyright 2015 gRPC authors.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from __future__ import print_function
23 from apiclient import discovery
24 from apiclient.errors import HttpError
25 from oauth2client.client import GoogleCredentials
27 # 30 days in milliseconds
28 _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
32 def create_big_query():
33 """Authenticates with cloud platform and gets a BiqQuery service object
35 creds = GoogleCredentials.get_application_default()
36 return discovery.build('bigquery',
39 cache_discovery=False)
42 def create_dataset(biq_query, project_id, dataset_id):
46 'projectId': project_id,
47 'datasetId': dataset_id
52 dataset_req = biq_query.datasets().insert(projectId=project_id,
54 dataset_req.execute(num_retries=NUM_RETRIES)
55 except HttpError as http_error:
56 if http_error.resp.status == 409:
57 print('Warning: The dataset %s already exists' % dataset_id)
59 # Note: For more debugging info, print "http_error.content"
60 print('Error in creating dataset: %s. Err: %s' %
61 (dataset_id, http_error))
66 def create_table(big_query, project_id, dataset_id, table_id, table_schema,
71 'description': field_description
72 } for (field_name, field_type, field_description) in table_schema]
73 return create_table2(big_query, project_id, dataset_id, table_id, fields,
77 def create_partitioned_table(big_query,
84 expiration_ms=_EXPIRATION_MS):
85 """Creates a partitioned table. By default, a date-paritioned table is created with
86 each partition lasting 30 days after it was last modified.
91 'description': field_description
92 } for (field_name, field_type, field_description) in table_schema]
93 return create_table2(big_query, project_id, dataset_id, table_id, fields,
94 description, partition_type, expiration_ms)
97 def create_table2(big_query,
108 'description': description,
110 'fields': fields_schema
113 'datasetId': dataset_id,
114 'projectId': project_id,
119 if partition_type and expiration_ms:
120 body["timePartitioning"] = {
121 "type": partition_type,
122 "expirationMs": expiration_ms
126 table_req = big_query.tables().insert(projectId=project_id,
127 datasetId=dataset_id,
129 res = table_req.execute(num_retries=NUM_RETRIES)
130 print('Successfully created %s "%s"' % (res['kind'], res['id']))
131 except HttpError as http_error:
132 if http_error.resp.status == 409:
133 print('Warning: Table %s already exists' % table_id)
135 print('Error in creating table: %s. Err: %s' %
136 (table_id, http_error))
141 def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
146 'fields': fields_schema
149 'datasetId': dataset_id,
150 'projectId': project_id,
156 table_req = big_query.tables().patch(projectId=project_id,
157 datasetId=dataset_id,
160 res = table_req.execute(num_retries=NUM_RETRIES)
161 print('Successfully patched %s "%s"' % (res['kind'], res['id']))
162 except HttpError as http_error:
163 print('Error in creating table: %s. Err: %s' % (table_id, http_error))
168 def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
170 body = {'rows': rows_list}
172 insert_req = big_query.tabledata().insertAll(projectId=project_id,
173 datasetId=dataset_id,
176 res = insert_req.execute(num_retries=NUM_RETRIES)
177 if res.get('insertErrors', None):
178 print('Error inserting rows! Response: %s' % res)
180 except HttpError as http_error:
181 print('Error inserting rows to the table %s' % table_id)
182 print('Error message: %s' % http_error)
188 def sync_query_job(big_query, project_id, query, timeout=5000):
189 query_data = {'query': query, 'timeoutMs': timeout}
192 query_job = big_query.jobs().query(
193 projectId=project_id,
194 body=query_data).execute(num_retries=NUM_RETRIES)
195 except HttpError as http_error:
196 print('Query execute job failed with error: %s' % http_error)
197 print(http_error.content)
201 # List of (column name, column type, description) tuples
202 def make_row(unique_row_id, row_values_dict):
203 """row_values_dict is a dictionary of column name and column value.
205 return {'insertId': unique_row_id, 'json': row_values_dict}