Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prep 2.2.6 #308

Merged
merged 11 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions arrow_result.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ cdef class ArrowResult:
else self._connection._chunk_downloader_class(
chunks, self._connection, self._cursor, qrmk, chunk_headers,
query_result_format='arrow',
prefetch_threads=self._connection.client_prefetch_threads,
use_ijson=False)
prefetch_threads=self._connection.client_prefetch_threads)

def __iter__(self):
return self
Expand Down
23 changes: 6 additions & 17 deletions chunk_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
import time
from collections import namedtuple
from gzip import GzipFile
from io import BytesIO
from logging import getLogger
from multiprocessing.pool import ThreadPool
from threading import Condition, Lock

from snowflake.connector.gzip_decoder import decompress_raw_data
from snowflake.connector.util_text import split_rows_from_stream

from .arrow_context import ArrowConverterContext
from .errorcode import ER_CHUNK_DOWNLOAD_FAILED
Expand Down Expand Up @@ -51,9 +49,7 @@ class SnowflakeChunkDownloader(object):

def _pre_init(self, chunks, connection, cursor, qrmk, chunk_headers,
query_result_format='JSON',
prefetch_threads=DEFAULT_CLIENT_PREFETCH_THREADS,
use_ijson=False):
self._use_ijson = use_ijson
prefetch_threads=DEFAULT_CLIENT_PREFETCH_THREADS):
self._query_result_format = query_result_format

self._downloader_error = None
Expand Down Expand Up @@ -97,12 +93,10 @@ def _pre_init(self, chunks, connection, cursor, qrmk, chunk_headers,

def __init__(self, chunks, connection, cursor, qrmk, chunk_headers,
query_result_format='JSON',
prefetch_threads=DEFAULT_CLIENT_PREFETCH_THREADS,
use_ijson=False):
prefetch_threads=DEFAULT_CLIENT_PREFETCH_THREADS):
self._pre_init(chunks, connection, cursor, qrmk, chunk_headers,
query_result_format=query_result_format,
prefetch_threads=prefetch_threads,
use_ijson=use_ijson)
prefetch_threads=prefetch_threads)
logger.debug('Chunk Downloader in memory')
for idx in range(self._effective_threads):
self._pool.apply_async(self._download_chunk, [idx])
Expand Down Expand Up @@ -257,8 +251,7 @@ def _fetch_chunk(self, url, headers):
"""
Fetch the chunk from S3.
"""
handler = JsonBinaryHandler(is_raw_binary_iterator=True,
use_ijson=self._use_ijson) \
handler = JsonBinaryHandler(is_raw_binary_iterator=True) \
if self._query_result_format == 'json' else \
ArrowBinaryHandler(self._cursor, self._connection)

Expand Down Expand Up @@ -299,9 +292,8 @@ class JsonBinaryHandler(RawBinaryDataHandler):
"""
Convert result chunk in json format into interator
"""
def __init__(self, is_raw_binary_iterator, use_ijson):
def __init__(self, is_raw_binary_iterator):
self._is_raw_binary_iterator = is_raw_binary_iterator
self._use_ijson = use_ijson

def to_iterator(self, raw_data_fd, download_time):
parse_start_time = get_time_millis()
Expand All @@ -310,10 +302,7 @@ def to_iterator(self, raw_data_fd, download_time):
).decode('utf-8', 'replace')
if not self._is_raw_binary_iterator:
ret = json.loads(raw_data)
elif not self._use_ijson:
ret = iter(json.loads(raw_data))
else:
ret = split_rows_from_stream(BytesIO(raw_data.encode('utf-8')))
ret = iter(json.loads(raw_data))

parse_end_time = get_time_millis()

Expand Down
2 changes: 1 addition & 1 deletion ci/build_pyarrow.bat
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ if %errorlevel% neq 0 goto :error
python -m pip install --upgrade pip
if %errorlevel% neq 0 goto :error

pip install --upgrade setuptools wheel Cython pyarrow==0.16.0 numpy
pip install --upgrade setuptools wheel Cython pyarrow==0.17.0 numpy
if %errorlevel% neq 0 goto :error

cd %CONNECTOR_DIR%
Expand Down
2 changes: 1 addition & 1 deletion ci/build_pyarrow_darwin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function build_connector_with_python() {
rm -f generated_version.py || true
fi
# This needs to be kept in sync with setup.py
pip install -U pyarrow==0.16.0 Cython flake8
pip install -U pyarrow==0.17.0 Cython flake8
flake8
MACOSX_DEPLOYMENT_TARGET=10.12 python setup.py bdist_wheel -d $CONNECTOR_DIR/dist/
unset ENABLE_EXT_MODULES
Expand Down
2 changes: 1 addition & 1 deletion ci/install.bat
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ call env\Scripts\activate
# https:/pypa/pip/issues/6566
python -m pip install --upgrade pip
:: These versions have to be kept in sync with what is pinned in setup.py manually
pip install "pyarrow==0.16.0"
pip install "pyarrow==0.17.0"
pip install wheel
pip install Cython
set ENABLE_EXT_MODULES=true
Expand Down
2 changes: 1 addition & 1 deletion ci/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ source ./venv/bin/activate
if [ "$TRAVIS_OS_NAME" == "osx" ]; then
export ENABLE_EXT_MODULES=true
cd $THIS_DIR/..
pip install Cython pyarrow==0.16.0 wheel
pip install Cython pyarrow==0.17.0 wheel
python setup.py bdist_wheel
unset ENABLE_EXT_MODULES
CONNECTOR_WHL=$(ls $THIS_DIR/../dist/snowflake_connector_python*.whl | sort -r | head -n 1)
Expand Down
39 changes: 13 additions & 26 deletions connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ def DefaultConverterClass():
u'support_negative_year': True, # snowflake
u'log_max_query_length': LOG_MAX_QUERY_LENGTH, # snowflake
u'disable_request_pooling': False, # snowflake
# Enable temporary credential file for Linux, default false. Mac/Win will overlook this
u'client_store_temporary_credential': False,
u'client_store_temporary_credential': False, # enable temporary credential file for Linux, default false. Mac/Win will overlook this
'use_openssl_only': False, # only use openssl instead of python only crypto modules
}

APPLICATION_RE = re.compile(r'^[\w.-]+$')
APPLICATION_RE = re.compile(r'[\w\d_]+')

# adding the exception class to Connection class
for m in [method for method in dir(errors) if
callable(getattr(errors, method))]:
Expand All @@ -159,21 +159,6 @@ def DefaultConverterClass():
logger = getLogger(__name__)


def verify_application(val: str) -> None:
"""Raise ProgrammingError if invalid application name"""
if len(val) > 50:
raise ProgrammingError(
msg="Application name is too long: {}".format(val),
errno=0
)

if not APPLICATION_RE.match(val):
raise ProgrammingError(
msg='Invalid application name: {}'.format(val),
errno=0
)


class SnowflakeConnection(object):
u"""
Implementation of the connection object for the Snowflake Database. Use
Expand Down Expand Up @@ -417,10 +402,6 @@ def application(self):
"""
return self._application

@application.setter
def application(self, val: str) -> None:
verify_application(val)

@property
def errorhandler(self):
u"""
Expand Down Expand Up @@ -720,7 +701,7 @@ def __open_connection(self):
# enable storing temporary credential in a file
self._session_parameters[
PARAMETER_CLIENT_STORE_TEMPORARY_CREDENTIAL] = \
self._client_store_temporary_credential if IS_LINUX else True
self._client_store_temporary_credential if IS_LINUX else True

auth = Auth(self.rest)
if not auth.read_temporary_credential(
Expand All @@ -746,10 +727,16 @@ def __config(self, **kwargs):
if name == u'sequence_counter':
self.sequence_counter = value
elif name == u'application':
verify_application(value)
setattr(self, '_' + name, value)
if not APPLICATION_RE.match(value):
msg = u'Invalid application name: {}'.format(value)
raise ProgrammingError(
msg=msg,
errno=0
)
else:
setattr(self, u'_' + name, value)
else:
setattr(self, '_' + name, value)
setattr(self, u'_' + name, value)

if self._numpy:
try:
Expand Down
10 changes: 5 additions & 5 deletions cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ def execute(self, command, params=None, timeout=None,
u'total'] if u'data' in ret and u'total' in ret[
u'data'] else -1
return data
self._init_result_and_meta(data, _use_ijson)
self._init_result_and_meta(data)
else:
self._total_rowcount = ret[u'data'][
u'total'] if u'data' in ret and u'total' in ret[u'data'] else -1
Expand Down Expand Up @@ -611,7 +611,7 @@ def _is_dml(self, data):
and int(data[u'statementTypeId']) in \
STATEMENT_TYPE_ID_DML_SET

def _init_result_and_meta(self, data, use_ijson=False):
def _init_result_and_meta(self, data):
is_dml = self._is_dml(data)
self._query_result_format = data.get(u'queryResultFormat', u'json')
logger.debug(u"Query result format: %s", self._query_result_format)
Expand All @@ -636,7 +636,7 @@ def _init_result_and_meta(self, data, use_ijson=False):
self.check_can_use_arrow_resultset()
self._result = ArrowResult(data, self, use_dict_result=self._use_dict_result)
else:
self._result = self._json_result_class(data, self, use_ijson)
self._result = self._json_result_class(data, self)

if is_dml:
updated_rows = 0
Expand Down Expand Up @@ -694,7 +694,7 @@ def check_can_use_pandas(self):
}
)

def query_result(self, qid, _use_ijson=False):
def query_result(self, qid):
url = '/queries/{qid}/result'.format(qid=qid)
ret = self._connection.rest.request(url=url, method='get')
self._sfqid = ret[u'data'][
Expand All @@ -707,7 +707,7 @@ def query_result(self, qid, _use_ijson=False):

if ret.get(u'success'):
data = ret.get(u'data')
self._init_result_and_meta(data, _use_ijson)
self._init_result_and_meta(data)
else:
logger.info(u'failed')
logger.debug(ret)
Expand Down
2 changes: 1 addition & 1 deletion docker/manylinux2010/scripts/build_virtualenvs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ for PYTHON in ${PYTHON_VERSIONS}; do
"$(cpython_path $PYTHON ${U_WIDTH})/bin/virtualenv" -p ${PYTHON_INTERPRETER} --no-download /home/user/venv-build-${PYTHON}
source /home/user/venv-build-${PYTHON}/bin/activate
pip install -U pip
pip install "cython==0.29.15" "setuptools" "flake8" "wheel" "pyarrow==0.16.0"
pip install "cython==0.29.15" "setuptools" "flake8" "wheel" "pyarrow==0.17.0"
deactivate
done

Expand Down
13 changes: 6 additions & 7 deletions json_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@


class JsonResult:
def __init__(self, raw_response, cursor, use_ijson=False):
def __init__(self, raw_response, cursor):
self._reset()
self._cursor = cursor
self._connection = cursor.connection
self._init_from_meta(raw_response, use_ijson)
self._init_from_meta(raw_response)

def _init_from_meta(self, data, use_ijson):
def _init_from_meta(self, data):
self._total_row_index = -1 # last fetched number of rows
self._chunk_index = 0
self._chunk_count = 0
Expand Down Expand Up @@ -59,8 +59,7 @@ def _init_from_meta(self, data, use_ijson):
self._chunk_downloader = self._connection._chunk_downloader_class(
chunks, self._connection, self._cursor, qrmk, chunk_headers,
query_result_format='json',
prefetch_threads=self._connection.client_prefetch_threads,
use_ijson=use_ijson)
prefetch_threads=self._connection.client_prefetch_threads)

def __iter__(self):
return self
Expand Down Expand Up @@ -166,8 +165,8 @@ def _reset(self):

class DictJsonResult(JsonResult):

def __init__(self, raw_response, cursor, use_ijson):
JsonResult.__init__(self, raw_response, cursor, use_ijson)
def __init__(self, raw_response, cursor):
JsonResult.__init__(self, raw_response, cursor)

def _row_to_python(self, row):
# see the base class
Expand Down
22 changes: 11 additions & 11 deletions pandas_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def write_pandas(conn: 'SnowflakeConnection',
:Example:

import pandas
from snowflake.connector.pandas_tools import write_pandas_all
from snowflake.connector.pandas_tools import write_pandas

df = pandas.DataFrame([('Mark', 10), ('Luke', 20)], columns=['name', 'balance'])
success, nchunks, nrows, _ = write_pandas_all(cnx, df, 'customers')
success, nchunks, nrows, _ = write_pandas(cnx, df, 'customers')

@param conn: connection to be used to communicate with Snowflake
@param df: Dataframe we'd like to write back
Expand All @@ -68,7 +68,7 @@ def write_pandas(conn: 'SnowflakeConnection',
@return: tuple of whether all chunks were ingested correctly, # of chunks, # of ingested rows, and ingest's output
"""
if database is not None and schema is None:
raise ProgrammingError("Schema has to be provided to write_pandas_all when a database is provided")
raise ProgrammingError("Schema has to be provided to write_pandas when a database is provided")
# This dictionary maps the compression algorithm to Snowflake put copy into command type
# https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#type-parquet
compression_map = {
Expand All @@ -90,11 +90,11 @@ def write_pandas(conn: 'SnowflakeConnection',
while True:
try:
stage_name = ''.join(random.choice(string.ascii_lowercase) for _ in range(5))
cursor.execute('create temporary stage /* Python:snowflake.connector.pandas_tools.write.pandas_all() */ '
cursor.execute('create temporary stage /* Python:snowflake.connector.pandas_tools.write_pandas() */ '
'"{stage_name}"'.format(stage_name=stage_name), _is_internal=True).fetchall()
break
except ProgrammingError as pe:
if pe.msg.endswith('already exists.'.format(stage_name)):
if pe.msg.endswith('already exists.'):
continue
raise

Expand All @@ -104,7 +104,7 @@ def write_pandas(conn: 'SnowflakeConnection',
# Dump chunk into parquet file
chunk.to_parquet(chunk_path, compression=compression)
# Upload parquet file
cursor.execute('PUT /* Python:snowflake.connector.pandas_tools.write.pandas_all() */ '
cursor.execute('PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ '
'file://{path} @"{stage_name}" PARALLEL={parallel}'.format(
path=chunk_path,
stage_name=stage_name,
Expand All @@ -113,7 +113,7 @@ def write_pandas(conn: 'SnowflakeConnection',
# Remove chunk file
os.remove(chunk_path)
copy_results = cursor.execute((
'COPY INTO {location} /* Python:snowflake.connector.pandas_tools.write.pandas_all() */ '
'COPY INTO {location} /* Python:snowflake.connector.pandas_tools.write_pandas() */ '
'FROM @"{stage_name}" FILE_FORMAT=(TYPE=PARQUET COMPRESSION={compression}) '
'MATCH_BY_COLUMN_NAME=CASE_SENSITIVE PURGE=TRUE ON_ERROR={on_error}'
).format(
Expand All @@ -134,13 +134,13 @@ def pd_writer(table: pandas.io.sql.SQLTable,
keys: Iterable,
data_iter: Iterable) -> None:
"""
This is a wrapper on top of write_pandas_all to make it compatible with to_sql method in pandas.
This is a wrapper on top of write_pandas to make it compatible with to_sql method in pandas.
:Example:

import pandas as pd
from snowflake.connector.pandas_utils import pf_writer
from snowflake.connector.pandas_tools import pd_writer

sf_connector_version_df = pd.DataFrame([('snowflake-connector-python',)], columns=['NAME', 'NEWEST_VERSION'])
sf_connector_version_df = pd.DataFrame([('snowflake-connector-python', '1.0')], columns=['NAME', 'NEWEST_VERSION'])
sf_connector_version_df.to_sql('driver_versions', engine, index=False, method=pd_writer)
@param table: Pandas package's table object
@param conn: SQLAlchemy engine object to talk to Snowflake
Expand All @@ -152,6 +152,6 @@ def pd_writer(table: pandas.io.sql.SQLTable,
df = pandas.DataFrame(data_iter, columns=keys)
write_pandas(conn=sf_connection,
df=df,
# Note: Our sqlalchemy connector creates table in the case insensitive way
# Note: Our sqlalchemy connector creates tables case insensitively
table_name=table.name.upper(),
schema=table.schema)
Loading