Watson Studio, formerly Data Science Experience or DSX, is IBM’s software platform for data science. The platform consists of a workspace that includes multiple collaboration and open-source tools for use in data science.
# ETL - Load the file , transform the file and store the file
# Install Apache Spark
!pip install tensorflow
!pip install pyspark==2.4.5
!pip install plotly
import pandas as pd
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
from pyspark.sql.types import StructType, StructField, IntegerType
import os
from io import StringIO
import plotly.express as px
# Input Data Set is uploaded in git repository
!git clone https://github.com/spribylova/Coursera_IBM_Watson
# View all datasets avaialble in the repository, I use csv format for file
! ls Coursera_IBM_Watson
# get list of folders/files in folder HMP_Dataset
file_list = os.listdir('Coursera_IBM_Watson')
file_list
pd.read_csv('Coursera_IBM_Watson/Data_IBM_Watson.csv').dtypes
pd.read_csv('Coursera_IBM_Watson/Data_IBM_Watson.csv')
# Replace missing values using median (various Missing values imputation methodologies are ffill, bfill, pad, … )
df = pd.read_csv('Coursera_IBM_Watson/Data_IBM_Watson.csv')
df.head()
df.isnull().sum()
UID 31
Category 30
KW 0
Web 7
Emp_count 50
Sales 56
Note 55
KW_search 3
FB_likes 51
FB 29
Position_CH 23
Impressions 23
Visits 23
Value_visitor 44
Value 23
Links 25
Site_count 30
Capital 55
Headquarter 0
City 18
Registered 22
dtype: int64
median = df['Sales'].median()
df['Sales'].fillna(median, inplace=True)
median = df['Visits'].median()
df['Visits'].fillna(median, inplace=True)
median = df['Links'].median()
df['Links'].fillna(median, inplace=True)
median = df['Emp_count'].median()
df['Emp_count'].fillna(median, inplace=True)
median = df['KW_search'].median()
df['KW_search'].fillna(median, inplace=True)
median = df['FB_likes'].median()
df['FB_likes'].fillna(median, inplace=True)
median = df['Impressions'].median()
df['Impressions'].fillna(median, inplace=True)
median = df['Value_visitor'].median()
df['Value_visitor'].fillna(median, inplace=True)
median = df['Value'].median()
df['Value'].fillna(median, inplace=True)
median = df['Site_count'].median()
df['Site_count'].fillna(median, inplace=True)
median = df['Capital'].median()
df['Capital'].fillna(median, inplace=True)
df_inputed=df
# provide descriptive statistics
df_inputed.describe()
df=df_inputed
# print few visualisations
df1 = df.nlargest(10,'Emp_count')
import plotly.express as px
fig = px.bar(df1, x='KW', y='Emp_count')
fig.show()
df1 = df.nlargest(10,'KW_search')
import plotly.express as px
fig = px.bar(df1, x='KW', y='KW_search')
fig.show()
df1 = df.nlargest(10,'Sales')
import plotly.express as px
fig = px.bar(df1, x='KW', y='Sales')
fig.show()
# random split
train=df.sample(frac=0.8,random_state=200) #random state is a seed value
test=df.drop(train.index)
# initialize spark context
from pyspark import SparkConf
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import lit
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
!pip install findspark
import findspark
!pip install pyspark
def getOrCreate(self):
"""Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new
one based on the options set in this builder.
"""
with self._lock:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
sparkConf = SparkConf()
for key, value in self._options.items():
sparkConf.set(key, value)
sparkContext = SparkContext.getOrCreate(sparkConf)
return SQLContext.getOrCreate(sparkContext).sparkSession
schema = StructType([
StructField("x", IntegerType(), True),
StructField("y", IntegerType(), True),
StructField("z", IntegerType(), True)])
spark.sql("SELECT * from df").show()
# features are formatted as a single vector. So the first stage of this workflow is the VectorAssembler. This is to define the workflow.
indexer = StringIndexer(inputCol="class", outputCol="label")
feature_list = []
for col in df.columns:
if col == 'label':
continue
else:
feature_list.append(col)
vectorAssembler = VectorAssembler(inputCols=feature_list,outputCol="features")
normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")
# The only inputs for the Random Forest model are the label and features. Parameters are assigned in the tuning piece.
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="label", featuresCol="features")
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(),
numFolds=3)
cvModel = crossval.fit(train)
predictions = cvModel.transform(test)
# we put our simple, two-stage workflow into an ML pipeline.
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorassembler, rf])
model= pipeline.fit(train)
model = pipeline.fit(test)
# To evaluate our model and the corresponding “grid” of parameter variables
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
.build()
# evaluate
import matplotlib.pyplot as plt
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
rfPred = cvModel.transform(df)
rfResult = rfPred.toPandas()
plt.plot(rfResult.label, rfResult.prediction, 'bo')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()
# feature importance
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[1]
importances = bestModel.featureImportances
x_values = list(range(len(importances)))
plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')
# best parameters
print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))
Comments