Only this pageAll pages
Powered by GitBook
1 of 36

Sparkitecture

Loading...

Cloud Service Integration

Loading...

Loading...

Loading...

Data Preparation

Loading...

Loading...

Loading...

Machine Learning

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Streaming Data

Loading...

Operationalization

Loading...

Loading...

Natural Language Processing

Loading...

Loading...

Bioinformatics and Genomics

Loading...

Regression

Azure Storage

Storage is a managed service in Azure that provides highly available, secure, durable, scalable, and redundant storage for your data. Azure Storage includes both Blobs, Data Lake Store, and others.

Databricks-Specific Functionality

Mounting Blob Storage

Once you create your blob storage account in Azure, you will need to grab a couple bits of information from the Azure Portal before you mount your storage.

  • You can find your Storage Account Name (which will go in below) and your Key (which will go in below) under Access Keys in your Storage Account resource in Azure.

  • Go into your Storage Account resource in Azure and click on Blobs. Here, you will find all of your containers. Pick the one you want to mount and copy its name into below.

  • As for the mount point (/mnt/<FOLDERNAME> below), you can name this whatever you'd like, but it will help you in the long run to name it something useful along the lines of storageaccount_container.

Once you have the required bits of information, you can use the following code to mount the storage location inside the Databricks environment

You can then test to see if you can list the files in your mounted location

Resources:

  • To learn how to create an Azure Storage service, visit

Mounting Data Lake Storage

For finer-grained access controls on your data, you may opt to use Azure Data Lake Storage. In Databricks, you can connect to your data lake in a similar manner to blob storage. Instead of an access key, your user credentials will be passed through, therefore only showing you data that you specifically have access to.

Pass-through Azure Active Directory Credentials

To pass in your Azure Active Directory credentials from Databricks to Azure Data Lake Store, you will need to enable this feature in Databricks under New Cluster > Advanced Options.

Note: If you create a High Concurrency cluster, multiple users can use the same cluster. The Standard cluster mode will only allow a single user's credential at a time.

Welcome to Sparkitecture!

Created by: Colby T. Ford, Ph.D.

PySpark Edition | A work in progress... | Created using GitBook.com

About

Sparkitecture is a collection of “cookbook-style” scripts for simplifying data engineering and machine learning in Apache Spark.

This is an open source project (GPL v3.0) for the Spark community. If you have ideas or contributions you'd like to add, submit a or a write your code/tutorial/page and create a in the GitHub repo.

How to Cite

About Spark MLlib

MLlib is Apache Spark's scalable machine learning library.

MLlib works with Spark's APIs and with NumPy in Python and with R libraries. Since Spark excels at iterative computation, MLlib runs very fast with highly-scalable, high-quality algorithms that leverage iteration.

Included Functionality:

Model Evaluation

Multiclass classification evaluator

MLflow

MLflow is an open source library by the Databricks team designed for managing the machine learning lifecycle. It allows for the creation of projects, tracking of metrics, and model versioning.

Install mlflow using pip

MLflow can be used in any Spark environmnet, but the automated tracking and UI of MLflow is Databricks-Specific Functionality.

Structured Streaming

Read in Streaming Data

Reading JSON files from storage

Azure SQL Data Warehouse / Synapse

Set up Azure SQL DW connection parameters

Define a query

Reading and Writing Data

Reading in Data

...from Mounted Storage

ML algorithms include:
  • Classification: logistic regression, naive Bayes,...

  • Regression: generalized linear regression, survival regression,...

  • Decision trees, random forests, and gradient-boosted trees

  • Recommendation: alternating least squares (ALS)

  • Clustering: K-means, Gaussian mixtures (GMMs),...

  • Topic modeling: latent Dirichlet allocation (LDA)

  • Frequent itemsets, association rules, and sequential pattern mining

ML workflow utilities include:

  • Feature transformations: standardization, normalization, hashing,...

  • ML Pipeline construction

  • Model evaluation and hyper-parameter tuning

  • ML persistence: saving and loading models and Pipelines

Other utilities include:

  • Distributed linear algebra: SVD, PCA,...

  • Statistics: summary statistics, hypothesis testing,...

Resources

  • Spark MLlib Website

  • Getting Starting Guide

from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

for model in ["lrpredictions", "nbpredictions", "rfpredictions"]:
    
    df = globals()[model]
    ########################################
    # Compute raw scores on the test set
    predictionAndLabels = df.select("prediction", "label").rdd

    # Instantiate metrics object
    metrics = MulticlassMetrics(predictionAndLabels)

    # Overall statistics
    precision = metrics.precision()
    recall = metrics.recall()
    f1Score = metrics.fMeasure()
    print("Summary Stats for: ", model)
    #print(metrics.confusionMatrix())
    print("Accuracy = %s" % evaluator.evaluate(df))
    print("Precision = %s" % precision)
    print("Recall = %s" % recall)
    print("F1 Score = %s" % f1Score)

    # Weighted stats
    #print("Weighted recall = %s" % metrics.weightedRecall)
    #print("Weighted precision = %s" % metrics.weightedPrecision)
    #print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    #print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    #print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
    print("\n")
References
  • Databricks Structured Streaming: https://docs.databricks.com/spark/latest/structured-streaming/index.html

from pyspark.sql.types import *

inputPath = "/mnt/data/jsonfiles/"

# Define your schema if it's known (rather than relying on Spark to infer the schema)
jsonSchema = StructType([StructField("time", TimestampType(), True),
                         StructField("id", IntegerType(), True),
                         StructField("value", StringType(), True)])

streamingInputDF = spark.readStream \
                        .schema(jsonSchema) \
                        .option("maxFilesPerTrigger", 1) \ # Treat a sequence of files as a stream by picking one file at a time
                        .json(inputPath)
Track metrics and parameters

MLflow GitHub: https://github.com/mlflow/mlflow/

pip install mlflow
import mlflow

## Log Parameters and Metrics from your normal MLlib run
with mlflow.start_run():
  # Log a parameter (key-value pair)
  mlflow.log_param("alpha", 0.1)

  # Log a metric; metrics can be updated throughout the run
  mlflow.log_metric("AUC", 0.871827)
  mlflow.log_metric("F1", 0.726153)
  mlflow.log_metric("Precision", 0.213873)
Create a Spark DataFrame using the SQL DW data
dwDatabase = "<DATABASENAME>" ## The Azure SQL Data Warehouse database name
dwServer = "<DWNAME>.database.windows.net" ## The Azure SQL Server
dwUser = "<USERNAME>" ## The dedicated loading user login 
dwPass = dbutils.secrets.get(scope = "<SECRETNAME>", key = "<KEYNAME>") ## The dediciated loading user login password
dwJdbcPort =  "1433" 
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
sqlQuery = """
  SELECT *, 'AzureSqlDw' AS SourceSystem
  FROM dbo.<TABLENAME>
"""
...when Schema Inference Fails

Writing out Data

Other Resources

Apache Spark Data Sources Documentation: https://spark.apache.org/docs/latest/sql-data-sources.html

dataset = spark.read.format('csv') \
                    .options(header='true', inferSchema='true', delimiter= ',') \
                    .load('/mnt/<FOLDERNAME>/<FILENAME>.csv')

## or spark.read.format('csv')...
## Formats: json, parquet, jdbc, orc, libsvm, csv, text, avro
data = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", sqlDwUrlSmall) \
  .option("tempDir", tempDir) \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", sqlQuery) \
  .load() \
  .createOrReplaceTempView("<TEMPVIEWNAME>")
  #.write.saveAsTable("<TABLENAME>")
from pyspark.sql.types import *

schema = StructType([StructField('ID', IntegerType(), True),
                     StructField('Value', DoubleType(), True),
                     StructField('Category', StringType(), True),
                     StructField('Date', DateType(), True)])

dataset = sqlContext.read.format('csv') \
                    .schema(schema) \
                    .options(header='true', delimiter= ',') \
                    .load('/mnt/<FOLDERNAME>/<FILENAME>.csv')
df.coalesce(1) \
   .write.format("com.databricks.spark.csv") \
   .option("header", "true") \
   .save("file.csv")
https://docs.microsoft.com/en-us/azure/storage/

BibTex

Text Citation

@misc{sparkitecture,

author = {Colby T. Ford},

title = {Sparkitecture - {A} collection of "cookbook-style" scripts for simplifying data engineering and machine learning in {Apache Spark}.},

month = oct,

year = 2019,

doi = {10.5281/zenodo.3468502},

url = {https://doi.org/10.5281/zenodo.3468502}

}

Colby T. Ford. (2019, October) Sparkitecture - A collection of "cookbook-style" scripts for simplifying data engineering and machine learning in Apache Spark., (Version v1.0.0). Zenodo. http://doi.org/10.5281/zenodo.3468502

Feature Request
Pull Request

Random Forest

Setting Up Random Forest Regression

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Random Forest object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Gradient-Boosted Trees

Setting Up Gradient-Boosted Tree Regression

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Gradient-Boosted Tree object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Naïve Bayes

Setting Up a Naïve Bayes Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Naïve Bayes object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidatorfunction to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Linear Regression

Setting Up Linear Regression

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Linear Regression object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Get model information

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Random Forest

Setting Up a Random Forest Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Random Forest object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Decision Tree

Setting Up a Decision Tree Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Load in required libraries

Initialize Decision Tree object

Create a parameter grid for tuning the model

Define how you want the model to be evaluated

Define the type of cross-validation you want to perform

Fit the model to the data

Score the testing dataset using your fitted model for evaluation purposes

Evaluate the model

Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

Feature Importance

Extract important features using Gini

## Based on: https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
import pandas as pd

def ExtractFeatureImportance(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
  
  
# ExtractFeatureImportance(model.stages[-1].featureImportances, dataset, "features")
dataset_fi = ExtractFeatureImportance(model.bestModel.featureImportances, dataset, "features")
dataset_fi = sqlContext.createDataFrame(dataset_fi)
display(dataset_fi)

Extract important features using p-values

Extract coefficients from a model

Classification

Description:

Classification algorithms are used to identify into which classes observations of data should fall. This problem could be considered part of pattern recognition in that we use training data (historical information) to recognize patterns to predict where new data should be categorized.

Common Use Cases:

  • Fraudulent activity detection

  • Loan default prediction

  • Spam vs. ham

  • Customer segmentation

Classification Algorithms included in MLlib:

  • (both binomial and multiclass)

  • Gradient-boosted trees

Logistic Regression

Setting Up a Logistic Regression Classifier

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

dbutils.fs.mount(
  source = "wasbs://<CONTAINERNAME>@<STORAGEACCOUNT>.blob.core.windows.net",
  mount_point = "/mnt/<FOLDERNAME>/",
  extra_configs = {"fs.azure.account.key.<STORAGEACCOUNT>.blob.core.windows.net":"<KEYGOESHERE>"})
display(dbutils.fs.ls("/mnt/<FOLDERNAME>"))
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}
dbutils.fs.mount(
  source = "abfss://<CONTAINERNAME>@<STORAGEACCOUNT>.dfs.core.windows.net/",
  mount_point = "/mnt/<FOLDERNAME>",
  extra_configs = configs)
## Based on: https://stackoverflow.com/questions/42935914/how-to-map-features-from-the-output-of-a-vectorassembler-back-to-the-column-name
lrm = model.stages[-1]
## Transform the data:
transformed =  model.transform(df)
#Extract and flatten ML attributes:
from itertools import chain

attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*transformed
        .schema[lrm.summary.featuresCol]
        .metadata["ml_attr"]["attrs"].values())))
# and map to the output:

[(name, lrm.summary.pValues[idx]) for idx, name in attrs]
# [(name, lrm.coefficients[idx]) for idx, name in attrs]

Benign vs. malignant tumor classification

  • and many more...

  • Multilayer perceptron

  • Linear Support Vector Machine

  • One-vs-Rest classifier (a.k.a. One-vs-All)

  • Naïve Bayes

  • Logistic regression
    Decision trees
    Random forests
    DOI
    import pandas as pd
    
    featurelist = pd.DataFrame(dataset.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+dataset.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")
    featurelist["Coefficient"] = pd.DataFrame(model.bestModel.coefficients.toArray())
    featurelist = sqlContext.createDataFrame(featurelist)
    
    display(featurelist)
    Load in required libraries

    Initialize Logistic Regression object

    Create a parameter grid for tuning the model

    Define how you want the model to be evaluated

    Define the type of cross-validation you want to perform

    Fit the model to the data

    Score the testing dataset using your fitted model for evaluation purposes

    Evaluate the model

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    from pyspark.ml.regression import RandomForestRegressor
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    rf = RandomForestRegressor(labelCol="label", featuresCol="features")
    rfparamGrid = (ParamGridBuilder()
                 #.addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
                   .addGrid(rf.maxDepth, [2, 5, 10])
                 #.addGrid(rf.maxBins, [10, 20, 40, 80, 100])
                   .addGrid(rf.maxBins, [5, 10, 20])
                 #.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
                   .addGrid(rf.numTrees, [5, 20, 50])
                 .build())
    rfevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    rfcv = CrossValidator(estimator = rf,
                          estimatorParamMaps = rfparamGrid,
                          evaluator = rfevaluator,
                          numFolds = 5)
    rfcvModel = rfcv.fit(train)
    print(rfcvModel)
    rfpredictions = rfcvModel.transform(test)
    print('RMSE:', rfevaluator.evaluate(rfpredictions))
    from pyspark.ml.regression import GBTRegressor
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    gb = GBTRegressor(labelCol="label", featuresCol="features")
    gbparamGrid = (ParamGridBuilder()
                 .addGrid(gb.maxDepth, [2, 5, 10])
                 .addGrid(gb.maxBins, [10, 20, 40])
                 .addGrid(gb.maxIter, [5, 10, 20])
                 .build())
    gbevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    gbcv = CrossValidator(estimator = gb,
                          estimatorParamMaps = gbparamGrid,
                          evaluator = gbevaluator,
                          numFolds = 5)
    gbcvModel = gbcv.fit(train)
    print(gbcvModel)
    gbpredictions = gbcvModel.transform(test)
    print('RMSE:', gbevaluator.evaluate(gbpredictions))
    from pyspark.ml.classification import NaiveBayes
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    nb = NaiveBayes(labelCol="label", featuresCol="features")
    nbparamGrid = (ParamGridBuilder()
                   .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
                   .build())
    nbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    nbcv = CrossValidator(estimator = nb,
                          estimatorParamMaps = nbparamGrid,
                          evaluator = nbevaluator,
                          numFolds = 5)
    nbcvModel = nbcv.fit(train)
    print(nbcvModel)
    nbpredictions = nbcvModel.transform(test)
    print('Accuracy:', lrevaluator.evaluate(lrpredictions))
    print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    lr = LinearRegression(labelCol="label", featuresCol="features")
    lrparamGrid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
                 #  .addGrid(lr.regParam, [0.01, 0.1, 0.5])
                 .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
                 #  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                 .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
                 #  .addGrid(lr.maxIter, [1, 5, 10])
                 .build())
    lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    lrcv = CrossValidator(estimator = lr,
                        estimatorParamMaps = lrparamGrid,
                        evaluator = lrevaluator,
                        numFolds = 5)
    lrcvModel = lrcv.fit(train)
    print(lrcvModel)
    lrcvSummary = lrcvModel.bestModel.summary
    print("Coefficient Standard Errors: " + str(lrcvSummary.coefficientStandardErrors))
    print("P Values: " + str(lrcvSummary.pValues)) # Last element is the intercept
    lrpredictions = lrcvModel.transform(test)
    print('RMSE:', lrevaluator.evaluate(lrpredictions))
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")
    rfparamGrid = (ParamGridBuilder()
    
                   .addGrid(rf.maxDepth, [2, 5, 10])
    
                   .addGrid(rf.maxBins, [5, 10, 20])
    
                   .addGrid(rf.numTrees, [5, 20, 50])
                 .build())
    rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    rfcv = CrossValidator(estimator = rf,
                          estimatorParamMaps = rfparamGrid,
                          evaluator = rfevaluator,
                          numFolds = 5)
    rfcvModel = rfcv.fit(train)
    print(rfcvModel)
    rfpredictions = rfcvModel.transform(test)
    print('Accuracy:', rfevaluator.evaluate(rfpredictions))
    print('AUC:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
    dtparamGrid = (ParamGridBuilder()
                 .addGrid(dt.maxDepth, [2, 5, 10])
                 .addGrid(dt.maxBins, [10, 20])
                 .build())
    dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    dtcv = CrossValidator(estimator = dt,
                          estimatorParamMaps = dtparamGrid,
                          evaluator = dtevaluator,
                          numFolds = 5)
    dtcvModel = dtcv.fit(train)
    print(dtcvModel)
    dtpredictions = dtcvModel.transform(test)
    print('Accuracy:', dtevaluator.evaluate(dtpredictions))
    print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    lr = LogisticRegression(labelCol="label", featuresCol="features")
    lrparamGrid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
                 .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
                 .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
                 .build())
    lrevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName = "areaUnderROC")
    # Create 5-fold CrossValidator
    lrcv = CrossValidator(estimator = lr,
                        estimatorParamMaps = lrparamGrid,
                        evaluator = lrevaluator,
                        numFolds = 5)
    lrcvModel = lrcv.fit(train)
    print(lrcvModel)
    lrpredictions = lrcvModel.transform(test)
    print('Accuracy:', lrevaluator.evaluate(lrpredictions))
    print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)

    Model Evaluation

    Evaluate model performance by probability cutoff

    Note: Extract probability values using method found here - https://www.sparkitecture.io/machine-learning/model-saving-and-loading#remove-unnecessary-columns-from-the-scored-data.

    performance_df = spark.createDataFrame([(0,0,0)], ['cutoff', 'AUPR', 'AUC'])
    
    for cutoff in range(5, 95, 5):
      cutoff = (cutoff * 0.01)
      
      print('Testing cutoff = ', str(format(cutoff, '.2f')))
    
      lrpredictions_prob_temp = lrpredictions.withColumn('prediction_test', when(col('probability') >= cutoff, 1).otherwise(0).cast(DoubleType()))
      aupr_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderPR
      auc_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderROC
      print('\tAUPR:', aupr_temp,'\tAUC:', auc_temp)
      performance_df_row = spark.createDataFrame([(cutoff,aupr_temp,auc_temp)], ['cutoff', 'AUPR', 'AUC'])
      performance_df = performance_df.union(performance_df_row)
    
    display(performance_df)

    Evaluate multiclass classification models

    Evaluate binary classification models

    Text Data Preparation

    Tokenization and Vectorization

    Load in required libraries

    Remove usernames, dates, links, etc.

    RegEx tokenization

    Remove stop words

    Count words

    Index strings

    Create transformation pipeline

    Once the transformation pipeline has been fit, you can use normal for classifying the text.

    Extras

    Get label numbers for each class

    Split text body into sentences

    Create `part_number` for the split sentences

    Gradient-Boosted Trees

    Setting Up Gradient-Boosted Tree Classifier

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    Load in required libraries

    Initialize Gradient-Boosted Tree object

    Create a parameter grid for tuning the model

    Define how you want the model to be evaluated

    Define the type of cross-validation you want to perform

    Fit the model to the data

    Score the testing dataset using your fitted model for evaluation purposes

    Evaluate the model

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    API Serving

    Use MMLSpark

    Load in required libraries

    Glow

    About Glow

    Glow is an open-source and independent Spark library that brings even more flexibility and functionality to Azure Databricks. This toolkit is natively built on Apache Spark, enabling the scale of the cloud for genomics workflows.

    Glow allows for genomic data to work with Spark SQL. So, you can interact with common genetic data types as easily as you can play with a .csv file.

    Shaping Data with Pipelines

    Load in required libraries

    Define which columns are numerical versus categorical (and which is the label column)

    from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.mllib.evaluation import MulticlassMetrics
    
    # Evaluate best model
    print('Accuracy:', lrevaluator.evaluate(lrpredictions))
    lrmetrics = MulticlassMetrics(lrpredictions['label','prediction'].rdd)
    print('Confusion Matrix:\n', lrmetrics.confusionMatrix())
    print('F1 Score:', lrmetrics.fMeasure(1.0,1.0))
    classification algorithms
    Load in transformation pipeline and trained model

    Define username, key, and IP address

    Define input schema

    Set up streaming DataFrame

    Set up server

    Test the webservice

    You may need to run sudo netstat -tulpn to see what port is open if you're running inside Databricks.

    Use this command to look for the port that was opened by the server.

    Resources:

    Microsoft MMLSpark on GitHub: https://github.com/Azure/mmlspark

    Set up stages

    Index the categorical columns and perform One Hot Encoding

    One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)

    Index the label column and perform One Hot Encoding

    Note: If you are preparing the data for use in regression algorithms, there's no need to One Hot Encode the label column (since it should be numerical).

    Assemble the data together as a vector

    This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the VectorAssembler function.

    Scale features using Normalization

    Set up the transformation pipeline using the stages you've created along the way

    Pipeline Saving and Loading

    Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.

    Save the transformation pipeline

    Load in the transformation pipeline

    for model in ["lrpredictions", "dtpredictions", "rfpredictions", "nbpredictions", "gbpredictions"]:
        df = globals()[model]
        
        tp = df[(df.label == 1) & (df.prediction == 1)].count()
        tn = df[(df.label == 0) & (df.prediction == 0)].count()
        fp = df[(df.label == 0) & (df.prediction == 1)].count()
        fn = df[(df.label == 1) & (df.prediction == 0)].count()
        a = ((tp + tn)/df.count())
        
        if(tp + fn == 0.0):
            r = 0.0
            p = float(tp) / (tp + fp)
        elif(tp + fp == 0.0):
            r = float(tp) / (tp + fn)
            p = 0.0
        else:
            r = float(tp) / (tp + fn)
            p = float(tp) / (tp + fp)
        
        if(p + r == 0):
            f1 = 0
        else:
            f1 = 2 * ((p * r)/(p + r))
        
        print("Model:", model)
        print("True Positives:", tp)
        print("True Negatives:", tn)
        print("False Positives:", fp)
        print("False Negatives:", fn)
        print("Total:", df.count())
        print("Accuracy:", a)
        print("Recall:", r)
        print("Precision: ", p)
        print("F1 score:", f1)
        print('AUC:', BinaryClassificationMetrics(df['label','prediction'].rdd).areaUnderROC)
    print("\n")
    
    def clean_text(c):
      c = lower(c)
      c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
      c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
      c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
      c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
      c = regexp_replace(c, "[0-9]", "") # Remove numbers
      c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
      #c = regexp_replace(c, "(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")
      return c
    
    dataset = dataset.withColumn("text", clean_text(col("text")))
    regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
    # Add Stop words
    add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words
    
    stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
    # Bag of Words Count
    countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
    # String Indexer
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
    label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")
    from pyspark.ml import Pipeline
    
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
    
    # Fit the pipeline to training documents.
    pipelineFit = pipeline.fit(data)
    dataset = pipelineFit.transform(data)
    from pyspark.sql import *
    from pyspark.sql.functions import col
    labelset = dataset.select(col("class"),
                              col("label")).distinct()
    display(labelset)
    from pyspark.sql.types import *
    from pyspark.sql.window import *
    from pyspark.sql.functions import col, split, explode, row_number
    # Split text by sentence and convert to array
    array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
      
    # Explode array into separate rows in the dataset
    split_df = array_df.withColumn("text", explode(col("text")))\
                       .withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
    data = split_df
    display(data)
    from pyspark.sql.window import *
    from pyspark.sql.functions import row_number
    
    data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()
    from pyspark.ml.classification import GBTClassifier
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    gb = GBTClassifier(labelCol="label", featuresCol="features")
    gbparamGrid = (ParamGridBuilder()
                 .addGrid(gb.maxDepth, [2, 5, 10])
                 .addGrid(gb.maxBins, [10, 20, 40])
                 .addGrid(gb.maxIter, [5, 10, 20])
                 .build())
    gbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    gbcv = CrossValidator(estimator = gb,
                          estimatorParamMaps = gbparamGrid,
                          evaluator = gbevaluator,
                          numFolds = 5)
    gbcvModel = gbcv.fit(train)
    print(gbcvModel)
    gbpredictions = gbcvModel.transform(test)
    print('Accuracy:', gbevaluator.evaluate(gbpredictions))
    print('AUC:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.tuning import CrossValidatorModel
    from pyspark.ml import PipelineModel
    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import col, round
    
    import sys
    import numpy as np
    import pandas as pd
    import mmlspark
    from pyspark.sql.functions import col, from_json
    from pyspark.sql.types import *
    import uuid
    from mmlspark import request_to_string, string_to_response
    ## Load in the transformation pipeline
    mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
    
    ## Load in trained model
    mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
    username = "admin"
    ip = "10.0.0.4" #Internal IP
    sas_url = "" # SAS Token for your VM's Private Key in Blob
    input_schema = StructType([
      StructField("id", IntegerType(), True),
      StructField("x1", IntegerType(), True),
      StructField("x2", DoubleType(), True),
      StructField("x3", StringType(), True),
     ])
    serving_inputs = spark.readStream.continuousServer() \
                          .option("numPartitions", 1) \
                          .option("name", "http://10.0.0.4:8898/my_api") \
                          .option("forwarding.enabled", True) \
                          .option("forwarding.username", username) \
                          .option("forwarding.sshHost", ip) \
                          .option("forwarding.keySas", sas_url) \
                          .address("localhost", 8898, "my_api") \
                          .load()\
                          .parseRequest(input_schema)
    
    mydataset = mypipeline.transform(serving_inputs)
    
    serving_outputs = mymodel.bestModel.transform(mydataset) \
      .makeReply("prediction")
    
    # display(serving_inputs)
    server = serving_outputs.writeStream \
        .continuousServer() \
        .trigger(continuous="1 second") \
        .replyTo("my_api") \
        .queryName("my_query") \
        .option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1())) \
        .start()
    import requests
    data = u'{"id":0,"x1":1,"x2":2.0,"x3":"3"}'
    
    #r = requests.post(data=data, url="http://localhost:8898/my_api") # Locally
    r = requests.post(data=data, url="http://102.208.216.32:8902/my_api") # Via the VM IP
    
    print("Response {}".format(r.text))
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
    label = "dependentvar"
    categoricalColumns = ["col1",
                         "col2"]
    
    numericalColumns = ["num1",
                        "num2"]
    
    #categoricalColumnsclassVec = ["col1classVec",
    #                              "col2classVec"]
    categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
    stages = []
    for categoricalColumn in categoricalColumns:
      print(categoricalColumn)
      ## Category Indexing with StringIndexer
      stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
      ## Use OneHotEncoder to convert categorical variables into binary SparseVectors
      encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
      ## Add stages
      stages += [stringIndexer, encoder]
    ## Convert label into label indices using the StringIndexer
    label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
    stages += [label_stringIndexer]
    assemblerInputs = categoricalColumnsclassVec + numericalColumns
    assembler = VectorAssembler(inputCols = assemblerInputs,
                                outputCol = "features")
    stages += [assembler]
    from pyspark.ml.feature import StandardScaler
    
    scaler = StandardScaler(inputCol = "features",
                            outputCol = "scaledFeatures",
                            withStd = True,
                            withMean = True)
    stages += [scaler]
    prepPipeline = Pipeline().setStages(stages)
    pipelineModel = prepPipeline.fit(dataset)
    dataset = pipelineModel.transform(dataset)
    pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
    display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))
    from pyspark.ml import PipelineModel
    pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
    dataset = pipelineModel.transform(dataset)
    display(dataset)
    Learn more about Project Glow at projectglow.io.

    Read the full documentation: glow.readthedocs.io

    Features:

    • Genomic datasources: To read datasets in common file formats such as VCF, BGEN, and Plink into Spark DataFrames.

    • Genomic functions: Common operations such as computing quality control statistics, running regression tests, and performing simple transformations are provided as Spark functions that can be called from Python, SQL, Scala, or R.

    • Data preparation building blocks: Glow includes transformations such as variant normalization and lift over to help produce analysis ready datasets.

    • Integration with existing tools: With Spark, you can write user-defined functions (UDFs) in Python, R, SQL, or Scala. Glow also makes it easy to run DataFrames through command line tools.

    • Integration with other data types: Genomic data can generate additional insights when joined with data sets such as electronic health records, real world evidence, and medical images. Since Glow returns native Spark SQL DataFrames, its simple to join multiple data sets together.

    How To Install

    If you're using Databricks, make sure you enable the Databricks Runtime for Genomics. Glow is already included and configured in this runtime.

    pip Installation

    Using pip, install by simply running pip install glow.py and then start the Spark shell with the Glow maven package.

    Maven Installation

    Install the maven package io.project:glow_2.11:${version} and optionally the Python frontend glow.py. Set the Spark configuration spark.hadoop.io.compression.codecs to io.projectglow.sql.util.BGZFCodec in order to read and write BGZF-compressed files.

    Load in Glow

    Read in Data

    Summary Statistics and Quality Control

    Split Multiallelic Variants to Biallelic

    Write out Data

    Azure Data Factory

    Transformation with Azure Databricks

    Using Azure Databricks with Azure Data Factory, notebooks can be run from an end-to-end pipeline that contains the Validation, Copy data, and Notebook activities in Azure Data Factory.

    • Validation ensures that your source dataset is ready for downstream consumption before you trigger the copy and analytics job.

    ./bin/pyspark --packages io.projectglow:glow_2.11:0.5.0
     --conf spark.hadoop.io.compression.codecs=io.projectglow.sql.util.BGZFCodec
    import glow
    glow.register(spark)
    vcf_path = "/databricks-datasets/genomics/1kg-vcfs/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz"
    
    df = spark.read.format("vcf")\
              .option("includeSampleIds", False)\
              .option("flattenInfoFields", False)\
              .load(vcf_path)\
              .withColumn("first_genotype", expr("genotypes[0]"))
              
    # bgen_path = "/databricks-datasets/genomics/1kg-bgens/1kg_chr22.bgen"
    
    # df = spark.read.format("bgen") \
    #           .load(bgen_path)
    df = df.withColumn("hardyweinberg", expr("hardy_weinberg(genotypes)")) \
           .withColumn("summarystats", expr("call_summary_stats(genotypes)")) \
           .withColumn("depthstats", expr("dp_summary_stats(genotypes)")) \
           .withColumn("genotypequalitystats", expr("gq_summary_stats(genotypes)")) \
           .filter(col("qual") >= 98) \
           .filter((col("start") >= 16000000) & (col("end") >= 16050000)) \
           .where((col("alleleFrequencies").getItem(0) >= allele_freq_cutoff) & 
                  (col("alleleFrequencies").getItem(0) <= (1.0 - allele_freq_cutoff))) \
           .withColumn("log10pValueHwe", when(col("pValueHwe") == 0, 26).otherwise(-log10(col("pValueHwe"))))
    split_df = glow.transform("split_multiallelics", df)
    df.coalesce(1) \
      .write \
      .mode("overwrite") \
      .format("vcf") \
      .save("/tmp/vcf_output")
  • Copy data duplicates the source dataset to the sink storage, which is mounted as DBFS in the Azure Databricks notebook. In this way, the dataset can be directly consumed by Spark.

  • Notebook triggers the Databricks notebook that transforms the dataset. It also adds the dataset to a processed folder or Azure SQL Data Warehouse.

  • Import a notebook for Transformation

    To import a Transformation notebook to your Databricks workspace:

    1. Sign in to your Azure Databricks workspace, and then select Import. Your workspace path can be different from the one shown, but remember it for later.

    2. Select Import from: URL. In the text box, enter https://adflabstaging1.blob.core.windows.net/share/Transformations.html.

    3. Now let's update the Transformation notebook with your storage connection information.

      In the imported notebook, go to command 5 as shown in the following code snippet.

      • Replace with your own storage connection information.

      • Use the storage account with the sinkdata container.

    4. Generate a Databricks access token for Data Factory to access Databricks.

      1. In your Databricks workspace, select your user profile icon in the upper right.

      2. Select User Settings.

      3. Select Generate New Token

    How to use this template

    1. Go to the Transformation with Azure Databricks template and create new linked services for following connections.

      • Source Blob Connection - to access the source data.

        For this exercise, you can use the public blob storage that contains the source files. Reference the following screenshot for the configuration. Use the following SAS URL to connect to source storage (read-only access):

        https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D

      • Destination Blob Connection - to store the copied data.

        In the New linked service window, select your sink storage blob.

      • Azure Databricks - to connect to the Databricks cluster.

        Create a Databricks-linked service by using the access key that you generated previously. You can opt to select an interactive cluster if you have one. This example uses the New job cluster option.

    2. Select Use this template. You'll see a pipeline created.

    Pipeline introduction and configuration

    In the new pipeline, most settings are configured automatically with default values. Review the configurations of your pipeline and make any necessary changes.

    1. In the Validation activity Availability flag, verify that the source Dataset value is set to SourceAvailabilityDataset that you created earlier.

    2. In the Copy data activity file-to-blob, check the Source and Sink tabs. Change settings if necessary.

      • Source tab

      • Sink tab

    3. In the Notebook activity Transformation, review and update the paths and settings as needed.

      Databricks linked service should be pre-populated with the value from a previous step, as shown:

      To check the Notebook settings:

      1. Select the Settings tab. For Notebook path, verify that the default path is correct. You might need to browse and choose the correct notebook path.

    4. Verify that the Pipeline Parameters match what is shown in the following screenshot:

    5. Connect to your datasets.

    In below datasets, the file path has been automatically specified in the template. If any changes required, make sure that you specify the path for both container and directory in case any connection error.

    • SourceAvailabilityDataset - to check that the source data is available.

    • SourceFilesDataset - to access the source data.

    • DestinationFilesDataset - to copy the data into the sink destination location. Use the following values:

      • Linked service - sinkBlob_LS, created in a previous step.

      • File path - sinkdata/staged_sink.

    1. Select Debug to run the pipeline. You can find the link to Databricks logs for more detailed Spark logs.

      You can also verify the data file by using Azure Storage Explorer.

    For correlating with Data Factory pipeline runs, this example appends the pipeline run ID from the data factory to the output folder. This helps keep track of files generated by each run.

    Next steps

    • Introduction to Azure Data Factory

    • Transformation with Azure Databricks

    • Run a Databricks notebook with the Databricks Notebook Activity in Azure Data Factory

    Model Saving and Loading

    Model Saving

    Save model(s) to mounted storage

    Remove a model

    Spark MLlib models are actually a series of files in a directory. So, you will need to recursively delete the files in model's directory, then the directory itself.

    Score new data using a trained model

    Load in required libraries

    Load in the transformation pipeline

    Load in the trained model

    Remove unnecessary columns from the scored data

    lrcvModel.save("/mnt/trainedmodels/lr")
    rfcvModel.save("/mnt/trainedmodels/rf")
    dtcvModel.save("/mnt/trainedmodels/dt")
    display(dbutils.fs.ls("/mnt/trainedmodels/"))
    under the
    Access Tokens
    tab.
  • Select Generate.

  • Save the access token for later use in creating a Databricks linked service. The access token looks something like dapi32db32cbb4w6eee18b7d87e45exxxxxx.

    Expand the Base Parameters selector and verify that the parameters match what is shown in the following screenshot. These parameters are passed to the Databricks notebook from Data Factory.

    ## Supply storageName and accessKey values  
    storageName = ""  
    accessKey = ""  
    
    ## Attempt to Mount Data Factory Data in Azure Storage
    dbutils.fs.mount(
        source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",  
        mount_point = "/mnt/Data Factorydata",  
        extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey})  
    dbutils.fs.rm("/mnt/trainedmodels/dt", True)
    from pyspark.ml.tuning import CrossValidatorModel
    from pyspark.ml import PipelineModel
    from pyspark.sql.functions import col, round
    from pyspark.sql.types import IntegerType, FloatType
    pipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
    ## Fit the pipeline to new data
    transformeddataset = pipeline.transform(dataset)
    model = CrossValidatorModel.load("/mnt/trainedmodels/lr/")
    ## Score the data using the model
    scoreddataset = model.bestModel.transform(transformeddataset)
    ## Function to extract probability from array
    getprob = udf(lambda v:float(v[1]),FloatType())
    
    ## Select out the necessary columns
    output = scoreddataset.select(col("ID"),
                                  col("label"),
                                  col("rawPrediction"),           
                                  getprob(col("probability")).alias("probability"),
                                  col("prediction"))

    Decision Tree

    Setting Up Decision Tree Regression

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    Load in required libraries

    Initialize Decision Tree object

    Create a parameter grid for tuning the model

    Define how you want the model to be evaluated

    Define the type of cross-validation you want to perform

    Fit the model to the data

    Score the testing dataset using your fitted model for evaluation purposes

    Evaluate the model

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    Other Common Tasks

    Split Data into Training and Test Datasets

    Rename all columns

    Batch Scoring

    This section is designed for use with a data orchestration tool that can call and execute Databricks notebooks. For more information on how to set up Azure Data Factory, see: .

    Create date parameter

    Convert PySpark DataFrame to NumPy array

    Call Cognitive Service API using PySpark

    Create `chunker` function

    The cognitive service APIs can only take a limited number of observations at a time (1,000, to be exact) or a limited amount of data in a single call. So, we can create a chunker function that we will use to split the dataset up into smaller chunks.

    Convert Spark DataFrame to Pandas

    Set up API requirements

    Create DataFrame for incoming scored data

    Loop through chunks of the data and call the API

    Write the results out to mounted storage

    Find All Columns of a Certain Type

    Change a Column's Type

    Generate StructType Schema Printout (Manual Execution)

    Generate StructType Schema from List (Automatic Execution)

    Make a DataFrame of Consecutive Dates

    Unpivot a DataFrame Dynamically (Longer)

    Pivot a wide dataset into a longer form. (Similar to the pivot_longer() function from the tidyr R package or the .wide_to_long method from pandas.)

    Connect to storage

    Define input schema

    Read in new data

    Load in transformation pipeline and model

    Score data using the model

    Write data back out to storage

    https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook
    from pyspark.ml.regression import DecisionTreeRegressor
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    dt = DecisionTreeRegressor(labelCol="label", featuresCol="features")
    dtparamGrid = (ParamGridBuilder()
                 .addGrid(dt.maxDepth, [2, 5, 10, 20, 30])
                 #.addGrid(dt.maxDepth, [2, 5, 10])
                 .addGrid(dt.maxBins, [10, 20, 40, 80, 100])
                 #.addGrid(dt.maxBins, [10, 20])
                 .build())
    dtevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    dtcv = CrossValidator(estimator = dt,
                          estimatorParamMaps = dtparamGrid,
                          evaluator = dtevaluator,
                          numFolds = 5)
    dtcvModel = dtcv.fit(train)
    print(dtcvModel)
    dtpredictions = dtcvModel.transform(test)
    print('RMSE:', dtevaluator.evaluate(dtpredictions))
    train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)
    column_list = data.columns
    prefix = "my_prefix"
    new_column_list = [prefix + s for s in column_list]
    #new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID later
     
    column_mapping = [[o, n] for o, n in zip(column_list, new_column_list)]
    
    # print(column_mapping)
    
    data = data.select(list(map(lambda old, new: col(old).alias(new),*zip(*column_mapping))))
    ## Convert `train` DataFrame to NumPy
    pdtrain = train.toPandas()
    trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
    X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)
    y_train = pdtrain['label'].values.reshape(-1,1).ravel()
    
    ## Convert `test` DataFrame to NumPy
    pdtest = test.toPandas()
    testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
    X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)
    y_test = pdtest['label'].values.reshape(-1,1).ravel()
    
    print(y_test)
    ## Define Chunking Logic
    import pandas as pd
    import numpy as np
    # Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficiently
    def chunker(seq, size):
        return (seq[pos:pos + size] for pos in range(0, len(seq), size))
    ## sentiment_df_pd = sentiment_df.toPandas()
    # pprint is used to format the JSON response
    from pprint import pprint
    import json
    import requests
    
    subscription_key = '<SUBSCRIPTIONKEY>'
    endpoint = 'https://<SERVICENAME>.cognitiveservices.azure.com'
    sentiment_url = endpoint + "/text/analytics/v2.1/sentiment"
    headers = {"Ocp-Apim-Subscription-Key": subscription_key}
    from pyspark.sql.types import *
    
    sentiment_schema = StructType([StructField("id", IntegerType(), True),
                                   StructField("score", FloatType(), True)])
    
    sentiments_df = spark.createDataFrame([], sentiment_schema)
    
    display(sentiments_df)
    for chunk in chunker(sentiment_df_pd, 1000):
      print("Scoring", len(chunk), "rows.")
      sentiment_df_json = json.loads('{"documents":' + chunk.to_json(orient='records') + '}')
      
      response = requests.post(sentiment_url, headers = headers, json = sentiment_df_json)
      sentiments = response.json()
      # pprint(sentiments)
      
      sentiments_pd = pd.read_json(json.dumps(sentiments['documents']))
      sentiments_df_chunk = spark.createDataFrame(sentiments_pd)
      sentiments_df = sentiments_df.unionAll(sentiments_df_chunk)
      
    display(sentiments_df)
    sentiments_df.count()
    sentiments_df.coalesce(1).write.csv("/mnt/textanalytics/sentimentanalysis/")
    import pandas as pd
    def get_nonstring_cols(df):
        types = spark.createDataFrame(pd.DataFrame({'Column': df.schema.names, 'Type': [str(f.dataType) for f in df.schema.fields]}))
        result = types.filter(col('Type') != 'StringType').select('Column').rdd.flatMap(lambda x: x).collect()
        return result
        
    get_nonstring_cols(df)
    from pyspark.sql.types import *
    from pyspark.sql.functions import col
    
    df = df.withColumn('col1', col('col1').cast(IntegerType()))
    ## Fill in list with your desired column names
    cols = ["col1", "col2", "col3"]
    i = 1
    
    for col in cols:
        if i == 1:
            print("schema = StructType([")
            print("\tStructField('" + col +  "', StringType(), True),")
        
        elif i == len(cols):
            print("\tStructField('" + col +  "', StringType(), True)])")
            
        else:
            print("\tStructField('" + col +  "', StringType(), True),")
        
        i += 1
        
    ## Once the output has printed, copy and paste into a new cell
    ## and change column types and nullability
    """
    Struct Schema Creator for PySpark
    
    [<Column Name>, <Column Type>, <Column Nullable>]
    
    Types:  binary, boolean, byte, date,
            double, integer, long, null,
            short, string, timestamp, unknown
    """
    from pyspark.sql.types import *
    
    ## Fill in with your desired column names, types, and nullability
    cols = [["col1", "string", False],
            ["col2", "date", True],
            ["col3", "integer", True]]
    
    ## Loop to build list of StructFields
    schema_set = ["schema = StructType(["]
    
    for i, col in enumerate(cols):
        colname = col[0]
        coltype = col[1].title() + "Type()"
        colnull = col[2]
        
        if i == len(cols)-1:
            iter_structfield = "StructField('" + colname +  "', " + coltype + ", " + str(colnull) + ")])"
        else:
            iter_structfield = "StructField('" + colname +  "', " + coltype + ", " + str(colnull) + "),"
        
        schema_set.append(iter_structfield)
    
    ## Convert list to single string
    schema_string = ''.join(map(str, schema_set))
    
    ## This will execute the generated command string
    exec(schema_string)
    from pyspark.sql.functions import sequence, to_date, explode, col
    date_dim = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2019-12-31'), interval 1 day) as DATE").withColumn("DATE", explode(col("DATE")))
    display(date_dim)
    ## UnpivotDF Function
    def UnpivotDF(df, columns, pivotCol, unpivotColName, valueColName):
      columnsValue = list(map(lambda x: str("'") + str(x) + str("',")  + str(x), columns))
      stackCols = ','.join(x for x in columnsValue)
    
      df_unpvt = df.selectExpr(pivotCol, f"stack({str(len(columns))}, {stackCols}) as ({unpivotColName}, {valueColName})")\
                   .select(pivotCol, unpivotColName, valueColName)
      
      return(df_unpvt)
    df_unpvt = UnpivotDF(df = df,
                         columns = df.columns[1:], ## The columns to transpose into a single, longer column
                         pivotCol = "ID", ## The column to leave in place (usually an ID)
                         unpivotColName = "Category", ## The name of the new column
                         valueColName = "value") ## The name of the column of values
    dbutils.widgets.text("varReportDate", "19000101")
    ReportDate = dbutils.widgets.get("varReportDate")
    print(ReportDate)
    storage_account_name = "mystorage"
    storage_account_access_key = ""
    
    file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
    file_type = "csv"
    
    spark.conf.set(
      "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
      storage_account_access_key)
    from pyspark.sql.types import *
    
    schema = StructType([
        StructField("ReportingDate", DateType(), True),
        StructField("id", StringType(), True),
        StructField("x1", IntegerType(), True),
        StructField("x2", DoubleType(), True)
    ])
    dataset = spark.read\
                   .format(file_type)\
                   .option("header", "true")\
                   .schema(schema)\
                   .load(file_location)
    
    ## You can avoid defining a schema by having spark infer it from your data
    ## This doesn't always work and can be slow
    #.option("inferSchema", "true")
    
    ## Fill in na's, if needed
    # dataset = dataset.na.fill(0)
    display(dataset)
    from pyspark.ml.tuning import CrossValidatorModel
    from pyspark.ml import PipelineModel
    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import col, round
    from pyspark.ml.regression import GeneralizedLinearRegressionModel
    
    mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
    mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
    ## Transform new data using the pipeline
    mydataset = mypipeline.transform(dataset)
    ## Score new data using a trained model
    scoreddataset = mymodel.bestModel.transform(mydataset)
    
    output = scoreddataset.select(col("id"),
                                  col("ReportingDate"),
                                  col("prediction").alias("MyForecast"))
    display(output)
    fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
    output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)