MLLib Linear Regression Example with PySpark

         Apache Spark is an analytic engine to process large scale dataset by using tools such as Spark SQL, MLLib and others. PySpark is a Python API to execute Spark applications in Python.

    In this tutorial, we'll briefly learn how to fit and predict regression data by using PySpark and MLLib Linear Regression model. The tutorial covers:

  1. Preparing the data
  2. Fitting and accuracy check
  3. Visualizing the results
  4. Source code listing
   We'll start by loading the required libraries for this tutorial.

 
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd 
 


Preparing the data

   We use Boston Housing Price dataset of Scikit-learn. We'll load dataset,  transform it into the data frame type, and combine into single features type by using VectorAssembler in order to make the appropriate input data format for LinearRegression class of PySpark ML library. 

boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())
 

Next, we'll define SqlConext and create data frame by using df_boston data.
 
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema()) 
 
root
|-- CRIM: double (nullable = true)
|-- ZN: double (nullable = true)
|-- INDUS: double (nullable = true)
|-- CHAS: double (nullable = true)
|-- NOX: double (nullable = true)
|-- RM: double (nullable = true)
|-- AGE: double (nullable = true)
|-- DIS: double (nullable = true)
|-- RAD: double (nullable = true)
|-- TAX: double (nullable = true)
|-- PTRATIO: double (nullable = true)
|-- B: double (nullable = true)
|-- LSTAT: double (nullable = true)
|-- target: double (nullable = true)
 
 
 
To combine all feature data and separate 'label' data in a dataset, we use VectorAnalyzer.

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols=features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)
 
+--------------------+------+
| features|target|
+--------------------+------+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
+--------------------+------+
only showing top 3 rows
 


Fitting and Accuracy Check

   Next, we'll define the regressor model by using the LinearRegression class. Here, we can change the parameters according to your data content. After fitting the model we can check coefficients and intercept values. 

lr=LinearRegression(featuresCol='features', labelCol='target',
                    regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(va_df)

print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)
 
Coefficients:  [-0.034024229130007695,0.009359015936752714,
0.0,2.247564189644528,-7.230786173732827,4.348712110587842,
0.0,-0.603564999831066,0.0,0.0,-0.8220712024477692,
0.00808024431913416,-0.5034480504252381]
Intercept: 20.025217329865892
 
 

Now, we check the accuracy scores of fitted data. The model provides easy summary report of accuracy metrics. 

print("MSE: ", lr_model.summary.meanSquaredError)
print("MAE: ", lr_model.summary.meanAbsoluteError)
print("R-squared: ", lr_model.summary.r2) 
  
MSE:  23.832602753248327
MAE: 3.3409807187310054
R-squared: 0.7176886039395777
 

 
Visualizing the results
 
    We can use 'matplotlib' library to visualize the original and predicted 'label' data. We'll extract those data from the lr_model object. 

mdata = lr_model.transform(va_df)
mdata.show(3
 
+--------------------+------+------------------+
| features|target| prediction|
+--------------------+------+------------------+
|[0.00632,18.0,2.3...| 24.0|30.554831691938382|
|[0.02731,0.0,7.07...| 21.6| 25.47215641847489|
|[0.02729,0.0,7.07...| 34.7| 31.3186615896002|
+--------------------+------+------------------+ 
 
 
x_ax = range(0, mdata.count())
y_pred = mdata.select("prediction").collect()
y_orig = mdata.select("target").collect()  

 
Finally, we'll visualize the original and predicted data in a plot.
 
plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()  
 

If you do new executions of your code, do not forget to close the spark context session.
 
# Stop session 
sc.stop()  

 
   In this tutorial, we've briefly learned how to fit and predict regression data by using PySpark and MLLib LinearRegression model. The full source code is listed below.


Source code listing
 
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd


boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema())

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)

lr = LinearRegression(featuresCol='features', labelCol='target'
                      regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(va_df)

print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)

print("MSE: ", lr_model.summary.meanSquaredError)
print("MAE: ", lr_model.summary.meanAbsoluteError)
print("R-squared: ", lr_model.summary.r2)

mdata = lr_model.transform(va_df)
mdata.show(3)

x_ax = range(0, mdata.count())
y_pred=mdata.select("prediction").collect()
y_orig=mdata.select("target").collect()

plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()  

# Stop session 
sc.stop()   
 

References:

1 comment:

  1. when i run the following code in jupyter notebook-

    va_df = va.transform(data)
    va_df = va_df.select(['features', 'target'])
    va_df.show(3)

    i get the following error-
    ---------------------------------------------------------------------------
    Py4JJavaError Traceback (most recent call last)
    Input In [12], in ()
    1 va_df = va.transform(data)
    2 va_df = va_df.select(['features', 'target'])
    ----> 3 va_df.show(3)

    ReplyDelete