From 109f89dd4e831600e4c553c4fad3abc1362e5a00 Mon Sep 17 00:00:00 2001 From: Stanislav Lysikov Date: Mon, 20 Mar 2023 15:54:09 +0300 Subject: [PATCH] check reselt for every query in multiquery --- dbt/adapters/vertica/connections.py | 52 +++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/dbt/adapters/vertica/connections.py b/dbt/adapters/vertica/connections.py index 5c1a916..94513a6 100644 --- a/dbt/adapters/vertica/connections.py +++ b/dbt/adapters/vertica/connections.py @@ -55,7 +55,7 @@ class verticaCredentials(Credentials): # backup_server_node: Optional[str] = None # additional_info = { - # 'password': str, + # 'password': str, # 'backup_server_node': list# invalid value to be set in a connection string # } @@ -97,9 +97,9 @@ def open(cls, connection): 'connection_load_balance':credentials.connection_load_balance, 'session_label': f'dbt_{credentials.username}', 'retries': credentials.retries, - + 'backup_server_node':credentials.backup_server_node, - + } # if credentials.ssl.lower() in {'true', 'yes', 'please'}: @@ -119,7 +119,7 @@ def open(cls, connection): context = ssl.create_default_context() conn_info['ssl'] = context logger.debug(f'SSL is on') - + def connect(): handle = vertica_python.connect(**conn_info) logger.debug(f':P Connection work {handle}') @@ -127,8 +127,8 @@ def connect(): connection.handle = handle logger.debug(f':P Connected to database: {credentials.database} at {credentials.host} at {handle}') return handle - - + + except Exception as exc: @@ -184,6 +184,45 @@ def cancel(self, connection): logger.debug(':P Cancel query') connection.handle.cancel() + @classmethod + def get_result_from_cursor(cls, cursor: Any) -> agate.Table: + data: List[Any] = [] + column_names: List[str] = [] + + if cursor.description is not None: + column_names = [col[0] for col in cursor.description] + rows = cursor.fetchall() + + # check result for every query if there are some queries with ; separator + while cursor.nextset(): + check = cursor._message + if isinstance(check, ErrorResponse): + logger.debug(f'Cursor message is: {check}') + self.release() + raise dbt.exceptions.DatabaseException(str(check)) + + data = cls.process_results(column_names, rows) + + return dbt.clients.agate_helper.table_from_data_flat(data, column_names) + + def execute( + self, sql: str, auto_begin: bool = False, fetch: bool = False + ) -> Tuple[AdapterResponse, agate.Table]: + sql = self._add_query_comment(sql) + _, cursor = self.add_query(sql, auto_begin) + response = self.get_response(cursor) + if fetch: + table = self.get_result_from_cursor(cursor) + else: + table = dbt.clients.agate_helper.empty_table() + while cursor.nextset(): + check = cursor._message + if isinstance(check, vertica_python.vertica.messages.ErrorResponse): + logger.debug(f'Cursor message is: {check}') + self.release() + raise dbt.exceptions.DatabaseException(str(check)) + return response, table + @contextmanager def exception_handler(self, sql): @@ -197,4 +236,3 @@ def exception_handler(self, sql): logger.debug(f':P Error: {exc}') self.release() raise dbt.exceptions.RuntimeException(str(exc)) -