train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)
Rename all columns
column_list = data.columnsprefix ="my_prefix"new_column_list = [prefix + s for s in column_list]#new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID latercolumn_mapping = [[o, n] for o, n inzip(column_list, new_column_list)]# print(column_mapping)data = data.select(list(map(lambdaold, new: col(old).alias(new),*zip(*column_mapping))))
Convert PySpark DataFrame to NumPy array
## Convert `train` DataFrame to NumPy
pdtrain = train.toPandas()
trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)
y_train = pdtrain['label'].values.reshape(-1,1).ravel()
## Convert `test` DataFrame to NumPy
pdtest = test.toPandas()
testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)
y_test = pdtest['label'].values.reshape(-1,1).ravel()
print(y_test)
Call Cognitive Service API using PySpark
Create `chunker` function
The cognitive service APIs can only take a limited number of observations at a time (1,000, to be exact) or a limited amount of data in a single call. So, we can create a chunker function that we will use to split the dataset up into smaller chunks.
## Define Chunking Logicimport pandas as pdimport numpy as np# Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficiently
defchunker(seq,size):return (seq[pos:pos + size]for pos inrange(0, len(seq), size))
Convert Spark DataFrame to Pandas
## sentiment_df_pd = sentiment_df.toPandas()
Set up API requirements
# pprint is used to format the JSON responsefrom pprint import pprintimport jsonimport requestssubscription_key ='<SUBSCRIPTIONKEY>'endpoint ='https://<SERVICENAME>.cognitiveservices.azure.com'sentiment_url = endpoint +"/text/analytics/v2.1/sentiment"headers ={"Ocp-Apim-Subscription-Key": subscription_key}
import pandas as pddefget_nonstring_cols(df): types = spark.createDataFrame(pd.DataFrame({'Column': df.schema.names, 'Type': [str(f.dataType) for f in df.schema.fields]}))
result = types.filter(col('Type') !='StringType').select('Column').rdd.flatMap(lambdax: x).collect()return resultget_nonstring_cols(df)
Change a Column's Type
from pyspark.sql.types import*from pyspark.sql.functions import coldf = df.withColumn('col1', col('col1').cast(IntegerType()))
## Fill in list with your desired column namescols = ["col1","col2","col3"]i =1for col in cols:if i ==1:print("schema = StructType([")print("\tStructField('"+ col +"', StringType(), True),")elif i ==len(cols):print("\tStructField('"+ col +"', StringType(), True)])")else:print("\tStructField('"+ col +"', StringType(), True),") i +=1## Once the output has printed, copy and paste into a new cell## and change column types and nullability
Generate StructType Schema from List (Automatic Execution)
"""Struct Schema Creator for PySpark[<Column Name>, <Column Type>, <Column Nullable>]Types: binary, boolean, byte, date, double, integer, long, null, short, string, timestamp, unknown"""from pyspark.sql.types import*## Fill in with your desired column names, types, and nullabilitycols = [["col1","string",False], ["col2","date",True], ["col3","integer",True]]## Loop to build list of StructFieldsschema_set = ["schema = StructType(["]for i, col inenumerate(cols): colname = col[0] coltype = col[1].title()+"Type()" colnull = col[2]if i ==len(cols)-1: iter_structfield ="StructField('"+ colname +"', "+ coltype +", "+str(colnull)+")])"else: iter_structfield ="StructField('"+ colname +"', "+ coltype +", "+str(colnull)+")," schema_set.append(iter_structfield)## Convert list to single stringschema_string =''.join(map(str, schema_set))## This will execute the generated command stringexec(schema_string)
Make a DataFrame of Consecutive Dates
from pyspark.sql.functions import sequence, to_date, explode, coldate_dim = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2019-12-31'), interval 1 day) as DATE").withColumn("DATE", explode(col("DATE")))
display(date_dim)
Unpivot a DataFrame Dynamically (Longer)
Pivot a wide dataset into a longer form. (Similar to the pivot_longer() function from the tidyr R package or the .wide_to_long method from pandas.)
## UnpivotDF FunctiondefUnpivotDF(df,columns,pivotCol,unpivotColName,valueColName): columnsValue =list(map(lambdax: str("'") +str(x) +str("',") +str(x), columns)) stackCols =','.join(x for x in columnsValue) df_unpvt = df.selectExpr(pivotCol, f"stack({str(len(columns))}, {stackCols}) as ({unpivotColName}, {valueColName})")\.select(pivotCol, unpivotColName, valueColName)return(df_unpvt)
df_unpvt =UnpivotDF(df = df, columns = df.columns[1:], ## The columns to transpose into a single, longer column pivotCol ="ID", ## The column to leave in place (usually an ID) unpivotColName ="Category", ## The name of the new column valueColName ="value")## The name of the column of values