Only this pageAll pages
Powered by GitBook
1 of 36

Sparkitecture

Loading...

Cloud Service Integration

Loading...

Loading...

Loading...

Data Preparation

Loading...

Loading...

Loading...

Machine Learning

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Streaming Data

Loading...

Operationalization

Loading...

Loading...

Natural Language Processing

Loading...

Loading...

Bioinformatics and Genomics

Loading...

Welcome to Sparkitecture!

Created by: Colby T. Ford, Ph.D.

PySpark Edition | A work in progress... | Created using GitBook.com

About

Sparkitecture is a collection of “cookbook-style” scripts for simplifying data engineering and machine learning in Apache Spark.

This is an open source project (GPL v3.0) for the Spark community. If you have ideas or contributions you'd like to add, submit a Feature Request or a write your code/tutorial/page and create a Pull Request in the GitHub repo.

How to Cite

BibTex

Text Citation

@misc{sparkitecture,

author = {Colby T. Ford},

title = {Sparkitecture - {A} collection of "cookbook-style" scripts for simplifying data engineering and machine learning in {Apache Spark}.},

month = oct,

year = 2019,

doi = {10.5281/zenodo.3468502},

url = {https://doi.org/10.5281/zenodo.3468502}

}

Colby T. Ford. (2019, October) Sparkitecture - A collection of "cookbook-style" scripts for simplifying data engineering and machine learning in Apache Spark., (Version v1.0.0). Zenodo.

Azure Storage

Storage is a managed service in Azure that provides highly available, secure, durable, scalable, and redundant storage for your data. Azure Storage includes both Blobs, Data Lake Store, and others.

Databricks-Specific Functionality

Mounting Blob Storage

Once you create your blob storage account in Azure, you will need to grab a couple bits of information from the Azure Portal before you mount your storage.

  • You can find your Storage Account Name (which will go in below) and your Key (which will go in below) under Access Keys in your Storage Account resource in Azure.

  • Go into your Storage Account resource in Azure and click on Blobs. Here, you will find all of your containers. Pick the one you want to mount and copy its name into below.

  • As for the mount point (/mnt/<FOLDERNAME> below), you can name this whatever you'd like, but it will help you in the long run to name it something useful along the lines of storageaccount_container.

Once you have the required bits of information, you can use the following code to mount the storage location inside the Databricks environment

You can then test to see if you can list the files in your mounted location

Resources:

  • To learn how to create an Azure Storage service, visit

Mounting Data Lake Storage

For finer-grained access controls on your data, you may opt to use Azure Data Lake Storage. In Databricks, you can connect to your data lake in a similar manner to blob storage. Instead of an access key, your user credentials will be passed through, therefore only showing you data that you specifically have access to.

Pass-through Azure Active Directory Credentials

To pass in your Azure Active Directory credentials from Databricks to Azure Data Lake Store, you will need to enable this feature in Databricks under New Cluster > Advanced Options.

Note: If you create a High Concurrency cluster, multiple users can use the same cluster. The Standard cluster mode will only allow a single user's credential at a time.

dbutils.fs.mount(
  source = "wasbs://<CONTAINERNAME>@<STORAGEACCOUNT>.blob.core.windows.net",
  mount_point = "/mnt/<FOLDERNAME>/",
  extra_configs = {"fs.azure.account.key.<STORAGEACCOUNT>.blob.core.windows.net":"<KEYGOESHERE>"})
display(dbutils.fs.ls("/mnt/<FOLDERNAME>"))
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}
dbutils.fs.mount(
  source = "abfss://<CONTAINERNAME>@<STORAGEACCOUNT>.dfs.core.windows.net/",
  mount_point = "/mnt/<FOLDERNAME>",
  extra_configs = configs)
https://docs.microsoft.com/en-us/azure/storage/
http://doi.org/10.5281/zenodo.3468502

Azure SQL Data Warehouse / Synapse

Set up Azure SQL DW connection parameters

Define a query

Create a Spark DataFrame using the SQL DW data

dwDatabase = "<DATABASENAME>" ## The Azure SQL Data Warehouse database name
dwServer = "<DWNAME>.database.windows.net" ## The Azure SQL Server
dwUser = "<USERNAME>" ## The dedicated loading user login 
dwPass = dbutils.secrets.get(scope = "<SECRETNAME>", key = "<KEYNAME>") ## The dediciated loading user login password
dwJdbcPort =  "1433" 
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
sqlQuery = """
  SELECT *, 'AzureSqlDw' AS SourceSystem
  FROM dbo.<TABLENAME>
"""
data = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", sqlDwUrlSmall) \
  .option("tempDir", tempDir) \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", sqlQuery) \
  .load() \
  .createOrReplaceTempView("<TEMPVIEWNAME>")
  #.write.saveAsTable("<TABLENAME>")

Reading and Writing Data

Reading in Data

...from Mounted Storage

dataset = spark.read.format('csv') \
                    .options(header='true', inferSchema='true', delimiter= ',') \
                    .load('/mnt/<FOLDERNAME>/<FILENAME>.csv')

## or spark.read.format('csv')...
## Formats: json, parquet, jdbc, orc, libsvm, csv, text, avro

...when Schema Inference Fails

from pyspark.sql.types import *

schema = StructType([StructField('ID', IntegerType(), True),
                     StructField('Value', DoubleType(), True),
                     StructField('Category', StringType(), True),
                     StructField('Date', DateType(), True)])

dataset = sqlContext.read.format('csv') \
                    .schema(schema) \
                    .options(header='true', delimiter= ',') \
                    .load('/mnt/<FOLDERNAME>/<FILENAME>.csv')

Writing out Data

df.coalesce(1) \
   .write.format("com.databricks.spark.csv") \
   .option("header", "true") \
   .save("file.csv")

Other Resources

Apache Spark Data Sources Documentation: https://spark.apache.org/docs/latest/sql-data-sources.html

About Spark MLlib

MLlib is Apache Spark's scalable machine learning library.

MLlib works with Spark's APIs and with NumPy in Python and with R libraries. Since Spark excels at iterative computation, MLlib runs very fast with highly-scalable, high-quality algorithms that leverage iteration.

Included Functionality:

ML algorithms include:

  • Classification: logistic regression, naive Bayes,...

  • Regression: generalized linear regression, survival regression,...

  • Decision trees, random forests, and gradient-boosted trees

  • Recommendation: alternating least squares (ALS)

  • Clustering: K-means, Gaussian mixtures (GMMs),...

  • Topic modeling: latent Dirichlet allocation (LDA)

  • Frequent itemsets, association rules, and sequential pattern mining

ML workflow utilities include:

  • Feature transformations: standardization, normalization, hashing,...

  • ML Pipeline construction

  • Model evaluation and hyper-parameter tuning

  • ML persistence: saving and loading models and Pipelines

Other utilities include:

  • Distributed linear algebra: SVD, PCA,...

  • Statistics: summary statistics, hypothesis testing,...

Resources

  • Spark MLlib Website

  • Getting Starting Guide

Regression

MLflow

MLflow is an open source library by the Databricks team designed for managing the machine learning lifecycle. It allows for the creation of projects, tracking of metrics, and model versioning.

Install mlflow using pip

pip install mlflow

MLflow can be used in any Spark environmnet, but the automated tracking and UI of MLflow is Databricks-Specific Functionality.

Track metrics and parameters

import mlflow

## Log Parameters and Metrics from your normal MLlib run
with mlflow.start_run():
  # Log a parameter (key-value pair)
  mlflow.log_param("alpha", 0.1)

  # Log a metric; metrics can be updated throughout the run
  mlflow.log_metric("AUC", 0.871827)
  mlflow.log_metric("F1", 0.726153)
  mlflow.log_metric("Precision", 0.213873)

MLflow GitHub: https://github.com/mlflow/mlflow/

DOI

Decision Tree

Setting Up a Decision Tree Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Decision Tree object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Classification

Description:

Classification algorithms are used to identify into which classes observations of data should fall. This problem could be considered part of pattern recognition in that we use training data (historical information) to recognize patterns to predict where new data should be categorized.

Common Use Cases:

  • Fraudulent activity detection

  • Loan default prediction

  • Spam vs. ham

  • Customer segmentation

  • Benign vs. malignant tumor classification

  • and many more...

Classification Algorithms included in MLlib:

  • (both binomial and multiclass)

  • Gradient-boosted trees

  • Multilayer perceptron

  • Linear Support Vector Machine

  • One-vs-Rest classifier (a.k.a. One-vs-All)

Gradient-Boosted Trees

Setting Up Gradient-Boosted Tree Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Gradient-Boosted Tree object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Linear Regression

Setting Up Linear Regression

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Linear Regression object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Get model information

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Logistic Regression

Setting Up a Logistic Regression Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Logistic Regression object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10])
             .addGrid(dt.maxBins, [10, 20])
             .build())
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 5)
dtcvModel = dtcv.fit(train)
print(dtcvModel)
dtpredictions = dtcvModel.transform(test)
print('Accuracy:', dtevaluator.evaluate(dtpredictions))
print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderPR)
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
gb = GBTClassifier(labelCol="label", featuresCol="features")
gbparamGrid = (ParamGridBuilder()
             .addGrid(gb.maxDepth, [2, 5, 10])
             .addGrid(gb.maxBins, [10, 20, 40])
             .addGrid(gb.maxIter, [5, 10, 20])
             .build())
gbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
                      estimatorParamMaps = gbparamGrid,
                      evaluator = gbevaluator,
                      numFolds = 5)
gbcvModel = gbcv.fit(train)
print(gbcvModel)
gbpredictions = gbcvModel.transform(test)
print('Accuracy:', gbevaluator.evaluate(gbpredictions))
print('AUC:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderPR)
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
lr = LinearRegression(labelCol="label", featuresCol="features")
lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
             #  .addGrid(lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             #  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
             #  .addGrid(lr.maxIter, [1, 5, 10])
             .build())
lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 5)
lrcvModel = lrcv.fit(train)
print(lrcvModel)
lrcvSummary = lrcvModel.bestModel.summary
print("Coefficient Standard Errors: " + str(lrcvSummary.coefficientStandardErrors))
print("P Values: " + str(lrcvSummary.pValues)) # Last element is the intercept
lrpredictions = lrcvModel.transform(test)
print('RMSE:', lrevaluator.evaluate(lrpredictions))
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
lr = LogisticRegression(labelCol="label", featuresCol="features")
lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
             .build())
lrevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName = "areaUnderROC")
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 5)
lrcvModel = lrcv.fit(train)
print(lrcvModel)
lrpredictions = lrcvModel.transform(test)
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)
Logistic regression
Decision trees
Random forests
Naïve Bayes

Shaping Data with Pipelines

Load in required libraries

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler

Define which columns are numerical versus categorical (and which is the label column)

label = "dependentvar"
categoricalColumns = ["col1",
                     "col2"]

numericalColumns = ["num1",
                    "num2"]

#categoricalColumnsclassVec = ["col1classVec",
#                              "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]

Set up stages

stages = []

Index the categorical columns and perform One Hot Encoding

One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)

for categoricalColumn in categoricalColumns:
  print(categoricalColumn)
  ## Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
  ## Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
  ## Add stages
  stages += [stringIndexer, encoder]

Index the label column and perform One Hot Encoding

## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]

Note: If you are preparing the data for use in regression algorithms, there's no need to One Hot Encode the label column (since it should be numerical).

Assemble the data together as a vector

This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the VectorAssembler function.

assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
                            outputCol = "features")
stages += [assembler]

Scale features using Normalization

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol = "features",
                        outputCol = "scaledFeatures",
                        withStd = True,
                        withMean = True)
stages += [scaler]

Set up the transformation pipeline using the stages you've created along the way

prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

Pipeline Saving and Loading

Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.

Save the transformation pipeline

pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))

Load in the transformation pipeline

from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
dataset = pipelineModel.transform(dataset)
display(dataset)

Random Forest

Setting Up a Random Forest Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

Initialize Random Forest object

rf = RandomForestClassifier(labelCol="label", featuresCol="features")

Create a parameter grid for tuning the model

rfparamGrid = (ParamGridBuilder()

               .addGrid(rf.maxDepth, [2, 5, 10])

               .addGrid(rf.maxBins, [5, 10, 20])

               .addGrid(rf.numTrees, [5, 20, 50])
             .build())

Define how you want the model to be evaluated

rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

Define the type of cross-validation you want to perform

# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

Fit the model to the data

rfcvModel = rfcv.fit(train)
print(rfcvModel)

Score the testing dataset using your fitted model for evaluation purposes

rfpredictions = rfcvModel.transform(test)

Evaluate the model

print('Accuracy:', rfevaluator.evaluate(rfpredictions))
print('AUC:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderPR)

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Gradient-Boosted Trees

Setting Up Gradient-Boosted Tree Regression

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Gradient-Boosted Tree object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Model Saving and Loading

Model Saving

Save model(s) to mounted storage

Remove a model

Spark MLlib models are actually a series of files in a directory. So, you will need to recursively delete the files in model's directory, then the directory itself.

Score new data using a trained model

Load in required libraries

Load in the transformation pipeline

Load in the trained model

Remove unnecessary columns from the scored data

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
gb = GBTRegressor(labelCol="label", featuresCol="features")
gbparamGrid = (ParamGridBuilder()
             .addGrid(gb.maxDepth, [2, 5, 10])
             .addGrid(gb.maxBins, [10, 20, 40])
             .addGrid(gb.maxIter, [5, 10, 20])
             .build())
gbevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
                      estimatorParamMaps = gbparamGrid,
                      evaluator = gbevaluator,
                      numFolds = 5)
gbcvModel = gbcv.fit(train)
print(gbcvModel)
gbpredictions = gbcvModel.transform(test)
print('RMSE:', gbevaluator.evaluate(gbpredictions))
lrcvModel.save("/mnt/trainedmodels/lr")
rfcvModel.save("/mnt/trainedmodels/rf")
dtcvModel.save("/mnt/trainedmodels/dt")
display(dbutils.fs.ls("/mnt/trainedmodels/"))
dbutils.fs.rm("/mnt/trainedmodels/dt", True)
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, round
from pyspark.sql.types import IntegerType, FloatType
pipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
## Fit the pipeline to new data
transformeddataset = pipeline.transform(dataset)
model = CrossValidatorModel.load("/mnt/trainedmodels/lr/")
## Score the data using the model
scoreddataset = model.bestModel.transform(transformeddataset)
## Function to extract probability from array
getprob = udf(lambda v:float(v[1]),FloatType())

## Select out the necessary columns
output = scoreddataset.select(col("ID"),
                              col("label"),
                              col("rawPrediction"),           
                              getprob(col("probability")).alias("probability"),
                              col("prediction"))

Batch Scoring

This section is designed for use with a data orchestration tool that can call and execute Databricks notebooks. For more information on how to set up Azure Data Factory, see: https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook.

Create date parameter

dbutils.widgets.text("varReportDate", "19000101")
ReportDate = dbutils.widgets.get("varReportDate")
print(ReportDate)

Connect to storage

storage_account_name = "mystorage"
storage_account_access_key = ""

file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
file_type = "csv"

spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

Define input schema

from pyspark.sql.types import *

schema = StructType([
    StructField("ReportingDate", DateType(), True),
    StructField("id", StringType(), True),
    StructField("x1", IntegerType(), True),
    StructField("x2", DoubleType(), True)
])

Read in new data

dataset = spark.read\
               .format(file_type)\
               .option("header", "true")\
               .schema(schema)\
               .load(file_location)

## You can avoid defining a schema by having spark infer it from your data
## This doesn't always work and can be slow
#.option("inferSchema", "true")

## Fill in na's, if needed
# dataset = dataset.na.fill(0)
display(dataset)

Load in transformation pipeline and model

from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round
from pyspark.ml.regression import GeneralizedLinearRegressionModel

mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")

Score data using the model

## Transform new data using the pipeline
mydataset = mypipeline.transform(dataset)
## Score new data using a trained model
scoreddataset = mymodel.bestModel.transform(mydataset)

output = scoreddataset.select(col("id"),
                              col("ReportingDate"),
                              col("prediction").alias("MyForecast"))
display(output)

Write data back out to storage

fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)

Model Evaluation

Evaluate model performance by probability cutoff

Note: Extract probability values using method found here - https://www.sparkitecture.io/machine-learning/model-saving-and-loading#remove-unnecessary-columns-from-the-scored-data.

performance_df = spark.createDataFrame([(0,0,0)], ['cutoff', 'AUPR', 'AUC'])

for cutoff in range(5, 95, 5):
  cutoff = (cutoff * 0.01)
  
  print('Testing cutoff = ', str(format(cutoff, '.2f')))

  lrpredictions_prob_temp = lrpredictions.withColumn('prediction_test', when(col('probability') >= cutoff, 1).otherwise(0).cast(DoubleType()))
  aupr_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderPR
  auc_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderROC
  print('\tAUPR:', aupr_temp,'\tAUC:', auc_temp)
  performance_df_row = spark.createDataFrame([(cutoff,aupr_temp,auc_temp)], ['cutoff', 'AUPR', 'AUC'])
  performance_df = performance_df.union(performance_df_row)

display(performance_df)

Evaluate multiclass classification models

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Evaluate best model
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
lrmetrics = MulticlassMetrics(lrpredictions['label','prediction'].rdd)
print('Confusion Matrix:\n', lrmetrics.confusionMatrix())
print('F1 Score:', lrmetrics.fMeasure(1.0,1.0))

Evaluate binary classification models

for model in ["lrpredictions", "dtpredictions", "rfpredictions", "nbpredictions", "gbpredictions"]:
    df = globals()[model]
    
    tp = df[(df.label == 1) & (df.prediction == 1)].count()
    tn = df[(df.label == 0) & (df.prediction == 0)].count()
    fp = df[(df.label == 0) & (df.prediction == 1)].count()
    fn = df[(df.label == 1) & (df.prediction == 0)].count()
    a = ((tp + tn)/df.count())
    
    if(tp + fn == 0.0):
        r = 0.0
        p = float(tp) / (tp + fp)
    elif(tp + fp == 0.0):
        r = float(tp) / (tp + fn)
        p = 0.0
    else:
        r = float(tp) / (tp + fn)
        p = float(tp) / (tp + fp)
    
    if(p + r == 0):
        f1 = 0
    else:
        f1 = 2 * ((p * r)/(p + r))
    
    print("Model:", model)
    print("True Positives:", tp)
    print("True Negatives:", tn)
    print("False Positives:", fp)
    print("False Negatives:", fn)
    print("Total:", df.count())
    print("Accuracy:", a)
    print("Recall:", r)
    print("Precision: ", p)
    print("F1 score:", f1)
    print('AUC:', BinaryClassificationMetrics(df['label','prediction'].rdd).areaUnderROC)
print("\n")

Feature Importance

Extract important features using Gini

## Based on: https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
import pandas as pd

def ExtractFeatureImportance(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
  
  
# ExtractFeatureImportance(model.stages[-1].featureImportances, dataset, "features")
dataset_fi = ExtractFeatureImportance(model.bestModel.featureImportances, dataset, "features")
dataset_fi = sqlContext.createDataFrame(dataset_fi)
display(dataset_fi)

Extract important features using p-values

## Based on: https://stackoverflow.com/questions/42935914/how-to-map-features-from-the-output-of-a-vectorassembler-back-to-the-column-name
lrm = model.stages[-1]
## Transform the data:
transformed =  model.transform(df)
#Extract and flatten ML attributes:
from itertools import chain

attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*transformed
        .schema[lrm.summary.featuresCol]
        .metadata["ml_attr"]["attrs"].values())))
# and map to the output:

[(name, lrm.summary.pValues[idx]) for idx, name in attrs]
# [(name, lrm.coefficients[idx]) for idx, name in attrs]

Extract coefficients from a model

import pandas as pd

featurelist = pd.DataFrame(dataset.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+dataset.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")
featurelist["Coefficient"] = pd.DataFrame(model.bestModel.coefficients.toArray())
featurelist = sqlContext.createDataFrame(featurelist)

display(featurelist)

Model Evaluation

Multiclass classification evaluator

from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

for model in ["lrpredictions", "nbpredictions", "rfpredictions"]:
    
    df = globals()[model]
    ########################################
    # Compute raw scores on the test set
    predictionAndLabels = df.select("prediction", "label").rdd

    # Instantiate metrics object
    metrics = MulticlassMetrics(predictionAndLabels)

    # Overall statistics
    precision = metrics.precision()
    recall = metrics.recall()
    f1Score = metrics.fMeasure()
    print("Summary Stats for: ", model)
    #print(metrics.confusionMatrix())
    print("Accuracy = %s" % evaluator.evaluate(df))
    print("Precision = %s" % precision)
    print("Recall = %s" % recall)
    print("F1 Score = %s" % f1Score)

    # Weighted stats
    #print("Weighted recall = %s" % metrics.weightedRecall)
    #print("Weighted precision = %s" % metrics.weightedPrecision)
    #print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    #print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    #print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
    print("\n")

Azure Data Factory

Transformation with Azure Databricks

Using Azure Databricks with Azure Data Factory, notebooks can be run from an end-to-end pipeline that contains the Validation, Copy data, and Notebook activities in Azure Data Factory.

  • Validation ensures that your source dataset is ready for downstream consumption before you trigger the copy and analytics job.

  • Copy data duplicates the source dataset to the sink storage, which is mounted as DBFS in the Azure Databricks notebook. In this way, the dataset can be directly consumed by Spark.

  • Notebook triggers the Databricks notebook that transforms the dataset. It also adds the dataset to a processed folder or Azure SQL Data Warehouse.

Import a notebook for Transformation

To import a Transformation notebook to your Databricks workspace:

  1. Sign in to your Azure Databricks workspace, and then select Import. Your workspace path can be different from the one shown, but remember it for later.

  2. Select Import from: URL. In the text box, enter https://adflabstaging1.blob.core.windows.net/share/Transformations.html.

  3. Now let's update the Transformation notebook with your storage connection information.

    In the imported notebook, go to command 5 as shown in the following code snippet.

    • Replace with your own storage connection information.

    • Use the storage account with the sinkdata container.

  4. Generate a Databricks access token for Data Factory to access Databricks.

    1. In your Databricks workspace, select your user profile icon in the upper right.

    2. Select User Settings.

    3. Select Generate New Token under the Access Tokens tab.

    4. Select Generate.

    Save the access token for later use in creating a Databricks linked service. The access token looks something like dapi32db32cbb4w6eee18b7d87e45exxxxxx.

How to use this template

  1. Go to the Transformation with Azure Databricks template and create new linked services for following connections.

    • Source Blob Connection - to access the source data.

      For this exercise, you can use the public blob storage that contains the source files. Reference the following screenshot for the configuration. Use the following SAS URL to connect to source storage (read-only access):

      https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D

    • Destination Blob Connection - to store the copied data.

      In the New linked service window, select your sink storage blob.

    • Azure Databricks - to connect to the Databricks cluster.

      Create a Databricks-linked service by using the access key that you generated previously. You can opt to select an interactive cluster if you have one. This example uses the New job cluster option.

  2. Select Use this template. You'll see a pipeline created.

Pipeline introduction and configuration

In the new pipeline, most settings are configured automatically with default values. Review the configurations of your pipeline and make any necessary changes.

  1. In the Validation activity Availability flag, verify that the source Dataset value is set to SourceAvailabilityDataset that you created earlier.

  2. In the Copy data activity file-to-blob, check the Source and Sink tabs. Change settings if necessary.

    • Source tab

    • Sink tab

  3. In the Notebook activity Transformation, review and update the paths and settings as needed.

    Databricks linked service should be pre-populated with the value from a previous step, as shown:

    To check the Notebook settings:

    1. Select the Settings tab. For Notebook path, verify that the default path is correct. You might need to browse and choose the correct notebook path.

    2. Expand the Base Parameters selector and verify that the parameters match what is shown in the following screenshot. These parameters are passed to the Databricks notebook from Data Factory.

  4. Verify that the Pipeline Parameters match what is shown in the following screenshot:

  5. Connect to your datasets.

In below datasets, the file path has been automatically specified in the template. If any changes required, make sure that you specify the path for both container and directory in case any connection error.

  • SourceAvailabilityDataset - to check that the source data is available.

  • SourceFilesDataset - to access the source data.

  • DestinationFilesDataset - to copy the data into the sink destination location. Use the following values:

    • Linked service - sinkBlob_LS, created in a previous step.

    • File path - sinkdata/staged_sink.

  1. Select Debug to run the pipeline. You can find the link to Databricks logs for more detailed Spark logs.

    You can also verify the data file by using Azure Storage Explorer.

For correlating with Data Factory pipeline runs, this example appends the pipeline run ID from the data factory to the output folder. This helps keep track of files generated by each run.

Next steps

## Supply storageName and accessKey values  
storageName = ""  
accessKey = ""  

## Attempt to Mount Data Factory Data in Azure Storage
dbutils.fs.mount(
    source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",  
    mount_point = "/mnt/Data Factorydata",  
    extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey})  
Introduction to Azure Data Factory
Transformation with Azure Databricks
Run a Databricks notebook with the Databricks Notebook Activity in Azure Data Factory

Text Data Preparation

Tokenization and Vectorization

Load in required libraries

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

Remove usernames, dates, links, etc.


def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
  c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
  c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
  c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
  c = regexp_replace(c, "[0-9]", "") # Remove numbers
  c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
  #c = regexp_replace(c, "(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")
  return c

dataset = dataset.withColumn("text", clean_text(col("text")))

RegEx tokenization

regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

Remove stop words

# Add Stop words
add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

Count words

# Bag of Words Count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

Index strings

# String Indexer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")

Create transformation pipeline

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

Once the transformation pipeline has been fit, you can use normal classification algorithms for classifying the text.

Extras

Get label numbers for each class

from pyspark.sql import *
from pyspark.sql.functions import col
labelset = dataset.select(col("class"),
                          col("label")).distinct()
display(labelset)

Split text body into sentences

from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import col, split, explode, row_number
# Split text by sentence and convert to array
array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
  
# Explode array into separate rows in the dataset
split_df = array_df.withColumn("text", explode(col("text")))\
                   .withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
data = split_df
display(data)

Create `part_number` for the split sentences

from pyspark.sql.window import *
from pyspark.sql.functions import row_number

data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()

Glow

About Glow

Glow is an open-source and independent Spark library that brings even more flexibility and functionality to Azure Databricks. This toolkit is natively built on Apache Spark, enabling the scale of the cloud for genomics workflows.

Glow allows for genomic data to work with Spark SQL. So, you can interact with common genetic data types as easily as you can play with a .csv file.

Learn more about Project Glow at .

Read the full documentation:

Features:

  • Genomic datasources: To read datasets in common file formats such as VCF, BGEN, and Plink into Spark DataFrames.

  • Genomic functions: Common operations such as computing quality control statistics, running regression tests, and performing simple transformations are provided as Spark functions that can be called from Python, SQL, Scala, or R.

  • Data preparation building blocks: Glow includes transformations such as variant normalization and lift over to help produce analysis ready datasets.

  • Integration with existing tools: With Spark, you can write user-defined functions (UDFs) in Python, R, SQL, or Scala. Glow also makes it easy to run DataFrames through command line tools.

  • Integration with other data types: Genomic data can generate additional insights when joined with data sets such as electronic health records, real world evidence, and medical images. Since Glow returns native Spark SQL DataFrames, its simple to join multiple data sets together.

How To Install

If you're using Databricks, make sure you enable the . Glow is already included and configured in this runtime.

pip Installation

Using pip, install by simply running pip install glow.py and then start the with the Glow maven package.

Maven Installation

Install the maven package io.project:glow_2.11:${version} and optionally the Python frontend glow.py. Set the Spark configuration spark.hadoop.io.compression.codecs to io.projectglow.sql.util.BGZFCodec in order to read and write BGZF-compressed files.

Load in Glow

Read in Data

Summary Statistics and Quality Control

Split Multiallelic Variants to Biallelic

Write out Data

./bin/pyspark --packages io.projectglow:glow_2.11:0.5.0
 --conf spark.hadoop.io.compression.codecs=io.projectglow.sql.util.BGZFCodec
import glow
glow.register(spark)
vcf_path = "/databricks-datasets/genomics/1kg-vcfs/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz"

df = spark.read.format("vcf")\
          .option("includeSampleIds", False)\
          .option("flattenInfoFields", False)\
          .load(vcf_path)\
          .withColumn("first_genotype", expr("genotypes[0]"))
          
# bgen_path = "/databricks-datasets/genomics/1kg-bgens/1kg_chr22.bgen"

# df = spark.read.format("bgen") \
#           .load(bgen_path)
df = df.withColumn("hardyweinberg", expr("hardy_weinberg(genotypes)")) \
       .withColumn("summarystats", expr("call_summary_stats(genotypes)")) \
       .withColumn("depthstats", expr("dp_summary_stats(genotypes)")) \
       .withColumn("genotypequalitystats", expr("gq_summary_stats(genotypes)")) \
       .filter(col("qual") >= 98) \
       .filter((col("start") >= 16000000) & (col("end") >= 16050000)) \
       .where((col("alleleFrequencies").getItem(0) >= allele_freq_cutoff) & 
              (col("alleleFrequencies").getItem(0) <= (1.0 - allele_freq_cutoff))) \
       .withColumn("log10pValueHwe", when(col("pValueHwe") == 0, 26).otherwise(-log10(col("pValueHwe"))))
split_df = glow.transform("split_multiallelics", df)
df.coalesce(1) \
  .write \
  .mode("overwrite") \
  .format("vcf") \
  .save("/tmp/vcf_output")
projectglow.io
glow.readthedocs.io
Databricks Runtime for Genomics
Spark shell

Structured Streaming

Read in Streaming Data

Reading JSON files from storage

from pyspark.sql.types import *

inputPath = "/mnt/data/jsonfiles/"

# Define your schema if it's known (rather than relying on Spark to infer the schema)
jsonSchema = StructType([StructField("time", TimestampType(), True),
                         StructField("id", IntegerType(), True),
                         StructField("value", StringType(), True)])

streamingInputDF = spark.readStream \
                        .schema(jsonSchema) \
                        .option("maxFilesPerTrigger", 1) \ # Treat a sequence of files as a stream by picking one file at a time
                        .json(inputPath)

References

  • Databricks Structured Streaming: https://docs.databricks.com/spark/latest/structured-streaming/index.html

Random Forest

Setting Up Random Forest Regression

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

Initialize Random Forest object

rf = RandomForestRegressor(labelCol="label", featuresCol="features")

Create a parameter grid for tuning the model

rfparamGrid = (ParamGridBuilder()
             #.addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
               .addGrid(rf.maxDepth, [2, 5, 10])
             #.addGrid(rf.maxBins, [10, 20, 40, 80, 100])
               .addGrid(rf.maxBins, [5, 10, 20])
             #.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
               .addGrid(rf.numTrees, [5, 20, 50])
             .build())

Define how you want the model to be evaluated

rfevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")

Define the type of cross-validation you want to perform

# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

Fit the model to the data

rfcvModel = rfcv.fit(train)
print(rfcvModel)

Score the testing dataset using your fitted model for evaluation purposes

rfpredictions = rfcvModel.transform(test)

Evaluate the model

print('RMSE:', rfevaluator.evaluate(rfpredictions))

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Naïve Bayes

Setting Up a Naïve Bayes Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

Initialize Naïve Bayes object

nb = NaiveBayes(labelCol="label", featuresCol="features")

Create a parameter grid for tuning the model

nbparamGrid = (ParamGridBuilder()
               .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
               .build())

Define how you want the model to be evaluated

nbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

Define the type of cross-validation you want to perform

# Create 5-fold CrossValidator
nbcv = CrossValidator(estimator = nb,
                      estimatorParamMaps = nbparamGrid,
                      evaluator = nbevaluator,
                      numFolds = 5)

Fit the model to the data

nbcvModel = nbcv.fit(train)
print(nbcvModel)

Score the testing dataset using your fitted model for evaluation purposes

nbpredictions = nbcvModel.transform(test)

Evaluate the model

print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)

Note: When you use the CrossValidatorfunction to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Decision Tree

Setting Up Decision Tree Regression

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

Initialize Decision Tree object

dt = DecisionTreeRegressor(labelCol="label", featuresCol="features")

Create a parameter grid for tuning the model

dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10, 20, 30])
             #.addGrid(dt.maxDepth, [2, 5, 10])
             .addGrid(dt.maxBins, [10, 20, 40, 80, 100])
             #.addGrid(dt.maxBins, [10, 20])
             .build())

Define how you want the model to be evaluated

dtevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")

Define the type of cross-validation you want to perform

# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 5)

Fit the model to the data

dtcvModel = dtcv.fit(train)
print(dtcvModel)

Score the testing dataset using your fitted model for evaluation purposes

dtpredictions = dtcvModel.transform(test)

Evaluate the model

print('RMSE:', dtevaluator.evaluate(dtpredictions))

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

API Serving

Use MMLSpark

Load in required libraries

from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round

import sys
import numpy as np
import pandas as pd
import mmlspark
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
import uuid
from mmlspark import request_to_string, string_to_response

Load in transformation pipeline and trained model

## Load in the transformation pipeline
mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")

## Load in trained model
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")

Define username, key, and IP address

username = "admin"
ip = "10.0.0.4" #Internal IP
sas_url = "" # SAS Token for your VM's Private Key in Blob

Define input schema

input_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("x1", IntegerType(), True),
  StructField("x2", DoubleType(), True),
  StructField("x3", StringType(), True),
 ])

Set up streaming DataFrame

serving_inputs = spark.readStream.continuousServer() \
                      .option("numPartitions", 1) \
                      .option("name", "http://10.0.0.4:8898/my_api") \
                      .option("forwarding.enabled", True) \
                      .option("forwarding.username", username) \
                      .option("forwarding.sshHost", ip) \
                      .option("forwarding.keySas", sas_url) \
                      .address("localhost", 8898, "my_api") \
                      .load()\
                      .parseRequest(input_schema)

mydataset = mypipeline.transform(serving_inputs)

serving_outputs = mymodel.bestModel.transform(mydataset) \
  .makeReply("prediction")

# display(serving_inputs)

Set up server

server = serving_outputs.writeStream \
    .continuousServer() \
    .trigger(continuous="1 second") \
    .replyTo("my_api") \
    .queryName("my_query") \
    .option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1())) \
    .start()

Test the webservice

import requests
data = u'{"id":0,"x1":1,"x2":2.0,"x3":"3"}'

#r = requests.post(data=data, url="http://localhost:8898/my_api") # Locally
r = requests.post(data=data, url="http://102.208.216.32:8902/my_api") # Via the VM IP

print("Response {}".format(r.text))

You may need to run sudo netstat -tulpn to see what port is open if you're running inside Databricks.

Use this command to look for the port that was opened by the server.

Resources:

Microsoft MMLSpark on GitHub: https://github.com/Azure/mmlspark

Other Common Tasks

Split Data into Training and Test Datasets

train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)

Rename all columns

column_list = data.columns
prefix = "my_prefix"
new_column_list = [prefix + s for s in column_list]
#new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID later
 
column_mapping = [[o, n] for o, n in zip(column_list, new_column_list)]

# print(column_mapping)

data = data.select(list(map(lambda old, new: col(old).alias(new),*zip(*column_mapping))))

Convert PySpark DataFrame to NumPy array

## Convert `train` DataFrame to NumPy
pdtrain = train.toPandas()
trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)
y_train = pdtrain['label'].values.reshape(-1,1).ravel()

## Convert `test` DataFrame to NumPy
pdtest = test.toPandas()
testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)
y_test = pdtest['label'].values.reshape(-1,1).ravel()

print(y_test)

Call Cognitive Service API using PySpark

Create `chunker` function

The cognitive service APIs can only take a limited number of observations at a time (1,000, to be exact) or a limited amount of data in a single call. So, we can create a chunker function that we will use to split the dataset up into smaller chunks.

## Define Chunking Logic
import pandas as pd
import numpy as np
# Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficiently
def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

Convert Spark DataFrame to Pandas

## sentiment_df_pd = sentiment_df.toPandas()

Set up API requirements

# pprint is used to format the JSON response
from pprint import pprint
import json
import requests

subscription_key = '<SUBSCRIPTIONKEY>'
endpoint = 'https://<SERVICENAME>.cognitiveservices.azure.com'
sentiment_url = endpoint + "/text/analytics/v2.1/sentiment"
headers = {"Ocp-Apim-Subscription-Key": subscription_key}

Create DataFrame for incoming scored data

from pyspark.sql.types import *

sentiment_schema = StructType([StructField("id", IntegerType(), True),
                               StructField("score", FloatType(), True)])

sentiments_df = spark.createDataFrame([], sentiment_schema)

display(sentiments_df)

Loop through chunks of the data and call the API

for chunk in chunker(sentiment_df_pd, 1000):
  print("Scoring", len(chunk), "rows.")
  sentiment_df_json = json.loads('{"documents":' + chunk.to_json(orient='records') + '}')
  
  response = requests.post(sentiment_url, headers = headers, json = sentiment_df_json)
  sentiments = response.json()
  # pprint(sentiments)
  
  sentiments_pd = pd.read_json(json.dumps(sentiments['documents']))
  sentiments_df_chunk = spark.createDataFrame(sentiments_pd)
  sentiments_df = sentiments_df.unionAll(sentiments_df_chunk)
  
display(sentiments_df)
sentiments_df.count()

Write the results out to mounted storage

sentiments_df.coalesce(1).write.csv("/mnt/textanalytics/sentimentanalysis/")

Find All Columns of a Certain Type

import pandas as pd
def get_nonstring_cols(df):
    types = spark.createDataFrame(pd.DataFrame({'Column': df.schema.names, 'Type': [str(f.dataType) for f in df.schema.fields]}))
    result = types.filter(col('Type') != 'StringType').select('Column').rdd.flatMap(lambda x: x).collect()
    return result
    
get_nonstring_cols(df)

Change a Column's Type

from pyspark.sql.types import *
from pyspark.sql.functions import col

df = df.withColumn('col1', col('col1').cast(IntegerType()))

Generate StructType Schema Printout (Manual Execution)

## Fill in list with your desired column names
cols = ["col1", "col2", "col3"]
i = 1

for col in cols:
    if i == 1:
        print("schema = StructType([")
        print("\tStructField('" + col +  "', StringType(), True),")
    
    elif i == len(cols):
        print("\tStructField('" + col +  "', StringType(), True)])")
        
    else:
        print("\tStructField('" + col +  "', StringType(), True),")
    
    i += 1
    
## Once the output has printed, copy and paste into a new cell
## and change column types and nullability

Generate StructType Schema from List (Automatic Execution)

"""
Struct Schema Creator for PySpark

[<Column Name>, <Column Type>, <Column Nullable>]

Types:  binary, boolean, byte, date,
        double, integer, long, null,
        short, string, timestamp, unknown
"""
from pyspark.sql.types import *

## Fill in with your desired column names, types, and nullability
cols = [["col1", "string", False],
        ["col2", "date", True],
        ["col3", "integer", True]]

## Loop to build list of StructFields
schema_set = ["schema = StructType(["]

for i, col in enumerate(cols):
    colname = col[0]
    coltype = col[1].title() + "Type()"
    colnull = col[2]
    
    if i == len(cols)-1:
        iter_structfield = "StructField('" + colname +  "', " + coltype + ", " + str(colnull) + ")])"
    else:
        iter_structfield = "StructField('" + colname +  "', " + coltype + ", " + str(colnull) + "),"
    
    schema_set.append(iter_structfield)

## Convert list to single string
schema_string = ''.join(map(str, schema_set))

## This will execute the generated command string
exec(schema_string)

Make a DataFrame of Consecutive Dates

from pyspark.sql.functions import sequence, to_date, explode, col
date_dim = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2019-12-31'), interval 1 day) as DATE").withColumn("DATE", explode(col("DATE")))
display(date_dim)

Unpivot a DataFrame Dynamically (Longer)

Pivot a wide dataset into a longer form. (Similar to the pivot_longer() function from the tidyr R package or the .wide_to_long method from pandas.)

## UnpivotDF Function
def UnpivotDF(df, columns, pivotCol, unpivotColName, valueColName):
  columnsValue = list(map(lambda x: str("'") + str(x) + str("',")  + str(x), columns))
  stackCols = ','.join(x for x in columnsValue)

  df_unpvt = df.selectExpr(pivotCol, f"stack({str(len(columns))}, {stackCols}) as ({unpivotColName}, {valueColName})")\
               .select(pivotCol, unpivotColName, valueColName)
  
  return(df_unpvt)
df_unpvt = UnpivotDF(df = df,
                     columns = df.columns[1:], ## The columns to transpose into a single, longer column
                     pivotCol = "ID", ## The column to leave in place (usually an ID)
                     unpivotColName = "Category", ## The name of the new column
                     valueColName = "value") ## The name of the column of values