Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Storage is a managed service in Azure that provides highly available, secure, durable, scalable, and redundant storage for your data. Azure Storage includes both Blobs, Data Lake Store, and others.
dbutils.fs.mount(
source = "wasbs://<CONTAINERNAME>@<STORAGEACCOUNT>.blob.core.windows.net",
mount_point = "/mnt/<FOLDERNAME>/",
extra_configs = {"fs.azure.account.key.<STORAGEACCOUNT>.blob.core.windows.net":"<KEYGOESHERE>"})display(dbutils.fs.ls("/mnt/<FOLDERNAME>"))configs = {
"fs.azure.account.auth.type": "CustomAccessToken",
"fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}dbutils.fs.mount(
source = "abfss://<CONTAINERNAME>@<STORAGEACCOUNT>.dfs.core.windows.net/",
mount_point = "/mnt/<FOLDERNAME>",
extra_configs = configs)doi = {10.5281/zenodo.3468502},from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
for model in ["lrpredictions", "nbpredictions", "rfpredictions"]:
df = globals()[model]
########################################
# Compute raw scores on the test set
predictionAndLabels = df.select("prediction", "label").rdd
# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)
# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats for: ", model)
#print(metrics.confusionMatrix())
print("Accuracy = %s" % evaluator.evaluate(df))
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
# Weighted stats
#print("Weighted recall = %s" % metrics.weightedRecall)
#print("Weighted precision = %s" % metrics.weightedPrecision)
#print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
#print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
#print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
print("\n")df.coalesce(1) \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.save("file.csv")dwDatabase = "<DATABASENAME>" ## The Azure SQL Data Warehouse database name
dwServer = "<DWNAME>.database.windows.net" ## The Azure SQL Server
dwUser = "<USERNAME>" ## The dedicated loading user login
dwPass = dbutils.secrets.get(scope = "<SECRETNAME>", key = "<KEYNAME>") ## The dediciated loading user login password
dwJdbcPort = "1433"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPasssqlQuery = """
SELECT *, 'AzureSqlDw' AS SourceSystem
FROM dbo.<TABLENAME>
"""pip install mlflowimport mlflow
## Log Parameters and Metrics from your normal MLlib run
with mlflow.start_run():
# Log a parameter (key-value pair)
mlflow.log_param("alpha", 0.1)
# Log a metric; metrics can be updated throughout the run
mlflow.log_metric("AUC", 0.871827)
mlflow.log_metric("F1", 0.726153)
mlflow.log_metric("Precision", 0.213873)from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetricsnb = NaiveBayes(labelCol="label", featuresCol="features")nbparamGrid = (ParamGridBuilder()
.addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
.build())nbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")# Create 5-fold CrossValidator
nbcv = CrossValidator(estimator = nb,
estimatorParamMaps = nbparamGrid,
evaluator = nbevaluator,
numFolds = 5)nbcvModel = nbcv.fit(train)
print(nbcvModel)nbpredictions = nbcvModel.transform(test)print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)dataset = spark.read.format('csv') \
.options(header='true', inferSchema='true', delimiter= ',') \
.load('/mnt/<FOLDERNAME>/<FILENAME>.csv')
## or spark.read.format('csv')...
## Formats: json, parquet, jdbc, orc, libsvm, csv, text, avrofrom pyspark.sql.types import *
schema = StructType([StructField('ID', IntegerType(), True),
StructField('Value', DoubleType(), True),
StructField('Category', StringType(), True),
StructField('Date', DateType(), True)])
dataset = sqlContext.read.format('csv') \
.schema(schema) \
.options(header='true', delimiter= ',') \
.load('/mnt/<FOLDERNAME>/<FILENAME>.csv')data = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", sqlDwUrlSmall) \
.option("tempDir", tempDir) \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", sqlQuery) \
.load() \
.createOrReplaceTempView("<TEMPVIEWNAME>")
#.write.saveAsTable("<TABLENAME>")lrcvModel.save("/mnt/trainedmodels/lr")
rfcvModel.save("/mnt/trainedmodels/rf")
dtcvModel.save("/mnt/trainedmodels/dt")
display(dbutils.fs.ls("/mnt/trainedmodels/"))dbutils.fs.rm("/mnt/trainedmodels/dt", True)from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, round
from pyspark.sql.types import IntegerType, FloatTypepipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
## Fit the pipeline to new data
transformeddataset = pipeline.transform(dataset)model = CrossValidatorModel.load("/mnt/trainedmodels/lr/")
## Score the data using the model
scoreddataset = model.bestModel.transform(transformeddataset)## Function to extract probability from array
getprob = udf(lambda v:float(v[1]),FloatType())
## Select out the necessary columns
output = scoreddataset.select(col("ID"),
col("label"),
col("rawPrediction"),
getprob(col("probability")).alias("probability"),
col("prediction"))## Based on: https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
import pandas as pd
def ExtractFeatureImportance(featureImp, dataset, featuresCol):
list_extract = []
for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
varlist = pd.DataFrame(list_extract)
varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
return(varlist.sort_values('score', ascending = False))
# ExtractFeatureImportance(model.stages[-1].featureImportances, dataset, "features")
dataset_fi = ExtractFeatureImportance(model.bestModel.featureImportances, dataset, "features")
dataset_fi = sqlContext.createDataFrame(dataset_fi)
display(dataset_fi)from pyspark.sql.types import *
inputPath = "/mnt/data/jsonfiles/"
# Define your schema if it's known (rather than relying on Spark to infer the schema)
jsonSchema = StructType([StructField("time", TimestampType(), True),
StructField("id", IntegerType(), True),
StructField("value", StringType(), True)])
streamingInputDF = spark.readStream \
.schema(jsonSchema) \
.option("maxFilesPerTrigger", 1) \ # Treat a sequence of files as a stream by picking one file at a time
.json(inputPath)## Based on: https://stackoverflow.com/questions/42935914/how-to-map-features-from-the-output-of-a-vectorassembler-back-to-the-column-name
lrm = model.stages[-1]
## Transform the data:
transformed = model.transform(df)
#Extract and flatten ML attributes:
from itertools import chain
attrs = sorted(
(attr["idx"], attr["name"]) for attr in (chain(*transformed
.schema[lrm.summary.featuresCol]
.metadata["ml_attr"]["attrs"].values())))
# and map to the output:
[(name, lrm.summary.pValues[idx]) for idx, name in attrs]
# [(name, lrm.coefficients[idx]) for idx, name in attrs]import pandas as pd
featurelist = pd.DataFrame(dataset.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+dataset.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")
featurelist["Coefficient"] = pd.DataFrame(model.bestModel.coefficients.toArray())
featurelist = sqlContext.createDataFrame(featurelist)
display(featurelist)from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluatorrf = RandomForestRegressor(labelCol="label", featuresCol="features")rfparamGrid = (ParamGridBuilder()
#.addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
.addGrid(rf.maxDepth, [2, 5, 10])
#.addGrid(rf.maxBins, [10, 20, 40, 80, 100])
.addGrid(rf.maxBins, [5, 10, 20])
#.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
.addGrid(rf.numTrees, [5, 20, 50])
.build())rfevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
estimatorParamMaps = rfparamGrid,
evaluator = rfevaluator,
numFolds = 5)rfcvModel = rfcv.fit(train)
print(rfcvModel)rfpredictions = rfcvModel.transform(test)print('RMSE:', rfevaluator.evaluate(rfpredictions))from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluatordt = DecisionTreeRegressor(labelCol="label", featuresCol="features")dtparamGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [2, 5, 10, 20, 30])
#.addGrid(dt.maxDepth, [2, 5, 10])
.addGrid(dt.maxBins, [10, 20, 40, 80, 100])
#.addGrid(dt.maxBins, [10, 20])
.build())dtevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
estimatorParamMaps = dtparamGrid,
evaluator = dtevaluator,
numFolds = 5)dtcvModel = dtcv.fit(train)
print(dtcvModel)dtpredictions = dtcvModel.transform(test)print('RMSE:', dtevaluator.evaluate(dtpredictions))from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluatorgb = GBTClassifier(labelCol="label", featuresCol="features")gbparamGrid = (ParamGridBuilder()
.addGrid(gb.maxDepth, [2, 5, 10])
.addGrid(gb.maxBins, [10, 20, 40])
.addGrid(gb.maxIter, [5, 10, 20])
.build())gbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
estimatorParamMaps = gbparamGrid,
evaluator = gbevaluator,
numFolds = 5)gbcvModel = gbcv.fit(train)
print(gbcvModel)gbpredictions = gbcvModel.transform(test)print('Accuracy:', gbevaluator.evaluate(gbpredictions))
print('AUC:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderPR)from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluatorrf = RandomForestClassifier(labelCol="label", featuresCol="features")rfparamGrid = (ParamGridBuilder()
.addGrid(rf.maxDepth, [2, 5, 10])
.addGrid(rf.maxBins, [5, 10, 20])
.addGrid(rf.numTrees, [5, 20, 50])
.build())rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
estimatorParamMaps = rfparamGrid,
evaluator = rfevaluator,
numFolds = 5)rfcvModel = rfcv.fit(train)
print(rfcvModel)rfpredictions = rfcvModel.transform(test)print('Accuracy:', rfevaluator.evaluate(rfpredictions))
print('AUC:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderPR)from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluatorgb = GBTRegressor(labelCol="label", featuresCol="features")gbparamGrid = (ParamGridBuilder()
.addGrid(gb.maxDepth, [2, 5, 10])
.addGrid(gb.maxBins, [10, 20, 40])
.addGrid(gb.maxIter, [5, 10, 20])
.build())gbevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")# Create 5-fold CrossValidator
gbcv = CrossValidator(estimator = gb,
estimatorParamMaps = gbparamGrid,
evaluator = gbevaluator,
numFolds = 5)gbcvModel = gbcv.fit(train)
print(gbcvModel)gbpredictions = gbcvModel.transform(test)print('RMSE:', gbevaluator.evaluate(gbpredictions))from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluatordt = DecisionTreeClassifier(labelCol="label", featuresCol="features")dtparamGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [2, 5, 10])
.addGrid(dt.maxBins, [10, 20])
.build())dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
estimatorParamMaps = dtparamGrid,
evaluator = dtevaluator,
numFolds = 5)dtcvModel = dtcv.fit(train)
print(dtcvModel)dtpredictions = dtcvModel.transform(test)print('Accuracy:', dtevaluator.evaluate(dtpredictions))
print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderPR)from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetricslr = LogisticRegression(labelCol="label", featuresCol="features")lrparamGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
.addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
.addGrid(lr.maxIter, [1, 5, 10, 20, 50])
.build())lrevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName = "areaUnderROC")# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
estimatorParamMaps = lrparamGrid,
evaluator = lrevaluator,
numFolds = 5)lrcvModel = lrcv.fit(train)
print(lrcvModel)lrpredictions = lrcvModel.transform(test)print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)dbutils.widgets.text("varReportDate", "19000101")
ReportDate = dbutils.widgets.get("varReportDate")
print(ReportDate)from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssemblerlabel = "dependentvar"
categoricalColumns = ["col1",
"col2"]
numericalColumns = ["num1",
"num2"]
#categoricalColumnsclassVec = ["col1classVec",
# "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
storage_account_name = "mystorage"
storage_account_access_key = ""
file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
file_type = "csv"
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
storage_account_access_key)from pyspark.sql.types import *
schema = StructType([
StructField("ReportingDate", DateType(), True),
StructField("id", StringType(), True),
StructField("x1", IntegerType(), True),
StructField("x2", DoubleType(), True)
])dataset = spark.read\
.format(file_type)\
.option("header", "true")\
.schema(schema)\
.load(file_location)
## You can avoid defining a schema by having spark infer it from your data
## This doesn't always work and can be slow
#.option("inferSchema", "true")
## Fill in na's, if needed
# dataset = dataset.na.fill(0)
display(dataset)from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round
from pyspark.ml.regression import GeneralizedLinearRegressionModel
mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")## Transform new data using the pipeline
mydataset = mypipeline.transform(dataset)
## Score new data using a trained model
scoreddataset = mymodel.bestModel.transform(mydataset)
output = scoreddataset.select(col("id"),
col("ReportingDate"),
col("prediction").alias("MyForecast"))
display(output)fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)stages = []for categoricalColumn in categoricalColumns:
print(categoricalColumn)
## Category Indexing with StringIndexer
stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
## Use OneHotEncoder to convert categorical variables into binary SparseVectors
encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
## Add stages
stages += [stringIndexer, encoder]## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = "features")
stages += [assembler]from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = "features",
outputCol = "scaledFeatures",
withStd = True,
withMean = True)
stages += [scaler]prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
dataset = pipelineModel.transform(dataset)
display(dataset)from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
def clean_text(c):
c = lower(c)
c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
c = regexp_replace(c, "[0-9]", "") # Remove numbers
c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
#c = regexp_replace(c, "(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")
return c
dataset = dataset.withColumn("text", clean_text(col("text")))regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")# Add Stop words
add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)# Bag of Words Count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)# String Indexer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)from pyspark.sql import *
from pyspark.sql.functions import col
labelset = dataset.select(col("class"),
col("label")).distinct()
display(labelset)from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import col, split, explode, row_number
# Split text by sentence and convert to array
array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
# Explode array into separate rows in the dataset
split_df = array_df.withColumn("text", explode(col("text")))\
.withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
data = split_df
display(data)from pyspark.sql.window import *
from pyspark.sql.functions import row_number
data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round
import sys
import numpy as np
import pandas as pd
import mmlspark
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
import uuid
from mmlspark import request_to_string, string_to_response## Load in the transformation pipeline
mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
## Load in trained model
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")username = "admin"
ip = "10.0.0.4" #Internal IP
sas_url = "" # SAS Token for your VM's Private Key in Blobinput_schema = StructType([
StructField("id", IntegerType(), True),
StructField("x1", IntegerType(), True),
StructField("x2", DoubleType(), True),
StructField("x3", StringType(), True),
])serving_inputs = spark.readStream.continuousServer() \
.option("numPartitions", 1) \
.option("name", "http://10.0.0.4:8898/my_api") \
.option("forwarding.enabled", True) \
.option("forwarding.username", username) \
.option("forwarding.sshHost", ip) \
.option("forwarding.keySas", sas_url) \
.address("localhost", 8898, "my_api") \
.load()\
.parseRequest(input_schema)
mydataset = mypipeline.transform(serving_inputs)
serving_outputs = mymodel.bestModel.transform(mydataset) \
.makeReply("prediction")
# display(serving_inputs)server = serving_outputs.writeStream \
.continuousServer() \
.trigger(continuous="1 second") \
.replyTo("my_api") \
.queryName("my_query") \
.option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1())) \
.start()import requests
data = u'{"id":0,"x1":1,"x2":2.0,"x3":"3"}'
#r = requests.post(data=data, url="http://localhost:8898/my_api") # Locally
r = requests.post(data=data, url="http://102.208.216.32:8902/my_api") # Via the VM IP
print("Response {}".format(r.text))

train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)column_list = data.columns
prefix = "my_prefix"
new_column_list = [prefix + s for s in column_list]
#new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID later
column_mapping = [[o, n] for o, n in zip(column_list, new_column_list)]
# print(column_mapping)
data = data.select(list(map(lambda old, new: col(old).alias(new),*zip(*column_mapping))))## Convert `train` DataFrame to NumPy
pdtrain = train.toPandas()
trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)
y_train = pdtrain['label'].values.reshape(-1,1).ravel()
## Convert `test` DataFrame to NumPy
pdtest = test.toPandas()
testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)
y_test = pdtest['label'].values.reshape(-1,1).ravel()
print(y_test)## Define Chunking Logic
import pandas as pd
import numpy as np
# Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficiently
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))## sentiment_df_pd = sentiment_df.toPandas()# pprint is used to format the JSON response
from pprint import pprint
import json
import requests
subscription_key = '<SUBSCRIPTIONKEY>'
endpoint = 'https://<SERVICENAME>.cognitiveservices.azure.com'
sentiment_url = endpoint + "/text/analytics/v2.1/sentiment"
headers = {"Ocp-Apim-Subscription-Key": subscription_key}from pyspark.sql.types import *
sentiment_schema = StructType([StructField("id", IntegerType(), True),
StructField("score", FloatType(), True)])
sentiments_df = spark.createDataFrame([], sentiment_schema)
display(sentiments_df)for chunk in chunker(sentiment_df_pd, 1000):
print("Scoring", len(chunk), "rows.")
sentiment_df_json = json.loads('{"documents":' + chunk.to_json(orient='records') + '}')
response = requests.post(sentiment_url, headers = headers, json = sentiment_df_json)
sentiments = response.json()
# pprint(sentiments)
sentiments_pd = pd.read_json(json.dumps(sentiments['documents']))
sentiments_df_chunk = spark.createDataFrame(sentiments_pd)
sentiments_df = sentiments_df.unionAll(sentiments_df_chunk)
display(sentiments_df)
sentiments_df.count()sentiments_df.coalesce(1).write.csv("/mnt/textanalytics/sentimentanalysis/")import pandas as pd
def get_nonstring_cols(df):
types = spark.createDataFrame(pd.DataFrame({'Column': df.schema.names, 'Type': [str(f.dataType) for f in df.schema.fields]}))
result = types.filter(col('Type') != 'StringType').select('Column').rdd.flatMap(lambda x: x).collect()
return result
get_nonstring_cols(df)from pyspark.sql.types import *
from pyspark.sql.functions import col
df = df.withColumn('col1', col('col1').cast(IntegerType()))## Fill in list with your desired column names
cols = ["col1", "col2", "col3"]
i = 1
for col in cols:
if i == 1:
print("schema = StructType([")
print("\tStructField('" + col + "', StringType(), True),")
elif i == len(cols):
print("\tStructField('" + col + "', StringType(), True)])")
else:
print("\tStructField('" + col + "', StringType(), True),")
i += 1
## Once the output has printed, copy and paste into a new cell
## and change column types and nullability"""
Struct Schema Creator for PySpark
[<Column Name>, <Column Type>, <Column Nullable>]
Types: binary, boolean, byte, date,
double, integer, long, null,
short, string, timestamp, unknown
"""
from pyspark.sql.types import *
## Fill in with your desired column names, types, and nullability
cols = [["col1", "string", False],
["col2", "date", True],
["col3", "integer", True]]
## Loop to build list of StructFields
schema_set = ["schema = StructType(["]
for i, col in enumerate(cols):
colname = col[0]
coltype = col[1].title() + "Type()"
colnull = col[2]
if i == len(cols)-1:
iter_structfield = "StructField('" + colname + "', " + coltype + ", " + str(colnull) + ")])"
else:
iter_structfield = "StructField('" + colname + "', " + coltype + ", " + str(colnull) + "),"
schema_set.append(iter_structfield)
## Convert list to single string
schema_string = ''.join(map(str, schema_set))
## This will execute the generated command string
exec(schema_string)from pyspark.sql.functions import sequence, to_date, explode, col
date_dim = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2019-12-31'), interval 1 day) as DATE").withColumn("DATE", explode(col("DATE")))
display(date_dim)## UnpivotDF Function
def UnpivotDF(df, columns, pivotCol, unpivotColName, valueColName):
columnsValue = list(map(lambda x: str("'") + str(x) + str("',") + str(x), columns))
stackCols = ','.join(x for x in columnsValue)
df_unpvt = df.selectExpr(pivotCol, f"stack({str(len(columns))}, {stackCols}) as ({unpivotColName}, {valueColName})")\
.select(pivotCol, unpivotColName, valueColName)
return(df_unpvt)df_unpvt = UnpivotDF(df = df,
columns = df.columns[1:], ## The columns to transpose into a single, longer column
pivotCol = "ID", ## The column to leave in place (usually an ID)
unpivotColName = "Category", ## The name of the new column
valueColName = "value") ## The name of the column of values
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluatorlr = LinearRegression(labelCol="label", featuresCol="features")lrparamGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
# .addGrid(lr.regParam, [0.01, 0.1, 0.5])
.addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
# .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
.addGrid(lr.maxIter, [1, 5, 10, 20, 50])
# .addGrid(lr.maxIter, [1, 5, 10])
.build())lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
estimatorParamMaps = lrparamGrid,
evaluator = lrevaluator,
numFolds = 5)lrcvModel = lrcv.fit(train)
print(lrcvModel)lrcvSummary = lrcvModel.bestModel.summary
print("Coefficient Standard Errors: " + str(lrcvSummary.coefficientStandardErrors))
print("P Values: " + str(lrcvSummary.pValues)) # Last element is the interceptlrpredictions = lrcvModel.transform(test)print('RMSE:', lrevaluator.evaluate(lrpredictions))performance_df = spark.createDataFrame([(0,0,0)], ['cutoff', 'AUPR', 'AUC'])
for cutoff in range(5, 95, 5):
cutoff = (cutoff * 0.01)
print('Testing cutoff = ', str(format(cutoff, '.2f')))
lrpredictions_prob_temp = lrpredictions.withColumn('prediction_test', when(col('probability') >= cutoff, 1).otherwise(0).cast(DoubleType()))
aupr_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderPR
auc_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderROC
print('\tAUPR:', aupr_temp,'\tAUC:', auc_temp)
performance_df_row = spark.createDataFrame([(cutoff,aupr_temp,auc_temp)], ['cutoff', 'AUPR', 'AUC'])
performance_df = performance_df.union(performance_df_row)
display(performance_df)from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
# Evaluate best model
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
lrmetrics = MulticlassMetrics(lrpredictions['label','prediction'].rdd)
print('Confusion Matrix:\n', lrmetrics.confusionMatrix())
print('F1 Score:', lrmetrics.fMeasure(1.0,1.0))https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3Dfor model in ["lrpredictions", "dtpredictions", "rfpredictions", "nbpredictions", "gbpredictions"]:
df = globals()[model]
tp = df[(df.label == 1) & (df.prediction == 1)].count()
tn = df[(df.label == 0) & (df.prediction == 0)].count()
fp = df[(df.label == 0) & (df.prediction == 1)].count()
fn = df[(df.label == 1) & (df.prediction == 0)].count()
a = ((tp + tn)/df.count())
if(tp + fn == 0.0):
r = 0.0
p = float(tp) / (tp + fp)
elif(tp + fp == 0.0):
r = float(tp) / (tp + fn)
p = 0.0
else:
r = float(tp) / (tp + fn)
p = float(tp) / (tp + fp)
if(p + r == 0):
f1 = 0
else:
f1 = 2 * ((p * r)/(p + r))
print("Model:", model)
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("Total:", df.count())
print("Accuracy:", a)
print("Recall:", r)
print("Precision: ", p)
print("F1 score:", f1)
print('AUC:', BinaryClassificationMetrics(df['label','prediction'].rdd).areaUnderROC)
print("\n")## Supply storageName and accessKey values
storageName = ""
accessKey = ""
## Attempt to Mount Data Factory Data in Azure Storage
dbutils.fs.mount(
source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",
mount_point = "/mnt/Data Factorydata",
extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey}) ./bin/pyspark --packages io.projectglow:glow_2.11:0.5.0
--conf spark.hadoop.io.compression.codecs=io.projectglow.sql.util.BGZFCodecimport glow
glow.register(spark)vcf_path = "/databricks-datasets/genomics/1kg-vcfs/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz"
df = spark.read.format("vcf")\
.option("includeSampleIds", False)\
.option("flattenInfoFields", False)\
.load(vcf_path)\
.withColumn("first_genotype", expr("genotypes[0]"))
# bgen_path = "/databricks-datasets/genomics/1kg-bgens/1kg_chr22.bgen"
# df = spark.read.format("bgen") \
# .load(bgen_path)df = df.withColumn("hardyweinberg", expr("hardy_weinberg(genotypes)")) \
.withColumn("summarystats", expr("call_summary_stats(genotypes)")) \
.withColumn("depthstats", expr("dp_summary_stats(genotypes)")) \
.withColumn("genotypequalitystats", expr("gq_summary_stats(genotypes)")) \
.filter(col("qual") >= 98) \
.filter((col("start") >= 16000000) & (col("end") >= 16050000)) \
.where((col("alleleFrequencies").getItem(0) >= allele_freq_cutoff) &
(col("alleleFrequencies").getItem(0) <= (1.0 - allele_freq_cutoff))) \
.withColumn("log10pValueHwe", when(col("pValueHwe") == 0, 26).otherwise(-log10(col("pValueHwe"))))split_df = glow.transform("split_multiallelics", df)df.coalesce(1) \
.write \
.mode("overwrite") \
.format("vcf") \
.save("/tmp/vcf_output")