Skip to content

Commit

Permalink
Merge pull request #1673 from sjwhitworth/bq-priority
Browse files Browse the repository at this point in the history
Support job priority in BigQuery
  • Loading branch information
drewbanin authored Aug 9, 2019
2 parents ffcaac5 + be53b67 commit da7c950
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
11 changes: 11 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
'location': {
'type': 'string',
},
'priority': {
'enum': ['interactive', 'batch'],
},
},
'required': ['method', 'database', 'schema'],
}
Expand Down Expand Up @@ -192,6 +195,14 @@ def raw_execute(self, sql, fetch=False):

job_config = google.cloud.bigquery.QueryJobConfig()
job_config.use_legacy_sql = False

priority = conn.credentials.get('priority', 'interactive')
if priority == "batch":
job_config.priority = google.cloud.bigquery.QueryPriority.BATCH
else:
job_config.priority = \
google.cloud.bigquery.QueryPriority.INTERACTIVE

query_job = client.query(sql, job_config)

# this blocks until the query has completed
Expand Down
17 changes: 17 additions & 0 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def setUp(self):
'schema': 'dummy_schema',
'threads': 1,
'location': 'Luna Station',
'priority': 'batch',
},
},
'target': 'oauth',
Expand Down Expand Up @@ -104,6 +105,22 @@ def test_acquire_connection_service_account_validations(self, mock_open_connecti

mock_open_connection.assert_called_once()

@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
def test_acquire_connection_priority(self, mock_open_connection):
adapter = self.get_adapter('loc')
try:
connection = adapter.acquire_connection('dummy')
self.assertEqual(connection.get('type'), 'bigquery')
self.assertEqual(connection.credentials.get('priority'), 'batch')

except dbt.exceptions.ValidationException as e:
self.fail('got ValidationException: {}'.format(str(e)))

except BaseException as e:
raise

mock_open_connection.assert_called_once()

def test_cancel_open_connections_empty(self):
adapter = self.get_adapter('oauth')
self.assertEqual(adapter.cancel_open_connections(), None)
Expand Down

0 comments on commit da7c950

Please sign in to comment.