Spark踩坑2—Lambda表达式序列化异常

这几天用Spark写一些正式的功能又踩到坑了,当submit程序到yarn以client模式运行时出现:

1
cannot assign instance of java.lang.invoke.SerializedLambda to field

异常显示序列化lambda异常。。。。
诡吊的一点在于cluster模式没有问题。
关于client模式和cluster模式网上有详细的解释,简单来说:

  • cluster模式下ResourceManger会在集群中的某台NodeManger服务器上启动一个ApplicationMaster,也就是Driver的容器,然后将ApplicationMaster分配给这台NodeManger。
  • client模式下ResourceManger会在本地启动ApplicationMaster并将其分配给本地的NodeManger。

由于用的JDK8所有程序中确实有大量的Lambda,一直觉得Lambda是匿名内部类的语法糖而已,运行时该擦除擦除,看到lang包下有个专门的SerializedLambda就知道不仅仅是个语法糖而已。
根据stackoverflow上的一解答来看:

Implementors of serializable lambdas, such as compilers or language runtime libraries, are expected to ensure that instances deserialize properly. One means to do so is to ensure that the writeReplace method returns an instance of SerializedLambda, rather than allowing default serialization to proceed.
当编译器或一个运行时的类库序列化一个lambad时,先要确保实例正确的反序列化,其中一个方法是确保writeReplace方法返回的是一个SerializedLambda实例,而不是用默认的序列化去处理。

SerializedLambda has a readResolve method that looks for a (possibly private) static method called $deserializeLambda$(SerializedLambda) in the capturing class, invokes that with itself as the first argument, and returns the result. Lambda classes implementing $deserializeLambda$ are responsible for validating that the properties of the SerializedLambda are consistent with a lambda actually captured by that class.
当反序列化时候SerializedLambda有一个readResolve方法,它会在capturing class(个人理解为调用lambda的那个类)查找一个可能私有的静态方法$deserializeLambda$,并将自己作为参数进行调用,实现$deserializeLambda$ 方法负责验证SerializedLambda的属性是否与调用的lambda一致。

所以说如果没有定义$deserializeLambda$这个方法,反序列化仍然会调用这个方法进行验证。
如果要解决这个问题有两种方法。

  • 1.不写lambda,换成匿名内部类。
  • 2.将包含lambda的类单独打包,使用Spark的setJar方法提交这个jar。代码:
    1
    2
    3
    SparkConf sconf = new SparkConf()
    .setJars(new String[]{"/root/lambda.jar"})
    .setMaster("yarn");

但仍然搞不懂的是为何cluster模式下没这问题,猜想是因为client模式下代码不进行完整的分发,所以包含lambda的代码无法在其他服务器上正确的序列化和反序列化,而cluster模式下都是整个打包分发到集群中其他的NodeManager中执行,所以没问题?
因为setJar方法的注释说明:传入的jar包会被分发到集群中的其他work中。