Pyspark Regression Example with Factorization Machines Regressor

    Factorization machine (FM) is a predictor model that estimates parameters under the high sparsity. The model combines advantages of SVM and applies a factorized parameters instead of dense parametrization like in SVM [2]. FM is a supervised learning algorithm and can be used in classification, regression, and recommendation system tasks in machine learning. PySpark MLLib API provides a FMRegressor class to implement factorization machines for regression tasks.

     In this tutorial, you'll briefly learn how to fit and predict regression data by using PySpark FMRegressor in Python. The tutorial covers:

  1. Preparing the data
  2. Prediction and accuracy check
  3. Visualizing the results
  4. Source code listing
   We'll start by loading the required libraries for this tutorial.


from pyspark.ml.regression import FMRegressor
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import load_boston
import pandas as pd
import matplotlib.pyplot as plt


Preparing the data

   We use Boston Housing Price dataset as a target regression data and we can easily load it from sklearn.datasets module. Below code shows how to load the dataset and transform it into the pandas data frame type. 

boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())
 

Next, we'll define SqlConext and create data frame by using df_boston data.
 
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema()) 
 
root
|-- CRIM: double (nullable = true)
|-- ZN: double (nullable = true)
|-- INDUS: double (nullable = true)
|-- CHAS: double (nullable = true)
|-- NOX: double (nullable = true)
|-- RM: double (nullable = true)
|-- AGE: double (nullable = true)
|-- DIS: double (nullable = true)
|-- RAD: double (nullable = true)
|-- TAX: double (nullable = true)
|-- PTRATIO: double (nullable = true)
|-- B: double (nullable = true)
|-- LSTAT: double (nullable = true)
|-- target: double (nullable = true)
 
 
 
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols=features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)
 
+--------------------+------+
| features|target|
+--------------------+------+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
+--------------------+------+
only showing top 3 rows
 

Next, we'll split data into the train and test parts.

(train, test) = va_df.randomSplit([0.8, 0.2])
 
 

Prediction and Accuracy Check

   Next, we'll define the regressor model by using the FMRegressor class. Here, we'll set label column and step size parameters as shown below. Then, we'll fit the model on train data. You can check the coefficients and intercept value.
 

fmr = FMRegressor(labelCol="target", stepSize=0.001)

model = fmr.fit(train)
print(model) 
 
print("Coefficients: ", model.linear)
print("Intercept: ", model.intercept)
 
Coefficients:  [-0.0031597215118115292,0.01264775286246078,
0.006751437529376054,0.06688063485997252,0.024022735933618748,
0.03552901136571402,0.01464627116414251,0.026957322766026113,
0.010074131478299764,0.013412548728266136,0.024255850712787255,
0.022587078320568228,-0.0018044787308271859]
Intercept: 0.0296902251363327 
 
   
 
    After training the model, we'll predict test data and check the accuracy. Here, we'll use RegressionEvaluater to extract accuracy metrics.

tdata = model.transform(test)
tdata.show(3)

rmse = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = rmse.evaluate(tdata)
mae = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="mae")
mae = mae.evaluate(tdata)
r2 = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(tdata)

print("RMSE: ", rmse)
print("MAE: ", mae)
print("R-squared: ", r2)
 
+--------------------+------+------------------+
| features|target| prediction|
+--------------------+------+------------------+
|[0.02729,0.0,7.07...| 34.7|28.002172179767474|
|[0.02985,0.0,2.18...| 28.7|27.391726079614333|
|[0.08829,12.5,7.8...| 22.9|22.811250744987678|
+--------------------+------+------------------+
only showing top 3 rows

RMSE: 6.920893476410719
MAE: 5.113965067177143
R-squared: 0.45681566779638527 
 
 
 
Visualizing the results
 
    To visualize the original and predicted data, we can use 'matplotlib' library. First, we'll extract original and predicted data from the 'tdata' object. 

x_ax = range(0, tdata.count())
y_pred=tdata.select("prediction").collect()
y_orig=tdata.select("target").collect()
 
plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show() 
 



If you do new executions of your code, do not forget to close the spark context session.
 
# Stop session 
sc.stop()  

 
   In this tutorial, we've briefly learned how to fit and predict regression data by using PySpark FMRegressor model in Python. The full source code is listed below.


Source code listing
 
 
from pyspark.ml.regression import FMRegressor
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import load_boston
import pandas as pd
import matplotlib.pyplot as plt

boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema())

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)

(train, test) = va_df.randomSplit([0.8, 0.2])

fmr = FMRegressor(labelCol="target", stepSize=0.001)

model = fmr.fit(train)
print(model)

print("Coefficients: ", model.linear)
print("Intercept: ", model.intercept)

tdata = model.transform(test)
tdata.show(3)

rmse = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = rmse.evaluate(tdata)
mae = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="mae")
mae = mae.evaluate(tdata)
r2 = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(tdata)

print("RMSE: ", rmse)
print("MAE: ", mae)
print("R-squared: ", r2)

x_ax = range(0, tdata.count())
y_pred=tdata.select("prediction").collect()
y_orig=tdata.select("target").collect()

plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()  

sc.stop() 
 

 
References:

No comments:

Post a Comment