Skip to content

Commit

Permalink
Merge pull request #76 from analyst-collective/fix/non-superuser-support
Browse files Browse the repository at this point in the history
show helpful messages if permissions denied in db
  • Loading branch information
drewbanin authored Jul 26, 2016
2 parents e4c0cb2 + 6da2748 commit d1f1010
Showing 1 changed file with 52 additions and 5 deletions.
57 changes: 52 additions & 5 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@
from dbt.source import Source
from dbt.utils import find_model_by_name

SCHEMA_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the schema '{schema}'.
Either create the schema manually, or adjust the permissions of the '{user}' user."""

RELATION_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the model '{model}' \
in the schema '{schema}'.\nPlease adjust the permissions of the '{user}' user on the '{schema}' schema.
With a superuser account, execute the following commands, then re-run dbt.
grant usage, create on schema "{schema}" to "{user}";
grant select on all tables in schema "{schema}" to "{user}";
"""

class Runner:
def __init__(self, project, target_path, run_mode):
self.project = project
Expand All @@ -32,12 +43,34 @@ def deserialize_graph(self):

return linker

def create_schema(self):
target_cfg = self.project.run_environment()
def create_schema(self, schema_name):
target = self.get_target()
with target.get_handle() as handle:
with handle.cursor() as cursor:
cursor.execute('create schema if not exists "{}"'.format(schema_name))

def get_schemas(self):
target = self.get_target()
existing = []
with target.get_handle() as handle:
with handle.cursor() as cursor:
cursor.execute('create schema if not exists "{}"'.format(target_cfg['schema']))
cursor.execute('select nspname from pg_catalog.pg_namespace')

existing = [name for (name,) in cursor.fetchall()]
return existing

def create_schema_or_exit(self, schema_name):

target_cfg = self.project.run_environment()
user = target_cfg['user']

try:
self.create_schema(schema_name)
except psycopg2.ProgrammingError as e:
if "permission denied for" in e.diag.message_primary:
raise RuntimeError(SCHEMA_PERMISSION_DENIED_MESSAGE.format(schema=schema_name, user=user))
else:
raise e

def query_for_existing(self, cursor, schema):
sql = """
Expand Down Expand Up @@ -98,7 +131,13 @@ def execute_models(self, linker, models, limit_to=None):
handle.commit()

print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name))
self.__do_execute(cursor, model.contents, model)
try:
self.__do_execute(cursor, model.contents, model)
except psycopg2.ProgrammingError as e:
if "permission denied for" in e.diag.message_primary:
raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user))
else:
raise e
handle.commit()
yield model

Expand All @@ -117,8 +156,16 @@ def run(self, specified_models=None):
print("ERROR: {}".format(str(e)))
print("Exiting")
return

target_cfg = self.project.run_environment()
schema_name = target_cfg['schema']

try:
self.create_schema()
schemas = self.get_schemas()

if schema_name not in schemas:
self.create_schema_or_exit(schema_name)

for model in self.execute_models(linker, compiled_models, limit_to):
yield model, True
except psycopg2.OperationalError as e:
Expand Down

0 comments on commit d1f1010

Please sign in to comment.