From 00671c5786f3194b07bf7548e7a8806bd5d58553 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Mon, 3 Dec 2018 23:06:26 +0100 Subject: [PATCH 01/16] LGBMRegressor wrapper and score methods (fully tested) --- dask_lightgbm/core.py | 58 +++++++++++++- dask_lightgbm/tests/test_core.py | 125 ++++++++++++++++++++++++++----- 2 files changed, 161 insertions(+), 22 deletions(-) diff --git a/dask_lightgbm/core.py b/dask_lightgbm/core.py index 5bf8ae5..124cca4 100644 --- a/dask_lightgbm/core.py +++ b/dask_lightgbm/core.py @@ -1,13 +1,13 @@ import logging from collections import defaultdict +import dask.array as da +import dask.dataframe as dd import lightgbm import numpy as np import pandas as pd from lightgbm.basic import _safe_call, _LIB from toolz import first, assoc -import dask.dataframe as dd -import dask.array as da try: import sparse @@ -85,7 +85,6 @@ def _fit_local(params, model_factory, list_of_parts, worker_addresses, return_mo return None - def train(client, X, y, params, model_factory, sample_weight=None, **kwargs): data_parts = X.to_delayed() label_parts = y.to_delayed() @@ -215,6 +214,12 @@ def predict_proba(self, X, client=None, **kwargs): return predict(client, self.to_local(), X, proba=True, dtype=self.classes_[0].dtype, **kwargs) predict_proba.__doc__ = lightgbm.LGBMClassifier.predict_proba.__doc__ + def score(self, X, y, client=None, compute=True): + from dask_ml.metrics import accuracy_score + X = X.values if isinstance(X, dd._Frame) else X + y = y.values if isinstance(y, dd._Frame) else y + return accuracy_score(y, self.predict(X, client=client), compute=compute) + def to_local(self): model = lightgbm.LGBMClassifier(**self.get_params()) model._Booster = self._Booster @@ -227,3 +232,50 @@ def to_local(self): model._best_score = self._best_score return model + + +class LGBMRegressor(lightgbm.LGBMRegressor): + + def fit(self, X, y=None, sample_weight=None, client=None, **kwargs): + if client is None: + client = default_client() + model_factory = lightgbm.LGBMRegressor + params = self.get_params(True) + + model = train(client, X, y, params, model_factory, sample_weight, **kwargs) + self.set_params(**model.get_params()) + self._Booster = model._Booster + self._n_features = model._n_features + self._evals_result = model._evals_result + self._best_iteration = model._best_iteration + self._best_score = model._best_score + + return self + fit.__doc__ = lightgbm.LGBMRegressor.fit.__doc__ + + def _network_params(self): + return { + "machines": self.machines + } + + def predict(self, X, client=None, **kwargs): + if client is None: + client = default_client() + return predict(client, self.to_local(), X, **kwargs) + predict.__doc__ = lightgbm.LGBMRegressor.predict.__doc__ + + def score(self, X, y, client=None, compute=True): + from dask_ml.metrics import r2_score + X = X.values if isinstance(X, dd._Frame) else X + y = y.values if isinstance(y, dd._Frame) else y + return r2_score(y, self.predict(X, client=client), compute=compute) + + def to_local(self): + model = lightgbm.LGBMRegressor(**self.get_params()) + model._Booster = self._Booster + model._n_features = self._n_features + model._evals_result = self._evals_result + model._best_iteration = self._best_iteration + model._best_score = self._best_score + + return model diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index 031c2bf..a439f4a 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -1,30 +1,34 @@ +# Workaround for conflict with distributed 1.23.0 +# https://github.com/dask/dask-xgboost/pull/27#issuecomment-417474734 +from concurrent.futures import ThreadPoolExecutor + +import dask.array as da +import dask.dataframe as dd +import distributed.comm.utils +import lightgbm import numpy as np import pandas as pd -import sparse -import lightgbm -import scipy.sparse import pytest - -import dask.array as da +import scipy.sparse +import sparse from dask.array.utils import assert_eq -import dask.dataframe as dd from dask.distributed import Client -from sklearn.datasets import make_blobs -from distributed.utils_test import gen_cluster, loop, cluster # noqa +from distributed.utils_test import gen_cluster, cluster # noqa +from sklearn.datasets import make_blobs, make_regression from sklearn.metrics import confusion_matrix import dask_lightgbm.core as dlgbm -# Workaround for conflict with distributed 1.23.0 -# https://github.com/dask/dask-xgboost/pull/27#issuecomment-417474734 -from concurrent.futures import ThreadPoolExecutor -import distributed.comm.utils - distributed.comm.utils._offload_executor = ThreadPoolExecutor(max_workers=2) -def _create_data(n_samples=100, centers=2, output="array", chunk_size=50): - X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42) +def _create_data(objective, n_samples=100, centers=2, output="array", chunk_size=50): + if objective == 'classification': + X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42) + elif objective == 'regression': + X, y = make_regression(n_samples=n_samples, random_state=42) + else: + raise ValueError(objective) rnd = np.random.RandomState(42) w = rnd.rand(X.shape[0])*0.01 @@ -63,19 +67,22 @@ def _create_data(n_samples=100, centers=2, output="array", chunk_size=50): def test_classifier(loop, output, listen_port, centers): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: - X, y, w, dX, dy, dw = _create_data(output=output, centers=centers) + X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers) a = dlgbm.LGBMClassifier(local_listen_port=listen_port) a = a.fit(dX, dy, sample_weight=dw) p1 = a.predict(dX, client=client) p1 = p1.compute() + s1 = a.score(dX, dy) b = lightgbm.LGBMClassifier() b.fit(X, y, sample_weight=w) p2 = b.predict(X) + s2 = b.score(X, y) print(confusion_matrix(y, p1)) print(confusion_matrix(y, p2)) + assert_eq(s1, s2) assert_eq(p1, p2) assert_eq(y, p1) assert_eq(y, p2) @@ -94,7 +101,7 @@ def test_classifier(loop, output, listen_port, centers): def test_classifier_proba(loop, output, listen_port, centers): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: - X, y, w, dX, dy, dw = _create_data(output=output, centers=centers) + X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers) a = dlgbm.LGBMClassifier(local_listen_port=listen_port) a = a.fit(dX, dy, sample_weight=dw) @@ -111,9 +118,9 @@ def test_classifier_proba(loop, output, listen_port, centers): def test_classifier_local_predict(loop): #noqa with cluster() as (s, [a, b]): with Client(s['address'], loop=loop): - X, y, w, dX, dy, dw = _create_data(output="array") + X, y, w, dX, dy, dw = _create_data('classification', output="array") - a = dlgbm.LGBMClassifier(local_listen_port=11400) + a = dlgbm.LGBMClassifier(local_listen_port=10400) a = a.fit(dX, dy, sample_weight=dw) p1 = a.to_local().predict(dX) @@ -126,6 +133,86 @@ def test_classifier_local_predict(loop): #noqa assert_eq(y, p2) +@pytest.mark.parametrize("output, listen_port", [ + ('array', 31400), + ('scipy_csr_matrix', 32400), + ('sparse', 33400), + # ('dataframe', 34400), # Dataframes are not supported by dask_ml.metrics +]) +def test_regressor(loop, output, listen_port): + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as client: + X, y, w, dX, dy, dw = _create_data('regression', output=output) + + a = dlgbm.LGBMRegressor(local_listen_port=listen_port, seed=42) + a = a.fit(dX, dy, client=client, sample_weight=dw) + s1 = a.score(dX, dy, client=client) + p1 = a.predict(dX, client=client).compute() + + b = lightgbm.LGBMRegressor(seed=42) + b.fit(X, y, sample_weight=w) + s2 = b.score(X, y) + p2 = b.predict(X) + + # Scores should be the same + assert_eq(s1, s2, atol=.01) + + # Predictions should be roughly the same + assert_eq(y, p1, rtol=1., atol=50.) + assert_eq(y, p2, rtol=1., atol=50.) + + +@pytest.mark.parametrize("output, listen_port, alpha", [ + ('array', 41400, .1), + ('array', 42400, .5), + ('array', 43400, .9), + ('scipy_csr_matrix', 44400, .1), + ('scipy_csr_matrix', 45400, .5), + ('scipy_csr_matrix', 46400, .9), + ('sparse', 47400, .1), + ('sparse', 48400, .5), + ('sparse', 49400, .9), + # ('dataframe', 50400, .1), + # ('dataframe', 51400, .5), + # ('dataframe', 52400, .9), +]) +def test_regressor_quantile(loop, output, listen_port, alpha): + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as client: + X, y, w, dX, dy, dw = _create_data('regression', output=output) + + a = dlgbm.LGBMRegressor(local_listen_port=listen_port, seed=42, objective='quantile', alpha=alpha) + a = a.fit(dX, dy, client=client, sample_weight=dw) + p1 = a.predict(dX, client=client).compute() + q1 = np.count_nonzero(y < p1) / y.shape[0] + + b = lightgbm.LGBMRegressor(seed=42, objective='quantile', alpha=alpha) + b.fit(X, y, sample_weight=w) + p2 = b.predict(X) + q2 = np.count_nonzero(y < p2) / y.shape[0] + + # Quantiles should be right + np.isclose(q1, alpha, atol=.1) + np.isclose(q2, alpha, atol=.1) + + +def test_regressor_local_predict(loop): + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop): + X, y, w, dX, dy, dw = _create_data('regression', output="array") + + a = dlgbm.LGBMRegressor(local_listen_port=30400, seed=42) + a = a.fit(dX, dy, sample_weight=dw) + p1 = a.predict(dX).compute() + p2 = a.to_local().predict(X) + s1 = a.score(dX, dy) + s2 = a.to_local().score(X, y) + + # Predictions and scores should be the same + assert_eq(p1, p2, rtol=.01, atol=.1) + np.isclose(s1, s2, atol=.01) + + def test_build_network_params(): workers_ips = [ "tcp://192.168.0.1:34545", From cf5c71e78b84ee038a641f5595949e3873fea5d8 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 5 Dec 2018 23:06:53 +0100 Subject: [PATCH 02/16] Regression system test + Remove dependency to dask-ml for scoring --- dask_lightgbm/core.py | 23 +++++++++++++++++------ dask_lightgbm/tests/test_core.py | 16 ++++++++++------ system_tests/test_fit_predict.py | 21 ++++++++++++++++++--- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/dask_lightgbm/core.py b/dask_lightgbm/core.py index 124cca4..d839b9a 100644 --- a/dask_lightgbm/core.py +++ b/dask_lightgbm/core.py @@ -215,10 +215,11 @@ def predict_proba(self, X, client=None, **kwargs): predict_proba.__doc__ = lightgbm.LGBMClassifier.predict_proba.__doc__ def score(self, X, y, client=None, compute=True): - from dask_ml.metrics import accuracy_score - X = X.values if isinstance(X, dd._Frame) else X - y = y.values if isinstance(y, dd._Frame) else y - return accuracy_score(y, self.predict(X, client=client), compute=compute) + # Source: dask_ml.metrics.accuracy_score + result = (y == self.predict(X, client=client)).mean() + if compute: + result = result.compute() + return result def to_local(self): model = lightgbm.LGBMClassifier(**self.get_params()) @@ -265,10 +266,20 @@ def predict(self, X, client=None, **kwargs): predict.__doc__ = lightgbm.LGBMRegressor.predict.__doc__ def score(self, X, y, client=None, compute=True): - from dask_ml.metrics import r2_score + # Source: dask_ml.metrics.r2_score + # Ensure compatibility with Dataframes and Series X = X.values if isinstance(X, dd._Frame) else X y = y.values if isinstance(y, dd._Frame) else y - return r2_score(y, self.predict(X, client=client), compute=compute) + + yp = self.predict(X, client=client) + + numerator = ((y - yp) ** 2).sum() + denominator = ((y - yp.mean()) ** 2).sum() + + result = 1 - numerator / denominator + if compute: + result = result.compute() + return result def to_local(self): model = lightgbm.LGBMRegressor(**self.get_params()) diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index a439f4a..6b549ef 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -83,6 +83,8 @@ def test_classifier(loop, output, listen_port, centers): print(confusion_matrix(y, p2)) assert_eq(s1, s2) + print(s1) + assert_eq(p1, p2) assert_eq(y, p1) assert_eq(y, p2) @@ -137,7 +139,7 @@ def test_classifier_local_predict(loop): #noqa ('array', 31400), ('scipy_csr_matrix', 32400), ('sparse', 33400), - # ('dataframe', 34400), # Dataframes are not supported by dask_ml.metrics + ('dataframe', 34400), ]) def test_regressor(loop, output, listen_port): with cluster() as (s, [a, b]): @@ -156,6 +158,7 @@ def test_regressor(loop, output, listen_port): # Scores should be the same assert_eq(s1, s2, atol=.01) + print(s1) # Predictions should be roughly the same assert_eq(y, p1, rtol=1., atol=50.) @@ -172,9 +175,9 @@ def test_regressor(loop, output, listen_port): ('sparse', 47400, .1), ('sparse', 48400, .5), ('sparse', 49400, .9), - # ('dataframe', 50400, .1), - # ('dataframe', 51400, .5), - # ('dataframe', 52400, .9), + ('dataframe', 50400, .1), + ('dataframe', 51400, .5), + ('dataframe', 52400, .9), ]) def test_regressor_quantile(loop, output, listen_port, alpha): with cluster() as (s, [a, b]): @@ -207,10 +210,11 @@ def test_regressor_local_predict(loop): p2 = a.to_local().predict(X) s1 = a.score(dX, dy) s2 = a.to_local().score(X, y) + print(s1) # Predictions and scores should be the same - assert_eq(p1, p2, rtol=.01, atol=.1) - np.isclose(s1, s2, atol=.01) + assert_eq(p1, p2) + np.isclose(s1, s2) def test_build_network_params(): diff --git a/system_tests/test_fit_predict.py b/system_tests/test_fit_predict.py index 9037afc..8b487fe 100644 --- a/system_tests/test_fit_predict.py +++ b/system_tests/test_fit_predict.py @@ -27,11 +27,26 @@ def test_classify_newsread(self): dX = data.iloc[:, :-1] dy = data.iloc[:, -1] - d_classif = dlgbm.LGBMClassifier(n_estimators=50) + d_classif = dlgbm.LGBMClassifier(n_estimators=50, local_listen_port=12400) d_classif.fit(dX, dy) - dy_pred = d_classif.predict(dX) + dy_pred = d_classif.predict(dX, client=self.client) print(confusion_matrix(dy.compute(), dy_pred.compute())) - self.assertGreaterEqual((dy == dy_pred).sum()/len(dy), 0.9) + s1 = (dy == dy_pred).sum()/len(dy) + s2 = d_classif.score(dX, dy, client=self.client) + self.assertEqual(s1, s2) + self.assertGreaterEqual(s2, 0.8) + + def test_regress_newsread(self): + data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) + dX = data.iloc[:, 1:] + dy = data.iloc[:, 0] + + d_regress = dlgbm.LGBMRegressor(n_estimators=50, local_listen_port=13400) + d_regress.fit(dX, dy) + + d_regress.predict(dX, client=self.client) + + self.assertGreaterEqual(d_regress.score(dX, dy, client=self.client), 0.8) From f983f6dcd121a152e02cf1de681660cfeba0ad59 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 5 Dec 2018 23:09:55 +0100 Subject: [PATCH 03/16] Fix removed import --- dask_lightgbm/tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index 6b549ef..c77e9fa 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -13,7 +13,7 @@ import sparse from dask.array.utils import assert_eq from dask.distributed import Client -from distributed.utils_test import gen_cluster, cluster # noqa +from distributed.utils_test import gen_cluster, loop, cluster # noqa from sklearn.datasets import make_blobs, make_regression from sklearn.metrics import confusion_matrix From 171f3acf5d91e6b5e7980c5d99a8dc34ff068062 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 12 Dec 2018 09:18:55 +0100 Subject: [PATCH 04/16] Same imports as in LightGBM + Fix predict part without rows --- dask_lightgbm/__init__.py | 1 + dask_lightgbm/core.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_lightgbm/__init__.py b/dask_lightgbm/__init__.py index e69de29..8dbff73 100644 --- a/dask_lightgbm/__init__.py +++ b/dask_lightgbm/__init__.py @@ -0,0 +1 @@ +from .core import LGBMRegressor, LGBMClassifier \ No newline at end of file diff --git a/dask_lightgbm/core.py b/dask_lightgbm/core.py index d839b9a..b8b31df 100644 --- a/dask_lightgbm/core.py +++ b/dask_lightgbm/core.py @@ -143,7 +143,9 @@ def _predict_part(part, model, proba, **kwargs): X = part.values else: X = part - if proba: + if not X.shape[0]: + result = [] + elif proba: result = model.predict_proba(X, **kwargs) else: result = model.predict(X, **kwargs) From c24a50ad4f42fb7936e3662686b5eaf0ba3e640c Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 2 Jan 2019 01:33:57 +0100 Subject: [PATCH 05/16] Remove implementations of score --- dask_lightgbm/core.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/dask_lightgbm/core.py b/dask_lightgbm/core.py index b8b31df..5e920a3 100644 --- a/dask_lightgbm/core.py +++ b/dask_lightgbm/core.py @@ -216,13 +216,6 @@ def predict_proba(self, X, client=None, **kwargs): return predict(client, self.to_local(), X, proba=True, dtype=self.classes_[0].dtype, **kwargs) predict_proba.__doc__ = lightgbm.LGBMClassifier.predict_proba.__doc__ - def score(self, X, y, client=None, compute=True): - # Source: dask_ml.metrics.accuracy_score - result = (y == self.predict(X, client=client)).mean() - if compute: - result = result.compute() - return result - def to_local(self): model = lightgbm.LGBMClassifier(**self.get_params()) model._Booster = self._Booster @@ -267,22 +260,6 @@ def predict(self, X, client=None, **kwargs): return predict(client, self.to_local(), X, **kwargs) predict.__doc__ = lightgbm.LGBMRegressor.predict.__doc__ - def score(self, X, y, client=None, compute=True): - # Source: dask_ml.metrics.r2_score - # Ensure compatibility with Dataframes and Series - X = X.values if isinstance(X, dd._Frame) else X - y = y.values if isinstance(y, dd._Frame) else y - - yp = self.predict(X, client=client) - - numerator = ((y - yp) ** 2).sum() - denominator = ((y - yp.mean()) ** 2).sum() - - result = 1 - numerator / denominator - if compute: - result = result.compute() - return result - def to_local(self): model = lightgbm.LGBMRegressor(**self.get_params()) model._Booster = self._Booster From 674ab9bcb831fd3c67c88d48959e649cb4ab72de Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 2 Jan 2019 01:34:14 +0100 Subject: [PATCH 06/16] Fix tests --- dask_lightgbm/tests/test_core.py | 18 +++++++++++------- docker-compose.yml | 2 ++ setup.py | 1 + system_tests/test_fit_predict.py | 27 ++++++++++++++++++++------- 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index c77e9fa..8a048cf 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -13,6 +13,7 @@ import sparse from dask.array.utils import assert_eq from dask.distributed import Client +from dask_ml.metrics import accuracy_score, r2_score from distributed.utils_test import gen_cluster, loop, cluster # noqa from sklearn.datasets import make_blobs, make_regression from sklearn.metrics import confusion_matrix @@ -72,8 +73,8 @@ def test_classifier(loop, output, listen_port, centers): a = dlgbm.LGBMClassifier(local_listen_port=listen_port) a = a.fit(dX, dy, sample_weight=dw) p1 = a.predict(dX, client=client) + s1 = accuracy_score(dy, p1) p1 = p1.compute() - s1 = a.score(dX, dy) b = lightgbm.LGBMClassifier() b.fit(X, y, sample_weight=w) @@ -148,8 +149,10 @@ def test_regressor(loop, output, listen_port): a = dlgbm.LGBMRegressor(local_listen_port=listen_port, seed=42) a = a.fit(dX, dy, client=client, sample_weight=dw) - s1 = a.score(dX, dy, client=client) - p1 = a.predict(dX, client=client).compute() + p1 = a.predict(dX, client=client) + if output != 'dataframe': + s1 = r2_score(dy, p1) + p1 = p1.compute() b = lightgbm.LGBMRegressor(seed=42) b.fit(X, y, sample_weight=w) @@ -157,8 +160,8 @@ def test_regressor(loop, output, listen_port): p2 = b.predict(X) # Scores should be the same - assert_eq(s1, s2, atol=.01) - print(s1) + if output != 'dataframe': + assert_eq(s1, s2, atol=.01) # Predictions should be roughly the same assert_eq(y, p1, rtol=1., atol=50.) @@ -206,9 +209,10 @@ def test_regressor_local_predict(loop): a = dlgbm.LGBMRegressor(local_listen_port=30400, seed=42) a = a.fit(dX, dy, sample_weight=dw) - p1 = a.predict(dX).compute() + p1 = a.predict(dX) p2 = a.to_local().predict(X) - s1 = a.score(dX, dy) + s1 = r2_score(dy, p1) + p1 = p1.compute() s2 = a.to_local().score(X, y) print(s1) diff --git a/docker-compose.yml b/docker-compose.yml index 748ea68..ccc5c47 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,8 @@ services: dockerfile: Dockerfile.test image: dask-lightgbm hostname: dask-worker + environment: + - SCHEDULER=tcp://scheduler:8786 command: ["dask-worker", "--nprocs", "2", "--nthreads", "2", "scheduler:8786"] client: diff --git a/setup.py b/setup.py index 71e0c8b..e80dc67 100755 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ "pytest>=3.9.0", "pandas>=0.23.0", 'dask[dataframe]', + 'dask-ml', 'requests' ], "sparse": [ diff --git a/system_tests/test_fit_predict.py b/system_tests/test_fit_predict.py index 8b487fe..18d7405 100644 --- a/system_tests/test_fit_predict.py +++ b/system_tests/test_fit_predict.py @@ -1,3 +1,4 @@ +import os import gzip import unittest @@ -20,7 +21,11 @@ def load_data(path): class FitPredictTest(unittest.TestCase): def setUp(self): - self.client = Client("scheduler:8786") + # Test with either distributed scheduler or threaded scheduler (easier to setup) + if os.getenv('SCHEDULER'): + self.client = Client(os.getenv('SCHEDULER')) + else: + self.client = Client() def test_classify_newsread(self): data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) @@ -34,10 +39,11 @@ def test_classify_newsread(self): print(confusion_matrix(dy.compute(), dy_pred.compute())) - s1 = (dy == dy_pred).sum()/len(dy) - s2 = d_classif.score(dX, dy, client=self.client) - self.assertEqual(s1, s2) - self.assertGreaterEqual(s2, 0.8) + acc_score = (dy == dy_pred).sum()/len(dy) + acc_score = acc_score.compute() + print(acc_score) + + self.assertGreaterEqual(acc_score, 0.8) def test_regress_newsread(self): data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) @@ -47,6 +53,13 @@ def test_regress_newsread(self): d_regress = dlgbm.LGBMRegressor(n_estimators=50, local_listen_port=13400) d_regress.fit(dX, dy) - d_regress.predict(dX, client=self.client) + dy_pred = d_regress.predict(dX, client=self.client) + + # The dask_ml.metrics.r2_score method fails with dataframes so we compute the R2 score ourselves + numerator = ((dy - dy_pred) ** 2).sum() + denominator = ((dy - dy.mean()) ** 2).sum() + r2_score = 1 - numerator / denominator + r2_score = r2_score.compute() + print(r2_score) - self.assertGreaterEqual(d_regress.score(dX, dy, client=self.client), 0.8) + self.assertGreaterEqual(r2_score, 0.8) From 3add0df9434e730ac3dd63be34cb837ea789bcbf Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 2 Jan 2019 01:36:09 +0100 Subject: [PATCH 07/16] Sparse tests broken by latest sparse release (https://github.com/pydata/sparse/issues/218) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e80dc67..f40b938 100755 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ 'requests' ], "sparse": [ - "sparse>=0.5.0", + "sparse==0.5.0", "scipy>=1.0.0" ] } From 5ef81079821f431e59ef190644c02a80e37f652e Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 2 Jan 2019 02:22:23 +0100 Subject: [PATCH 08/16] Fix warning "Unknown parameter: listen_time_out" --- dask_lightgbm/core.py | 12 +++++------- dask_lightgbm/tests/test_core.py | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dask_lightgbm/core.py b/dask_lightgbm/core.py index 5e920a3..acdb7db 100644 --- a/dask_lightgbm/core.py +++ b/dask_lightgbm/core.py @@ -32,12 +32,12 @@ def parse_host_port(address): return host, port -def build_network_params(worker_addresses, local_worker_ip, local_listen_port, listen_time_out): +def build_network_params(worker_addresses, local_worker_ip, local_listen_port, time_out): addr_port_map = {addr: (local_listen_port + i) for i, addr in enumerate(worker_addresses)} params = { "machines": ",".join([parse_host_port(addr)[0] + ":" + str(port) for addr, port in addr_port_map.items()]), "local_listen_port": addr_port_map[local_worker_ip], - "listen_time_out": listen_time_out, + "time_out": time_out, "num_machines": len(addr_port_map) } return params @@ -56,10 +56,8 @@ def concat(L): raise TypeError("Data must be either numpy arrays or pandas dataframes. Got %s" % type(L[0])) -def _fit_local(params, model_factory, list_of_parts, worker_addresses, return_model, local_listen_port=12400, listen_time_out=120, - **kwargs): - network_params = build_network_params(worker_addresses, get_worker().address, local_listen_port, - listen_time_out) +def _fit_local(params, model_factory, list_of_parts, worker_addresses, return_model, local_listen_port=12400, time_out=120): + network_params = build_network_params(worker_addresses, get_worker().address, local_listen_port, time_out) params = {**params, **network_params} # Prepare data @@ -127,7 +125,7 @@ def train(client, X, y, params, model_factory, sample_weight=None, **kwargs): list_of_parts=list_of_parts, worker_addresses=list(worker_map.keys()), local_listen_port=params.get("local_listen_port", 12400), - listen_time_out=params.get("listen_time_out", 120), + time_out=params.get("time_out", 120), return_model=worker==master_worker, **kwargs) for worker, list_of_parts in worker_map.items()] diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index 8a048cf..6ef9fb7 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -233,7 +233,7 @@ def test_build_network_params(): "machines": "192.168.0.1:12400,192.168.0.2:12401,192.168.0.3:12402", "local_listen_port": 12401, "num_machines": len(workers_ips), - "listen_time_out": 120 + "time_out": 120 } assert exp_params == params From c89639dcfb447ec8ff228b27cafd96d56a69d409 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 2 Jan 2019 12:50:40 +0100 Subject: [PATCH 09/16] Revert "Fix warning "Unknown parameter: listen_time_out"" This reverts commit 5ef81079 --- dask_lightgbm/core.py | 12 +++++++----- dask_lightgbm/tests/test_core.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dask_lightgbm/core.py b/dask_lightgbm/core.py index acdb7db..5e920a3 100644 --- a/dask_lightgbm/core.py +++ b/dask_lightgbm/core.py @@ -32,12 +32,12 @@ def parse_host_port(address): return host, port -def build_network_params(worker_addresses, local_worker_ip, local_listen_port, time_out): +def build_network_params(worker_addresses, local_worker_ip, local_listen_port, listen_time_out): addr_port_map = {addr: (local_listen_port + i) for i, addr in enumerate(worker_addresses)} params = { "machines": ",".join([parse_host_port(addr)[0] + ":" + str(port) for addr, port in addr_port_map.items()]), "local_listen_port": addr_port_map[local_worker_ip], - "time_out": time_out, + "listen_time_out": listen_time_out, "num_machines": len(addr_port_map) } return params @@ -56,8 +56,10 @@ def concat(L): raise TypeError("Data must be either numpy arrays or pandas dataframes. Got %s" % type(L[0])) -def _fit_local(params, model_factory, list_of_parts, worker_addresses, return_model, local_listen_port=12400, time_out=120): - network_params = build_network_params(worker_addresses, get_worker().address, local_listen_port, time_out) +def _fit_local(params, model_factory, list_of_parts, worker_addresses, return_model, local_listen_port=12400, listen_time_out=120, + **kwargs): + network_params = build_network_params(worker_addresses, get_worker().address, local_listen_port, + listen_time_out) params = {**params, **network_params} # Prepare data @@ -125,7 +127,7 @@ def train(client, X, y, params, model_factory, sample_weight=None, **kwargs): list_of_parts=list_of_parts, worker_addresses=list(worker_map.keys()), local_listen_port=params.get("local_listen_port", 12400), - time_out=params.get("time_out", 120), + listen_time_out=params.get("listen_time_out", 120), return_model=worker==master_worker, **kwargs) for worker, list_of_parts in worker_map.items()] diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index 6ef9fb7..8a048cf 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -233,7 +233,7 @@ def test_build_network_params(): "machines": "192.168.0.1:12400,192.168.0.2:12401,192.168.0.3:12402", "local_listen_port": 12401, "num_machines": len(workers_ips), - "time_out": 120 + "listen_time_out": 120 } assert exp_params == params From 96ab542ba4190db28c75009e2cbe5e546c400506 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 2 Jan 2019 12:58:24 +0100 Subject: [PATCH 10/16] Pytest system tests --- system_tests/test_fit_predict.py | 74 +++++++++++++------------------- 1 file changed, 30 insertions(+), 44 deletions(-) diff --git a/system_tests/test_fit_predict.py b/system_tests/test_fit_predict.py index 18d7405..c8ffd1c 100644 --- a/system_tests/test_fit_predict.py +++ b/system_tests/test_fit_predict.py @@ -1,65 +1,51 @@ import os -import gzip -import unittest import dask.dataframe as dd -import numpy as np -from dask import delayed from dask.distributed import Client from sklearn.metrics import confusion_matrix import dask_lightgbm.core as dlgbm +if os.getenv('SCHEDULER'): + client = Client(os.getenv('SCHEDULER')) +else: + client = Client() -@delayed -def load_data(path): - with gzip.open(path) as fp: - X = np.loadtxt(fp, delimiter=",") - return X +def test_classify_newsread(): + data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) + dX = data.iloc[:, :-1] + dy = data.iloc[:, -1] -class FitPredictTest(unittest.TestCase): + d_classif = dlgbm.LGBMClassifier(n_estimators=50, local_listen_port=12400) + d_classif.fit(dX, dy) - def setUp(self): - # Test with either distributed scheduler or threaded scheduler (easier to setup) - if os.getenv('SCHEDULER'): - self.client = Client(os.getenv('SCHEDULER')) - else: - self.client = Client() + dy_pred = d_classif.predict(dX, client=client) - def test_classify_newsread(self): - data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) - dX = data.iloc[:, :-1] - dy = data.iloc[:, -1] + print(confusion_matrix(dy.compute(), dy_pred.compute())) - d_classif = dlgbm.LGBMClassifier(n_estimators=50, local_listen_port=12400) - d_classif.fit(dX, dy) + acc_score = (dy == dy_pred).sum()/len(dy) + acc_score = acc_score.compute() + print(acc_score) - dy_pred = d_classif.predict(dX, client=self.client) + assert acc_score > 0.8 - print(confusion_matrix(dy.compute(), dy_pred.compute())) - acc_score = (dy == dy_pred).sum()/len(dy) - acc_score = acc_score.compute() - print(acc_score) +def test_regress_newsread(): + data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) + dX = data.iloc[:, 1:] + dy = data.iloc[:, 0] - self.assertGreaterEqual(acc_score, 0.8) + d_regress = dlgbm.LGBMRegressor(n_estimators=50, local_listen_port=13400) + d_regress.fit(dX, dy) - def test_regress_newsread(self): - data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) - dX = data.iloc[:, 1:] - dy = data.iloc[:, 0] + dy_pred = d_regress.predict(dX, client=client) - d_regress = dlgbm.LGBMRegressor(n_estimators=50, local_listen_port=13400) - d_regress.fit(dX, dy) + # The dask_ml.metrics.r2_score method fails with dataframes so we compute the R2 score ourselves + numerator = ((dy - dy_pred) ** 2).sum() + denominator = ((dy - dy.mean()) ** 2).sum() + r2_score = 1 - numerator / denominator + r2_score = r2_score.compute() + print(r2_score) - dy_pred = d_regress.predict(dX, client=self.client) - - # The dask_ml.metrics.r2_score method fails with dataframes so we compute the R2 score ourselves - numerator = ((dy - dy_pred) ** 2).sum() - denominator = ((dy - dy.mean()) ** 2).sum() - r2_score = 1 - numerator / denominator - r2_score = r2_score.compute() - print(r2_score) - - self.assertGreaterEqual(r2_score, 0.8) + assert r2_score > 0.8 From fb2fcba4235990ec20593f25a1f3049e2aadd393 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Wed, 2 Jan 2019 15:39:14 +0100 Subject: [PATCH 11/16] Remove unnecessary confusion matrix (no assertion) --- system_tests/test_fit_predict.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/system_tests/test_fit_predict.py b/system_tests/test_fit_predict.py index c8ffd1c..17fc93f 100644 --- a/system_tests/test_fit_predict.py +++ b/system_tests/test_fit_predict.py @@ -2,7 +2,6 @@ import dask.dataframe as dd from dask.distributed import Client -from sklearn.metrics import confusion_matrix import dask_lightgbm.core as dlgbm @@ -22,9 +21,7 @@ def test_classify_newsread(): dy_pred = d_classif.predict(dX, client=client) - print(confusion_matrix(dy.compute(), dy_pred.compute())) - - acc_score = (dy == dy_pred).sum()/len(dy) + acc_score = (dy == dy_pred).sum() / len(dy) acc_score = acc_score.compute() print(acc_score) From b08cd2e741bde54d07e015d6e43bbecf902343c3 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Thu, 3 Jan 2019 17:13:15 +0100 Subject: [PATCH 12/16] Dask client as pytest fixture --- system_tests/test_fit_predict.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/system_tests/test_fit_predict.py b/system_tests/test_fit_predict.py index 17fc93f..4919db9 100644 --- a/system_tests/test_fit_predict.py +++ b/system_tests/test_fit_predict.py @@ -1,17 +1,18 @@ import os +import pytest import dask.dataframe as dd from dask.distributed import Client import dask_lightgbm.core as dlgbm -if os.getenv('SCHEDULER'): - client = Client(os.getenv('SCHEDULER')) -else: - client = Client() +@pytest.fixture(scope='module') +def client(): + return Client(os.getenv('SCHEDULER')) if os.getenv('SCHEDULER') else Client() -def test_classify_newsread(): + +def test_classify_newsread(client): data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) dX = data.iloc[:, :-1] dy = data.iloc[:, -1] @@ -28,7 +29,7 @@ def test_classify_newsread(): assert acc_score > 0.8 -def test_regress_newsread(): +def test_regress_newsread(client): data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) dX = data.iloc[:, 1:] dy = data.iloc[:, 0] From 6cad879a5b1642d811e895e985d061c976840e28 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Fri, 4 Jan 2019 11:27:27 +0100 Subject: [PATCH 13/16] Pytest rerun on failure to avoid lightgbm socket transient errors --- .travis.yml | 4 ++-- docker-compose.yml | 2 +- setup.py | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2a460ce..39bc76c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,5 +16,5 @@ install: - docker build -t ${DOCKER_REPOSITORY}:${TRAVIS_COMMIT} -f Dockerfile.test --pull=true . script: - - docker run ${DOCKER_REPOSITORY}:${TRAVIS_COMMIT} pytest /app/dask_lightgbm - - docker-compose up --abort-on-container-exit \ No newline at end of file + - docker run ${DOCKER_REPOSITORY}:${TRAVIS_COMMIT} pytest /app/dask_lightgbm --reruns 3 --reruns-delay 30 + - docker-compose up --abort-on-container-exit diff --git a/docker-compose.yml b/docker-compose.yml index ccc5c47..4ed28c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,4 +28,4 @@ services: dockerfile: Dockerfile.test image: dask-lightgbm hostname: dask-client - command: ["pytest", "/app/system_tests"] + command: ["pytest", "/app/system_tests", "--reruns", "3", "--reruns-delay", "30"] diff --git a/setup.py b/setup.py index f40b938..61a6c2a 100755 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ extras_require = { "dev": [ "pytest>=3.9.0", + "pytest-rerunfailures>=5.0", "pandas>=0.23.0", 'dask[dataframe]', 'dask-ml', From ab1979e7348409f816a3e7e5eca5f03985366773 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Mon, 7 Jan 2019 15:14:57 +0100 Subject: [PATCH 14/16] Listen port as pytest fixture (no overlapping between core and system tests) --- dask_lightgbm/tests/test_core.py | 89 ++++++++++++++++++-------------- system_tests/test_fit_predict.py | 17 ++++-- 2 files changed, 62 insertions(+), 44 deletions(-) diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index 8a048cf..03e2730 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -23,6 +23,15 @@ distributed.comm.utils._offload_executor = ThreadPoolExecutor(max_workers=2) +@pytest.fixture() +def listen_port(): + listen_port.port += 10 + return listen_port.port + + +listen_port.port = 13000 + + def _create_data(objective, n_samples=100, centers=2, output="array", chunk_size=50): if objective == 'classification': X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42) @@ -55,15 +64,15 @@ def _create_data(objective, n_samples=100, centers=2, output="array", chunk_size return X, y, w, dX, dy, dw -@pytest.mark.parametrize("output, listen_port, centers", [ #noqa - ('array', 11400, [[-4, -4], [4, 4]]), - ('array', 12400, [[-4, -4], [4, 4], [-4, 4]]), - ('scipy_csr_matrix', 13400, [[-4, -4], [4, 4]]), - ('scipy_csr_matrix', 14400, [[-4, -4], [4, 4], [-4, 4]]), - ('sparse', 15400, [[-4, -4], [4, 4]]), - ('sparse', 16400, [[-4, -4], [4, 4], [-4, 4]]), - ('dataframe', 17400, [[-4, -4], [4, 4]]), - ('dataframe', 18400, [[-4, -4], [4, 4], [-4, 4]]) +@pytest.mark.parametrize("output, centers", [ #noqa + ('array', [[-4, -4], [4, 4]]), + ('array', [[-4, -4], [4, 4], [-4, 4]]), + ('scipy_csr_matrix', [[-4, -4], [4, 4]]), + ('scipy_csr_matrix', [[-4, -4], [4, 4], [-4, 4]]), + ('sparse', [[-4, -4], [4, 4]]), + ('sparse', [[-4, -4], [4, 4], [-4, 4]]), + ('dataframe', [[-4, -4], [4, 4]]), + ('dataframe', [[-4, -4], [4, 4], [-4, 4]]) ]) # noqa def test_classifier(loop, output, listen_port, centers): with cluster() as (s, [a, b]): @@ -91,15 +100,15 @@ def test_classifier(loop, output, listen_port, centers): assert_eq(y, p2) -@pytest.mark.parametrize("output, listen_port, centers", [ #noqa - ('array', 21400, [[-4, -4], [4, 4]]), - ('array', 22400, [[-4, -4], [4, 4], [-4, 4]]), - ('scipy_csr_matrix', 23400, [[-4, -4], [4, 4]]), - ('scipy_csr_matrix', 24400, [[-4, -4], [4, 4], [-4, 4]]), - ('sparse', 25400, [[-4, -4], [4, 4]]), - ('sparse', 26400, [[-4, -4], [4, 4], [-4, 4]]), - ('dataframe', 27400, [[-4, -4], [4, 4]]), - ('dataframe', 28400, [[-4, -4], [4, 4], [-4, 4]]) +@pytest.mark.parametrize("output, centers", [ #noqa + ('array', [[-4, -4], [4, 4]]), + ('array', [[-4, -4], [4, 4], [-4, 4]]), + ('scipy_csr_matrix', [[-4, -4], [4, 4]]), + ('scipy_csr_matrix', [[-4, -4], [4, 4], [-4, 4]]), + ('sparse', [[-4, -4], [4, 4]]), + ('sparse', [[-4, -4], [4, 4], [-4, 4]]), + ('dataframe', [[-4, -4], [4, 4]]), + ('dataframe', [[-4, -4], [4, 4], [-4, 4]]) ]) # noqa def test_classifier_proba(loop, output, listen_port, centers): with cluster() as (s, [a, b]): @@ -118,12 +127,12 @@ def test_classifier_proba(loop, output, listen_port, centers): assert_eq(p1, p2, atol=0.3) -def test_classifier_local_predict(loop): #noqa +def test_classifier_local_predict(loop, listen_port): #noqa with cluster() as (s, [a, b]): with Client(s['address'], loop=loop): X, y, w, dX, dy, dw = _create_data('classification', output="array") - a = dlgbm.LGBMClassifier(local_listen_port=10400) + a = dlgbm.LGBMClassifier(local_listen_port=listen_port) a = a.fit(dX, dy, sample_weight=dw) p1 = a.to_local().predict(dX) @@ -136,11 +145,11 @@ def test_classifier_local_predict(loop): #noqa assert_eq(y, p2) -@pytest.mark.parametrize("output, listen_port", [ - ('array', 31400), - ('scipy_csr_matrix', 32400), - ('sparse', 33400), - ('dataframe', 34400), +@pytest.mark.parametrize("output", [ + ('array', ), + ('scipy_csr_matrix', ), + ('sparse', ), + ('dataframe', ), ]) def test_regressor(loop, output, listen_port): with cluster() as (s, [a, b]): @@ -168,19 +177,19 @@ def test_regressor(loop, output, listen_port): assert_eq(y, p2, rtol=1., atol=50.) -@pytest.mark.parametrize("output, listen_port, alpha", [ - ('array', 41400, .1), - ('array', 42400, .5), - ('array', 43400, .9), - ('scipy_csr_matrix', 44400, .1), - ('scipy_csr_matrix', 45400, .5), - ('scipy_csr_matrix', 46400, .9), - ('sparse', 47400, .1), - ('sparse', 48400, .5), - ('sparse', 49400, .9), - ('dataframe', 50400, .1), - ('dataframe', 51400, .5), - ('dataframe', 52400, .9), +@pytest.mark.parametrize("output, alpha", [ + ('array', .1), + ('array', .5), + ('array', .9), + ('scipy_csr_matrix', .1), + ('scipy_csr_matrix', .5), + ('scipy_csr_matrix', .9), + ('sparse', .1), + ('sparse', .5), + ('sparse', .9), + ('dataframe', .1), + ('dataframe', .5), + ('dataframe', .9), ]) def test_regressor_quantile(loop, output, listen_port, alpha): with cluster() as (s, [a, b]): @@ -202,12 +211,12 @@ def test_regressor_quantile(loop, output, listen_port, alpha): np.isclose(q2, alpha, atol=.1) -def test_regressor_local_predict(loop): +def test_regressor_local_predict(loop, listen_port): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop): X, y, w, dX, dy, dw = _create_data('regression', output="array") - a = dlgbm.LGBMRegressor(local_listen_port=30400, seed=42) + a = dlgbm.LGBMRegressor(local_listen_port=listen_port, seed=42) a = a.fit(dX, dy, sample_weight=dw) p1 = a.predict(dX) p2 = a.to_local().predict(X) diff --git a/system_tests/test_fit_predict.py b/system_tests/test_fit_predict.py index 4919db9..41bbcfa 100644 --- a/system_tests/test_fit_predict.py +++ b/system_tests/test_fit_predict.py @@ -12,12 +12,21 @@ def client(): return Client(os.getenv('SCHEDULER')) if os.getenv('SCHEDULER') else Client() -def test_classify_newsread(client): +@pytest.fixture() +def listen_port(): + listen_port.port += 10 + return listen_port.port + + +listen_port.port = 12400 + + +def test_classify_newsread(client, listen_port): data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) dX = data.iloc[:, :-1] dy = data.iloc[:, -1] - d_classif = dlgbm.LGBMClassifier(n_estimators=50, local_listen_port=12400) + d_classif = dlgbm.LGBMClassifier(n_estimators=50, local_listen_port=listen_port) d_classif.fit(dX, dy) dy_pred = d_classif.predict(dX, client=client) @@ -29,12 +38,12 @@ def test_classify_newsread(client): assert acc_score > 0.8 -def test_regress_newsread(client): +def test_regress_newsread(client, listen_port): data = dd.read_csv("./system_tests/data/*.gz", compression="gzip", blocksize=None) dX = data.iloc[:, 1:] dy = data.iloc[:, 0] - d_regress = dlgbm.LGBMRegressor(n_estimators=50, local_listen_port=13400) + d_regress = dlgbm.LGBMRegressor(n_estimators=50, local_listen_port=listen_port) d_regress.fit(dX, dy) dy_pred = d_regress.predict(dX, client=client) From a62f7011e2fabcfc3023a115b57c15189a376c13 Mon Sep 17 00:00:00 2001 From: mlemainque Date: Mon, 7 Jan 2019 15:30:53 +0100 Subject: [PATCH 15/16] Cleaner parametrize (fix test_regressor) --- dask_lightgbm/tests/test_core.py | 47 +++++--------------------------- 1 file changed, 7 insertions(+), 40 deletions(-) diff --git a/dask_lightgbm/tests/test_core.py b/dask_lightgbm/tests/test_core.py index 03e2730..e3befab 100644 --- a/dask_lightgbm/tests/test_core.py +++ b/dask_lightgbm/tests/test_core.py @@ -64,16 +64,8 @@ def _create_data(objective, n_samples=100, centers=2, output="array", chunk_size return X, y, w, dX, dy, dw -@pytest.mark.parametrize("output, centers", [ #noqa - ('array', [[-4, -4], [4, 4]]), - ('array', [[-4, -4], [4, 4], [-4, 4]]), - ('scipy_csr_matrix', [[-4, -4], [4, 4]]), - ('scipy_csr_matrix', [[-4, -4], [4, 4], [-4, 4]]), - ('sparse', [[-4, -4], [4, 4]]), - ('sparse', [[-4, -4], [4, 4], [-4, 4]]), - ('dataframe', [[-4, -4], [4, 4]]), - ('dataframe', [[-4, -4], [4, 4], [-4, 4]]) - ]) # noqa +@pytest.mark.parametrize('output', ['array', 'scipy_csr_matrix', 'sparse', 'dataframe']) +@pytest.mark.parametrize('centers', [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]) # noqa def test_classifier(loop, output, listen_port, centers): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: @@ -100,16 +92,8 @@ def test_classifier(loop, output, listen_port, centers): assert_eq(y, p2) -@pytest.mark.parametrize("output, centers", [ #noqa - ('array', [[-4, -4], [4, 4]]), - ('array', [[-4, -4], [4, 4], [-4, 4]]), - ('scipy_csr_matrix', [[-4, -4], [4, 4]]), - ('scipy_csr_matrix', [[-4, -4], [4, 4], [-4, 4]]), - ('sparse', [[-4, -4], [4, 4]]), - ('sparse', [[-4, -4], [4, 4], [-4, 4]]), - ('dataframe', [[-4, -4], [4, 4]]), - ('dataframe', [[-4, -4], [4, 4], [-4, 4]]) - ]) # noqa +@pytest.mark.parametrize('output', ['array', 'scipy_csr_matrix', 'sparse', 'dataframe']) +@pytest.mark.parametrize('centers', [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]) # noqa def test_classifier_proba(loop, output, listen_port, centers): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: @@ -145,12 +129,7 @@ def test_classifier_local_predict(loop, listen_port): #noqa assert_eq(y, p2) -@pytest.mark.parametrize("output", [ - ('array', ), - ('scipy_csr_matrix', ), - ('sparse', ), - ('dataframe', ), -]) +@pytest.mark.parametrize('output', ['array', 'scipy_csr_matrix', 'sparse', 'dataframe']) def test_regressor(loop, output, listen_port): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: @@ -177,20 +156,8 @@ def test_regressor(loop, output, listen_port): assert_eq(y, p2, rtol=1., atol=50.) -@pytest.mark.parametrize("output, alpha", [ - ('array', .1), - ('array', .5), - ('array', .9), - ('scipy_csr_matrix', .1), - ('scipy_csr_matrix', .5), - ('scipy_csr_matrix', .9), - ('sparse', .1), - ('sparse', .5), - ('sparse', .9), - ('dataframe', .1), - ('dataframe', .5), - ('dataframe', .9), -]) +@pytest.mark.parametrize('output', ['array', 'scipy_csr_matrix', 'sparse', 'dataframe']) +@pytest.mark.parametrize('alpha', [.1, .5, .9]) def test_regressor_quantile(loop, output, listen_port, alpha): with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as client: From 14810265a74dc47011cf58bd2bfbe1c86deb072d Mon Sep 17 00:00:00 2001 From: mlemainque Date: Mon, 7 Jan 2019 15:40:29 +0100 Subject: [PATCH 16/16] Remove pytest rerun --- .travis.yml | 2 +- docker-compose.yml | 2 +- setup.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 39bc76c..6d1cb06 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,5 +16,5 @@ install: - docker build -t ${DOCKER_REPOSITORY}:${TRAVIS_COMMIT} -f Dockerfile.test --pull=true . script: - - docker run ${DOCKER_REPOSITORY}:${TRAVIS_COMMIT} pytest /app/dask_lightgbm --reruns 3 --reruns-delay 30 + - docker run ${DOCKER_REPOSITORY}:${TRAVIS_COMMIT} pytest /app/dask_lightgbm - docker-compose up --abort-on-container-exit diff --git a/docker-compose.yml b/docker-compose.yml index 4ed28c1..ccc5c47 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,4 +28,4 @@ services: dockerfile: Dockerfile.test image: dask-lightgbm hostname: dask-client - command: ["pytest", "/app/system_tests", "--reruns", "3", "--reruns-delay", "30"] + command: ["pytest", "/app/system_tests"] diff --git a/setup.py b/setup.py index 61a6c2a..f40b938 100755 --- a/setup.py +++ b/setup.py @@ -14,7 +14,6 @@ extras_require = { "dev": [ "pytest>=3.9.0", - "pytest-rerunfailures>=5.0", "pandas>=0.23.0", 'dask[dataframe]', 'dask-ml',