Pyspark Factorization Machines Classification Example

      Factorization machines (FM) is a predictor model that estimates parameters under the high sparsity. The model combines advantages of SVM and applies a factorized parameters instead of dense parametrization like in SVM [2]. FM is a supervised learning algorithm and can be used in classification, regression, and recommendation system tasks in machine learning. PySpark MLLib API provides a FMClassifier class to classify binary data with factorization machines method.

    In this tutorial, you'll briefly learn how to train and classify binary classification data by using PySpark FMClassifier model. The tutorial covers:

  1. Preparing the data
  2. Prediction and accuracy check
  3. Source code listing
   We'll start by loading the required libraries for this tutorial.


from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import FMClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_breast_cancer
from pandas import DataFrame, Series
 

 
Preparing the data

    For a teaching purpose I use a simple dataset in this tutorial. If you want to evaluate the real performance of FMClassifier model, you need to prepare a large sparse dataset.  
    We use Breast Cancer dataset to perform binary classification and it can be easily loaded from the Scikit-learn dataset module. Below code explains how to load dataset and transform it into the pandas data frame type. 


bc = load_breast_cancer()

df_bc = DataFrame(bc.data, columns=bc.feature_names)
df_bc['label'] = Series(bc.target) 
  
print(df_bc.head())
  
mean radius  mean texture  ...  worst fractal dimension  label
0 17.99 10.38 ... 0.11890 0
1 20.57 17.77 ... 0.08902 0
2 19.69 21.25 ... 0.08758 0
3 11.42 20.38 ... 0.17300 0
4 20.29 14.34 ... 0.07678 0

[5 rows x 31 columns]
 

Next, we'll define SqlConext and create data frame by using df_bc data. You can check the data frame schema.
 
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_bc)
print(data.printSchema()) 
 
root
|-- mean radius: double (nullable = true)
|-- mean texture: double (nullable = true)
|-- mean perimeter: double (nullable = true)
|-- mean area: double (nullable = true)
|-- mean smoothness: double (nullable = true)
|-- mean compactness: double (nullable = true)
|-- mean concavity: double (nullable = true)
|-- mean concave points: double (nullable = true)
|-- mean symmetry: double (nullable = true)
|-- mean fractal dimension: double (nullable = true)
|-- radius error: double (nullable = true)
|-- texture error: double (nullable = true)
|-- perimeter error: double (nullable = true)
|-- area error: double (nullable = true)
|-- smoothness error: double (nullable = true)
|-- compactness error: double (nullable = true)
|-- concavity error: double (nullable = true)
|-- concave points error: double (nullable = true)
|-- symmetry error: double (nullable = true)
|-- fractal dimension error: double (nullable = true)
|-- worst radius: double (nullable = true)
|-- worst texture: double (nullable = true)
|-- worst perimeter: double (nullable = true)
|-- worst area: double (nullable = true)
|-- worst smoothness: double (nullable = true)
|-- worst compactness: double (nullable = true)
|-- worst concavity: double (nullable = true)
|-- worst concave points: double (nullable = true)
|-- worst symmetry: double (nullable = true)
|-- worst fractal dimension: double (nullable = true)
|-- label: long (nullable = true) 
 
 
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.

features = bc.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)
 
+--------------------+-----+
| features|label|
+--------------------+-----+
|[17.99,10.38,122....| 0|
|[20.57,17.77,132....| 0|
|[19.69,21.25,130....| 0|
+--------------------+-----+
only showing top 3 rows 
 

Next, we'll split data into the train and test parts.

 
# split data into train and test 
(train, test) = va_df.randomSplit([0.9, 0.1])
 
 

Prediction and Accuracy Check

   We'll define the factorization machines model by using the FMClassifier class and fit the model on train data. Here, we'll set 0.001 into the stepSize parameter. To predict test data, we can use trasnform() method.
 

# training 
fmc = FMClassifier(labelCol="label", stepSize=0.001)
fmc = fmc.fit(train)

# prediction
pred = fmc.transform(test)
pred.show(3
   
+--------------------+-----+--------------------+--------------------+----------+
| features|label| rawPrediction| probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[13.03,18.42,82.6...| 1|[-15.744299067728...|[1.45324129618984...| 1.0|
|[13.08,15.71,85.6...| 1|[-10.213107316564...|[3.66849487735132...| 1.0|
|[13.28,20.28,87.3...| 0|[19.3255629862012...|[0.99999999595410...| 0.0|
+--------------------+-----+--------------------+--------------------+----------+ 
only showing top 3 rows 
 
 
    After predicting test data, we'll check the prediction accuracy. Here, we can use MulticlassClassificationEvaluator. Confusion matrix can be created by using confusion_matrix function of sklearn.metrics module.

 
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
acc = evaluator.evaluate(pred)
 
print("Prediction Accuracy: ", acc)

y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm) 

Prediction Accuracy:  0.9642857142857143
Confusion Matrix:
[[21 1]
[ 1 33]] 
 
 
Finally, we'll stop spark context session.
 
# Stop session 
sc.stop()  

 
   In this tutorial, we've briefly learned how to fit and classify data by using PySpark FMClassifier class. The full source code is listed below.


Source code listing
 
 
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import FMClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_breast_cancer
from pandas import DataFrame, Series

bc = load_breast_cancer()

df_bc = DataFrame(bc.data, columns=bc.feature_names)
df_bc['label'] = Series(bc.target)
print(df_bc.head())

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_bc)
print(data.printSchema())

features = bc.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)

# split data into train and test 
(train, test) = va_df.randomSplit([0.9, 0.1])

# training 
fmc = FMClassifier(labelCol="label", stepSize=0.001)
fmc = fmc.fit(train)

# prediction
pred = fmc.transform(test)
pred.show(3)

# accucary check
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)

# confusion matrix
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

sc.stop() 
 

 
References:

No comments:

Post a Comment