diff --git a/docs/8.md b/docs/8.md index 826c8fe96e59cd7c0ea4563659a004f5187c344e..0b067907db536379d63deaef12be6d20145ae1c5 100644 --- a/docs/8.md +++ b/docs/8.md @@ -122,7 +122,7 @@ Gradient descent is a first-order iterative optimization algorithm for finding t * I will only present the code with pipeline style in the following. * For more details about the parameters, please visit [Linear Regression API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.LinearRegression.html) . -1. Set up spark context and SparkSession +建立 spark 上下文和 SparkSession ``` from pyspark.sql import SparkSession @@ -135,7 +135,7 @@ spark = SparkSession \ ``` -1. Load dataset +加载数据集 ``` df = spark.read.format('com.databricks.spark.csv').\ @@ -145,7 +145,7 @@ df = spark.read.format('com.databricks.spark.csv').\ ``` -check the data set +检查数据集 ``` df.show(5,True) @@ -153,7 +153,7 @@ df.printSchema() ``` -Then you will get +之后我们会得到 ``` +-----+-----+---------+-----+ @@ -175,14 +175,14 @@ root ``` -You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical). +您还可以从数据帧中获取统计结果(不幸的是,它仅适用于数字)。 ``` df.describe().show() ``` -Then you will get +之后你会得到 ``` +-------+-----------------+------------------+------------------+------------------+ @@ -199,9 +199,9 @@ Then you will get ![https://runawayhorse001.github.io/LearningApacheSpark/_images/ad.png](img/ad37847dfd8d9f3d99f646966f32cf30.jpg) -Sales distribution +销售分布 -1. Convert the data to dense vector (**features** and **label**) +将数据转换为密集向量(**特征**和**标签**) ``` from pyspark.sql import Row @@ -222,77 +222,77 @@ return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','l ``` -Note +> 注意 +> +> 强烈建议您尝试使用我的`get_dummy`函数来处理数据集中的分类数据。 -You are strongly encouraged to try my `get_dummy` function for dealing with the categorical data in comple dataset. +监督学习版本: -Supervised learning version: +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> -> from pyspark.ml import Pipeline -> from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler -> from pyspark.sql.functions import col -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> data = data.withColumn('label',col(labelCol)) -> -> return data.select(indexCol,'features','label') -> -> ``` - -Unsupervised learning version: - -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols): -> ''' -> Get dummy variables and concat with continuous variables for unsupervised learning. -> :param df: the dataframe -> :param categoricalCols: the name list of the categorical data -> :param continuousCols: the name list of the numerical data -> :return k: feature matrix -> -> :author: Wenqiang Feng -> :email: von198@gmail.com -> ''' -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> return data.select(indexCol,'features') -> -> ``` + from pyspark.ml import Pipeline + from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler + from pyspark.sql.functions import col + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + data = data.withColumn('label',col(labelCol)) + + return data.select(indexCol,'features','label') + +``` -1. Transform the dataset to DataFrame +无监督学习版本: + +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols): + ''' + Get dummy variables and concat with continuous variables for unsupervised learning. + :param df: the dataframe + :param categoricalCols: the name list of the categorical data + :param continuousCols: the name list of the numerical data + :return k: feature matrix + + :author: Wenqiang Feng + :email: von198@gmail.com + ''' + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + return data.select(indexCol,'features') + +``` + +将数据集转换为数据帧 ``` transformed= transData(df) @@ -314,11 +314,11 @@ only showing top 5 rows ``` -Note - -You will find out that all of the supervised machine learning algorithms in Spark are based on the **features** and **label** (unsupervised machine learning algorithms in Spark are based on the **features**). That is to say, you can play with all of the machine learning algorithms in Spark when you get ready the **features** and **label** in pipeline architecture. +> 注意 +> +> 您会发现 Spark 中所有监督机器学习算法都基于**特征**和**标签**(Spark 中的无监督机器学习算法基于**特征**)。 也就是说,当您在管道架构中准备好**特征**和**标签**时,您可以使用 Spark 中的所有机器学习算法。 -1. Deal With Categorical Variables +处理类别变量 ``` from pyspark.ml import Pipeline @@ -337,14 +337,14 @@ data = featureIndexer.transform(transformed) ``` -Now you check your dataset with +现在你可以这样检查数据集 ``` data.show(5,True) ``` -you will get +你会得到 ``` +-----------------+-----+-----------------+ @@ -360,7 +360,7 @@ only showing top 5 rows ``` -1. Split the data into training and test sets (40% held out for testing) +将数据分割为训练和测试集(留出 40% 用于测试) ``` # Split the data into training and test sets (40% held out for testing) @@ -368,7 +368,7 @@ only showing top 5 rows ``` -You can check your train and test data as follows (In my opinion, it is always to good to keep tracking your data during prototype pahse): +您可以按照以下方式检查您的训练和测试数据(在我看来,在原型阶段始终跟踪您的数据总是很好): ``` trainingData.show(5) @@ -376,7 +376,7 @@ testData.show(5) ``` -Then you will get +之后你会得到 ``` +---------------+-----+---------------+ @@ -403,9 +403,9 @@ only showing top 5 rows ``` -1. Fit Ordinary Least Square Regression Model +拟合普通最小二乘回归模型 -For more details about the parameters, please visit [Linear Regression API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.LinearRegression.html) . +参数的更多信息请见[线性回归 API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.LinearRegression.html)。 ``` # Import LinearRegression class @@ -416,7 +416,7 @@ lr = LinearRegression() ``` -1. Pipeline Architecture +流水线架构 ``` # Chain indexer and tree in a Pipeline @@ -426,9 +426,9 @@ model = pipeline.fit(trainingData) ``` -1. Summary of the Model +模型总结 -Spark has a poor summary function for data and model. I wrote a summary function which has similar format as **R** output for the linear regression in PySpark. +Spark 不擅长数据和模型的汇总。 我为 PySpark 中线性回归写了一个汇总函数,其格式与 **R** 的输出类似。 ``` def modelsummary(model): @@ -459,7 +459,7 @@ modelsummary(model.stages[-1]) ``` -You will get the following summary results: +你会得到以下结果: ``` Note: the last rows are the information for Intercept @@ -475,7 +475,7 @@ Note: the last rows are the information for Intercept ``` -1. Make predictions +做出预测 ``` # Make predictions. @@ -503,7 +503,7 @@ only showing top 5 rows ``` -1. Evaluation +评估 ``` from pyspark.ml.evaluation import RegressionEvaluator @@ -517,14 +517,14 @@ print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) ``` -The final Root Mean Squared Error (RMSE) is as follows: +最终的均方根误差(RMSE)如下: ``` Root Mean Squared Error (RMSE) on test data = 1.63114 ``` -You can also check the ![R^2](img/1ac835166928f502b55a31636602602a.jpg) value for the test data: +您还可以检查测试数据的 ![R^2](img/1ac835166928f502b55a31636602602a.jpg) 值: ``` y_true = predictions.select("label").toPandas() @@ -536,29 +536,29 @@ print('r2_score: {0}'.format(r2_score)) ``` -Then you will get +之后你会得到 ``` r2_score: 0.854486655585 ``` -Warning - -You should know most softwares are using different formula to calculate the ![R^2](img/1ac835166928f502b55a31636602602a.jpg) value when no intercept is included in the model. You can get more information from the [disscussion at StackExchange](https://stats.stackexchange.com/questions/26176/removal-of-statistically-significant-intercept-term-increases-r2-in-linear-mo). +> 警告 +> +> 你应该知道,模型不包含截距时,大多数软件使用不同的公式来计算 ![R^2](img/1ac835166928f502b55a31636602602a.jpg) 值。你可以从[ StackExchange 上的讨论](https://stats.stackexchange.com/questions/26176/removal-of-statistically-significant-intercept-term-increases-r2-in-linear-mo)获得更多信息。 -## 8.2\. Generalized linear regression +## 8.2\. 广义线性回归 -### 8.2.1\. Introduction +### 8.2.1\. 介绍 -### 8.2.2\. How to solve it? +### 8.2.2\. 如何求解 -### 8.2.3\. Demo +### 8.2.3\. 示例 -* The Jupyter notebook can be download from [Generalized Linear Regression](_static/GLM.ipynb). -* For more details about the parameters, please visit [Generalized Linear Regression API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.GeneralizedLinearRegression.html) . +* Jupyter 笔记本可以从[广义线性回归](_static/GLM.ipynb)下载。 +* 参数的更多信息请见[广义线性回归 API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.GeneralizedLinearRegression.html)。 -1. Set up spark context and SparkSession +建立 spark 上下文和 SparkSession ``` from pyspark.sql import SparkSession @@ -571,7 +571,7 @@ spark = SparkSession \ ``` -1. Load dataset +加载数据集 ``` df = spark.read.format('com.databricks.spark.csv').\ @@ -581,7 +581,7 @@ df = spark.read.format('com.databricks.spark.csv').\ ``` -check the data set +查看数据集 ``` df.show(5,True) @@ -589,7 +589,8 @@ df.printSchema() ``` -Then you will get +之后你会得到 + ``` +-----+-----+---------+-----+ @@ -611,14 +612,15 @@ root ``` -You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical). +您还可以从数据帧中获取统计结果(不幸的是,它仅适用于数字)。 ``` df.describe().show() ``` -Then you will get +之后你会得到 + ``` +-------+-----------------+------------------+------------------+------------------+ @@ -633,77 +635,77 @@ Then you will get ``` -1. Convert the data to dense vector (**features** and **label**) +将数据转换为密集向量(**特征**和**标签**) -Note +> 注意 -You are strongly encouraged to try my `get_dummy` function for dealing with the categorical data in comple dataset. +强烈建议您尝试使用我的`get_dummy`函数来处理数据集中的分类数据。 -Supervised learning version: +监督学习版本: -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> -> from pyspark.ml import Pipeline -> from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler -> from pyspark.sql.functions import col -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> data = data.withColumn('label',col(labelCol)) -> -> return data.select(indexCol,'features','label') -> -> ``` - -Unsupervised learning version: - -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols): -> ''' -> Get dummy variables and concat with continuous variables for unsupervised learning. -> :param df: the dataframe -> :param categoricalCols: the name list of the categorical data -> :param continuousCols: the name list of the numerical data -> :return k: feature matrix -> -> :author: Wenqiang Feng -> :email: von198@gmail.com -> ''' -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> return data.select(indexCol,'features') -> -> ``` +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): + + from pyspark.ml import Pipeline + from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler + from pyspark.sql.functions import col + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + data = data.withColumn('label',col(labelCol)) + + return data.select(indexCol,'features','label') + +``` + +无监督学习版本: + +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols): + ''' + Get dummy variables and concat with continuous variables for unsupervised learning. + :param df: the dataframe + :param categoricalCols: the name list of the categorical data + :param continuousCols: the name list of the numerical data + :return k: feature matrix + + :author: Wenqiang Feng + :email: von198@gmail.com + ''' + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + return data.select(indexCol,'features') + +``` ``` from pyspark.sql import Row @@ -744,11 +746,11 @@ only showing top 5 rows ``` -Note - -You will find out that all of the machine learning algorithms in Spark are based on the **features** and **label**. That is to say, you can play with all of the machine learning algorithms in Spark when you get ready the **features** and **label**. +> 注意 +> +> 您会发现 Spark 中所有监督机器学习算法都基于**特征**和**标签**(Spark 中的无监督机器学习算法基于**特征**)。 也就是说,当您在管道架构中准备好**特征**和**标签**时,您可以使用 Spark 中的所有机器学习算法。 -1. Convert the data to dense vector +将数据转换为密集向量 ``` # convert the data to dense vector @@ -764,7 +766,7 @@ data.show() ``` -1. Deal with the Categorical variables +处理类别变量 ``` from pyspark.ml import Pipeline @@ -800,7 +802,7 @@ only showing top 5 rows ``` -1. Split the data into training and test sets (40% held out for testing) +将数据分割为训练和测试集(留出 40% 用于测试) ``` # Split the data into training and test sets (40% held out for testing) @@ -808,7 +810,7 @@ only showing top 5 rows ``` -You can check your train and test data as follows (In my opinion, it is always to good to keep tracking your data during prototype pahse): +您可以按照以下方式检查您的训练和测试数据(在我看来,在原型阶段始终跟踪您的数据总是很好): ``` trainingData.show(5) @@ -816,7 +818,8 @@ testData.show(5) ``` -Then you will get +之后你会得到 + ``` +----------------+-----+----------------+ @@ -843,7 +846,7 @@ only showing top 5 rows ``` -1. Fit Generalized Linear Regression Model +拟合广义线性回归模型 ``` # Import LinearRegression class @@ -855,7 +858,7 @@ glr = GeneralizedLinearRegression(family="gaussian", link="identity",\ ``` -1. Pipeline Architecture +流水线架构 ``` # Chain indexer and tree in a Pipeline @@ -865,9 +868,9 @@ model = pipeline.fit(trainingData) ``` -1. Summary of the Model +模型总结 -Spark has a poor summary function for data and model. I wrote a summary function which has similar format as **R** output for the linear regression in PySpark. +Spark 不擅长数据和模型的汇总。 我为 PySpark 中线性回归写了一个汇总函数,其格式与 **R** 输出类似。 ``` def modelsummary(model): @@ -898,7 +901,7 @@ modelsummary(model.stages[-1]) ``` -You will get the following summary results: +你会得到以下结果: ``` Note: the last rows are the information for Intercept @@ -912,7 +915,7 @@ Note: the last rows are the information for Intercept ``` -1. Make predictions +做出预测 ``` # Make predictions. @@ -940,7 +943,7 @@ only showing top 5 rows ``` -1. Evaluation +评估 ``` from pyspark.ml.evaluation import RegressionEvaluator @@ -955,7 +958,7 @@ print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) ``` -The final Root Mean Squared Error (RMSE) is as follows: +最终的均方根误差(RMSE)如下: ``` Root Mean Squared Error (RMSE) on test data = 1.89857 @@ -972,25 +975,25 @@ print('r2_score: {0}'.format(r2_score)) ``` -Then you will get the ![R^2](img/1ac835166928f502b55a31636602602a.jpg) value: +之后你会得到 ![R^2](img/1ac835166928f502b55a31636602602a.jpg) 值: ``` r2_score: 0.87707391843 ``` -## 8.3\. Decision tree Regression +## 8.3\. 决策树回归 -### 8.3.1\. Introduction +### 8.3.1\. 介绍 -### 8.3.2\. How to solve it? +### 8.3.2\. 如何求解 -### 8.3.3\. Demo +### 8.3.3\. 示例 -* The Jupyter notebook can be download from [Decision Tree Regression](_static/DecisionTreeR.ipynb). -* For more details about the parameters, please visit [Decision Tree Regressor API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.DecisionTreeRegressor.html) . +* Jupyter 笔记本可以从[决策树回归](_static/DecisionTreeR.ipynb)下载。 +* 参数的更多信息请见[决策树回归 API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.DecisionTreeRegressor.html)。 -1. Set up spark context and SparkSession +建立 spark 上下文和 SparkSession ``` from pyspark.sql import SparkSession @@ -1003,7 +1006,7 @@ spark = SparkSession \ ``` -1. Load dataset +加载数据集 ``` df = spark.read.format('com.databricks.spark.csv').\ @@ -1013,7 +1016,7 @@ df = spark.read.format('com.databricks.spark.csv').\ ``` -check the data set +检查数据集 ``` df.show(5,True) @@ -1021,7 +1024,8 @@ df.printSchema() ``` -Then you will get +之后你会得到 + ``` +-----+-----+---------+-----+ @@ -1043,14 +1047,15 @@ root ``` -You can also get the Statistical resutls from the data frame (Unfortunately, it only works for numerical). +您还可以从数据帧中获取统计结果(不幸的是,它仅适用于数字)。 ``` df.describe().show() ``` -Then you will get +之后你会得到 + ``` +-------+-----------------+------------------+------------------+------------------+ @@ -1065,77 +1070,77 @@ Then you will get ``` -1. Convert the data to dense vector (**features** and **label**) +将数据转换为密集向量(**特征**和**标签**) -Note +> 注意 +> +> 强烈建议您尝试使用我的`get_dummy`函数来处理数据集中的分类数据。 -You are strongly encouraged to try my `get_dummy` function for dealing with the categorical data in comple dataset. +监督学习版本: -Supervised learning version: +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> -> from pyspark.ml import Pipeline -> from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler -> from pyspark.sql.functions import col -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> data = data.withColumn('label',col(labelCol)) -> -> return data.select(indexCol,'features','label') -> -> ``` - -Unsupervised learning version: - -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols): -> ''' -> Get dummy variables and concat with continuous variables for unsupervised learning. -> :param df: the dataframe -> :param categoricalCols: the name list of the categorical data -> :param continuousCols: the name list of the numerical data -> :return k: feature matrix -> -> :author: Wenqiang Feng -> :email: von198@gmail.com -> ''' -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> return data.select(indexCol,'features') -> -> ``` + from pyspark.ml import Pipeline + from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler + from pyspark.sql.functions import col + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + data = data.withColumn('label',col(labelCol)) + + return data.select(indexCol,'features','label') + +``` + +无监督学习版本: + +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols): + ''' + Get dummy variables and concat with continuous variables for unsupervised learning. + :param df: the dataframe + :param categoricalCols: the name list of the categorical data + :param continuousCols: the name list of the numerical data + :return k: feature matrix + + :author: Wenqiang Feng + :email: von198@gmail.com + ''' + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + return data.select(indexCol,'features') + +``` ``` from pyspark.sql import Row @@ -1176,11 +1181,11 @@ only showing top 5 rows ``` -Note - -You will find out that all of the machine learning algorithms in Spark are based on the **features** and **label**. That is to say, you can play with all of the machine learning algorithms in Spark when you get ready the **features** and **label**. +> 注意 +> +> 您会发现 Spark 中所有监督机器学习算法都基于**特征**和**标签**(Spark 中的无监督机器学习算法基于**特征**)。 也就是说,当您在管道架构中准备好**特征**和**标签**时,您可以使用 Spark 中的所有机器学习算法。 -1. Convert the data to dense vector +将数据转换为密集向量 ``` # convert the data to dense vector @@ -1193,7 +1198,7 @@ transformed.show(5) ``` -1. Deal with the Categorical variables +处理类别变量 ``` from pyspark.ml import Pipeline @@ -1229,7 +1234,7 @@ only showing top 5 rows ``` -1. Split the data into training and test sets (40% held out for testing) +将数据分割为训练和测试集(留出 40% 用于测试) ``` # Split the data into training and test sets (40% held out for testing) @@ -1237,7 +1242,7 @@ only showing top 5 rows ``` -You can check your train and test data as follows (In my opinion, it is always to good to keep tracking your data during prototype pahse): +您可以按照以下方式检查您的训练和测试数据(在我看来,在原型阶段始终跟踪您的数据总是很好): ``` trainingData.show(5) @@ -1245,7 +1250,8 @@ testData.show(5) ``` -Then you will get +之后你会得到 + ``` +---------------+-----+---------------+ @@ -1272,7 +1278,7 @@ only showing top 5 rows ``` -1. Fit Decision Tree Regression Model +拟合决策树回归模型 ``` from pyspark.ml.regression import DecisionTreeRegressor @@ -1282,7 +1288,7 @@ dt = DecisionTreeRegressor(featuresCol="indexedFeatures") ``` -1. Pipeline Architecture +流水线架构 ``` # Chain indexer and tree in a Pipeline @@ -1292,7 +1298,7 @@ model = pipeline.fit(trainingData) ``` -1. Make predictions +做出预测 ``` # Make predictions. @@ -1320,7 +1326,7 @@ only showing top 5 rows ``` -1. Evaluation +评估 ``` from pyspark.ml.evaluation import RegressionEvaluator @@ -1335,7 +1341,7 @@ print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) ``` -The final Root Mean Squared Error (RMSE) is as follows: +最终的均方根误差(RMSE)如下: ``` Root Mean Squared Error (RMSE) on test data = 1.50999 @@ -1352,39 +1358,39 @@ print('r2_score: {0}'.format(r2_score)) ``` -Then you will get the ![R^2](img/1ac835166928f502b55a31636602602a.jpg) value: +之后你会得到 ![R^2](img/1ac835166928f502b55a31636602602a.jpg) 值: ``` r2_score: 0.911024318967 ``` -You may also check the importance of the features: +你可以检查特征上的重要性 ``` model.stages[1].featureImportances ``` -The you will get the weight for each features +您将获得每个特征的权重 ``` SparseVector(3, {0: 0.6811, 1: 0.3187, 2: 0.0002}) ``` -## 8.4\. Random Forest Regression +## 8.4\. 随机森林回归 -### 8.4.1\. Introduction +### 8.4.1\. 简介 -### 8.4.2\. How to solve it? +### 8.4.2\. 如何求解 -### 8.4.3\. Demo +### 8.4.3\. 示例 -* The Jupyter notebook can be download from [Random Forest Regression](_static/RandomForestR.ipynb). -* For more details about the parameters, please visit [Random Forest Regressor API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.RandomForestRegressor.html) . +* Jupyter 笔记本可以从[随机森林回归](_static/RandomForestR.ipynb)下载。 +* 参数的更多信息请见[随机森林回归 API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.RandomForestRegressor.html)。 -1. Set up spark context and SparkSession +建立 spark 上下文和 SparkSession ``` from pyspark.sql import SparkSession @@ -1397,7 +1403,7 @@ spark = SparkSession \ ``` -1. Load dataset +加载数据集 ``` df = spark.read.format('com.databricks.spark.csv').\ @@ -1445,77 +1451,77 @@ df.describe().show() ``` -1. Convert the data to dense vector (**features** and **label**) +将数据转换为密集向量(**特征**和**标签**) -Note +> 注意 +> +> 强烈建议您尝试使用我的`get_dummy`函数来处理数据集中的分类数据。 -You are strongly encouraged to try my `get_dummy` function for dealing with the categorical data in comple dataset. +监督学习版本: -Supervised learning version: +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> -> from pyspark.ml import Pipeline -> from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler -> from pyspark.sql.functions import col -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> data = data.withColumn('label',col(labelCol)) -> -> return data.select(indexCol,'features','label') -> -> ``` - -Unsupervised learning version: - -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols): -> ''' -> Get dummy variables and concat with continuous variables for unsupervised learning. -> :param df: the dataframe -> :param categoricalCols: the name list of the categorical data -> :param continuousCols: the name list of the numerical data -> :return k: feature matrix -> -> :author: Wenqiang Feng -> :email: von198@gmail.com -> ''' -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> return data.select(indexCol,'features') -> -> ``` + from pyspark.ml import Pipeline + from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler + from pyspark.sql.functions import col + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + data = data.withColumn('label',col(labelCol)) + + return data.select(indexCol,'features','label') + +``` + +无监督学习版本: + +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols): + ''' + Get dummy variables and concat with continuous variables for unsupervised learning. + :param df: the dataframe + :param categoricalCols: the name list of the categorical data + :param continuousCols: the name list of the numerical data + :return k: feature matrix + + :author: Wenqiang Feng + :email: von198@gmail.com + ''' + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + return data.select(indexCol,'features') + +``` ``` from pyspark.sql import Row @@ -1532,7 +1538,7 @@ def transData(data): ``` -1. Convert the data to dense vector +将数据转换为密集向量 ``` transformed= transData(df) @@ -1554,7 +1560,7 @@ only showing top 5 rows ``` -1. Deal with the Categorical variables +处理类别变量 ``` from pyspark.ml import Pipeline @@ -1585,7 +1591,7 @@ only showing top 5 rows ``` -1. Split the data into training and test sets (40% held out for testing) +将数据分割为训练和测试集(留出 40% 用于测试) ``` # Split the data into training and test sets (40% held out for testing) @@ -1621,7 +1627,7 @@ only showing top 5 rows ``` -1. Fit RandomForest Regression Model +拟合随机森林回归模型 ``` # Import LinearRegression class @@ -1632,11 +1638,11 @@ rf = RandomForestRegressor() # featuresCol="indexedFeatures",numTrees=2, maxDept ``` -Note - -If you decide to use the `indexedFeatures` features, you need to add the parameter `featuresCol="indexedFeatures"`. +> 注意 +> +> 如果你决定使用`indexedFeatures`特征,你需要添加参数`featuresCol="indexedFeatures"`。 -1. Pipeline Architecture +流水线架构 ``` # Chain indexer and tree in a Pipeline @@ -1645,7 +1651,7 @@ model = pipeline.fit(trainingData) ``` -1. Make predictions +做出预测 ``` predictions = model.transform(testData) @@ -1669,7 +1675,7 @@ only showing top 5 rows ``` -1. Evaluation +评估 ``` # Select (prediction, true label) and compute test error @@ -1697,7 +1703,7 @@ r2_score: 0.831 ``` -1. Feature importances +特征重要性 ``` model.stages[-1].featureImportances @@ -1738,18 +1744,18 @@ model.stages[-1].trees ``` -## 8.5\. Gradient-boosted tree regression +## 8.5\. 梯度提升树回归 -### 8.5.1\. Introduction +### 8.5.1\. 简介 -### 8.5.2\. How to solve it? +### 8.5.2\. 如何求解 -### 8.5.3\. Demo +### 8.5.3\. 示例 -* The Jupyter notebook can be download from [Gradient-boosted tree regression](_static/GLM.ipynb). -* For more details about the parameters, please visit [Gradient boosted tree API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.GBTRegressor.html) . +* Jupyter 笔记本可以从[梯度提升树回归](_static/GLM.ipynb)。 +* 参数的更多信息请见[梯度提升树回归 API](http://takwatanabe.me/pyspark/generated/generated/ml.regression.GBTRegressor.html)。 -1. Set up spark context and SparkSession +建立 spark 上下文和 SparkSession ``` from pyspark.sql import SparkSession @@ -1762,7 +1768,7 @@ spark = SparkSession \ ``` -1. Load dataset +加载数据集 ``` df = spark.read.format('com.databricks.spark.csv').\ @@ -1810,77 +1816,77 @@ df.describe().show() ``` -1. Convert the data to dense vector (**features** and **label**) +将数据转换为密集向量(**特征**和**标签**) -Note +> 注意 +> +> 强烈建议您尝试使用我的`get_dummy`函数来处理数据集中的分类数据。 -You are strongly encouraged to try my `get_dummy` function for dealing with the categorical data in comple dataset. +监督学习版本: -Supervised learning version: +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol): -> -> from pyspark.ml import Pipeline -> from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler -> from pyspark.sql.functions import col -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> data = data.withColumn('label',col(labelCol)) -> -> return data.select(indexCol,'features','label') -> -> ``` - -Unsupervised learning version: - -> ``` -> def get_dummy(df,indexCol,categoricalCols,continuousCols): -> ''' -> Get dummy variables and concat with continuous variables for unsupervised learning. -> :param df: the dataframe -> :param categoricalCols: the name list of the categorical data -> :param continuousCols: the name list of the numerical data -> :return k: feature matrix -> -> :author: Wenqiang Feng -> :email: von198@gmail.com -> ''' -> -> indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) -> for c in categoricalCols ] -> -> # default setting: dropLast=True -> encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), -> outputCol="{0}_encoded".format(indexer.getOutputCol())) -> for indexer in indexers ] -> -> assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] -> + continuousCols, outputCol="features") -> -> pipeline = Pipeline(stages=indexers + encoders + [assembler]) -> -> model=pipeline.fit(df) -> data = model.transform(df) -> -> return data.select(indexCol,'features') -> -> ``` + from pyspark.ml import Pipeline + from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler + from pyspark.sql.functions import col + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + data = data.withColumn('label',col(labelCol)) + + return data.select(indexCol,'features','label') + +``` + +无监督学习版本: + +``` +def get_dummy(df,indexCol,categoricalCols,continuousCols): + ''' + Get dummy variables and concat with continuous variables for unsupervised learning. + :param df: the dataframe + :param categoricalCols: the name list of the categorical data + :param continuousCols: the name list of the numerical data + :return k: feature matrix + + :author: Wenqiang Feng + :email: von198@gmail.com + ''' + + indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) + for c in categoricalCols ] + + # default setting: dropLast=True + encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), + outputCol="{0}_encoded".format(indexer.getOutputCol())) + for indexer in indexers ] + + assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + + continuousCols, outputCol="features") + + pipeline = Pipeline(stages=indexers + encoders + [assembler]) + + model=pipeline.fit(df) + data = model.transform(df) + + return data.select(indexCol,'features') + +``` ``` from pyspark.sql import Row @@ -1897,7 +1903,7 @@ def transData(data): ``` -1. Convert the data to dense vector +将数据转换为密集向量 ``` transformed= transData(df) @@ -1919,7 +1925,7 @@ only showing top 5 rows ``` -1. Deal with the Categorical variables +处理类别变量 ``` from pyspark.ml import Pipeline @@ -1950,7 +1956,7 @@ only showing top 5 rows ``` -1. Split the data into training and test sets (40% held out for testing) +将数据分割为训练和测试集(留出 40% 用于测试) ``` # Split the data into training and test sets (40% held out for testing) @@ -1986,7 +1992,7 @@ only showing top 5 rows ``` -1. Fit RandomForest Regression Model +拟合梯度提升树模型 ``` # Import LinearRegression class @@ -1997,11 +2003,11 @@ rf = GBTRegressor() #numTrees=2, maxDepth=2, seed=42 ``` -Note - -If you decide to use the `indexedFeatures` features, you need to add the parameter `featuresCol="indexedFeatures"`. +> 注意 +> +> 如果你决定使用`indexedFeatures`特征,你需要添加参数`featuresCol="indexedFeatures"`。 -1. Pipeline Architecture +流水线架构 ``` # Chain indexer and tree in a Pipeline @@ -2010,7 +2016,7 @@ model = pipeline.fit(trainingData) ``` -1. Make predictions +做出预测 ``` predictions = model.transform(testData) @@ -2034,7 +2040,7 @@ only showing top 5 rows ``` -1. Evaluation +评估 ``` # Select (prediction, true label) and compute test error @@ -2062,7 +2068,7 @@ r2_score: 0.932 ``` -1. Feature importances +特征重要性 ``` model.stages[-1].featureImportances