Spark踩坑3-共享Scala集合类

最近为了更好的了解Spark的细节,改用Spark的亲儿子Scala进行开发,得益于框架API的统一,Spark方面没什么需要重新学习的,开发完就想着按照以前的经验优化一下。
记得官方Tuning Spark中,第一件要干的事就是调整序列化类库,Spark默认使用的Java serialization的序列化方式。官方建议使用Kryo类库进行序列化可以获得更好的压缩率和传输效率。
切换的方式很简单,只要在配置时指定序列化方式为:org.apache.spark.serializer.KryoSerializer即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
object Recommender {
private val log = Logger(Recommender.getClass)

// load config
private val config = ConfigFactory.load("config.properties")

// spark config
private val appName = config.getString("appName")
private val mode = config.getString("mode")


def main(args: Array[String]): Unit = {
log.info("init {},mode:{}", appName, mode)

// init spark
val sc = new SparkConf()
.setAppName(appName)
.setMaster(mode)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")


val session = SparkSession.builder().config(sc).getOrCreate()
try {
initMat(session)

var headMap = new mutable.LinkedHashMap[String, Int]()
headMap.put("ID", 0)
headMap.put("UID", 1)

val headBroadcast = ss.sparkContext.broadcast[mutable.LinkedHashMap[String, Int]](headMap)

// 略...


} finally {
session.close()
}
}
}

但是当我切换序列化方式后,我发现我要广播的一个LinkedHashMap每次获取都是空的。
于是猜测Kryo框架是不是没办法序列化Scala的集合类型,因为大部分的序列化框架对集合类都有或多或少的一些限制,比如Protostuff等。
于是写个测试代码测试一下:

1
2
3
4
5
6
7
8
9
10
def main(args: Array[String]): Unit = {
var headMap = new mutable.LinkedHashMap[String, Int]()
headMap.put("a", 1)
headMap.put("b", 2)
headMap.put("c", 3)
//
val kryo = new Kryo()
val output = new Output(new FileOutputStream("e:\\ser.bin"))
kryo.writeObject(output, headMap)
}

最终写到磁盘ser.bin的才1个字节,都不用反序列化,肯定不对。
查了一些官方文档,有个相对简单点的解决办法:

1
kryo.register(classOf[mutable.HashMap[String, (String, String)]], new JavaSerializer)

将原生类做为自定义的类注册下,并指定序列化方式仍然使用jdk的。。。
虽然问题能解决但这其实已经背离本意了,好在一点是mutable.HashMap使用jdk序列化,其他的类使用kryo,相比较以前全部用jdk效率确实高点。