From 1e0c069a39040c78697b466fd7ff9ea173276176 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 5 Jul 2018 14:15:51 -0600 Subject: [PATCH 1/2] Add TCP keepalive values to the connection string in postgres --- dbt/adapters/postgres/impl.py | 12 +++++++++++- dbt/adapters/redshift/impl.py | 3 +++ dbt/contracts/connection.py | 6 ++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/postgres/impl.py b/dbt/adapters/postgres/impl.py index f9a69ad386c..89fa0ea538b 100644 --- a/dbt/adapters/postgres/impl.py +++ b/dbt/adapters/postgres/impl.py @@ -15,6 +15,8 @@ class PostgresAdapter(dbt.adapters.default.DefaultAdapter): + DEFAULT_TCP_KEEPALIVE = 0 # 0 means to use the default value + @classmethod @contextmanager def exception_handler(cls, profile, sql, model_name=None, @@ -67,6 +69,13 @@ def open_connection(cls, connection): base_credentials = connection.get('credentials', {}) credentials = cls.get_credentials(base_credentials.copy()) + kwargs = {} + keepalives_idle = credentials.get('keepalives_idle', + cls.DEFAULT_TCP_KEEPALIVE) + # we don't want to pass 0 along to connect() as postgres will try to + # call an invalid setsockopt() call (contrary to the docs). + if keepalives_idle: + kwargs['keepalives_idle'] = keepalives_idle try: handle = psycopg2.connect( @@ -75,7 +84,8 @@ def open_connection(cls, connection): host=credentials.get('host'), password=credentials.get('pass'), port=credentials.get('port'), - connect_timeout=10) + connect_timeout=10, + **kwargs) result['handle'] = handle result['state'] = 'open' diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index 319edf2bf45..d908a3da298 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -9,6 +9,9 @@ class RedshiftAdapter(PostgresAdapter): + + DEFAULT_TCP_KEEPALIVE = 240 + @classmethod def type(cls): return 'redshift' diff --git a/dbt/contracts/connection.py b/dbt/contracts/connection.py index 2183c5d891b..111450fc407 100644 --- a/dbt/contracts/connection.py +++ b/dbt/contracts/connection.py @@ -32,6 +32,9 @@ 'schema': { 'type': 'string', }, + 'keepalives_idle': { + 'type': 'integer', + }, }, 'required': ['dbname', 'host', 'user', 'pass', 'port', 'schema'], } @@ -87,6 +90,9 @@ 'If using IAM auth, the ttl for the temporary credentials' ) }, + 'keepalives_idle': { + 'type': 'integer', + }, 'required': ['dbname', 'host', 'user', 'port', 'schema'] } } From b37b33fc6b11b1b3c62df7dfd7a6e4b3909a78f8 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 6 Jul 2018 07:29:37 -0600 Subject: [PATCH 2/2] add tests --- test/unit/test_postgres_adapter.py | 42 ++++++++++++ test/unit/test_redshift_adapter.py | 100 ++++++++++++++++------------- 2 files changed, 99 insertions(+), 43 deletions(-) diff --git a/test/unit/test_postgres_adapter.py b/test/unit/test_postgres_adapter.py index 3c6148a6d24..c5b6d05b39c 100644 --- a/test/unit/test_postgres_adapter.py +++ b/test/unit/test_postgres_adapter.py @@ -1,7 +1,9 @@ +import mock import unittest import dbt.flags as flags +import dbt.adapters from dbt.adapters.postgres import PostgresAdapter from dbt.exceptions import ValidationException from dbt.logger import GLOBAL_LOGGER as logger # noqa @@ -37,3 +39,43 @@ def test_acquire_connection(self): self.assertEquals(connection.get('state'), 'open') self.assertNotEquals(connection.get('handle'), None) + + @mock.patch('dbt.adapters.postgres.impl.psycopg2') + def test_default_keepalive(self, psycopg2): + connection = PostgresAdapter.acquire_connection(self.profile, 'dummy') + + psycopg2.connect.assert_called_once_with( + dbname='postgres', + user='root', + host='database', + password='password', + port=5432, + connect_timeout=10) + + @mock.patch('dbt.adapters.postgres.impl.psycopg2') + def test_changed_keepalive(self, psycopg2): + self.profile['keepalives_idle'] = 256 + connection = PostgresAdapter.acquire_connection(self.profile, 'dummy') + + psycopg2.connect.assert_called_once_with( + dbname='postgres', + user='root', + host='database', + password='password', + port=5432, + connect_timeout=10, + keepalives_idle=256) + + @mock.patch('dbt.adapters.postgres.impl.psycopg2') + def test_set_zero_keepalive(self, psycopg2): + self.profile['keepalives_idle'] = 0 + connection = PostgresAdapter.acquire_connection(self.profile, 'dummy') + + psycopg2.connect.assert_called_once_with( + dbname='postgres', + user='root', + host='database', + password='password', + port=5432, + connect_timeout=10) + diff --git a/test/unit/test_redshift_adapter.py b/test/unit/test_redshift_adapter.py index 505ae4b536d..f8b4432e3bd 100644 --- a/test/unit/test_redshift_adapter.py +++ b/test/unit/test_redshift_adapter.py @@ -1,6 +1,7 @@ import unittest import mock +import dbt.adapters import dbt.flags as flags import dbt.utils @@ -20,8 +21,7 @@ class TestRedshiftAdapter(unittest.TestCase): def setUp(self): flags.STRICT_MODE = True - def test_implicit_database_conn(self): - implicit_database_profile = { + self.profile = { 'dbname': 'redshift', 'user': 'root', 'host': 'database', @@ -30,69 +30,83 @@ def test_implicit_database_conn(self): 'schema': 'public' } - creds = RedshiftAdapter.get_credentials(implicit_database_profile) - self.assertEquals(creds, implicit_database_profile) + def test_implicit_database_conn(self): + creds = RedshiftAdapter.get_credentials(self.profile) + self.assertEquals(creds, self.profile) def test_explicit_database_conn(self): - explicit_database_profile = { - 'method': 'database', - 'dbname': 'redshift', - 'user': 'root', - 'host': 'database', - 'pass': 'password', - 'port': 5439, - 'schema': 'public' - } + self.profile['method'] = 'database' - creds = RedshiftAdapter.get_credentials(explicit_database_profile) - self.assertEquals(creds, explicit_database_profile) + creds = RedshiftAdapter.get_credentials(self.profile) + self.assertEquals(creds, self.profile) def test_explicit_iam_conn(self): - explicit_iam_profile = { + self.profile.update({ 'method': 'iam', 'cluster_id': 'my_redshift', 'iam_duration_s': 1200, - 'dbname': 'redshift', - 'user': 'root', - 'host': 'database', - 'port': 5439, - 'schema': 'public', - } + }) with mock.patch.object(RedshiftAdapter, 'fetch_cluster_credentials', new=fetch_cluster_credentials): - creds = RedshiftAdapter.get_credentials(explicit_iam_profile) + creds = RedshiftAdapter.get_credentials(self.profile) - expected_creds = dbt.utils.merge(explicit_iam_profile, {'pass': 'tmp_password'}) + expected_creds = dbt.utils.merge(self.profile, {'pass': 'tmp_password'}) self.assertEquals(creds, expected_creds) def test_invalid_auth_method(self): - invalid_profile = { - 'method': 'badmethod', - 'dbname': 'redshift', - 'user': 'root', - 'host': 'database', - 'pass': 'password', - 'port': 5439, - 'schema': 'public' - } + self.profile['method'] = 'badmethod' with self.assertRaises(dbt.exceptions.FailedToConnectException) as context: with mock.patch.object(RedshiftAdapter, 'fetch_cluster_credentials', new=fetch_cluster_credentials): - RedshiftAdapter.get_credentials(invalid_profile) + RedshiftAdapter.get_credentials(self.profile) self.assertTrue('badmethod' in context.exception.msg) def test_invalid_iam_no_cluster_id(self): - invalid_profile = { - 'method': 'iam', - 'dbname': 'redshift', - 'user': 'root', - 'host': 'database', - 'port': 5439, - 'schema': 'public' - } + self.profile['method'] = 'iam' with self.assertRaises(dbt.exceptions.FailedToConnectException) as context: with mock.patch.object(RedshiftAdapter, 'fetch_cluster_credentials', new=fetch_cluster_credentials): - RedshiftAdapter.get_credentials(invalid_profile) + RedshiftAdapter.get_credentials(self.profile) self.assertTrue("'cluster_id' must be provided" in context.exception.msg) + + + @mock.patch('dbt.adapters.postgres.impl.psycopg2') + def test_default_keepalive(self, psycopg2): + connection = RedshiftAdapter.acquire_connection(self.profile, 'dummy') + + psycopg2.connect.assert_called_once_with( + dbname='redshift', + user='root', + host='database', + password='password', + port=5439, + connect_timeout=10, + keepalives_idle=RedshiftAdapter.DEFAULT_TCP_KEEPALIVE) + + @mock.patch('dbt.adapters.postgres.impl.psycopg2') + def test_changed_keepalive(self, psycopg2): + self.profile['keepalives_idle'] = 256 + connection = RedshiftAdapter.acquire_connection(self.profile, 'dummy') + + psycopg2.connect.assert_called_once_with( + dbname='redshift', + user='root', + host='database', + password='password', + port=5439, + connect_timeout=10, + keepalives_idle=256) + + @mock.patch('dbt.adapters.postgres.impl.psycopg2') + def test_set_zero_keepalive(self, psycopg2): + self.profile['keepalives_idle'] = 0 + connection = RedshiftAdapter.acquire_connection(self.profile, 'dummy') + + psycopg2.connect.assert_called_once_with( + dbname='redshift', + user='root', + host='database', + password='password', + port=5439, + connect_timeout=10)