📜  Python MLlib

📅  最后修改于: 2020-10-28 01:23:21             🧑  作者: Mango

PySpark MLlib

机器学习是一种数据分析技术,将数据与统计工具结合在一起以预测输出。各种公司行业都使用此预测来做出有利的决策。

PySpark提供了一个与机器学习一起使用的API,称为mllib。 PySpark的mllib支持各种机器学习算法,例如分类,回归聚类,协作过滤和降维以及基础优化原语。下面给出了各种机器学习概念:

  • 分类

pyspark.mllib库支持多种分类方法,例如二进制分类,多类分类和回归分析。该对象可能属于不同的类。分类的目的是根据信息区分数据。随机森林,朴素贝叶斯,决策树是分类中最有用的算法。

  • 聚类

群集是无监督的机器学习问题。当您不知道如何对数据进行分类时,可以使用它。我们要求算法找到模式并相应地对数据进行分类。流行的聚类算法是K-means聚类,高斯混合模型,分层聚类。

  • fpm

fpm表示频繁模式匹配,该模式匹配用于挖掘各种项目,项目集,子序列或其他子结构。它主要用于大规模数据集。

  • linalg

mllib.linalg实用程序用于线性代数。

  • recommendation

它用于定义相关数据以提出建议。它能够预测未来的偏好并推荐顶级商品。例如,在线娱乐平台Netflix拥有大量电影,有时人们在选择喜欢的物品时会遇到困难。在该领域中,推荐起着重要的作用。

  • mllib回归

回归用于查找变量之间的关系和依存关系。它找到数据的每个特征之间的相关性,并预测未来的价值。

mllib软件包支持许多其他算法,类和函数。在这里,我们将了解pyspak.mllib的基本概念。

MLlib功能

PySpark mllib对于迭代算法很有用。功能如下:

  • 提取:它从“行”数据中提取要素。
  • 转换:用于缩放,转换或修改要素。
  • 选择:从更大的功能集中选择有用的子集。
  • 局部敏感哈希:它将特征转换的方面与其他算法结合在一起。

让我们看一下PySpark MLlib的基本库。

MLlib线性回归

线性回归用于查找变量之间的关系和依存关系。考虑以下代码:

frompyspark.sql import SparkSession
spark = SparkSession.builder.appName('Customer').getOrCreate()
frompyspark.ml.regression import LinearRegression
dataset = spark.read.csv(r'C:\Users\DEVANSH SHARMA\Ecommerce-Customers.csv')
dataset.show(10)

输出:

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|                 _c0|                 _c1|             _c2|               _c3|               _c4|               _c5|                 _c6|                _c7|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|     SaddleBrown| 34.30555662975554|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|
|mstephens@davidso...|14023 Rodriguez P...|MediumAquaMarine| 33.33067252364639|12.795188551078114| 37.53665330059473|   4.446308318351434|  599.4060920457634|
|alvareznancy@luca...|645 Martha Park A...|     FloralWhite|33.871037879341976|12.026925339755056| 34.47687762925054|   5.493507201364199|   637.102447915074|
|katherine20@yahoo...|68388 Reyes Light...|   DarkSlateBlue| 32.02159550138701|11.366348309710526| 36.68377615286961|   4.685017246570912|  521.5721747578274|
|  awatkins@yahoo.com|Unit 6538 Box 898...|            Aqua|32.739142938380326| 12.35195897300293| 37.37335885854755|  4.4342734348999375|  549.9041461052942|
|vchurch@walter-ma...|860 Lee KeyWest D...|          Salmon| 33.98777289568564|13.386235275676436|37.534497341555735|  3.2734335777477144|  570.2004089636196|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
only showing top 10 rows

在下面的代码中,我们将导入VectorAssembler库以创建一个新的独立列功能:

frompyspark.ml.linalg import Vectors
frompyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols = ["Avg Session Length","Time on App","Time on Website"],outputCol = "Independent Features")
output = featureassembler.transform(dataset)
output.show()

输出:

+------------------+
Independent Feature
+------------------+
|34.49726772511229 |
|31.92627202636016 |
|33.000914755642675|
|34.30555662975554 |
|33.33067252364639 |
|33.871037879341976|
|32.02159550138701 |
|32.739142938380326|
|33.98777289568564 |
+------------------+
z = featureassembler.transform(dataset)
finlized_data = z.select("Indepenent feature", "Yearly Amount Spent",)
z.show()

输出:

+--------------------++-------------------+
|Independent Feature | Yearly Amount Spent|
+--------------------++-------------------+
|34.49726772511229   | 587.9510539684005  |
|31.92627202636016   | 392.2049334443264  |
|33.000914755642675  | 487.5475048674720  |
|34.30555662975554   | 581.8523440352177  |
|33.33067252364639   | 599.4060920457634  |
|33.871037879341976  | 637.102447915074   |
|32.02159550138701   | 521.5721747578274  |
|32.739142938380326  | 549.9041461052942  |
|33.98777289568564   | 570.2004089636196  |
+--------------------++-------------------+

PySpark提供LinearRegression()函数来查找任何给定数据集的预测。语法如下:

regressor = LinearRegression(featureCol = 'column_name1', labelCol = 'column_name2 ')

MLlib K-均值聚类

K-均值聚类算法是最流行和最常用的算法之一。它用于将数据点聚类为预定义数量的聚类。以下示例显示了MLlib K-Means群集库的使用:

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data.
dataset = spark.read.format("libsvm").load(r"C:\Users\DEVANSH SHARMA\Iris.csv")
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

PySpark MLlib的参数

PySpark MLlib的几个重要参数如下:

  • Ratings

它是评级的RDD或(用户ID,产品ID,评级)元组。

  • Rank

它表示计算出的特征矩阵的等级(特征数量)。

  • Iterations

它代表ALS的迭代次数。 (默认值:5)

  • Lambda

它是正则化参数。 (预设:0.01)

  • Blocks

它用于并行化某些块的计算。

协同过滤(mllib.recommendation)

协作过滤是推荐系统中通常使用的一种技术。该技术专注于填充用户项的缺失条目。关联矩阵spark.ml当前支持基于模型的协作过滤。在协作过滤中,用户和产品由一小组隐藏的因素来描述,这些因素可用于预测缺失的条目。

正则化参数的缩放

将正则化参数regParam缩放以解决最小二乘问题。当在更新用户因子时由用户生成评分数,或在更新产品因数中获得产品的评分数时,会发生最小二乘问题。

冷启动策略

ALS模型(替代最小二乘模型)用于进行预测,同时会产生常见的预测问题。当测试数据集中出现用户或项目时,在训练模型期间可能不存在的问题。它可能在以下两种情况下发生:

  • 在预测中,不对没有评级历史记录的用户和项目训练模型(称为冷启动策略)。
  • 在交叉验证期间,将数据分为训练集和评估集。遇到用户和评估集中不在训练集中的项目很普遍。
#importing the libraries
frompyspark.ml.evaluation import RegressionEvaluator
frompyspark.ml.recommendation import ALS
frompyspark.sql import Row
no_of_lines = spark.read.text(r"C:\Users\DEVANSH SHARMA\MovieLens.csv").rdd
no_of_parts = no_of_lines.map(lambda row: row.value.split("::"))
ratingsRDD = no_of_lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Develop the recommendation model using ALS on the training data
# Note we set cold start strategy to make sure that we don't get NaN evaluation metrics.
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
    coldStartStrategy="drop")
model = als.fit(training)

# Calculate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Evaluate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Evaluate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Evaluate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Evalute top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)