Classification Example with Pyspark Gradient-boosted Tree Classifier

       Gradient tree boosting is an ensemble learning method that used in regression and classification tasks in machine learning. The model improves the weak learners by different set of train data to improve the quality of fit and prediction. PySpark MLlib library provides a GBTClassifier model to implement gradient-boosted tree classification method.

    In this tutorial, you'll briefly learn how to train and classify binary classification data by using PySpark GBTClassifier model. The tutorial covers:

  1. Preparing the data
  2. Prediction and accuracy check
  3. Source code listing
   We'll start by loading the required libraries for this tutorial.


from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_breast_cancer
from pandas import DataFrame, Series 
 

 
Preparing the data

    We use Breast Cancer dataset to perform binary classification and it can be easily loaded from the Scikit-learn dataset module. Below code explains how to load dataset and transform it into the pandas data frame type. 


bc = load_breast_cancer()

df_bc = DataFrame(bc.data, columns=bc.feature_names)
df_bc['label'] = Series(bc.target) 
  
print(df_bc.head())
  
mean radius  mean texture  ...  worst fractal dimension  label
0 17.99 10.38 ... 0.11890 0
1 20.57 17.77 ... 0.08902 0
2 19.69 21.25 ... 0.08758 0
3 11.42 20.38 ... 0.17300 0
4 20.29 14.34 ... 0.07678 0

[5 rows x 31 columns]
 

Next, we'll define SqlConext and create data frame by using df_bc data. You can check the data frame schema.
 
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_bc)
print(data.printSchema()) 
 
root
|-- mean radius: double (nullable = true)
|-- mean texture: double (nullable = true)
|-- mean perimeter: double (nullable = true)
|-- mean area: double (nullable = true)
|-- mean smoothness: double (nullable = true)
|-- mean compactness: double (nullable = true)
|-- mean concavity: double (nullable = true)
|-- mean concave points: double (nullable = true)
|-- mean symmetry: double (nullable = true)
|-- mean fractal dimension: double (nullable = true)
|-- radius error: double (nullable = true)
|-- texture error: double (nullable = true)
|-- perimeter error: double (nullable = true)
|-- area error: double (nullable = true)
|-- smoothness error: double (nullable = true)
|-- compactness error: double (nullable = true)
|-- concavity error: double (nullable = true)
|-- concave points error: double (nullable = true)
|-- symmetry error: double (nullable = true)
|-- fractal dimension error: double (nullable = true)
|-- worst radius: double (nullable = true)
|-- worst texture: double (nullable = true)
|-- worst perimeter: double (nullable = true)
|-- worst area: double (nullable = true)
|-- worst smoothness: double (nullable = true)
|-- worst compactness: double (nullable = true)
|-- worst concavity: double (nullable = true)
|-- worst concave points: double (nullable = true)
|-- worst symmetry: double (nullable = true)
|-- worst fractal dimension: double (nullable = true)
|-- label: long (nullable = true) 
 
 
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.

features = bc.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)
 
+--------------------+-----+
| features|label|
+--------------------+-----+
|[17.99,10.38,122....| 0|
|[20.57,17.77,132....| 0|
|[19.69,21.25,130....| 0|
+--------------------+-----+
only showing top 3 rows 
 

Next, we'll split data into the train and test parts.

 
# split data into train and test 
(train, test) = va_df.randomSplit([0.9, 0.1])
 
 

Prediction and Accuracy Check

   We'll define the factorization machines model by using the GBTClassifier class and fit the model on train data. Here, we'll set 20 into the maxIter parameter. To predict test data, we can use trasnform() method.
 

# training  
gbtc = GBTClassifier(labelCol="label", maxIter=20)
gbtc = gbtc.fit(train)

# prediction
pred = gbtc.transform(test)
pred.show(3
   
+--------------------+-----+--------------------+--------------------+----------+
| features|label| rawPrediction| probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[9.504,12.44,60.3...| 1|[-1.3227570335086...|[0.06626603644018...| 1.0|
|[10.95,21.35,71.9...| 0|[-0.5325503906771...|[0.25633589604367...| 1.0|
|[13.0,21.82,87.5,...| 0|[1.33986281860457...|[0.93581964699694...| 0.0|
+--------------------+-----+--------------------+--------------------+----------+ 
only showing top 3 rows 
 
 
    After predicting test data, we'll check the prediction accuracy. Here, we can use MulticlassClassificationEvaluator. Confusion matrix can be created by using confusion_matrix function of sklearn.metrics module.

 
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
acc = evaluator.evaluate(pred)
 
print("Prediction Accuracy: ", acc)

y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 


Prediction Accuracy:  0.9166666666666666
Confusion Matrix:
[[23 1]
[ 4 32]] 
 
 
Finally, we'll stop spark context session.
 
# Stop session 
sc.stop()  

 
   In this tutorial, we've briefly learned how to fit and classify data by using PySpark GBTClassifier class. The full source code is listed below.


Source code listing
 
 
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_breast_cancer
from pandas import DataFrame, Series

bc = load_breast_cancer()

df_bc = DataFrame(bc.data, columns=bc.feature_names)
df_bc['label'] = Series(bc.target)
print(df_bc.head())

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_bc)
print(data.printSchema())

features = bc.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)

# split data into train and test 
(train, test) = va_df.randomSplit([0.9, 0.1])

# training 
gbtc = GBTClassifier(labelCol="label", maxIter=10)
gbtc = gbtc.fit(train)

# prediction
pred = gbtc.transform(test)
pred.show(3)

# accucary check
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)

# confusion matrix
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

sc.stop() 
 

 
References:

No comments:

Post a Comment