top of page

AI - Apache Spark


Watson Studio, formerly Data Science Experience or DSX, is IBM’s software platform for data science. The platform consists of a workspace that includes multiple collaboration and open-source tools for use in data science.




# ETL - Load the file , transform the file and store the file

# Install Apache Spark

!pip install tensorflow

!pip install pyspark==2.4.5

!pip install plotly

import pandas as pd

import numpy as np

from pathlib import Path

import matplotlib.pyplot as plt

from pyspark.sql.types import StructType, StructField, IntegerType

import os

from io import StringIO

import plotly.express as px

# Input Data Set is uploaded in git repository

!git clone https://github.com/spribylova/Coursera_IBM_Watson

# View all datasets avaialble in the repository, I use csv format for file

! ls Coursera_IBM_Watson

# get list of folders/files in folder HMP_Dataset

file_list = os.listdir('Coursera_IBM_Watson')

file_list

pd.read_csv('Coursera_IBM_Watson/Data_IBM_Watson.csv').dtypes

pd.read_csv('Coursera_IBM_Watson/Data_IBM_Watson.csv')

# Replace missing values using median (various Missing values imputation methodologies are ffill, bfill, pad, … )

df = pd.read_csv('Coursera_IBM_Watson/Data_IBM_Watson.csv')

df.head()

df.isnull().sum()

UID 31

Category 30

KW 0

Web 7

Emp_count 50

Sales 56

Note 55

KW_search 3

FB_likes 51

FB 29

Position_CH 23

Impressions 23

Visits 23

Value_visitor 44

Value 23

Links 25

Site_count 30

Capital 55

Headquarter 0

City 18

Registered 22

dtype: int64

median = df['Sales'].median()

df['Sales'].fillna(median, inplace=True)

median = df['Visits'].median()

df['Visits'].fillna(median, inplace=True)

median = df['Links'].median()

df['Links'].fillna(median, inplace=True)

median = df['Emp_count'].median()

df['Emp_count'].fillna(median, inplace=True)

median = df['KW_search'].median()

df['KW_search'].fillna(median, inplace=True)

median = df['FB_likes'].median()

df['FB_likes'].fillna(median, inplace=True)

median = df['Impressions'].median()

df['Impressions'].fillna(median, inplace=True)

median = df['Value_visitor'].median()

df['Value_visitor'].fillna(median, inplace=True)

median = df['Value'].median()

df['Value'].fillna(median, inplace=True)

median = df['Site_count'].median()

df['Site_count'].fillna(median, inplace=True)

median = df['Capital'].median()

df['Capital'].fillna(median, inplace=True)

df_inputed=df

# provide descriptive statistics

df_inputed.describe()

df=df_inputed

# print few visualisations

df1 = df.nlargest(10,'Emp_count')

import plotly.express as px

fig = px.bar(df1, x='KW', y='Emp_count')

fig.show()

df1 = df.nlargest(10,'KW_search')

import plotly.express as px

fig = px.bar(df1, x='KW', y='KW_search')

fig.show()

df1 = df.nlargest(10,'Sales')

import plotly.express as px

fig = px.bar(df1, x='KW', y='Sales')

fig.show()

# random split

train=df.sample(frac=0.8,random_state=200) #random state is a seed value

test=df.drop(train.index)

# initialize spark context

from pyspark import SparkConf

from pyspark import SparkContext as sc

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType

from pyspark.sql.functions import lit

from pyspark.ml.tuning import TrainValidationSplit

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.tuning import ParamGridBuilder

from pyspark.ml.regression import LinearRegression

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import VectorAssembler

from pyspark.ml.feature import MinMaxScaler

from pyspark.ml.tuning import CrossValidator

from pyspark.ml.evaluation import RegressionEvaluator

!pip install findspark

import findspark

!pip install pyspark

def getOrCreate(self):

"""Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new

one based on the options set in this builder.

"""

with self._lock:

from pyspark.conf import SparkConf

from pyspark.context import SparkContext

from pyspark.sql.context import SQLContext

sparkConf = SparkConf()

for key, value in self._options.items():

sparkConf.set(key, value)

sparkContext = SparkContext.getOrCreate(sparkConf)

return SQLContext.getOrCreate(sparkContext).sparkSession

schema = StructType([

StructField("x", IntegerType(), True),

StructField("y", IntegerType(), True),

StructField("z", IntegerType(), True)])

spark.sql("SELECT * from df").show()

# features are formatted as a single vector. So the first stage of this workflow is the VectorAssembler. This is to define the workflow.

indexer = StringIndexer(inputCol="class", outputCol="label")

feature_list = []

for col in df.columns:

if col == 'label':

continue

else:

feature_list.append(col)

vectorAssembler = VectorAssembler(inputCols=feature_list,outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

# The only inputs for the Random Forest model are the label and features. Parameters are assigned in the tuning piece.

from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="label", featuresCol="features")

crossval = CrossValidator(estimator=pipeline,

estimatorParamMaps=paramGrid,

evaluator=RegressionEvaluator(),

numFolds=3)

cvModel = crossval.fit(train)

predictions = cvModel.transform(test)

# we put our simple, two-stage workflow into an ML pipeline.

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vectorassembler, rf])

model= pipeline.fit(train)

model = pipeline.fit(test)

# To evaluate our model and the corresponding “grid” of parameter variables

from pyspark.ml.tuning import ParamGridBuilder

import numpy as np

paramGrid = ParamGridBuilder() \

.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \

.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \

.build()

# evaluate

import matplotlib.pyplot as plt

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

rfPred = cvModel.transform(df)

rfResult = rfPred.toPandas()

plt.plot(rfResult.label, rfResult.prediction, 'bo')

plt.xlabel('Price')

plt.ylabel('Prediction')

plt.suptitle("Model Performance RMSE: %f" % rmse)

plt.show()

# feature importance

bestPipeline = cvModel.bestModel

bestModel = bestPipeline.stages[1]

importances = bestModel.featureImportances

x_values = list(range(len(importances)))

plt.bar(x_values, importances, orientation = 'vertical')

plt.xticks(x_values, feature_list, rotation=40)

plt.ylabel('Importance')

plt.xlabel('Feature')

plt.title('Feature Importances')

# best parameters

print('numTrees - ', bestModel.getNumTrees)

print('maxDepth - ', bestModel.getOrDefault('maxDepth'))


9 views0 comments

Recent Posts

See All

Comments


bottom of page