MLlib Gradient-boosted Tree Regression Example with PySpark

         PySpark MLlib library provides a GBTRegressor model to implement gradient-boosted tree regression method. Gradient tree boosting is an ensemble of decision trees model to solve regression and classification tasks in machine learning. Improving the weak learners by different set of train data is the main concept of this model. 

    In this tutorial, we'll briefly learn how to fit and predict regression data by using PySpark GBTRegressor in Python. The tutorial covers:

  1. Preparing the data
  2. Prediction and accuracy check
  3. Visualizing the results
  4. Source code listing
   We'll start by loading the required libraries for this tutorial.

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd 
 


Preparing the data

   We use Boston Housing Price dataset as a target regression data and we can easily load it from sklearn.datasets module. Below code shows how to load dataset and transform it into the pandas data frame type. 

boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())
 

Next, we'll define SqlConext and create data frame by using df_boston data.
 
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema()) 
 
root
|-- CRIM: double (nullable = true)
|-- ZN: double (nullable = true)
|-- INDUS: double (nullable = true)
|-- CHAS: double (nullable = true)
|-- NOX: double (nullable = true)
|-- RM: double (nullable = true)
|-- AGE: double (nullable = true)
|-- DIS: double (nullable = true)
|-- RAD: double (nullable = true)
|-- TAX: double (nullable = true)
|-- PTRATIO: double (nullable = true)
|-- B: double (nullable = true)
|-- LSTAT: double (nullable = true)
|-- target: double (nullable = true)
 
 
 
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols=features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)
 
+--------------------+------+
| features|target|
+--------------------+------+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
+--------------------+------+
only showing top 3 rows
 

Next, we'll split data into the train and test parts.

(train, test) = va_df.randomSplit([0.8, 0.2])
 
 

Prediction and Accuracy Check

   Next, we'll define the regressor model by using the GBTRegressor class. Here, we can change the parameters according to data content
 
gbtr = GBTRegressor(featuresCol='features', labelCol='target', maxIter=10)
gbtr = gbtr.fit(train)

 
    After training the model, we'll predict test data and check the accuracy metrics.

mdata = gbtr.transform(test)
mdata.show(3)
 
rmse=RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
rmse=rmse.evaluate(mdata) 
 
mae=RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="mae")
mae=mae.evaluate(mdata) 
 
r2=RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="r2")
r2=r2.evaluate(mdata)

print("RMSE: ", rmse)
print("MAE: ", mae)
print("R-squared: ", r2)
 
--------------------+------+------------------+
| features|target| prediction|
+--------------------+------+------------------+
|[0.02729,0.0,7.07...| 34.7| 33.78460260004924|
|[0.09744,0.0,5.96...| 20.0|21.706837297006956|
|[0.80271,0.0,8.14...| 20.2| 18.51482812579027|
+--------------------+------+------------------+
only showing top 3 rows
 
 
 
RMSE:  4.416076755529045
MAE: 2.826016527055411
R-squared: 0.7808863473079463
 
 
 
 
Visualizing the results
 
    To visualize the origianl and predicted data, we can use 'matplotlib' library. We'll extract those data from the 'mdata' object. 

x_ax = range(0, mdata.count())
y_pred=mdata.select("prediction").collect()
y_orig=mdata.select("target").collect()

plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show() 
 



If you do new executions of your code, do not forget to close the spark context session.
 
# Stop session 
sc.stop()  

 
   In this tutorial, we've briefly learned how to fit and predict regression data by using PySpark GBTRegressor model in Python. The full source code is listed below.


Source code listing
 
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd


boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema())

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)

(train, test) = va_df.randomSplit([0.8, 0.2])

gbtr = GBTRegressor(featuresCol='features', labelCol='target', maxIter=10)
gbtr = gbtr.fit(train)

mdata = gbtr.transform(test)
mdata.show(3)

rmse=RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
rmse=rmse.evaluate(mdata) 
 
mae=RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="mae")
mae=mae.evaluate(mdata) 
 
r2=RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="r2")
r2=r2.evaluate(mdata)

print("RMSE: ", rmse)
print("MAE: ", mae)
print("R-squared: ", r2)

x_ax = range(0, mdata.count())
y_pred=mdata.select("prediction").collect()
y_orig=mdata.select("target").collect()

plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()  

sc.stop()
 

 
References:

1 comment: