MLLib Naive Bayes Classification Example with PySpark

    PySpark MLLib API provides a NaiveBayes class to classify data with Naive Bayes method. Naive Bayes, based on Bayes Theorem is a supervised learning technique to solve classification problems. The model calculates the probability and conditional probability of each class based on input data and performs the classification.

    In this tutorial, you'll briefly learn how to train and classify data by using PySpark NaiveBayes model. The tutorial covers:
  1. Preparing the data
  2. Prediction and accuracy check
  3. Source code listing
   We'll start by loading the required libraries for this tutorial.

 
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd
 
 
Preparing the data

   We use Iris dataset to perform classification and it can be easily loaded from the Scikit-learn dataset module. Below code explains how to load dataset and transform it into the pandas data frame type. 

iris = load_iris()
df_iris = pd.DataFrame(iris.data, columns=iris.feature_names)
df_iris['label'] = pd.Series(iris.target)
 
print(df_iris.head())
 
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0 5.1 3.5 ... 0.2 0
1 4.9 3.0 ... 0.2 0
2 4.7 3.2 ... 0.2 0
3 4.6 3.1 ... 0.2 0
4 5.0 3.6 ... 0.2 0
 
 

Next, we'll define SqlConext and create data frame by using df_iris data.
 
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_iris)
print(data.printSchema()) 
 
root
|-- sepal length (cm): double (nullable = true)
|-- sepal width (cm): double (nullable = true)
|-- petal length (cm): double (nullable = true)
|-- petal width (cm): double (nullable = true)
|-- label: long (nullable = true)
 
 
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.

features = iris.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)
  
+-----------------+-----+
| features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]| 0|
|[4.9,3.0,1.4,0.2]| 0|
|[4.7,3.2,1.3,0.2]| 0|
+-----------------+-----+
only showing top 3 rows
 

Next, we'll split data into the train and test parts.

 
(train, test) = va_df.randomSplit([0.9, 0.1])
 
 

Prediction and Accuracy Check

   Next, we'll define the decision tree classifier model by using the NaiveBayes class and fit model on train data. We can predict test data by using trasnform() method.
 
 
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb = nb.fit(train)

pred = nb.transform(test)
pred.show(3

+-----------------+-----+--------------------+--------------------+----------+
| features|label| rawPrediction| probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[5.8,4.0,1.2,0.2]| 0|[-12.573605204378...|[0.85127877467313...| 0.0|
|[5.2,3.4,1.4,0.2]| 0|[-11.878572925715...|[0.75529148493192...| 0.0|
|[5.6,2.5,3.9,1.1]| 1|[-19.267419970902...|[0.08139949546395...| 1.0|
+-----------------+-----+--------------------+--------------------+----------+
only showing top 3 rows
 
 
 
    After training the model, we'll predict test data and check the accuracy metrics. Here, we can use MulticlassClassificationEvaluator to check the accuracy. Confusion matrix can be created by using confusion_matrix function of sklearn.metrics module.

evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)
 
print("Prediction Accuracy: ", acc)
 
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

Prediction Accuracy:  0.9175084175084175
Confusion Matrix:
[[2 0 0]
[0 4 0]
[0 1 5]] 
 
 
Finally, we'll stop spark context session.
 
# Stop session 
sc.stop()  

 
   In this tutorial, we've briefly learned how to fit and classify data by using PySpark NaiveBayes class. The full source code is listed below.


Source code listing
 
 
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd


iris = load_iris()
df_iris = pd.DataFrame(iris.data, columns=iris.feature_names)
df_iris['label'] = pd.Series(iris.target)
print(df_iris.head())

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_iris)
print(data.printSchema())

features = iris.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)

(train, test) = va_df.randomSplit([0.9, 0.1])

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb = nb.fit(train)

pred = nb.transform(test)
pred.show(3)

evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)

y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

sc.stop() 
 

 
References:

No comments:

Post a Comment