PySpark Generalized Linear Regression Example

    Generalized linear regression is a linear regression that follows any distribution other than normal distribution. PySpark provides a GeneralizedLinearRegression model that includes Gaussian, Poisson, logistic regression methods to predict regression problems.

    In this tutorial, we'll briefly learn how to fit and predict regression data by using PySpark GeneralizedLinearRegression in Python. The tutorial covers:

  1. Preparing the data
  2. Prediction and accuracy check
  3. Visualizing the results
  4. Source code listing
   We'll start by loading the required libraries for this tutorial.

from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import load_boston
import pandas as pd
import matplotlib.pyplot as plt 
 


Preparing the data

   We use Boston Housing Price dataset as a target regression data and we can easily load it from sklearn.datasets module. Below code shows how to load dataset and transform it into the pandas data frame type. 

boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())
 

Next, we'll define SqlConext and create data frame by using df_boston data.
 
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema()) 
 
root
|-- CRIM: double (nullable = true)
|-- ZN: double (nullable = true)
|-- INDUS: double (nullable = true)
|-- CHAS: double (nullable = true)
|-- NOX: double (nullable = true)
|-- RM: double (nullable = true)
|-- AGE: double (nullable = true)
|-- DIS: double (nullable = true)
|-- RAD: double (nullable = true)
|-- TAX: double (nullable = true)
|-- PTRATIO: double (nullable = true)
|-- B: double (nullable = true)
|-- LSTAT: double (nullable = true)
|-- target: double (nullable = true)
 
 
 
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols=features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)
 
+--------------------+------+
| features|target|
+--------------------+------+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
+--------------------+------+
only showing top 3 rows
 

Next, we'll split data into the train and test parts.

(train, test) = va_df.randomSplit([0.8, 0.2])
 
 

Prediction and Accuracy Check

   Next, we'll define the regressor model by using the GeneralizedLinearRegression class. Here, we can change the parameters according to data content. You can change family parameter if you want to change the distribution method like, Gaussian, logistic etc. Then, we'll train the model on train data. We can check the coefficients and intercepts. The 'summary' method provides additional properties of trainded model.
 

glr=GeneralizedLinearRegression(labelCol="target",family="poisson",maxIter=10,regParam=0.3)

model = glr.fit(train)
 
print("Coefficients: ", model.coefficients)
print("Intercept: ", model.intercept)
 
Coefficients:  [-0.010148363658164322,0.0014127521546288084,
0.0007822237455972935,0.020449846569659914,-0.004984395856161968,
0.05813269428464953,0.0009707035105313463,-0.03081832471491933,
0.015948434951052172,-0.0006842140427757848,-0.035875216756448974,
0.00045811775930736033,-0.03975363325270691]
Intercept: 3.8737122493159895
 
  
print(model.summary)
 
Coefficients:
Feature Estimate Std Error T Value P Value
(Intercept) 3.8737 0.1746 22.1852 0.0000
CRIM -0.0101 0.0023 -4.4504 0.0000
ZN 0.0014 0.0006 2.4772 0.0132
INDUS 0.0008 0.0031 0.2560 0.7979
CHAS 0.0204 0.0173 1.1841 0.2364
NOX -0.0050 0.0189 -0.2636 0.7921
RM 0.0581 0.0138 4.2259 0.0000
AGE 0.0010 0.0006 1.6094 0.1075
DIS -0.0308 0.0082 -3.7760 0.0002
RAD 0.0159 0.0035 4.5758 0.0000
TAX -0.0007 0.0002 -3.4265 0.0006
PTRATIO -0.0359 0.0057 -6.2886 0.0000
B 0.0005 0.0002 2.7437 0.0061
LSTAT -0.0398 0.0025 -15.6616 0.0000

(Dispersion parameter for poisson family taken to be 1.0000)
Null deviance: 1515.6486 on 395 degrees of freedom
Residual deviance: 329.3122 on 395 degrees of freedom
AIC: 2358.2324
 
 
    After training the model, we'll predict test data and check the accuracy metrics.

tdata = model.transform(test)
tdata.show(3)

rmse = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = rmse.evaluate(tdata)
mae = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="mae")
mae = mae.evaluate(tdata)
r2 = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(tdata)

print("RMSE: ", rmse)
print("MAE: ", mae)
print("R-squared: ", r2)
  
+--------------------+------+------------------+
| features|target| prediction|
+--------------------+------+------------------+
|[0.09378,12.5,7.8...| 21.7|19.731003924394102|
|[0.11747,12.5,7.8...| 18.9| 22.36646093334018|
|[0.17004,12.5,7.8...| 18.9|18.943575559905906|
+--------------------+------+------------------+
only showing top 3 rows

RMSE: 4.009492752595149
MAE: 3.054586317287038
R-squared: 0.7574608722630409 
 
 
 
Visualizing the results
 
    To visualize the origianl and predicted data, we can use 'matplotlib' library. We'll extract those data from the 'tdata' object. 

x_ax = range(0, tdata.count())
y_pred=tdata.select("prediction").collect()
y_orig=tdata.select("target").collect()
 
plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show() 
 


If you do new executions of your code, do not forget to close the spark context session.
 
# Stop session 
sc.stop()  

 
   In this tutorial, we've briefly learned how to fit and predict regression data by using PySpark GeneralizedLinearRegression model in Python. The full source code is listed below.


Source code listing
 
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.datasets import load_boston
import pandas as pd
import matplotlib.pyplot as plt


boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())

sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(data.printSchema())

features = boston.feature_names.tolist()

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)

(train, test) = va_df.randomSplit([0.8, 0.2])

glr=GeneralizedLinearRegression(labelCol="target",family="poisson",maxIter=10,regParam=0.3)

model = glr.fit(train)

print("Coefficients: ", model.coefficients)
print("Intercept: ", model.intercept)
print(str(model.summary))

tdata = model.transform(test)
tdata.show(3)

rmse = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = rmse.evaluate(tdata)
mae = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="mae")
mae = mae.evaluate(tdata)
r2 = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(tdata)

print("RMSE: ", rmse)
print("MAE: ", mae)
print("R-squared: ", r2)

x_ax = range(0, tdata.count())
y_pred=tdata.select("prediction").collect()
y_orig=tdata.select("target").collect()

plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()  


sc.stop() 
 

 
References:

No comments:

Post a Comment