Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
OpenDocCN
RecommenderSystems
提交
8740e668
R
RecommenderSystems
项目概览
OpenDocCN
/
RecommenderSystems
通知
22
Star
1
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
RecommenderSystems
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
8740e668
编写于
5月 31, 2017
作者:
片刻小哥哥
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test
上级
e179ba95
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
1008305 addition
and
2 deletion
+1008305
-2
README.md
README.md
+20
-2
bin/start.sh
bin/start.sh
+10
-0
input/hot_movies.csv
input/hot_movies.csv
+166
-0
input/user_movies.csv
input/user_movies.csv
+1007397
-0
pom.xml
pom.xml
+127
-0
src/main/scala/apache/wiki/App.scala
src/main/scala/apache/wiki/App.scala
+27
-0
src/main/scala/apache/wiki/OfflineRecommender.scala
src/main/scala/apache/wiki/OfflineRecommender.scala
+344
-0
src/main/scala/apache/wiki/OnlineRecommender.scala
src/main/scala/apache/wiki/OnlineRecommender.scala
+18
-0
src/main/scala/apache/wiki/WordCount.scala
src/main/scala/apache/wiki/WordCount.scala
+39
-0
src/test/scala/samples/junit.scala
src/test/scala/samples/junit.scala
+17
-0
src/test/scala/samples/scalatest.scala
src/test/scala/samples/scalatest.scala
+109
-0
src/test/scala/samples/specs.scala
src/test/scala/samples/specs.scala
+31
-0
未找到文件。
README.md
浏览文件 @
8740e668
# RecommenderSystems
推荐系统
name := "NetflixRecommender"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.0"
libraryDependencies += "org.mongodb" %% "casbah" % "3.0.0"
libraryDependencies += "org.jblas" % "jblas" % "1.2.4"
mergeStrategy in assembly
<
<=
(
mergeStrategy
in
assembly
)
{
mergeStrategy =
>
{
case entry => {
val strategy = mergeStrategy(entry)
if (strategy == MergeStrategy.deduplicate) MergeStrategy.first
else strategy
}
}
}
bin/start.sh
0 → 100644
浏览文件 @
8740e668
#!bin/bash
mvn clean
mvn compile
mvn package
echo
'package success'
# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.App /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar
# /opt/spark-2.0.0-bin-hadoop2.6/bin/spark-submit --class apache.wiki.WordCount /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar
/opt/spark/bin/spark-submit
--class
apache.wiki.OfflineRecommender /opt/git/RecommendedSystem/target/RecommendedSystem-1.0-SNAPSHOT.jar
echo
'spark-submit success'
input/hot_movies.csv
0 → 100755
浏览文件 @
8740e668
20645098,8.2,小王子
26259677,8.3,垫底辣妹
11808948,7.2,海绵宝宝
26253733,6.4,突然变异
25856265,6.7,烈日迷踪
26274810,6.6,侦探:为了原点
25889465,6.3,抢劫
1972724,7.3,斯坦福监狱实验
6845667,8.0,秘密特工
1866473,7.8,蚁人
25859495,8.2,思悼
25823132,8.1,暗杀
10533913,8.8,头脑特工队
25766754,8.2,年轻气盛
26393561,8.8,小萝莉的猴神大叔
26326395,5.1,真实魔鬼游戏
25955491,8.6,罪恶之家
25774051,6.8,寄生兽:完结篇
24325923,8.0,我和厄尔以及将死的女孩
2303845,7.2,刺客聂隐娘
24719063,7.9,烈日灼心
25911595,5.3,第三种爱情
25933898,6.4,爱恋
25763555,6.1,美式极端
25761178,8.3,百元之恋
25727048,7.8,福尔摩斯先生
25855951,8.3,贝利叶一家
26303865,7.7,维多利亚
26304268,6.7,致命礼物
25728010,7.5,老手
21937450,8.2,国际市场
25838463,6.2,像素大战
25821461,7.7,旅程终点
21350962,5.8,代号47
3445457,7.8,无境之兽
10773239,8.1,小男孩
24397586,8.5,小羊肖恩
26275494,7.3,橘色
26297388,7.5,这时对那时错
25955372,6.4,1980年代的爱情
25823840,6.4,奸臣
11624706,7.3,小黄人大眼萌
10741643,8.3,我的个神啊
25907004,4.6,坏姐姐之拆婚联盟
26235839,7.5,内在美
25774050,7.4,寄生兽
23769147,7.7,爱情限时恋未尽
26270517,7.8,愚人节
25958787,7.8,深夜食堂 电影版
26289144,7.6,滚蛋吧!肿瘤君
25752261,7.6,女间谍
25881628,6.7,幸存的女孩
25853129,6.3,瑞奇和闪电
25746375,7.3,我是路人甲
25753326,7.1,巴霍巴利王(上)
4075568,7.1,假期历险记
6039412,6.3,时光尽头的恋人
25870236,7.8,可爱的你
24751764,6.1,三城记
24405378,8.5,王牌特工:特工学院
3592854,8.5,疯狂的麦克斯4:狂暴之路
25830802,6.5,对风说爱你
24879839,5.4,道士下山
25774126,7.6,爷们些
26304167,8.1,出租车
25718082,7.0,念念
23761370,8.4,速度与激情7
10727641,7.8,碟中谍5:神秘国度
25745752,5.5,左耳
11540651,7.2,许三观
11776289,6.2,华丽上班族
4160540,7.5,机械姬
25956520,6.6,太平轮(下)·彼岸
26021055,4.1,栀子花开
25962735,4.4,既然青春留不住
26252157,7.5,龙三和他的七人党
25723907,7.0,捉妖记
11520649,8.2,麦克法兰
19957083,6.5,泰迪熊2
26252196,7.4,卫生间的圣母像
26147706,7.9,花与爱丽丝杀人事件
2973079,8.2,霍比特人3:五军之战
26366634,5.3,嘘!禁止想象!
3338862,6.9,终结者:创世纪
25895276,6.4,煎饼侠
3432861,6.5,黑色弥撒
6873042,6.2,明日世界
26384515,7.6,这里的黎明静悄悄
26279166,4.6,鸭王
4014396,4.1,神奇四侠2015
25823833,6.0,天将雄师
19897541,9.0,机动战士高达 THE ORIGIN I 青瞳的卡斯巴尔
24325815,6.4,非我
21345845,7.6,涉足荒野
25821585,6.5,生活残骸
24847343,4.6,小时代4:灵魂尽头
25858759,5.0,有一个地方只有我们知道
26582787,4.7,斗地主
25858785,5.8,澳门风云2
21349734,7.0,博物馆奇妙夜3
23788440,7.4,杀破狼2
25887846,6.7,传奇
25794212,5.8,分歧者2:绝地反击
6126442,6.2,一步之遥
5446197,7.2,铁拳
25862355,7.0,二十
25945356,4.3,新步步惊心
25786077,7.1,末日崩塌
10741834,7.1,复仇者联盟2:奥创纪元
25922902,7.5,唇上之歌
10827341,7.3,疯狂外星人
25881780,5.8,命中注定
10604554,6.8,躲藏
10792633,7.8,金衣女人
25856480,5.7,巴黎假期
26219652,5.7,少年班
10440138,7.8,侏罗纪世界
26328118,4.5,咒怨:完结篇
2325873,5.0,第七子:降魔之战
25944282,6.2,纸镇
25746414,5.3,暴走神探
25986688,4.5,流浪者年代记
25767747,7.4,故事的故事
21442760,7.2,最长的旅程
25872931,6.0,万物生长
26263443,5.7,恋爱中的城市
3078390,5.7,太平轮(上)
6875263,6.8,灰姑娘
24716045,6.8,远离尘嚣
6866928,5.1,进击的巨人真人版:前篇
26276359,7.1,酷毙了
25898213,7.1,军犬麦克斯
26356488,7.9,1944
26285777,5.2,有客到
24307637,6.3,江南1970
6846893,7.2,超能查派
25853727,7.3,破风
24753810,7.0,战狼
3608742,7.6,冲出康普顿
26599083,5.0,妈妈的朋友
25843352,7.2,如此美好
25908042,4.6,横冲直撞好莱坞
25912924,6.3,暗杀教室
25907088,5.0,魔镜
25809260,6.5,工作女郎
5154799,5.4,木星上行
25805054,6.4,十万个冷笑话
22522269,6.7,战斧骨
24872023,5.3,贵族大盗
24743709,4.1,北京纽约
25717176,6.2,新宿天鹅
24751757,4.8,微爱之渐入佳境
26265099,6.8,白河夜船
25835293,6.4,失孤
25868191,5.1,极道大战争
25779218,5.4,匆匆那年
25861695,7.1,海月姬
25731554,7.4,西部慢调
3006769,6.9,大眼睛
10440076,5.6,最后的女巫猎人
22556810,4.5,猛龙特囧
7003416,4.4,冲上云霄
25919385,7.8,长寿商会
11541282,5.8,魔力麦克2
10793610,6.4,法老与众神
25778488,4.1,宅女侦探桂香
input/user_movies.csv
0 → 100755
浏览文件 @
8740e668
此差异已折叠。
点击以展开。
pom.xml
0 → 100644
浏览文件 @
8740e668
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
apache.wiki
</groupId>
<artifactId>
RecommendedSystem
</artifactId>
<version>
1.0-SNAPSHOT
</version>
<name>
${project.artifactId}
</name>
<description>
My wonderfull scala app
</description>
<inceptionYear>
2015
</inceptionYear>
<licenses>
<license>
<name>
My License
</name>
<url>
http://....
</url>
<distribution>
repo
</distribution>
</license>
</licenses>
<properties>
<maven.compiler.source>
1.6
</maven.compiler.source>
<maven.compiler.target>
1.6
</maven.compiler.target>
<encoding>
UTF-8
</encoding>
<scala.version>
2.11.7
</scala.version>
<scala.compat.version>
2.11
</scala.compat.version>
<spark.version>
2.0.0
</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>
org.scala-lang
</groupId>
<artifactId>
scala-library
</artifactId>
<version>
${scala.version}
</version>
</dependency>
<dependency>
<groupId>
org.scala-lang
</groupId>
<artifactId>
scalap
</artifactId>
<version>
${scala.version}
</version>
</dependency>
<dependency>
<groupId>
org.scala-lang
</groupId>
<artifactId>
scala-compiler
</artifactId>
<version>
${scala.version}
</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
4.11
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.specs2
</groupId>
<artifactId>
specs2-junit_${scala.compat.version}
</artifactId>
<version>
2.4.16
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.specs2
</groupId>
<artifactId>
specs2-core_${scala.compat.version}
</artifactId>
<version>
2.4.16
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.scalatest
</groupId>
<artifactId>
scalatest_${scala.compat.version}
</artifactId>
<version>
2.2.4
</version>
<scope>
test
</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-core_${scala.compat.version}
</artifactId>
<version>
${spark.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-mllib_${scala.compat.version}
</artifactId>
<version>
${spark.version}
</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>
src/main/scala
</sourceDirectory>
<testSourceDirectory>
src/test/scala
</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>
net.alchim31.maven
</groupId>
<artifactId>
scala-maven-plugin
</artifactId>
<version>
3.2.0
</version>
<executions>
<execution>
<goals>
<goal>
compile
</goal>
<goal>
testCompile
</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>
-dependencyfile
</arg>
<arg>
${project.build.directory}/.scala_dependencies
</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-surefire-plugin
</artifactId>
<version>
2.18.1
</version>
<configuration>
<useFile>
false
</useFile>
<disableXmlReport>
true
</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>
**/*Test.*
</include>
<include>
**/*Suite.*
</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
src/main/scala/apache/wiki/App.scala
0 → 100644
浏览文件 @
8740e668
package
apache.wiki
import
org.apache.spark.SparkConf
import
org.apache.spark.SparkContext
/**
* @author ${user.name}
*/
object
App
{
// def foo(x : Array[String]) = x.foldLeft("")((a,b) => a + b)
// def main(args : Array[String]) {
// println( "Hello World!" )
// println("concat arguments = " + foo(args))
// }
def
main
(
args
:
Array
[
String
])
{
// 初始化 SparkContext对象,通过SparkConf指定配置的内容
val
conf
=
new
SparkConf
().
setMaster
(
"local"
).
setAppName
(
"My App"
)
val
sc
=
new
SparkContext
(
conf
)
println
(
"this system exit ok!!!"
)
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
sc
.
stop
()
}
}
src/main/scala/apache/wiki/OfflineRecommender.scala
0 → 100644
浏览文件 @
8740e668
package
apache.wiki
import
scala.collection.Map
import
org.apache.spark.
{
SparkConf
,
SparkContext
}
import
org.apache.spark.SparkContext._
import
org.apache.spark.broadcast.Broadcast
import
org.apache.spark.mllib.recommendation._
import
org.apache.spark.rdd.RDD
/**
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
*
* See LICENSE file for further information.
*
* 参考地址
* GitHub:
* 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html
* ALS说明: http://www.csdn.net/article/2015-05-07/2824641
*/
// UserMovies对象: 用户名,电影ID,评分
case
class
MovieRating
(
userID
:
String
,
movieID
:
Int
,
rating
:
Double
)
extends
scala
.
Serializable
object
OfflineRecommender
{
// 推荐函数
def
recommend
(
sc
:
SparkContext
,
rawUserMoviesData
:
RDD
[
String
],
rawHotMoviesData
:
RDD
[
String
],
base
:
String
)
:
Unit
=
{
// 抽取 电影ID和电影名称的映射关系
// 设置广播变量
val
moviesAndName
=
buildMovies
(
rawHotMoviesData
)
val
bMoviesAndName
=
sc
.
broadcast
(
moviesAndName
)
// 解析本文数据,并进行格式化转换处理
val
data
=
buildRatings
(
rawUserMoviesData
)
// 用户名称+索引ID[Long]
val
userIdToInt
:
RDD
[(
String
,
Long
)]
=
data
.
map
(
_
.
userID
)
.
distinct
()
.
zipWithUniqueId
()
// 索引ID+用户名称
val
reverseUserIDMapping
:
RDD
[(
Long
,
String
)]
=
userIdToInt
map
{
case
(
l
,
r
)
=>
(
r
,
l
)
}
val
bReverseUserIDMap
=
sc
.
broadcast
(
reverseUserIDMapping
.
collectAsMap
())
// 用户名称+索引ID[Int]
val
userIDMap
:
Map
[
String
,
Int
]
=
userIdToInt
.
collectAsMap
().
map
{
case
(
n
,
l
)
=>
(
n
,
l
.
toInt
)
}
val
bUserIDMap
=
sc
.
broadcast
(
userIDMap
)
// 转换数据类型为 RDD[Rating] 类型
val
ratings
:
RDD
[
Rating
]
=
data
.
map
{
r
=>
Rating
(
bUserIDMap
.
value
.
get
(
r
.
userID
).
get
,
r
.
movieID
,
r
.
rating
)
}.
cache
()
//使用协同过滤算法建模
/*
* ALS是alternating least squares的缩写 , 意为交替最小二乘法。
* 该方法常用于基于矩阵分解的推荐系统中。例如:将用户(user)对商品(item)的评分矩阵分解为两个矩阵:一个是用户对商品隐含特征的偏好矩阵,另一个是商品所包含的隐含特征的矩阵。
* 在这个矩阵分解的过程中,评分缺失项得到了填充,也就是说我们可以基于这个填充的评分来给用户最商品推荐了。
* 由于评分数据中有大量的缺失项,传统的矩阵分解SVD(奇异值分解)不方便处理这个问题,而ALS能够很好的解决这个问题。
*
*
* 训练模型接下来调用ALS.train()方法,进行模型训练: 【ALS.train()显式反馈; ALS.trainImplicit() 隐式反馈】
* 显式评分时,每个用户对于一个产品的评分需要是一个得分(例如1到5星)
* 隐式反馈时,每个评分代表的是用户会和给定产品发送交互的置信度(比如随着用户访问一个网页次数的增加,平跟也会提高),预测出来的也是置信度。
*
*
* 参数说明:
* ratings: RDD[Rating]
* rank 要使用的features的数量, 模型中隐藏因子的个数
* iterations 迭代的次数,推荐值:10-20
* lambda 正则化参数【惩罚函数的因数,是ALS的正则化参数,推荐值:0.01】
* alpha 用来在ALS中计算置信度的常量,默认1.0 【隐式反馈】
*/
//val model = ALS.trainImplicit(ratings, 10, 10, 0.01, 1.0)
val
model
=
ALS
.
train
(
ratings
,
50
,
10
,
0.0001
)
// unpersist用于删除磁盘、内存中的相关序列化对象
ratings
.
unpersist
()
model
.
save
(
sc
,
base
+
"output/model"
)
//val sameModel = MatrixFactorizationModel.load(sc, base + "model")
// 每个用户返回5部电影,格式:(user, ratings)
val
allRecommendations
=
model
.
recommendProductsForUsers
(
5
)
map
{
case
(
userID
,
recommendations
)
=>
{
var
recommendationStr
=
""
for
(
r
<-
recommendations
)
{
recommendationStr
+=
r
.
product
+
":"
+
bMoviesAndName
.
value
.
getOrElse
(
r
.
product
,
""
)
+
","
}
if
(
recommendationStr
.
endsWith
(
","
))
recommendationStr
=
recommendationStr
.
substring
(
0
,
recommendationStr
.
length
-
1
)
(
bReverseUserIDMap
.
value
.
get
(
userID
).
get
,
recommendationStr
)
}
}
// allRecommendations.saveAsTextFile(base + "output/result.csv")
allRecommendations
.
coalesce
(
1
,
true
).
sortByKey
().
saveAsTextFile
(
base
+
"output/result.csv"
)
// 自定义方法,删除model内部的序列化对象
unpersist
(
model
)
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
// 初始化 SparkContext
val
conf
=
new
SparkConf
().
setMaster
(
"local"
).
setAppName
(
"OfflineRecommender"
)
val
sc
=
new
SparkContext
(
conf
)
val
base
=
if
(
args
.
length
>
0
)
args
(
0
)
else
"file:/opt/git/RecommendedSystem/"
// 导入数据,获取RDD
// UserMovies格式: 用户名,电影ID,评分
// HotMovies 格式: 电影ID,评分,电影名称
val
rawUserMoviesData
=
sc
.
textFile
(
base
+
"input/user_movies.csv"
)
val
rawHotMoviesData
=
sc
.
textFile
(
base
+
"input/hot_movies.csv"
)
// 抽样检查数据
preparation
(
rawUserMoviesData
,
rawHotMoviesData
)
println
(
"准备完数据"
)
// 抽样评估推荐结果
model
(
sc
,
rawUserMoviesData
,
rawHotMoviesData
)
// 整体推荐评分的评估
evaluate
(
sc
,
rawUserMoviesData
,
rawHotMoviesData
)
// recommend(sc, rawUserMoviesData, rawHotMoviesData, base)
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
// sc.stop()
}
// 检查数据
def
preparation
(
rawUserMoviesData
:
RDD
[
String
],
rawHotMoviesData
:
RDD
[
String
])
=
{
// 格式:(用户名,索引分区ID[分区整除/分区余1/。。/分区余n]) => 计算结果:索引分区ID的统计信息[("最大值:" + stats.max, "最小值:" + stats.min, "平均值:" + stats.mean, "方差" + stats.variance, "标准方差" + stats.stdev, "元素个数:" + stats.count)]
// zipWithUniqueId 可参考: http://lxw1234.com/archives/2015/07/352.htm
val
userIDStats
=
rawUserMoviesData
.
map
(
_
.
split
(
","
)(
0
).
trim
)
.
distinct
()
.
zipWithUniqueId
()
.
map
(
_
.
_2
.
toDouble
)
.
stats
()
// 格式: 电影ID的集合,统计信息
val
itemIDStats
=
rawUserMoviesData
.
map
(
_
.
split
(
","
)(
1
).
trim
.
toDouble
)
.
distinct
()
.
stats
()
println
(
userIDStats
)
println
(
itemIDStats
)
val
moviesAndName
=
buildMovies
(
rawHotMoviesData
)
// 获取第一行,head()=first(), head(n)=take(n)
val
(
movieID
,
movieName
)
=
moviesAndName
.
head
println
(
movieID
+
" -> "
+
movieName
)
}
// 获取电影ID和电影名字的RDD
// collectAsMap: 在相同ID下,后面出现的Value会覆盖先出现的Value
def
buildMovies
(
rawHotMoviesData
:
RDD
[
String
])
:
Map
[
Int
,
String
]
=
rawHotMoviesData
.
flatMap
{
line
=>
val
tokens
=
line
.
split
(
","
)
if
(
tokens
(
0
).
isEmpty
)
{
None
}
else
{
Some
((
tokens
(
0
).
toInt
,
tokens
(
2
)))
}
}.
collectAsMap
()
// MovieRating电影评估字段分解和类型转化,格式: 用户名,电影ID,评分
def
buildRatings
(
rawUserMoviesData
:
RDD
[
String
])
:
RDD
[
MovieRating
]
=
{
rawUserMoviesData
.
map
{
line
=>
val
Array
(
userID
,
moviesID
,
countStr
)
=
line
.
split
(
","
).
map
(
_
.
trim
)
var
count
=
countStr
.
toInt
count
=
if
(
count
==
-
1
)
3
else
count
MovieRating
(
userID
,
moviesID
.
toInt
,
count
)
}
}
// 自定义方法,删除model内部的序列化对象
def
unpersist
(
model
:
MatrixFactorizationModel
)
:
Unit
=
{
// At the moment, it's necessary to manually unpersist the RDDs inside the model
// when done with it in order to make sure they are promptly uncached
model
.
userFeatures
.
unpersist
()
model
.
productFeatures
.
unpersist
()
}
// http://stackoverflow.com/questions/27772769/spark-how-to-use-mllib-recommendation-if-the-user-ids-are-string-instead-of-co
def
model
(
sc
:
SparkContext
,
rawUserMoviesData
:
RDD
[
String
],
rawHotMoviesData
:
RDD
[
String
])
:
Unit
=
{
val
moviesAndName
=
buildMovies
(
rawHotMoviesData
)
val
bMoviesAndName
=
sc
.
broadcast
(
moviesAndName
)
val
data
=
buildRatings
(
rawUserMoviesData
)
val
userIdToInt
:
RDD
[(
String
,
Long
)]
=
data
.
map
(
_
.
userID
).
distinct
().
zipWithUniqueId
()
val
reverseUserIDMapping
:
RDD
[(
Long
,
String
)]
=
userIdToInt
map
{
case
(
l
,
r
)
=>
(
r
,
l
)
}
val
userIDMap
:
Map
[
String
,
Int
]
=
userIdToInt
.
collectAsMap
().
map
{
case
(
n
,
l
)
=>
(
n
,
l
.
toInt
)
}
val
bUserIDMap
=
sc
.
broadcast
(
userIDMap
)
val
ratings
:
RDD
[
Rating
]
=
data
.
map
{
r
=>
Rating
(
bUserIDMap
.
value
.
get
(
r
.
userID
).
get
,
r
.
movieID
,
r
.
rating
)
}.
cache
()
//使用协同过滤算法建模
//val model = ALS.trainImplicit(ratings, 10, 10, 0.01, 1.0)
val
model
=
ALS
.
train
(
ratings
,
50
,
10
,
0.0001
)
ratings
.
unpersist
()
println
(
"打印第一个userFeature"
)
println
(
model
.
userFeatures
.
mapValues
(
_
.
mkString
(
", "
)).
first
())
for
(
userID
<-
Array
(
100
,
1001
,
10001
,
100001
,
110000
))
{
checkRecommenderResult
(
userID
,
rawUserMoviesData
,
bMoviesAndName
,
reverseUserIDMapping
,
model
)
}
unpersist
(
model
)
}
// 查看给某个用户的推荐
def
checkRecommenderResult
(
userID
:
Int
,
rawUserMoviesData
:
RDD
[
String
],
bMoviesAndName
:
Broadcast
[
Map
[
Int
,
String
]],
reverseUserIDMapping
:
RDD
[(
Long
,
String
)],
model
:
MatrixFactorizationModel
)
:
Unit
=
{
val
userName
=
reverseUserIDMapping
.
lookup
(
userID
).
head
val
recommendations
=
model
.
recommendProducts
(
userID
,
5
)
//给此用户的推荐的电影ID集合
val
recommendedMovieIDs
=
recommendations
.
map
(
_
.
product
).
toSet
//得到用户点播的电影ID集合
val
rawMoviesForUser
=
rawUserMoviesData
.
map
(
_
.
split
(
","
))
.
filter
{
case
Array
(
user
,
_
,
_
)
=>
user
.
trim
==
userName
}
val
existingUserMovieIDs
=
rawMoviesForUser
.
map
{
case
Array
(
_
,
movieID
,
_
)
=>
movieID
.
toInt
}.
collect
().
toSet
println
(
"用户"
+
userName
+
"点播过的电影名"
)
//点播的电影名
bMoviesAndName
.
value
.
filter
{
case
(
id
,
name
)
=>
existingUserMovieIDs
.
contains
(
id
)
}.
values
.
foreach
(
println
)
println
(
"推荐给用户"
+
userName
+
"的电影名"
)
//推荐的电影名
bMoviesAndName
.
value
.
filter
{
case
(
id
,
name
)
=>
recommendedMovieIDs
.
contains
(
id
)
}.
values
.
foreach
(
println
)
}
// 计算评估
def
evaluate
(
sc
:
SparkContext
,
rawUserMoviesData
:
RDD
[
String
],
rawHotMoviesData
:
RDD
[
String
])
:
Unit
=
{
val
moviesAndName
=
buildMovies
(
rawHotMoviesData
)
// 解析用户电影的原数据
val
data
=
buildRatings
(
rawUserMoviesData
)
// 得到: (用户名, 索引ID) 和 (用户名: 索引ID)
val
userIdToInt
:
RDD
[(
String
,
Long
)]
=
data
.
map
(
_
.
userID
).
distinct
().
zipWithUniqueId
()
val
userIDMap
:
Map
[
String
,
Int
]
=
userIdToInt
.
collectAsMap
().
map
{
case
(
n
,
l
)
=>
(
n
,
l
.
toInt
)
}
val
bUserIDMap
=
sc
.
broadcast
(
userIDMap
)
val
ratings
:
RDD
[
Rating
]
=
data
.
map
{
r
=>
Rating
(
bUserIDMap
.
value
.
get
(
r
.
userID
).
get
,
r
.
movieID
,
r
.
rating
)
}.
cache
()
val
numIterations
=
10
// ALS.train 显式调用
for
(
rank
<-
Array
(
10
,
50
);
lambda
<-
Array
(
1.0
,
0.01
,
0.0001
))
{
// 你可以对评分数据生成训练集和测试集,例如:训练集和测试集比例为8比2
// val splits = ratings.randomSplit(Array(0.8, 0.2), seed = 111l)
// val training = splits(0).repartition(numPartitions)
// val test = splits(1).repartition(numPartitions)
// 这里,我们是将评分数据全部当做训练集,并且也为测试集
val
model
=
ALS
.
train
(
ratings
,
rank
,
numIterations
,
lambda
)
// Evaluate the model on rating data
// 评测我们要对比一下预测的结果,注意:我们将训练集当作测试集来进行对比测试。从训练集中获取用户和商品的映射:
val
usersMovies
=
ratings
.
map
{
case
Rating
(
user
,
movie
,
rate
)
=>
(
user
,
movie
)
}
println
(
"实际评估的数量"
+
usersMovies
.
count
)
// 测试集的记录数等于评分总记录数,验证一下:
val
predictions
=
model
.
predict
(
usersMovies
).
map
{
case
Rating
(
user
,
movie
,
rate
)
=>
((
user
,
movie
),
rate
)
}
println
(
"预测评估的数量"
+
predictions
.
count
)
// 将真实评分数据集与预测评分数据集进行合并,这样得到用户对每一个商品的实际评分和预测评分:
val
ratesAndPreds
=
ratings
.
map
{
case
Rating
(
user
,
movie
,
rate
)
=>
((
user
,
movie
),
rate
)
}.
join
(
predictions
)
/* 计算均方差:
* 当然,我们不能凭着自己的感觉评价模型的好坏,尽管我们直觉告诉我们,这个结果看不错。我们需要量化的指标来评价模型的优劣。
*
* 通过计算均方差(Mean Squared Error, MSE)来衡量模型的好坏。
* 数理统计中均方误差是指参数估计值与参数真值之差平方的期望值,记为MSE。
* MSE是衡量“平均误差”的一种较方便的方法,MSE可以评价数据的变化程度,MSE的值越小,说明预测模型描述实验数据具有更好的精确度。
*
* 我们可以调整rank,numIterations,lambda,alpha这些参数,不断优化结果,使均方差变小。
* 比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。
*/
val
MSE
=
ratesAndPreds
.
map
{
case
((
user
,
movie
),
(
r1
,
r2
))
=>
val
err
=
(
r1
-
r2
)
err
*
err
}.
mean
()
println
(
s
"(rank:$rank, lambda: $lambda, Explicit ) Mean Squared Error = "
+
MSE
)
}
// ALS.trainImplicit 隐式调用
for
(
rank
<-
Array
(
10
,
50
);
lambda
<-
Array
(
1.0
,
0.01
,
0.0001
);
alpha
<-
Array
(
1.0
,
40.0
))
{
//(trainImplicit 隐式调用/ train 显式调用)
// 隐性的反馈(例如游览,点击,购买,喜欢,分享等等)
//
// lambda 正则化参数;
// alpha 用来在ALS中计算置信度的常量,默认1.0
val
model
=
ALS
.
trainImplicit
(
ratings
,
rank
,
numIterations
,
lambda
,
alpha
)
// Evaluate the model on rating data
val
usersMovies
=
ratings
.
map
{
case
Rating
(
user
,
movie
,
rate
)
=>
(
user
,
movie
)
}
val
predictions
=
model
.
predict
(
usersMovies
).
map
{
case
Rating
(
user
,
movie
,
rate
)
=>
((
user
,
movie
),
rate
)
}
val
ratesAndPreds
=
ratings
.
map
{
case
Rating
(
user
,
movie
,
rate
)
=>
((
user
,
movie
),
rate
)
}.
join
(
predictions
)
val
MSE
=
ratesAndPreds
.
map
{
case
((
user
,
movie
),
(
r1
,
r2
))
=>
val
err
=
(
r1
-
r2
)
err
*
err
}.
mean
()
println
(
s
"(rank:$rank, lambda: $lambda,alpha:$alpha ,implicit ) Mean Squared Error = "
+
MSE
)
}
}
}
\ No newline at end of file
src/main/scala/apache/wiki/OnlineRecommender.scala
0 → 100644
浏览文件 @
8740e668
package
apache.wiki
import
scala.collection.Map
import
org.apache.spark.
{
SparkConf
,
SparkContext
}
import
org.apache.spark.SparkContext._
/**
* @author ${user.name}
* Copyright 2015 Sanford Ryza, Uri Laserson, Sean Owen and Joshua Wills
*
* See LICENSE file for further information.
*
* 参考地址
* 推荐系统: http://www.kuqin.com/shuoit/20151202/349305.html
* ALS说明: http://www.csdn.net/article/2015-05-07/2824641
*/
\ No newline at end of file
src/main/scala/apache/wiki/WordCount.scala
0 → 100644
浏览文件 @
8740e668
package
apache.wiki
import
org.apache.spark.SparkConf
import
org.apache.spark.SparkContext
/**
* @author ${user.name}
*/
object
WordCount
{
def
main
(
args
:
Array
[
String
])
{
// 初始化 SparkContext对象,通过SparkConf指定配置的内容
val
conf
=
new
SparkConf
().
setMaster
(
"local"
).
setAppName
(
"My app"
)
//.set("spark.executor.memory", "2g")
val
sc
=
new
SparkContext
(
conf
)
// // 检验输入参数
// if (args.length < 1) {
// println("USAGE:")
// println("spark-submit ... xxx.jar Date_String [Iteration]")
// println("spark-submit ... xxx.jar 20160424 10")
// sys.exit()
// }
val
lines
=
sc
.
textFile
(
"file:/opt/git/RecommendedSystem/README.md"
)
lines
.
flatMap
(
_
.
split
(
" "
))
.
map
((
_
,
1
))
.
reduceByKey
(
_
+
_
)
.
map
(
x
=>
(
x
.
_2
,
x
.
_1
))
.
sortByKey
(
false
)
.
map
(
x
=>
(
x
.
_2
,
x
.
_1
))
.
saveAsTextFile
(
"file:/opt/git/RecommendedSystem/output/result.log"
)
// println("this system exit ok!!!")
// 每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
sc
.
stop
()
}
}
src/test/scala/samples/junit.scala
0 → 100644
浏览文件 @
8740e668
package
samples
import
org.junit._
import
Assert._
@Test
class
AppTest
{
@Test
def
testOK
()
=
assertTrue
(
true
)
// @Test
// def testKO() = assertTrue(false)
}
src/test/scala/samples/scalatest.scala
0 → 100644
浏览文件 @
8740e668
/*
* Copyright 2001-2009 Artima, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
samples
/*
ScalaTest facilitates different styles of testing by providing traits you can mix
together to get the behavior and syntax you prefer. A few examples are
included here. For more information, visit:
http://www.scalatest.org/
One way to use ScalaTest is to help make JUnit or TestNG tests more
clear and concise. Here's an example:
*/
import
scala.collection.mutable.Stack
import
org.scalatest.Assertions
import
org.junit.Test
class
StackSuite
extends
Assertions
{
@Test
def
stackShouldPopValuesIinLastInFirstOutOrder
()
{
val
stack
=
new
Stack
[
Int
]
stack
.
push
(
1
)
stack
.
push
(
2
)
assert
(
stack
.
pop
()
===
2
)
assert
(
stack
.
pop
()
===
1
)
}
@Test
def
stackShouldThrowNoSuchElementExceptionIfAnEmptyStackIsPopped
()
{
val
emptyStack
=
new
Stack
[
String
]
intercept
[
NoSuchElementException
]
{
emptyStack
.
pop
()
}
}
}
/*
Here's an example of a FunSuite with ShouldMatchers mixed in:
*/
import
org.scalatest.FunSuite
import
org.scalatest.matchers.ShouldMatchers
import
org.junit.runner.RunWith
import
org.scalatest.junit.JUnitRunner
@RunWith
(
classOf
[
JUnitRunner
])
class
ListSuite
extends
FunSuite
with
ShouldMatchers
{
test
(
"An empty list should be empty"
)
{
List
()
should
be
(
'empty
)
Nil
should
be
(
'empty
)
}
test
(
"A non-empty list should not be empty"
)
{
List
(
1
,
2
,
3
)
should
not
be
(
'empty
)
List
(
"fee"
,
"fie"
,
"foe"
,
"fum"
)
should
not
be
(
'empty
)
}
test
(
"A list's length should equal the number of elements it contains"
)
{
List
()
should
have
length
(
0
)
List
(
1
,
2
)
should
have
length
(
2
)
List
(
"fee"
,
"fie"
,
"foe"
,
"fum"
)
should
have
length
(
4
)
}
}
/*
ScalaTest also supports the behavior-driven development style, in which you
combine tests with text that specifies the behavior being tested. Here's
an example whose text output when run looks like:
A Map
- should only contain keys and values that were added to it
- should report its size as the number of key/value pairs it contains
*/
import
org.scalatest.FunSpec
import
scala.collection.mutable.Stack
class
ExampleSpec
extends
FunSpec
{
describe
(
"A Stack"
)
{
it
(
"should pop values in last-in-first-out order"
)
{
val
stack
=
new
Stack
[
Int
]
stack
.
push
(
1
)
stack
.
push
(
2
)
assert
(
stack
.
pop
()
===
2
)
assert
(
stack
.
pop
()
===
1
)
}
it
(
"should throw NoSuchElementException if an empty stack is popped"
)
{
val
emptyStack
=
new
Stack
[
Int
]
intercept
[
NoSuchElementException
]
{
emptyStack
.
pop
()
}
}
}
}
src/test/scala/samples/specs.scala
0 → 100644
浏览文件 @
8740e668
package
samples
import
org.junit.runner.RunWith
import
org.specs2.mutable._
import
org.specs2.runner._
/**
* Sample specification.
*
* This specification can be executed with: scala -cp <your classpath=""> ${package}.SpecsTest
* Or using maven: mvn test
*
* For more information on how to write or run specifications, please visit:
* http://etorreborre.github.com/specs2/guide/org.specs2.guide.Runners.html
*
*/
@RunWith
(
classOf
[
JUnitRunner
])
class
MySpecTest
extends
Specification
{
"The 'Hello world' string"
should
{
"contain 11 characters"
in
{
"Hello world"
must
have
size
(
11
)
}
"start with 'Hello'"
in
{
"Hello world"
must
startWith
(
"Hello"
)
}
"end with 'world'"
in
{
"Hello world"
must
endWith
(
"world"
)
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录