Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add on_kill equivalent to Databricks SQL Hook to cancel timed out queries #42668

Open
wants to merge 208 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
208 commits
Select commit Hold shift + click to select a range
e849825
add on_kill override to databricks workflow operator
Aug 15, 2024
52c00f9
on_kill equivalent for DatabricksSqlOperator
Aug 31, 2024
5f148b2
add tests for create_timeout_thread
Sep 5, 2024
9faed81
add note for on_kill in DatabricksCopyIntoOperator
Sep 6, 2024
e877d26
chore: static checks
Sep 9, 2024
c31b23d
remove changes for databricks_workflow.py for PR isolated to databric…
Oct 2, 2024
0c52cbc
remove file accidently picked up rebasing with main
Oct 2, 2024
25b0ba9
Revert "remove file accidently picked up rebasing with main"
Oct 2, 2024
75f8c05
remove file from tracking
Oct 2, 2024
74dae27
adding updated test_databricks_workflow file
Oct 3, 2024
fd5c274
add indent to execution_timeout param in docstring of DatabricksSqlHo…
Oct 3, 2024
c8d474c
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 3, 2024
21bbe89
Update airflow/providers/databricks/hooks/databricks_sql.py
R7L208 Oct 4, 2024
619e8f8
add new exceptions
Oct 4, 2024
a671901
add new exceptions
Oct 4, 2024
991acf6
fix typing warning
Oct 4, 2024
107392d
fix ruff error
Oct 4, 2024
282bc35
fix ruff error
Oct 4, 2024
46a9cdc
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 5, 2024
2bba888
move new exceptions to Databricks provider
Oct 7, 2024
c1f64a6
ruff formatting
Oct 7, 2024
fecee52
add tests for new exceptions
Oct 7, 2024
e6d2e8a
chore - static checks
Oct 7, 2024
ae58115
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 8, 2024
10e92c4
Update airflow/providers/databricks/hooks/databricks_sql.py
R7L208 Oct 10, 2024
09ac253
make new exceptions databricks specific
Oct 17, 2024
c629c6d
Simplify code for recent dbt provider change (#42840)
kaxil Oct 8, 2024
465553d
- Fixes #42432 (#42770)
harjeevanmaan Oct 9, 2024
ea3c1f0
AIP-84 Migrate views /object/historical_metrics_data to Fast API (#42…
bugraoz93 Oct 9, 2024
55afad6
Add support to filter by last dagrun state in UI. (#42779)
tirkarthi Oct 9, 2024
f2b4a44
feat(datasets): make strict_dataset_uri_validation default to True (#…
Lee-W Oct 9, 2024
c1ef8c5
Standard provider bash operator (#42252)
gopidesupavan Oct 9, 2024
bf09c39
Fix mark as success when pod fails while fetching log (#42815)
romsharon98 Oct 9, 2024
44f0faf
Fix spelling; `Airlfow` -> `Airflow` (#42855)
jbampton Oct 9, 2024
f316273
Docs: Add Template field related info for python operators (#42847)
rawwar Oct 9, 2024
a343e78
AIP-84 Get Variable (#42834)
pierrejeambrun Oct 9, 2024
ddc9d80
Move test of DagRun.update_state to better place (#42845)
dstandish Oct 9, 2024
d047fa8
Revert "Remove `sqlalchemy-redshift` dependency from Amazon provider …
mobuchowski Oct 9, 2024
814786a
Prepare docs for Oct 1st adhoc wave of providers (#42862)
eladkal Oct 9, 2024
f8a4fc6
fix more PT004 PyDocStyle checks (#42841)
dannyl1u Oct 9, 2024
eb0a028
Revert Asset to Dataset for Core Extension Doc (#42867)
kaxil Oct 9, 2024
66a3380
Split providers out of the main "airflow/" tree into a UV workspace p…
ashb Oct 9, 2024
875b700
Improving validation of task retries to handle None values (#42532)
sonu4578 Oct 9, 2024
154d70c
Fix deprecated stage names for Pre-commit (#42872)
kaxil Oct 9, 2024
d75a543
Fix ui lint pre-commit interactive mode (#42854)
pierrejeambrun Oct 9, 2024
50f1b42
update k8s tests urllib3 retry config status_forcelist and allowed_me…
gopidesupavan Oct 9, 2024
17c6382
Bump `blacken-docs` pre-commit (#42880)
kaxil Oct 9, 2024
98d354b
Exclude "not-ready" providers when building docs (#42873)
kaxil Oct 9, 2024
c3a8b33
Move Hooks to Standard provider (#42794)
gopidesupavan Oct 10, 2024
40a863a
Fix dag warning documentation (#42858)
pierrejeambrun Oct 10, 2024
a9ab2c6
Fix issue generation for provider after folder restructure (#42883)
eladkal Oct 10, 2024
fe86dfa
AIP-84 Migrate the public endpoint Get DAG to FastAPI (#42848)
omkar-foss Oct 10, 2024
78b5a01
AIP-84 Migrate GET Dag Run endpoint to FastAPI (#42725)
rawwar Oct 10, 2024
92f135c
add majorosdonat as bosch user (#42890)
majorosdonat Oct 10, 2024
be261df
Move the session auth backend to FAB auth manager (#42878)
vincbeck Oct 10, 2024
9373ec4
Move user and roles schemas to fab provider (#42869)
vincbeck Oct 10, 2024
c3d2b9f
Add support for run conf to backfill (#42865)
dstandish Oct 10, 2024
2447b97
Fix main (#42903)
pierrejeambrun Oct 10, 2024
6b9921d
Fix main: js/types/api-generated.ts (#42906)
dstandish Oct 10, 2024
ba28e36
mark TestGKEStartKueueInsideClusterOperator tests with flaky decorato…
gopidesupavan Oct 10, 2024
108f992
Remove dag.run() method (#42761)
dstandish Oct 10, 2024
3bcc005
Add test for behavior for paused backfill (#42837)
dstandish Oct 10, 2024
bc5b57e
Use url_from_endpoint inside HttpHook. (#42785)
simi Oct 10, 2024
bf39566
Fix typo in Breeze (#42919)
kaxil Oct 10, 2024
4a85104
Docs: Add templating info to TaskFlow tutorial (#42887)
infused-kim Oct 11, 2024
24a35fa
Add possibility to override the conn type for Druid (#42793)
Rasnar Oct 11, 2024
baadbbf
fix: HttpSensorTrigger to include `method` when serializing (#42925)
rawwar Oct 11, 2024
6aca278
AIP-84 Fix dag display name search (#42863)
pierrejeambrun Oct 11, 2024
0ef7419
Check _is_canary_run/pr condition in is_legacy_ui_api_labeled method …
gopidesupavan Oct 11, 2024
34fba8b
Allow python 3.12 for the breeze release commands (#42936)
ashb Oct 11, 2024
43a8693
Render errors when getting a list of dags (#42897)
bbovenzi Oct 11, 2024
a9c8a87
Remove the referrer from Webserver to Scarf (#42901)
kaxil Oct 11, 2024
a9ac157
Split providers out of the main "airflow/" tree into a UV workspace p…
vlieven Oct 11, 2024
f4835f3
increase backoff_factor and add try/catch in k8s tests (#42940)
gopidesupavan Oct 11, 2024
e2552c8
Consistent python version checks and troubleshooting (#42944)
potiuk Oct 11, 2024
85a6175
Update README.rst (#42941)
AndreMiranda-dev Oct 11, 2024
52b41fd
Add a "backfill create" command (#42922)
dstandish Oct 11, 2024
2845cfc
Fix SNOWFLAKE_CONN_ID and DAG_ID in Snowpark system tests (#42952)
sfc-gh-jdu Oct 12, 2024
fac5376
Fix broken links in Release Management docs (#42958)
kaxil Oct 12, 2024
46759d2
Restrict looker-sdk version 24.18.0 and microsoft-kiota-http 1.3.4 (#…
gopidesupavan Oct 12, 2024
9a3d385
upgrade trove-classifiers (#42950)
gopidesupavan Oct 12, 2024
b1f33d8
Chart: fix VCT for scheduler in local and persistent mode (#42946)
Aakcht Oct 12, 2024
31118a7
fix: use instance base_container_name to fetch logs on trigger_reentr…
peloyeje Oct 12, 2024
f558d79
fix: 03_contributors_quick_start_docs (#42927)
kgw7401 Oct 12, 2024
957c186
Add skeleton project for task-sdk (#42904)
kaxil Oct 12, 2024
349cbec
Upgrade Trove classifier to `2024.10.12` (#42961)
kaxil Oct 12, 2024
e8dd5cd
Add missed brackets for our dev script for Spell checks (#42965)
kaxil Oct 12, 2024
11ed8aa
FIX: Don't raise a warning in ExecutorSafeguard when execute is calle…
dabla Oct 12, 2024
0b1b150
Upgrade Helm Chart dependencies to latest released (#42816)
potiuk Oct 13, 2024
23cbeef
Fix PythonOperator DAG error when DAG has hyphen in name (#42902)
jason810496 Oct 13, 2024
0feb1f2
fIX handling removal of dependencies (#42967)
potiuk Oct 13, 2024
c7db2aa
mark test_task_workflow_trigger_success as flaky (#42972)
gopidesupavan Oct 13, 2024
cdb4852
Create a User Settings button with light/dark mode toggle as a menu i…
AryanK1511 Oct 13, 2024
f4af0f4
AIP-84 Migrate delete Dag Run endpoint to FastAPI (#42910)
rawwar Oct 13, 2024
b19a945
Make datStats endpoint dag_ids parameter optional (#42955)
michaeljs-c Oct 13, 2024
4a152b0
trove classifier upgrade (#42979)
gopidesupavan Oct 13, 2024
5930331
Change directory used by simple auth manager to store generated passw…
vincbeck Oct 13, 2024
a8c7080
Disable flaky mssql based integration tests (#42811)
gopidesupavan Oct 13, 2024
1568623
Remove BackfillJobRunner class (#42943)
dstandish Oct 14, 2024
147e59b
Add early job_id xcom_push for google provider Beam Pipeline operator…
olegkachur-e Oct 14, 2024
8e2cbba
Add logic to mark backfills as complete (#42683)
dstandish Oct 14, 2024
8c13f86
fix mypy check failure on main (#42976)
rawwar Oct 14, 2024
a4d7125
mark test_setup_constraint_mapped_task_upstream_removed_and_success a…
gopidesupavan Oct 14, 2024
f4f4a0e
Update providers metadata 2024-10-14 (#42995)
eladkal Oct 14, 2024
eae01b7
Update json schema pre-commit to have draft7 schema in file (#43005)
ephraimbuddy Oct 14, 2024
141e562
AIP-84 Patch Variable (#42929)
pierrejeambrun Oct 14, 2024
da4ed78
Commit the session between writing and deletion of RTIF (#42928)
ephraimbuddy Oct 14, 2024
1840ec6
Refactor FastApi Dag and DagRun endpoints tests (#42949)
rawwar Oct 14, 2024
e67b5b2
Minor updates in UI contributing docs (#43013)
kaxil Oct 14, 2024
651df6b
AIP-84 post variable (#42948)
pierrejeambrun Oct 14, 2024
8fa8c51
Feature: Added event_handler parameter in MSGraphAsyncOperator (#42539)
dabla Oct 15, 2024
f177da1
uv version bump to 0.4.20 (#42905)
dirrao Oct 15, 2024
07c180e
Removed deprecated Chainable type from BaseOperator (#42776)
dirrao Oct 15, 2024
acb779a
pin in providers (#43001)
rawwar Oct 15, 2024
ce97729
Move tests_common from "dev" to top-level. (#42985)
potiuk Oct 15, 2024
94c14ae
Add AssetActive model (#42612)
uranusjr Oct 15, 2024
dabc4f2
Migrate the public endpoint Delete DAG to FastAPI (#42914)
omkar-foss Oct 15, 2024
613c7d9
Clarifying PLUGINS_FOLDER permissions by DAG authors (#43022)
amoghrajesh Oct 15, 2024
b21e83e
kubernetes package version bump to 31.0.0 (#42907)
dirrao Oct 15, 2024
0898199
Make google provider pyarrow dependency explicit (#42996)
saucoide Oct 15, 2024
b3611f0
Add search by dag_display_name_pattern on dag list page with rebase (…
luyangliuable Oct 15, 2024
b477033
Deprecate session auth backend (#42909)
vincbeck Oct 15, 2024
96916fc
fix: Change CustomSecurityManager method name (#43034)
kgw7401 Oct 15, 2024
37a5414
Update to 0.4.21 of UV (#43032)
potiuk Oct 15, 2024
8fb844c
Add retry on error 502 and 504 (#42994)
majorosdonat Oct 15, 2024
1824f18
AIP-84 Migrate get connections to FastAPI API #42571 (#42782)
bugraoz93 Oct 15, 2024
dca0534
Make `RedshiftDataOperator` handle multiple queries (#42900)
jroachgolf84 Oct 15, 2024
c322700
Bump `uv` to `0.4.21` in other places (#43033)
kaxil Oct 15, 2024
d8c1edc
require 1.2.1 common.compat for openlineage provider (#43039)
mobuchowski Oct 15, 2024
ae5985e
Detect system color theme and add active state to nav button (#43041)
bbovenzi Oct 15, 2024
2640fe2
Migrate health info to fastapi (#42938)
bbovenzi Oct 15, 2024
915169d
Add upperbound to microsoft-kiota-abstractions (#43021)
rawwar Oct 15, 2024
ac9aa62
fix path to providers dir for mount sources breeze flag (#43042)
mobuchowski Oct 15, 2024
17e4e35
Create Operators for Google Cloud Vertex AI Context Caching (#43008)
CYarros10 Oct 15, 2024
9882506
add lowerbount to requests-toolbelt and replace requests_toolbelt wit…
rawwar Oct 15, 2024
07b96c5
✨ Allow node_selector templating in KPO (#43051)
bdsoha Oct 15, 2024
efc54ff
Rename views to routes for FastAPI apps (#43057)
kaxil Oct 15, 2024
bf50fb8
Follow-up SLA purge (#42808)
ferruzzi Oct 15, 2024
538368f
Bump `uv` to `0.4.22` (#43056)
kaxil Oct 16, 2024
71eaaee
Add 'name' and 'group' to public Asset class (#42812)
uranusjr Oct 16, 2024
90fa32f
Remove zombie ti from its executor (#42932)
uranusjr Oct 16, 2024
771188d
Drop unneeded unique() call on SQL (#43064)
uranusjr Oct 16, 2024
ba9c075
Allow building docs for not-ready providers (#43071)
kaxil Oct 16, 2024
9fdee64
Typo correction in 07_local_virtualenv.rst - "way" (#43072)
biswa-b Oct 16, 2024
b557565
Enable explicit namespaces (#42951)
ferruzzi Oct 16, 2024
bfc0ac3
Replace Mateusz with Kalyan in the triage team (#43074)
potiuk Oct 16, 2024
456c2f6
Reorganize ``api_fastapi`` folder into apps (#43062)
kaxil Oct 16, 2024
1be768e
Tweak AssetAlias to match Asset for AIP-74 additions (#42814)
uranusjr Oct 16, 2024
1f1c90e
Fix pytest from working outside breeze (#43082)
kaxil Oct 16, 2024
943d5b9
Fix typo in ``pyproject.toml`` (#43077)
shahar1 Oct 16, 2024
67d28b4
Remove DAG.get_num_active_runs (#43067)
dstandish Oct 16, 2024
8876c04
Fix UI lint pre-commit hook (#43086)
pierrejeambrun Oct 16, 2024
9f9bb8b
Fixed failing static checks (#43087)
kaxil Oct 16, 2024
2ffd0bf
Added task_instance_mutation_hook for mapped operator index 0 (#42661)
AutomationDev85 Oct 16, 2024
96fbda9
Fix selective checks when only pyproject.toml changes (#43088)
potiuk Oct 16, 2024
dbf3c7f
fix(providers/mongo): prevent applying lower method on boolean field …
josix Oct 16, 2024
4d93ae5
add kubernetes_conn_id to templated fields (#42786)
gopidesupavan Oct 16, 2024
3a7b96b
Add min version to cloudpickle (#43066)
rawwar Oct 16, 2024
674554b
Initialize dashboard page with health (#43090)
bbovenzi Oct 16, 2024
368c780
Update constraints to broken canary (#43095)
jscheffl Oct 16, 2024
106a542
Fix flaky `test_get_dags` in FastAPI routes (#43100)
kaxil Oct 16, 2024
cbe4a2f
SSHHook expose auth_timeout parameter (#43048)
gyandeeps Oct 17, 2024
76fda15
vertex ai training operators: add display_name to rendered fields (#4…
WSHoekstra Oct 17, 2024
8af1dc7
Print the key name when max_length is exceeded (#43061)
jabbera Oct 17, 2024
b799d40
Revert "Fix flaky `test_get_dags` in FastAPI routes (#43100)" (#43108)
pierrejeambrun Oct 17, 2024
79d5af9
Ensure stable secondary ordering (#43085)
pierrejeambrun Oct 17, 2024
97431aa
Add simple "Task Execution" API server (#43015)
kaxil Oct 17, 2024
e8d12b8
Add ClientConnectorError to be a retryable error in databricks provid…
rawwar Oct 17, 2024
46959de
passing the filetype for SlackAPIFileOperator (#43069)
Bowrna Oct 17, 2024
ac1dbba
chore(docs): add required import of BranchDayOfWeekOperator (#43053)
sfirke Oct 17, 2024
702135e
Update unit_tests.rst links error (#43114)
yangyulely Oct 17, 2024
f282eea
added MultipleFilesWebHdfsSensor (#43045)
eilon246810 Oct 17, 2024
0de7b10
Bump to `0.4.23` to fix a uv bug (#43121)
kaxil Oct 17, 2024
90bbe13
Clean up some eslint rules to better fit what we do (#43093)
bbovenzi Oct 17, 2024
89cc08b
Fixed failing static checks & provider tests (#43122)
kaxil Oct 17, 2024
2d15c16
Add min version to python-ldap (#43104)
rawwar Oct 17, 2024
40670b6
feat(providers/fab): Use asset in common provider (#43112)
Lee-W Oct 17, 2024
17cf408
Add `bugraoz93` in the triage team (#43124)
kaxil Oct 17, 2024
64b1d1b
add on_kill override to databricks workflow operator
Aug 15, 2024
cafc23a
on_kill equivalent for DatabricksSqlOperator
Aug 31, 2024
0cab9ab
add tests for create_timeout_thread
Sep 5, 2024
c3e7ecb
add note for on_kill in DatabricksCopyIntoOperator
Sep 6, 2024
c29e506
chore: static checks
Sep 9, 2024
104983c
remove changes for databricks_workflow.py for PR isolated to databric…
Oct 2, 2024
1054626
add indent to execution_timeout param in docstring of DatabricksSqlHo…
Oct 3, 2024
408d8fb
Update airflow/providers/databricks/hooks/databricks_sql.py
R7L208 Oct 4, 2024
30ecf43
add new exceptions
Oct 4, 2024
930794d
add new exceptions
Oct 4, 2024
e662230
fix typing warning
Oct 4, 2024
894466c
fix ruff error
Oct 4, 2024
0ae987d
fix ruff error
Oct 4, 2024
d2f37a1
move new exceptions to Databricks provider
Oct 7, 2024
9d5a3cf
ruff formatting
Oct 7, 2024
2d64d32
add tests for new exceptions
Oct 7, 2024
75d6995
chore - static checks
Oct 7, 2024
abe980c
Update airflow/providers/databricks/hooks/databricks_sql.py
R7L208 Oct 10, 2024
e649f1a
make new exceptions databricks specific
Oct 17, 2024
d18ba69
accidentally removed necessary changes when resolving conflicts
Oct 17, 2024
900cb34
remove redefinition of mock_workflow_run_metadata
Oct 17, 2024
4373704
merge main into this branch bc rebase failed
Oct 17, 2024
03a6b19
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 17, 2024
024b45c
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 18, 2024
3cc4b50
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 18, 2024
8fc1f79
Update providers/tests/databricks/hooks/test_databricks_sql.py
R7L208 Oct 18, 2024
6bd88bd
Update providers/tests/databricks/hooks/test_databricks_sql.py
R7L208 Oct 18, 2024
bcf4b68
make execution_timeout last arg and keyword arg
Oct 18, 2024
de71f76
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 18, 2024
115d845
Merge branch 'main' into lorin/databricks-sql-operator-on_kill-equiva…
R7L208 Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions providers/src/airflow/providers/databricks/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Note: Any AirflowException raised is expected to cause the TaskInstance
# to be marked in an ERROR state
"""Exceptions used by Databricks Provider."""

from __future__ import annotations

from airflow.exceptions import AirflowException


class DatabricksSqlExecutionError(AirflowException):
"""Raised when there is an error in sql execution."""


class DatabricksSqlExecutionTimeout(DatabricksSqlExecutionError):
"""Raised when a sql execution times out."""
41 changes: 39 additions & 2 deletions providers/src/airflow/providers/databricks/hooks/databricks_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
from __future__ import annotations

import threading
import warnings
from collections import namedtuple
from contextlib import closing
from copy import copy
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -35,8 +37,12 @@

from databricks import sql # type: ignore[attr-defined]

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import (
AirflowException,
AirflowProviderDeprecationWarning,
)
from airflow.providers.common.sql.hooks.sql import DbApiHook, return_single_query_results
from airflow.providers.databricks.exceptions import DatabricksSqlExecutionError, DatabricksSqlExecutionTimeout
from airflow.providers.databricks.hooks.databricks_base import BaseDatabricksHook

if TYPE_CHECKING:
Expand All @@ -49,6 +55,16 @@
T = TypeVar("T")


def create_timeout_thread(cur, execution_timeout: timedelta | None) -> threading.Timer | None:
if execution_timeout is not None:
seconds_to_timeout = execution_timeout.total_seconds()
t = threading.Timer(seconds_to_timeout, cur.connection.cancel)
else:
t = None

return t


class DatabricksSqlHook(BaseDatabricksHook, DbApiHook):
"""
Hook to interact with Databricks SQL.
Expand Down Expand Up @@ -184,6 +200,7 @@ def run(
handler: None = ...,
split_statements: bool = ...,
return_last: bool = ...,
execution_timeout: timedelta | None = None,
) -> None: ...

@overload
Expand All @@ -195,6 +212,7 @@ def run(
handler: Callable[[Any], T] = ...,
split_statements: bool = ...,
return_last: bool = ...,
execution_timeout: timedelta | None = None,
) -> tuple | list[tuple] | list[list[tuple] | tuple] | None: ...

def run(
Expand All @@ -205,6 +223,7 @@ def run(
handler: Callable[[Any], T] | None = None,
split_statements: bool = True,
return_last: bool = True,
execution_timeout: timedelta | None = None,
) -> tuple | list[tuple] | list[list[tuple] | tuple] | None:
"""
Run a command or a list of commands.
Expand All @@ -224,6 +243,8 @@ def run(
:param return_last: Whether to return result for only last statement or for all after split
:return: return only result of the LAST SQL expression if handler was provided unless return_last
is set to False.
:param execution_timeout: max time allowed for the execution of this task instance, if it goes beyond
it will raise and fail.
"""
self.descriptions = []
if isinstance(sql, str):
Expand All @@ -248,7 +269,23 @@ def run(
self.set_autocommit(conn, autocommit)

with closing(conn.cursor()) as cur:
self._run_command(cur, sql_statement, parameters) # type: ignore[attr-defined]
t = create_timeout_thread(cur, execution_timeout)

# TODO: adjust this to make testing easier
try:
self._run_command(cur, sql_statement, parameters) # type: ignore[attr-defined]
except Exception as e:
if t is None or t.is_alive():
raise DatabricksSqlExecutionError(
f"Error running SQL statement: {sql_statement}. {str(e)}"
)
raise DatabricksSqlExecutionTimeout(
f"Timeout threshold exceeded for SQL statement: {sql_statement} was cancelled."
)
finally:
if t is not None:
t.cancel()

if handler is not None:
raw_result = handler(cur)
if self.return_tuple:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,8 @@ def execute(self, context: Context) -> Any:
self.log.info("Executing: %s", sql)
hook = self._get_hook()
hook.run(sql)

def on_kill(self) -> None:
# NB: on_kill isn't required for this operator since query cancelling gets
# handled in `DatabricksSqlHook.run()` method which is called in `execute()`
...
Loading