Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Created by: Colby T. Ford, Ph.D.
PySpark Edition | A work in progress... | Created using GitBook.com
Sparkitecture is a collection of “cookbook-style” scripts for simplifying data engineering and machine learning in Apache Spark.
BibTex
Text Citation
@misc{sparkitecture,
author = {Colby T. Ford},
title = {Sparkitecture - {A} collection of "cookbook-style" scripts for simplifying data engineering and machine learning in {Apache Spark}.},
month = oct,
year = 2019,
doi = {10.5281/zenodo.3468502},
url = {https://doi.org/10.5281/zenodo.3468502}
}
Colby T. Ford. (2019, October) Sparkitecture - A collection of "cookbook-style" scripts for simplifying data engineering and machine learning in Apache Spark., (Version v1.0.0). Zenodo.
Storage is a managed service in Azure that provides highly available, secure, durable, scalable, and redundant storage for your data. Azure Storage includes both Blobs, Data Lake Store, and others.
Databricks-Specific Functionality
Once you create your blob storage account in Azure, you will need to grab a couple bits of information from the Azure Portal before you mount your storage.
You can find your Storage Account Name (which will go in below) and your Key (which will go in below) under Access Keys in your Storage Account resource in Azure.
Go into your Storage Account resource in Azure and click on Blobs. Here, you will find all of your containers. Pick the one you want to mount and copy its name into below.
As for the mount point (/mnt/<FOLDERNAME>
below), you can name this whatever you'd like, but it will help you in the long run to name it something useful along the lines of storageaccount_container
.
Once you have the required bits of information, you can use the following code to mount the storage location inside the Databricks environment
You can then test to see if you can list the files in your mounted location
To learn how to create an Azure Storage service, visit
For finer-grained access controls on your data, you may opt to use Azure Data Lake Storage. In Databricks, you can connect to your data lake in a similar manner to blob storage. Instead of an access key, your user credentials will be passed through, therefore only showing you data that you specifically have access to.
To pass in your Azure Active Directory credentials from Databricks to Azure Data Lake Store, you will need to enable this feature in Databricks under New Cluster > Advanced Options.
Note: If you create a High Concurrency cluster, multiple users can use the same cluster. The Standard cluster mode will only allow a single user's credential at a time.
dbutils.fs.mount(
source = "wasbs://<CONTAINERNAME>@<STORAGEACCOUNT>.blob.core.windows.net",
mount_point = "/mnt/<FOLDERNAME>/",
extra_configs = {"fs.azure.account.key.<STORAGEACCOUNT>.blob.core.windows.net":"<KEYGOESHERE>"})
display(dbutils.fs.ls("/mnt/<FOLDERNAME>"))
configs = {
"fs.azure.account.auth.type": "CustomAccessToken",
"fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}
dbutils.fs.mount(
source = "abfss://<CONTAINERNAME>@<STORAGEACCOUNT>.dfs.core.windows.net/",
mount_point = "/mnt/<FOLDERNAME>",
extra_configs = configs)
dwDatabase = "<DATABASENAME>" ## The Azure SQL Data Warehouse database name
dwServer = "<DWNAME>.database.windows.net" ## The Azure SQL Server
dwUser = "<USERNAME>" ## The dedicated loading user login
dwPass = dbutils.secrets.get(scope = "<SECRETNAME>", key = "<KEYNAME>") ## The dediciated loading user login password
dwJdbcPort = "1433"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
sqlQuery = """
SELECT *, 'AzureSqlDw' AS SourceSystem
FROM dbo.<TABLENAME>
"""
data = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", sqlDwUrlSmall) \
.option("tempDir", tempDir) \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", sqlQuery) \
.load() \
.createOrReplaceTempView("<TEMPVIEWNAME>")
#.write.saveAsTable("<TABLENAME>")
dataset = spark.read.format('csv') \
.options(header='true', inferSchema='true', delimiter= ',') \
.load('/mnt/<FOLDERNAME>/<FILENAME>.csv')
## or spark.read.format('csv')...
## Formats: json, parquet, jdbc, orc, libsvm, csv, text, avro
from pyspark.sql.types import *
schema = StructType([StructField('ID', IntegerType(), True),
StructField('Value', DoubleType(), True),
StructField('Category', StringType(), True),
StructField('Date', DateType(), True)])
dataset = sqlContext.read.format('csv') \
.schema(schema) \
.options(header='true', delimiter= ',') \
.load('/mnt/<FOLDERNAME>/<FILENAME>.csv')
df.coalesce(1) \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.save("file.csv")
Apache Spark Data Sources Documentation: https://spark.apache.org/docs/latest/sql-data-sources.html
MLlib is Apache Spark's scalable machine learning library.
MLlib works with Spark's APIs and with NumPy in Python and with R libraries. Since Spark excels at iterative computation, MLlib runs very fast with highly-scalable, high-quality algorithms that leverage iteration.
Classification: logistic regression, naive Bayes,...
Regression: generalized linear regression, survival regression,...
Decision trees, random forests, and gradient-boosted trees
Recommendation: alternating least squares (ALS)
Clustering: K-means, Gaussian mixtures (GMMs),...
Topic modeling: latent Dirichlet allocation (LDA)
Frequent itemsets, association rules, and sequential pattern mining
Feature transformations: standardization, normalization, hashing,...
ML Pipeline construction
Model evaluation and hyper-parameter tuning
ML persistence: saving and loading models and Pipelines
Distributed linear algebra: SVD, PCA,...
Statistics: summary statistics, hypothesis testing,...
MLflow is an open source library by the Databricks team designed for managing the machine learning lifecycle. It allows for the creation of projects, tracking of metrics, and model versioning.
pip install mlflow
MLflow can be used in any Spark environmnet, but the automated tracking and UI of MLflow is Databricks-Specific Functionality.
Track metrics and parameters
import mlflow
## Log Parameters and Metrics from your normal MLlib run
with mlflow.start_run():
# Log a parameter (key-value pair)
mlflow.log_param("alpha", 0.1)
# Log a metric; metrics can be updated throughout the run
mlflow.log_metric("AUC", 0.871827)
mlflow.log_metric("F1", 0.726153)
mlflow.log_metric("Precision", 0.213873)
MLflow GitHub: https://github.com/mlflow/mlflow/
Classification algorithms are used to identify into which classes observations of data should fall. This problem could be considered part of pattern recognition in that we use training data (historical information) to recognize patterns to predict where new data should be categorized.
Fraudulent activity detection
Loan default prediction
Spam vs. ham
Customer segmentation
Benign vs. malignant tumor classification
and many more...
(both binomial and multiclass)
Gradient-boosted trees
Multilayer perceptron
Linear Support Vector Machine
One-vs-Rest classifier (a.k.a. One-vs-All)
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dtparamGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [2, 5, 10])
.addGrid(dt.maxBins, [10, 20])
.build())
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
estimatorParamMaps = dtparamGrid,
evaluator = dtevaluator,
numFolds = 5)
dtcvModel = dtcv.fit(train)
print(dtcvModel)
dtpredictions = dtcvModel.transform(test)
print('Accuracy:', dtevaluator.evaluate(dtpredictions))
print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderPR)
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
gb = GBTClassifier(labelCol="label", featuresCol="features")
gbparamGrid = (ParamGridBuilder()
.addGrid(gb.maxDepth, [2, 5, 10])
.addGrid(gb.maxBins, [10, 20, 40])
.addGrid(gb.maxIter, [5, 10, 20])
.build())
gbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
estimatorParamMaps = gbparamGrid,
evaluator = gbevaluator,
numFolds = 5)
gbcvModel = gbcv.fit(train)
print(gbcvModel)
gbpredictions = gbcvModel.transform(test)
print('Accuracy:', gbevaluator.evaluate(gbpredictions))
print('AUC:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderPR)
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
lr = LinearRegression(labelCol="label", featuresCol="features")
lrparamGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
# .addGrid(lr.regParam, [0.01, 0.1, 0.5])
.addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
# .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
.addGrid(lr.maxIter, [1, 5, 10, 20, 50])
# .addGrid(lr.maxIter, [1, 5, 10])
.build())
lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
estimatorParamMaps = lrparamGrid,
evaluator = lrevaluator,
numFolds = 5)
lrcvModel = lrcv.fit(train)
print(lrcvModel)
lrcvSummary = lrcvModel.bestModel.summary
print("Coefficient Standard Errors: " + str(lrcvSummary.coefficientStandardErrors))
print("P Values: " + str(lrcvSummary.pValues)) # Last element is the intercept
lrpredictions = lrcvModel.transform(test)
print('RMSE:', lrevaluator.evaluate(lrpredictions))
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
lr = LogisticRegression(labelCol="label", featuresCol="features")
lrparamGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
.addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
.addGrid(lr.maxIter, [1, 5, 10, 20, 50])
.build())
lrevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName = "areaUnderROC")
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
estimatorParamMaps = lrparamGrid,
evaluator = lrevaluator,
numFolds = 5)
lrcvModel = lrcv.fit(train)
print(lrcvModel)
lrpredictions = lrcvModel.transform(test)
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
label = "dependentvar"
categoricalColumns = ["col1",
"col2"]
numericalColumns = ["num1",
"num2"]
#categoricalColumnsclassVec = ["col1classVec",
# "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
stages = []
One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)
for categoricalColumn in categoricalColumns:
print(categoricalColumn)
## Category Indexing with StringIndexer
stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
## Use OneHotEncoder to convert categorical variables into binary SparseVectors
encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
## Add stages
stages += [stringIndexer, encoder]
## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]
This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the VectorAssembler
function.
assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = "features")
stages += [assembler]
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = "features",
outputCol = "scaledFeatures",
withStd = True,
withMean = True)
stages += [scaler]
prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.
pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
dataset = pipelineModel.transform(dataset)
display(dataset)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfparamGrid = (ParamGridBuilder()
.addGrid(rf.maxDepth, [2, 5, 10])
.addGrid(rf.maxBins, [5, 10, 20])
.addGrid(rf.numTrees, [5, 20, 50])
.build())
rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
estimatorParamMaps = rfparamGrid,
evaluator = rfevaluator,
numFolds = 5)
rfcvModel = rfcv.fit(train)
print(rfcvModel)
rfpredictions = rfcvModel.transform(test)
print('Accuracy:', rfevaluator.evaluate(rfpredictions))
print('AUC:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderPR)
Spark MLlib models are actually a series of files in a directory. So, you will need to recursively delete the files in model's directory, then the directory itself.
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
gb = GBTRegressor(labelCol="label", featuresCol="features")
gbparamGrid = (ParamGridBuilder()
.addGrid(gb.maxDepth, [2, 5, 10])
.addGrid(gb.maxBins, [10, 20, 40])
.addGrid(gb.maxIter, [5, 10, 20])
.build())
gbevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
estimatorParamMaps = gbparamGrid,
evaluator = gbevaluator,
numFolds = 5)
gbcvModel = gbcv.fit(train)
print(gbcvModel)
gbpredictions = gbcvModel.transform(test)
print('RMSE:', gbevaluator.evaluate(gbpredictions))
lrcvModel.save("/mnt/trainedmodels/lr")
rfcvModel.save("/mnt/trainedmodels/rf")
dtcvModel.save("/mnt/trainedmodels/dt")
display(dbutils.fs.ls("/mnt/trainedmodels/"))
dbutils.fs.rm("/mnt/trainedmodels/dt", True)
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, round
from pyspark.sql.types import IntegerType, FloatType
pipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
## Fit the pipeline to new data
transformeddataset = pipeline.transform(dataset)
model = CrossValidatorModel.load("/mnt/trainedmodels/lr/")
## Score the data using the model
scoreddataset = model.bestModel.transform(transformeddataset)
## Function to extract probability from array
getprob = udf(lambda v:float(v[1]),FloatType())
## Select out the necessary columns
output = scoreddataset.select(col("ID"),
col("label"),
col("rawPrediction"),
getprob(col("probability")).alias("probability"),
col("prediction"))
dbutils.widgets.text("varReportDate", "19000101")
ReportDate = dbutils.widgets.get("varReportDate")
print(ReportDate)
storage_account_name = "mystorage"
storage_account_access_key = ""
file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
file_type = "csv"
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
storage_account_access_key)
from pyspark.sql.types import *
schema = StructType([
StructField("ReportingDate", DateType(), True),
StructField("id", StringType(), True),
StructField("x1", IntegerType(), True),
StructField("x2", DoubleType(), True)
])
dataset = spark.read\
.format(file_type)\
.option("header", "true")\
.schema(schema)\
.load(file_location)
## You can avoid defining a schema by having spark infer it from your data
## This doesn't always work and can be slow
#.option("inferSchema", "true")
## Fill in na's, if needed
# dataset = dataset.na.fill(0)
display(dataset)
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round
from pyspark.ml.regression import GeneralizedLinearRegressionModel
mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
## Transform new data using the pipeline
mydataset = mypipeline.transform(dataset)
## Score new data using a trained model
scoreddataset = mymodel.bestModel.transform(mydataset)
output = scoreddataset.select(col("id"),
col("ReportingDate"),
col("prediction").alias("MyForecast"))
display(output)
fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)
Note: Extract probability values using method found here - https://www.sparkitecture.io/machine-learning/model-saving-and-loading#remove-unnecessary-columns-from-the-scored-data.
performance_df = spark.createDataFrame([(0,0,0)], ['cutoff', 'AUPR', 'AUC'])
for cutoff in range(5, 95, 5):
cutoff = (cutoff * 0.01)
print('Testing cutoff = ', str(format(cutoff, '.2f')))
lrpredictions_prob_temp = lrpredictions.withColumn('prediction_test', when(col('probability') >= cutoff, 1).otherwise(0).cast(DoubleType()))
aupr_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderPR
auc_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderROC
print('\tAUPR:', aupr_temp,'\tAUC:', auc_temp)
performance_df_row = spark.createDataFrame([(cutoff,aupr_temp,auc_temp)], ['cutoff', 'AUPR', 'AUC'])
performance_df = performance_df.union(performance_df_row)
display(performance_df)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
# Evaluate best model
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
lrmetrics = MulticlassMetrics(lrpredictions['label','prediction'].rdd)
print('Confusion Matrix:\n', lrmetrics.confusionMatrix())
print('F1 Score:', lrmetrics.fMeasure(1.0,1.0))
for model in ["lrpredictions", "dtpredictions", "rfpredictions", "nbpredictions", "gbpredictions"]:
df = globals()[model]
tp = df[(df.label == 1) & (df.prediction == 1)].count()
tn = df[(df.label == 0) & (df.prediction == 0)].count()
fp = df[(df.label == 0) & (df.prediction == 1)].count()
fn = df[(df.label == 1) & (df.prediction == 0)].count()
a = ((tp + tn)/df.count())
if(tp + fn == 0.0):
r = 0.0
p = float(tp) / (tp + fp)
elif(tp + fp == 0.0):
r = float(tp) / (tp + fn)
p = 0.0
else:
r = float(tp) / (tp + fn)
p = float(tp) / (tp + fp)
if(p + r == 0):
f1 = 0
else:
f1 = 2 * ((p * r)/(p + r))
print("Model:", model)
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("Total:", df.count())
print("Accuracy:", a)
print("Recall:", r)
print("Precision: ", p)
print("F1 score:", f1)
print('AUC:', BinaryClassificationMetrics(df['label','prediction'].rdd).areaUnderROC)
print("\n")
## Based on: https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
import pandas as pd
def ExtractFeatureImportance(featureImp, dataset, featuresCol):
list_extract = []
for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
varlist = pd.DataFrame(list_extract)
varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
return(varlist.sort_values('score', ascending = False))
# ExtractFeatureImportance(model.stages[-1].featureImportances, dataset, "features")
dataset_fi = ExtractFeatureImportance(model.bestModel.featureImportances, dataset, "features")
dataset_fi = sqlContext.createDataFrame(dataset_fi)
display(dataset_fi)
## Based on: https://stackoverflow.com/questions/42935914/how-to-map-features-from-the-output-of-a-vectorassembler-back-to-the-column-name
lrm = model.stages[-1]
## Transform the data:
transformed = model.transform(df)
#Extract and flatten ML attributes:
from itertools import chain
attrs = sorted(
(attr["idx"], attr["name"]) for attr in (chain(*transformed
.schema[lrm.summary.featuresCol]
.metadata["ml_attr"]["attrs"].values())))
# and map to the output:
[(name, lrm.summary.pValues[idx]) for idx, name in attrs]
# [(name, lrm.coefficients[idx]) for idx, name in attrs]
import pandas as pd
featurelist = pd.DataFrame(dataset.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+dataset.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")
featurelist["Coefficient"] = pd.DataFrame(model.bestModel.coefficients.toArray())
featurelist = sqlContext.createDataFrame(featurelist)
display(featurelist)
from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
for model in ["lrpredictions", "nbpredictions", "rfpredictions"]:
df = globals()[model]
########################################
# Compute raw scores on the test set
predictionAndLabels = df.select("prediction", "label").rdd
# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)
# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats for: ", model)
#print(metrics.confusionMatrix())
print("Accuracy = %s" % evaluator.evaluate(df))
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
# Weighted stats
#print("Weighted recall = %s" % metrics.weightedRecall)
#print("Weighted precision = %s" % metrics.weightedPrecision)
#print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
#print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
#print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
print("\n")
Using Azure Databricks with Azure Data Factory, notebooks can be run from an end-to-end pipeline that contains the Validation, Copy data, and Notebook activities in Azure Data Factory.
Validation ensures that your source dataset is ready for downstream consumption before you trigger the copy and analytics job.
Copy data duplicates the source dataset to the sink storage, which is mounted as DBFS in the Azure Databricks notebook. In this way, the dataset can be directly consumed by Spark.
Notebook triggers the Databricks notebook that transforms the dataset. It also adds the dataset to a processed folder or Azure SQL Data Warehouse.
To import a Transformation notebook to your Databricks workspace:
Sign in to your Azure Databricks workspace, and then select Import. Your workspace path can be different from the one shown, but remember it for later.
Select Import from: URL. In the text box, enter https://adflabstaging1.blob.core.windows.net/share/Transformations.html
.
Now let's update the Transformation notebook with your storage connection information.
In the imported notebook, go to command 5 as shown in the following code snippet.
Replace with your own storage connection information.
Use the storage account with the sinkdata
container.
Generate a Databricks access token for Data Factory to access Databricks.
In your Databricks workspace, select your user profile icon in the upper right.
Select User Settings.
Select Generate New Token under the Access Tokens tab.
Select Generate.
Save the access token for later use in creating a Databricks linked service. The access token looks something like dapi32db32cbb4w6eee18b7d87e45exxxxxx
.
Go to the Transformation with Azure Databricks template and create new linked services for following connections.
Source Blob Connection - to access the source data.
For this exercise, you can use the public blob storage that contains the source files. Reference the following screenshot for the configuration. Use the following SAS URL to connect to source storage (read-only access):
https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D
Destination Blob Connection - to store the copied data.
In the New linked service window, select your sink storage blob.
Azure Databricks - to connect to the Databricks cluster.
Create a Databricks-linked service by using the access key that you generated previously. You can opt to select an interactive cluster if you have one. This example uses the New job cluster option.
Select Use this template. You'll see a pipeline created.
In the new pipeline, most settings are configured automatically with default values. Review the configurations of your pipeline and make any necessary changes.
In the Validation activity Availability flag, verify that the source Dataset value is set to SourceAvailabilityDataset
that you created earlier.
In the Copy data activity file-to-blob, check the Source and Sink tabs. Change settings if necessary.
Source tab
Sink tab
In the Notebook activity Transformation, review and update the paths and settings as needed.
Databricks linked service should be pre-populated with the value from a previous step, as shown:
To check the Notebook settings:
Select the Settings tab. For Notebook path, verify that the default path is correct. You might need to browse and choose the correct notebook path.
Expand the Base Parameters selector and verify that the parameters match what is shown in the following screenshot. These parameters are passed to the Databricks notebook from Data Factory.
Verify that the Pipeline Parameters match what is shown in the following screenshot:
Connect to your datasets.
SourceAvailabilityDataset - to check that the source data is available.
SourceFilesDataset - to access the source data.
DestinationFilesDataset - to copy the data into the sink destination location. Use the following values:
Linked service - sinkBlob_LS
, created in a previous step.
File path - sinkdata/staged_sink
.
Select Debug to run the pipeline. You can find the link to Databricks logs for more detailed Spark logs.
You can also verify the data file by using Azure Storage Explorer.
## Supply storageName and accessKey values
storageName = ""
accessKey = ""
## Attempt to Mount Data Factory Data in Azure Storage
dbutils.fs.mount(
source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",
mount_point = "/mnt/Data Factorydata",
extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey})
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
def clean_text(c):
c = lower(c)
c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
c = regexp_replace(c, "[0-9]", "") # Remove numbers
c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
#c = regexp_replace(c, "(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")
return c
dataset = dataset.withColumn("text", clean_text(col("text")))
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
# Add Stop words
add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# Bag of Words Count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
# String Indexer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
from pyspark.sql import *
from pyspark.sql.functions import col
labelset = dataset.select(col("class"),
col("label")).distinct()
display(labelset)
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import col, split, explode, row_number
# Split text by sentence and convert to array
array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
# Explode array into separate rows in the dataset
split_df = array_df.withColumn("text", explode(col("text")))\
.withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
data = split_df
display(data)
from pyspark.sql.window import *
from pyspark.sql.functions import row_number
data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()
Glow is an open-source and independent Spark library that brings even more flexibility and functionality to Azure Databricks. This toolkit is natively built on Apache Spark, enabling the scale of the cloud for genomics workflows.
Glow allows for genomic data to work with Spark SQL. So, you can interact with common genetic data types as easily as you can play with a .csv file.
Genomic datasources: To read datasets in common file formats such as VCF, BGEN, and Plink into Spark DataFrames.
Genomic functions: Common operations such as computing quality control statistics, running regression tests, and performing simple transformations are provided as Spark functions that can be called from Python, SQL, Scala, or R.
Data preparation building blocks: Glow includes transformations such as variant normalization and lift over to help produce analysis ready datasets.
Integration with existing tools: With Spark, you can write user-defined functions (UDFs) in Python, R, SQL, or Scala. Glow also makes it easy to run DataFrames through command line tools.
Integration with other data types: Genomic data can generate additional insights when joined with data sets such as electronic health records, real world evidence, and medical images. Since Glow returns native Spark SQL DataFrames, its simple to join multiple data sets together.
If you're using Databricks, make sure you enable the . Glow is already included and configured in this runtime.
Using pip, install by simply running pip install glow.py
and then start the with the Glow maven package.
Install the maven package io.project:glow_2.11:${version}
and optionally the Python frontend glow.py
. Set the Spark configuration spark.hadoop.io.compression.codecs
to io.projectglow.sql.util.BGZFCodec
in order to read and write BGZF-compressed files.
./bin/pyspark --packages io.projectglow:glow_2.11:0.5.0
--conf spark.hadoop.io.compression.codecs=io.projectglow.sql.util.BGZFCodec
import glow
glow.register(spark)
vcf_path = "/databricks-datasets/genomics/1kg-vcfs/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz"
df = spark.read.format("vcf")\
.option("includeSampleIds", False)\
.option("flattenInfoFields", False)\
.load(vcf_path)\
.withColumn("first_genotype", expr("genotypes[0]"))
# bgen_path = "/databricks-datasets/genomics/1kg-bgens/1kg_chr22.bgen"
# df = spark.read.format("bgen") \
# .load(bgen_path)
df = df.withColumn("hardyweinberg", expr("hardy_weinberg(genotypes)")) \
.withColumn("summarystats", expr("call_summary_stats(genotypes)")) \
.withColumn("depthstats", expr("dp_summary_stats(genotypes)")) \
.withColumn("genotypequalitystats", expr("gq_summary_stats(genotypes)")) \
.filter(col("qual") >= 98) \
.filter((col("start") >= 16000000) & (col("end") >= 16050000)) \
.where((col("alleleFrequencies").getItem(0) >= allele_freq_cutoff) &
(col("alleleFrequencies").getItem(0) <= (1.0 - allele_freq_cutoff))) \
.withColumn("log10pValueHwe", when(col("pValueHwe") == 0, 26).otherwise(-log10(col("pValueHwe"))))
split_df = glow.transform("split_multiallelics", df)
df.coalesce(1) \
.write \
.mode("overwrite") \
.format("vcf") \
.save("/tmp/vcf_output")
from pyspark.sql.types import *
inputPath = "/mnt/data/jsonfiles/"
# Define your schema if it's known (rather than relying on Spark to infer the schema)
jsonSchema = StructType([StructField("time", TimestampType(), True),
StructField("id", IntegerType(), True),
StructField("value", StringType(), True)])
streamingInputDF = spark.readStream \
.schema(jsonSchema) \
.option("maxFilesPerTrigger", 1) \ # Treat a sequence of files as a stream by picking one file at a time
.json(inputPath)
Databricks Structured Streaming: https://docs.databricks.com/spark/latest/structured-streaming/index.html
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
rf = RandomForestRegressor(labelCol="label", featuresCol="features")
rfparamGrid = (ParamGridBuilder()
#.addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
.addGrid(rf.maxDepth, [2, 5, 10])
#.addGrid(rf.maxBins, [10, 20, 40, 80, 100])
.addGrid(rf.maxBins, [5, 10, 20])
#.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
.addGrid(rf.numTrees, [5, 20, 50])
.build())
rfevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
estimatorParamMaps = rfparamGrid,
evaluator = rfevaluator,
numFolds = 5)
rfcvModel = rfcv.fit(train)
print(rfcvModel)
rfpredictions = rfcvModel.transform(test)
print('RMSE:', rfevaluator.evaluate(rfpredictions))
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
nb = NaiveBayes(labelCol="label", featuresCol="features")
nbparamGrid = (ParamGridBuilder()
.addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
.build())
nbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
# Create 5-fold CrossValidator
nbcv = CrossValidator(estimator = nb,
estimatorParamMaps = nbparamGrid,
evaluator = nbevaluator,
numFolds = 5)
nbcvModel = nbcv.fit(train)
print(nbcvModel)
nbpredictions = nbcvModel.transform(test)
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(labelCol="label", featuresCol="features")
dtparamGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [2, 5, 10, 20, 30])
#.addGrid(dt.maxDepth, [2, 5, 10])
.addGrid(dt.maxBins, [10, 20, 40, 80, 100])
#.addGrid(dt.maxBins, [10, 20])
.build())
dtevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
estimatorParamMaps = dtparamGrid,
evaluator = dtevaluator,
numFolds = 5)
dtcvModel = dtcv.fit(train)
print(dtcvModel)
dtpredictions = dtcvModel.transform(test)
print('RMSE:', dtevaluator.evaluate(dtpredictions))
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round
import sys
import numpy as np
import pandas as pd
import mmlspark
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
import uuid
from mmlspark import request_to_string, string_to_response
## Load in the transformation pipeline
mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
## Load in trained model
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
username = "admin"
ip = "10.0.0.4" #Internal IP
sas_url = "" # SAS Token for your VM's Private Key in Blob
input_schema = StructType([
StructField("id", IntegerType(), True),
StructField("x1", IntegerType(), True),
StructField("x2", DoubleType(), True),
StructField("x3", StringType(), True),
])
serving_inputs = spark.readStream.continuousServer() \
.option("numPartitions", 1) \
.option("name", "http://10.0.0.4:8898/my_api") \
.option("forwarding.enabled", True) \
.option("forwarding.username", username) \
.option("forwarding.sshHost", ip) \
.option("forwarding.keySas", sas_url) \
.address("localhost", 8898, "my_api") \
.load()\
.parseRequest(input_schema)
mydataset = mypipeline.transform(serving_inputs)
serving_outputs = mymodel.bestModel.transform(mydataset) \
.makeReply("prediction")
# display(serving_inputs)
server = serving_outputs.writeStream \
.continuousServer() \
.trigger(continuous="1 second") \
.replyTo("my_api") \
.queryName("my_query") \
.option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1())) \
.start()
import requests
data = u'{"id":0,"x1":1,"x2":2.0,"x3":"3"}'
#r = requests.post(data=data, url="http://localhost:8898/my_api") # Locally
r = requests.post(data=data, url="http://102.208.216.32:8902/my_api") # Via the VM IP
print("Response {}".format(r.text))
You may need to run sudo netstat -tulpn
to see what port is open if you're running inside Databricks.
Use this command to look for the port that was opened by the server.
Microsoft MMLSpark on GitHub: https://github.com/Azure/mmlspark
train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)
column_list = data.columns
prefix = "my_prefix"
new_column_list = [prefix + s for s in column_list]
#new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID later
column_mapping = [[o, n] for o, n in zip(column_list, new_column_list)]
# print(column_mapping)
data = data.select(list(map(lambda old, new: col(old).alias(new),*zip(*column_mapping))))
## Convert `train` DataFrame to NumPy
pdtrain = train.toPandas()
trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)
y_train = pdtrain['label'].values.reshape(-1,1).ravel()
## Convert `test` DataFrame to NumPy
pdtest = test.toPandas()
testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)
y_test = pdtest['label'].values.reshape(-1,1).ravel()
print(y_test)
The cognitive service APIs can only take a limited number of observations at a time (1,000, to be exact) or a limited amount of data in a single call. So, we can create a chunker
function that we will use to split the dataset up into smaller chunks.
## Define Chunking Logic
import pandas as pd
import numpy as np
# Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficiently
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
## sentiment_df_pd = sentiment_df.toPandas()
# pprint is used to format the JSON response
from pprint import pprint
import json
import requests
subscription_key = '<SUBSCRIPTIONKEY>'
endpoint = 'https://<SERVICENAME>.cognitiveservices.azure.com'
sentiment_url = endpoint + "/text/analytics/v2.1/sentiment"
headers = {"Ocp-Apim-Subscription-Key": subscription_key}
from pyspark.sql.types import *
sentiment_schema = StructType([StructField("id", IntegerType(), True),
StructField("score", FloatType(), True)])
sentiments_df = spark.createDataFrame([], sentiment_schema)
display(sentiments_df)
for chunk in chunker(sentiment_df_pd, 1000):
print("Scoring", len(chunk), "rows.")
sentiment_df_json = json.loads('{"documents":' + chunk.to_json(orient='records') + '}')
response = requests.post(sentiment_url, headers = headers, json = sentiment_df_json)
sentiments = response.json()
# pprint(sentiments)
sentiments_pd = pd.read_json(json.dumps(sentiments['documents']))
sentiments_df_chunk = spark.createDataFrame(sentiments_pd)
sentiments_df = sentiments_df.unionAll(sentiments_df_chunk)
display(sentiments_df)
sentiments_df.count()
sentiments_df.coalesce(1).write.csv("/mnt/textanalytics/sentimentanalysis/")
import pandas as pd
def get_nonstring_cols(df):
types = spark.createDataFrame(pd.DataFrame({'Column': df.schema.names, 'Type': [str(f.dataType) for f in df.schema.fields]}))
result = types.filter(col('Type') != 'StringType').select('Column').rdd.flatMap(lambda x: x).collect()
return result
get_nonstring_cols(df)
from pyspark.sql.types import *
from pyspark.sql.functions import col
df = df.withColumn('col1', col('col1').cast(IntegerType()))
## Fill in list with your desired column names
cols = ["col1", "col2", "col3"]
i = 1
for col in cols:
if i == 1:
print("schema = StructType([")
print("\tStructField('" + col + "', StringType(), True),")
elif i == len(cols):
print("\tStructField('" + col + "', StringType(), True)])")
else:
print("\tStructField('" + col + "', StringType(), True),")
i += 1
## Once the output has printed, copy and paste into a new cell
## and change column types and nullability
"""
Struct Schema Creator for PySpark
[<Column Name>, <Column Type>, <Column Nullable>]
Types: binary, boolean, byte, date,
double, integer, long, null,
short, string, timestamp, unknown
"""
from pyspark.sql.types import *
## Fill in with your desired column names, types, and nullability
cols = [["col1", "string", False],
["col2", "date", True],
["col3", "integer", True]]
## Loop to build list of StructFields
schema_set = ["schema = StructType(["]
for i, col in enumerate(cols):
colname = col[0]
coltype = col[1].title() + "Type()"
colnull = col[2]
if i == len(cols)-1:
iter_structfield = "StructField('" + colname + "', " + coltype + ", " + str(colnull) + ")])"
else:
iter_structfield = "StructField('" + colname + "', " + coltype + ", " + str(colnull) + "),"
schema_set.append(iter_structfield)
## Convert list to single string
schema_string = ''.join(map(str, schema_set))
## This will execute the generated command string
exec(schema_string)
from pyspark.sql.functions import sequence, to_date, explode, col
date_dim = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2019-12-31'), interval 1 day) as DATE").withColumn("DATE", explode(col("DATE")))
display(date_dim)
Pivot a wide dataset into a longer form. (Similar to the pivot_longer()
function from the tidyr R package or the .wide_to_long
method from pandas.)
## UnpivotDF Function
def UnpivotDF(df, columns, pivotCol, unpivotColName, valueColName):
columnsValue = list(map(lambda x: str("'") + str(x) + str("',") + str(x), columns))
stackCols = ','.join(x for x in columnsValue)
df_unpvt = df.selectExpr(pivotCol, f"stack({str(len(columns))}, {stackCols}) as ({unpivotColName}, {valueColName})")\
.select(pivotCol, unpivotColName, valueColName)
return(df_unpvt)
df_unpvt = UnpivotDF(df = df,
columns = df.columns[1:], ## The columns to transpose into a single, longer column
pivotCol = "ID", ## The column to leave in place (usually an ID)
unpivotColName = "Category", ## The name of the new column
valueColName = "value") ## The name of the column of values