Nacos临时节点和永久节点的本质意义

Nacos 1.0.0 版本在客户端侧增加了一个ephemeral配置,默认值是True,代表当前实例是临时实例还是永久实例。
官方对此的描述是:

1
2
3
4
如果是临时实例,则不会在Nacos服务端持久化存储,需要通过上报心跳的方式进行保活,如果一段时间内没有上报心跳,则会被Nacos服务端摘除。
在被摘除后如果又开始上报心跳,则会重新将这个实例注册。
持久化实例则会持久化到Nacos服务端,此时即使注册实例的客户端进程不在,这个实例也不会从服务端删除,只会将健康状态设为不健康。
同一个服务下可以同时有临时实例和持久化实例,这意味着当这服务的所有实例进程不在时,会有部分实例从服务上摘除,剩下的实例则会保留在服务下

那具体又是什么意义呢,正常一个实例挂了就应该从注册中心上拿掉,所以是临时实例就行了。
其实不然,因为现实项目中,一个项目有个十几、二十几服务很正常,假设其中挂了一个,挂的这一个在注册中心控制台上就会消失,一般运维或者开发很难确定哪个服务没了,只能顺着调用链路排查。
如果ephemeral改为false,那如果服务挂了,不会消失,控制台上会显示不健康,一眼就能定位到有问题的服务。
所以生产环境建议修改ephemeral的配置为false。

奇葩的异常支付场景

现代的订单支付、秒杀等业务,下单库存锁定后都有个倒计时功能,超过指定的阈值会释放掉订单,防止有人恶意锁单。
一般而言我的方案是这么做:
1.下单后对订单生成一个stub放在redis
2.用ttl进行倒计时
3.进页面后轮询key是否存在,不存在订单取消,存在则显示倒计时倒计时

现在有种场景:
用户在未超时的情况下点击付款后打开了第三方的app,比如支付宝进行支付,但是支付宝APP停留时间长,此时后台的ttl已经超时,被删除,但由于第三方支付已经打开,接口层面付款仍然是可以成功的。
所以这种情况算是下单成功还是失败?
再或者由于现在的支付回调大部分都是异步的,假设异步通知时候ttl过期,算成功还是失败?
更麻烦的是,假设算支付成功,成功后由于ttl过期,导致库存解锁,商品又被其他的订单锁定。。

目前想到的解决办法:
1.比较好的第三方支付渠道、支付宝、微信在API上都会提供一个最晚支付时间,如果超过这个时间未支付就不能支付了,比如订单超时是30分钟,那把这个时间减少个10秒传给支付宝,支付宝那边会保证这种场景的失败。
2.同步查询支付结果或者收到支付成功回调的话,判断一下订单的支付状态,如果已经超时,直接退款,但这种用户体验非常差,用户的账单明细会有两条记录,一条支付成功,一条退款成功。
3.看看第三方是否提供关闭订单的接口,ttl超时时先调用第三方的关闭接口,如果调用失败代表用户支付成功,续一下ttl,不要做后续的释放库存操作,如果调用关闭成功在做后续的释放库存业务,由于也是调用第三方和调用网络RPC,其实也没法保证严格的幂等。
4.前端拉起支付APP前调一下后台,后台看一下剩余时间,如果很快超时就续一小段时间,等待支付完毕。但有个风险,如果被人发现了这种机制,会一直反复唤起支付APP,长时间锁单。

没想到特别的好办法,由于支付宝和微信提供了最晚支付时间,所以目前是1和2结合的方案,但银联那边没提供,所以银联用的3。

MacVim使用SpaceVim后中文乱码

最近手贱把MacVim和SpaceVim删了,准备用最新的版本,结果安装完后,打开直接各种问号,中文也是问号方块。

网上各种搜教程,搜到几乎都是让安装一款Hack字体,然后在iTerm里修改下Non-ASCII Font,然后就解决了,照着做一遍发现没啥用,我iTerm不装这个字体,选个其他支持中文的字体也不乱码,我主要是MacVim乱码,各司其职,终端就不要承受文本编辑的工作了。
最终想到,现在的MacVim已经被SpaceVim各种插件托管,还是要设置SpaceVim才能解决,于是找到SpaceVim的配置文件

1
~/.SpaceVim/init.toml

找到option增加:

1
guifont= "Hack NF:h11"

就解决了。

但奇怪的事,设置其他中文字体,比如微软雅黑、宋体等,字体确实变了,但中文仍然是方块乱码,十分不解!

相对安全的网络传输方案

最近做一个C端的项目,需要设计一个“相对”安全的传输方案,用于确保数据在传输过程中的安全。
首先提到安全传输第一个想到的自然是走HTTPS,复习一下HTTPS为什么安全:

https
简单说下,https讲解网上一大堆:
1.客户端和服务器建立连接
2.服务器发送公钥给客户端
3.客户端用公钥加密数据后发给服务器
4.服务器用私钥进行数据解密
5.其中为了防止公钥被劫持,又引入了CA
6.CA是一家被信任的第三个机构,负责发布数字证书来保证公钥不被篡改
7.客户端发请求时,服务器把CA给的数字证书给客户端
8.客户端通过CA提供的公钥获取数字签名,来确保证书安全

如果将数据传输分为三大部分:服务器、传输中、落到客户端。那么HTTPS其实只是保护传输中的数据安全而已。
也就是在传输过程中第三方抓的包是密文的。
那么在服务器和客户端抓的包自然是明文,这就是为什么有些新手测试会说为什么能抓到明文的包,因为他们的测试方法都是在客户端抓包,数据已经落到客户端解密,自然能抓到。
服务器抓包不讨论,如果有在服务器抓包的能力那么跟任何的“传输”方案没关系,找运维比较靠谱。
客户端抓包很常见,尤其是现在的Android开放程度比较高,或者APP使用PC的代理,然后PC抓包。

所以需要设计一个针对客户端抓包方案:

步骤5和7当时画的不准确,以下面文字描述为准

总体思路:
1.每个APP或者终端,发送请求到负责授权的服务,授权服务根据终端唯一特性,比如IMEI、MAC等特征信息为该终端生成RSA密钥对,然后把公钥返回。
2.APP可以将公钥存在内存里
3.由于RSA的非对称加解密特性导致在大数据量的情况下加解密都特别的慢,所以这边要引入对称加密AES,简单来说RSA是非对称加密,也就是公钥加密,私钥解密。而对称的AES可以简单理解基于一个密码的加解密,我只要有这个密码就可以加解密,并且速度非常快
4.所以APP在传输数据的时候生成一个十六位的密码,使用这个密码和AES对传输的数据进行加密,由于加密后是二进制,所以需要将加密后的值base64 encode一下,然后将该值作为body提交,密码使用公钥进行加密,一样base64后放在Header里,然后进行请求
5.网关发现请求包含加密数据,将请求转发到授权服务进行脱敏
6.授权服务从Header里获取密码,base64 decode后,使用该APP对应RSA对应的密钥进行解密,获取AES密码明文,然后使用密码对Body的数据进行解密,解密后将明文返回给网关
7.网关将对应的Body使用明文进行替换,打到下游业务服务,下游业务服务进行业务处理
9.业务服务进行业务处理后,将响应返回
10.网关将响应和明文密码转发给授权服务,授权服务直接用明文密码对响应进行AES加密,然后返回
11.APP拿到响应,使用刚才的AES密码对响应进行解密,然后进行页面渲染
12.整体流程完毕

这个流程可以防止在客户端进行抓包,因为传输到落地在协议层面数据都是加密的,只有在客户端内存里才会进行解密渲染。

但缺点也十分明显,完全依赖同步阻塞的串行模型,现代的网关比如Spring Gateway支持WebFlux 异步,导致请求响应授权服务、下游业务服务的线程不一样。比如网关将请求打到下游业务服务,线程就切换处理其他请求去了,这时业务服务的响应可能被另外的网关线程处理,而解密的明文密码还在上一个线程里,如何在交给授权服务进行加密?所以针对这种情况要额外设计对请求/响应的打点标记,额外增加复杂度

在Linux下正确的使用Java锁

最近同事在linux上部署一个开源项目,为了防止这个项目生成的文件被外部其他程序修改/删除掉,他们稍微修改了下源代码在文件生成后给文件加锁,在他们开发机(Windows)上测试后没问题,就部署到服务器(CentOS)上了,结果不行,锁不住,生成的文件可以被第三方程序修改删除掉。
于是叫我帮忙调一下,因为我开发机用Linux Mint。

看现象十之八九就是 建议锁(ADVISORY)的问题,所以也懒的看。

猜测他们源码大体是这么写的:

1
2
3
4
5
6
7
8
9
10
FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
try(FileLock lock = channel.tryLock()) {
if (lock == null) {
log.debug("lock on file:{}", file.getPath());
// do something
// 花了很多时间
} else {

}
}

首先是结论:
代码没问题,不需改动所以不需要调什么。

Linux文件锁的类型有两种:

  • 1.mandatory 强制锁,加上后第三方程序对加过锁的文件修改会报错:文件被占用、文件无法被修改等。这个是排它的,一般有这种需求的都想要这种锁。
  • 2.advisory 建议锁/劝告锁,加上后第三方程序(gedite、vim/vi等、touch、rm等)可以直接修改删除。这个是 建议 的,也就是说它确实提供了加锁和检测是否有锁的手段,但假设你的第三方程序根本不检测文件有没有锁就直接修改/删除了,它也不 排斥,所以只对那些修改前try一下的守规矩程序有效。

再看一下JDK里FileLock的注释:

1
2
3
4
5
6
7
8
9
10
11
12
Whether or not a lock actually prevents another program from accessing
the content of the locked region is system-dependent and therefore
unspecified. The native file-locking facilities of some systems are merely
advisory, meaning that programs must cooperatively observe a known
locking protocol in order to guarantee data integrity. On other systems
native file locks are mandatory, meaning that if one program locks a
region of a file then other programs are actually prevented from accessing
that region in a way that would violate the lock. On yet other systems,
whether native file locks are advisory or mandatory is configurable on a
per-file basis. To ensure consistent and correct behavior across platforms,
it is strongly recommended that the locks provided by this API be used as if
they were advisory locks.

大意为是否能阻止另外的程序访问依赖于操作系统,有些操作系统下是advisory,有些则是mandatory。

一般来说绝大部分发行版(Ubuntu、CentOS、Debian)等默认获取的都是advisory锁,可以使用命令:

1
cat /proc/locks

看下当前系统所有已有的锁的类型。

如果想在Linux下mandatory锁,需要在操作系统挂载文件系统时激活才可以。

1
mount -o mand /dev/sdb7 /mnt

使用Ansible2.0部署JDK8

最近需要在多台新服务器部署Java环境,想到要重复同样的事这么多次就头疼,后来突然想到运维神器Ansible可以自动化部署,并且仅依赖ssh,只要在一台服务器上安装Ansible就可以,所以就想用Ansible来解决,因为毕竟偏运维所以也不想深究什么原理架构之类的,还是以解决实际问题为主,写死路径,丑陋什么的都无所谓。

所以没跟着官网文档做,直接在网上找了一篇入门教程就干了起来。
根据网上的教程添加个hosts文件用于管理服务器ip与提取公共变量。

hosts
1
2
3
4
5
6
[local]
192.168.100.129
[dw]
192.168.100.130
192.168.100.131
192.168.100.132

然后开始写yml格式playbook。

main.yml
1
2
3
4
5
6
7
8
9
10
11
12
- name: mkdir
shell: mkdir -p /root/java/
- name: copy jdk to remote host
copy: src=jdk-8u161-linux-x64.tar.gz dest=/root/java/
- name: unzip jdk
shell: tar -zxf /root/java/jdk-8u161-linux-x64.tar.gz -C /root/java/
- name: set jdk_env copy use template
template: src=java_home.sh.j2 dest=/root/java/set_jdk.sh
- name: execute script to set jdkenv
shell: sh /root/java/set_jdk.sh
- name: source bash_profile
shell: source /root/.bash_profile

  • 在root新建文件夹。
  • 拷贝jdk到新建的文件夹。
  • 解压jdk。
  • 拷贝设置环境变量的脚本到新建的文件夹下。
  • 执行设置环境变量的脚本。
  • 使刚才设置的环境变量生效。

看起来没什么问题,于是直接执行。

1
ansible-playbook playbook/roles/java/tasks/main.yml

直接报错:

1
2
3
4
5
6
7
The error appears to have been in '/etc/ansible/playbook/roles/java/tasks/main.yml': line 1, column 1, but may
be elsewhere in the file depending on the exact syntax problem.

The offending line appears to be:

- name: mkdir
^ here

看错误是解析yml失败了,猜测可能是空格或tab的原因导致,查看了下yml官方要求是用一个空格,但脚本里也都是空格没有tab。

没办法最后只能翻官方文档,看了官方的例子发现playbook的脚本格式与网上的教程都不一样,所以当时想会不会是因为用的Ansible2.0版本格式规则全变了,又不向下兼容导致。
所以写了个新建文件的小例子测试下。果然,2.0的格式与低版本的不一样,并且不向下兼容老本的playbook。

完整正确的Ansible2.0部署JDK的例子是:

  • 1.约定优于配置的情况下,官方建议的playbook、文件等存放的目录结构应该是:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /etc/ansible/
    ├── hosts
    └── playbook
    └── roles
    └── java
    └── tasks
    ├── files
    │   └── jdk-8u161-linux-x64.tar.gz
    ├── main.yml
    └── templates
    └── java_home.sh.j2
  • 2.playbook文件格式:

    main.yml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    - hosts: dw
    tasks:
    - name: mkdir
    shell: mkdir -p /root/java/
    - name: copy jdk to remote host
    copy: src=jdk-8u161-linux-x64.tar.gz dest=/root/java/
    - name: unzip jdk
    shell: tar -zxf /root/java/jdk-8u161-linux-x64.tar.gz -C /root/java/
    - name: set jdk_env copy use template
    template: src=java_home.sh.j2 dest=/root/java/set_jdk.sh
    - name: execute script to set jdkenv
    shell: sh /root/java/set_jdk.sh
    - name: source bash_profile
    shell: source /root/.bash_profile

增加了hosts,用于指定部署的服务器组,并且原本独立的命令移动到tasks下。

  • 3.设置环境变量的脚本:

    java_home.sh.j2
    1
    2
    3
    4
    5
    #!/bin/bash
    echo 'export JAVA_HOME=/root/java/jdk1.8.0_161' >> /root/.bash_profile
    echo 'export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH' >> /root/.bash_profile
    echo 'export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar' >> /root/.bash_profile
    source ~/.bash_profile
  • 最后执行命令:

    1
    ansible-playbook playbook/roles/java/tasks/main.yml

控制台输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

PLAY [dw] ***********************************************************************************************************************************************************************************************************

TASK [Gathering Facts] **********************************************************************************************************************************************************************************************
ok: [192.168.100.131]
ok: [192.168.100.132]
ok: [192.168.100.130]

TASK [mkdir] ********************************************************************************************************************************************************************************************************
[WARNING]: Consider using the file module with state=directory rather than running mkdir. If you need to use command because file is insufficient you can add warn=False to this command task or set
command_warnings=False in ansible.cfg to get rid of this message.

changed: [192.168.100.130]
changed: [192.168.100.132]
changed: [192.168.100.131]

TASK [copy jdk to remote host] **************************************************************************************************************************************************************************************
changed: [192.168.100.132]
changed: [192.168.100.130]
changed: [192.168.100.131]

TASK [unzip jdk] ****************************************************************************************************************************************************************************************************
[WARNING]: Consider using the unarchive module rather than running tar. If you need to use command because unarchive is insufficient you can add warn=False to this command task or set command_warnings=False in
ansible.cfg to get rid of this message.

changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]

TASK [set jdk_env copy use template] ********************************************************************************************************************************************************************************
changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]

TASK [execute script to set jdkenv] *********************************************************************************************************************************************************************************
changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]

TASK [source bash_profile] ******************************************************************************************************************************************************************************************
changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]

PLAY RECAP **********************************************************************************************************************************************************************************************************
192.168.100.130 : ok=7 changed=6 unreachable=0 failed=0
192.168.100.131 : ok=7 changed=6 unreachable=0 failed=0
192.168.100.132 : ok=7 changed=6 unreachable=0 failed=0

执行成功,部署全部正确完成。

Spark踩坑3-共享Scala集合类

最近为了更好的了解Spark的细节,改用Spark的亲儿子Scala进行开发,得益于框架API的统一,Spark方面没什么需要重新学习的,开发完就想着按照以前的经验优化一下。
记得官方Tuning Spark中,第一件要干的事就是调整序列化类库,Spark默认使用的Java serialization的序列化方式。官方建议使用Kryo类库进行序列化可以获得更好的压缩率和传输效率。
切换的方式很简单,只要在配置时指定序列化方式为:org.apache.spark.serializer.KryoSerializer即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
object Recommender {
private val log = Logger(Recommender.getClass)

// load config
private val config = ConfigFactory.load("config.properties")

// spark config
private val appName = config.getString("appName")
private val mode = config.getString("mode")


def main(args: Array[String]): Unit = {
log.info("init {},mode:{}", appName, mode)

// init spark
val sc = new SparkConf()
.setAppName(appName)
.setMaster(mode)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")


val session = SparkSession.builder().config(sc).getOrCreate()
try {
initMat(session)

var headMap = new mutable.LinkedHashMap[String, Int]()
headMap.put("ID", 0)
headMap.put("UID", 1)

val headBroadcast = ss.sparkContext.broadcast[mutable.LinkedHashMap[String, Int]](headMap)

// 略...


} finally {
session.close()
}
}
}

但是当我切换序列化方式后,我发现我要广播的一个LinkedHashMap每次获取都是空的。
于是猜测Kryo框架是不是没办法序列化Scala的集合类型,因为大部分的序列化框架对集合类都有或多或少的一些限制,比如Protostuff等。
于是写个测试代码测试一下:

1
2
3
4
5
6
7
8
9
10
def main(args: Array[String]): Unit = {
var headMap = new mutable.LinkedHashMap[String, Int]()
headMap.put("a", 1)
headMap.put("b", 2)
headMap.put("c", 3)
//
val kryo = new Kryo()
val output = new Output(new FileOutputStream("e:\\ser.bin"))
kryo.writeObject(output, headMap)
}

最终写到磁盘ser.bin的才1个字节,都不用反序列化,肯定不对。
查了一些官方文档,有个相对简单点的解决办法:

1
kryo.register(classOf[mutable.HashMap[String, (String, String)]], new JavaSerializer)

将原生类做为自定义的类注册下,并指定序列化方式仍然使用jdk的。。。
虽然问题能解决但这其实已经背离本意了,好在一点是mutable.HashMap使用jdk序列化,其他的类使用kryo,相比较以前全部用jdk效率确实高点。

Spark踩坑2—Lambda表达式序列化异常

这几天用Spark写一些正式的功能又踩到坑了,当submit程序到yarn以client模式运行时出现:

1
cannot assign instance of java.lang.invoke.SerializedLambda to field

异常显示序列化lambda异常。。。。
诡吊的一点在于cluster模式没有问题。
关于client模式和cluster模式网上有详细的解释,简单来说:

  • cluster模式下ResourceManger会在集群中的某台NodeManger服务器上启动一个ApplicationMaster,也就是Driver的容器,然后将ApplicationMaster分配给这台NodeManger。
  • client模式下ResourceManger会在本地启动ApplicationMaster并将其分配给本地的NodeManger。

由于用的JDK8所有程序中确实有大量的Lambda,一直觉得Lambda是匿名内部类的语法糖而已,运行时该擦除擦除,看到lang包下有个专门的SerializedLambda就知道不仅仅是个语法糖而已。
根据stackoverflow上的一解答来看:

Implementors of serializable lambdas, such as compilers or language runtime libraries, are expected to ensure that instances deserialize properly. One means to do so is to ensure that the writeReplace method returns an instance of SerializedLambda, rather than allowing default serialization to proceed.
当编译器或一个运行时的类库序列化一个lambad时,先要确保实例正确的反序列化,其中一个方法是确保writeReplace方法返回的是一个SerializedLambda实例,而不是用默认的序列化去处理。

SerializedLambda has a readResolve method that looks for a (possibly private) static method called $deserializeLambda$(SerializedLambda) in the capturing class, invokes that with itself as the first argument, and returns the result. Lambda classes implementing $deserializeLambda$ are responsible for validating that the properties of the SerializedLambda are consistent with a lambda actually captured by that class.
当反序列化时候SerializedLambda有一个readResolve方法,它会在capturing class(个人理解为调用lambda的那个类)查找一个可能私有的静态方法$deserializeLambda$,并将自己作为参数进行调用,实现$deserializeLambda$ 方法负责验证SerializedLambda的属性是否与调用的lambda一致。

所以说如果没有定义$deserializeLambda$这个方法,反序列化仍然会调用这个方法进行验证。
如果要解决这个问题有两种方法。

  • 1.不写lambda,换成匿名内部类。
  • 2.将包含lambda的类单独打包,使用Spark的setJar方法提交这个jar。代码:
    1
    2
    3
    SparkConf sconf = new SparkConf()
    .setJars(new String[]{"/root/lambda.jar"})
    .setMaster("yarn");

但仍然搞不懂的是为何cluster模式下没这问题,猜想是因为client模式下代码不进行完整的分发,所以包含lambda的代码无法在其他服务器上正确的序列化和反序列化,而cluster模式下都是整个打包分发到集群中其他的NodeManager中执行,所以没问题?
因为setJar方法的注释说明:传入的jar包会被分发到集群中的其他work中。

Spark踩坑1—无法初始化Main

上周用Spark开发了一个小程序,打包好submit到yarn后slave却一直报错:

1
java.lang.NoClassDefFoundError: Could not initialize class Main

代码:

Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author liuxinsi
* @mail akalxs@gmail.com
*/
public class Main {

private static final JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName(“Loader”).setMaster(“yarn”)
);

private static final SQLContext sqlContext = new SQLContext(sc);

public static void main(String[]() args) {
DataFrame df = sqlContext.read().json("data.json");
// 一些基本操作,略...
}
}

但开发时在local环境下则一点问题也没有。
一开始怀疑yarn环境里有重复的fatjar导致两个包的Main冲突,后来检查了一遍没问题,yarn本身也是隔离的。
google了一圈基本上都是说序列化或环境的问题。
仔细想了一下当submit到yarn的流程是把Spark的环境和提交的jar打了一个zip包放到HDFS上进行分发,而Spark程序运行时仅在数据混洗shuffle或其他传输情况才会进行序列化,而且无论是jdk的序列化还是kryo的序列化,如果对象不能序列化也是报序列化的异常,不可能连入口类都没初始化就跑到序列化的步骤去了。

环境问题的话也在加了-verbose情况下仔细检查了加载的jar包和顺序,也没发现什么问题。
最后实在没辙只能不停翻官方提供的例子看看有没有什么头绪,找了半天最后突然发现唯一一点不同就是官方的小demo里的所有SparkContext的初始都是放在函数里,也就是都是局部变量,而我一直都是写成全局变量。所以猜想会不会是这个原因导致的,后来把代码改成:

Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author liuxinsi
* @mail akalxs@gmail.com
*/
public class Main {

public static void main(String[]() args) {
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName(“Loader”).setMaster(“yarn”)
);

SQLContext sqlContext = new SQLContext(sc);

DataFrame df = sqlContext.read().json("data.json");
// 一些基本操作,略...
}
}

打包后提交果然解决,虽然Spark的亲儿子是Scala这种函数式编程语言,但怎么也想不明白一个全局变量为什么会没办法初始化。
还有坑人的一点是本地local[*]模式下也测不出这种问题。

Gson反序列化包含泛型的对象

最近在重构一个遗留项目,项目中关于Json序列化/反序列化操作几乎都是显示的手动序列化,代码散落在各个方法中,丝毫没有利用框架的自动序列化机制。
重构的第一步是去除冗余和提炼共通方法。所以将序列化与反序列提取出来。
但提取反序列化带泛型的对象出了问题。
由于泛型在编译时会被擦除的特性,一般情况下要反序列化包含泛型的对象时需要用到Gson的TypeToken用于返回泛型类。举例:

Response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 响应对象。<br/>
* {@link #errMesg}和{@link #errDetail}仅在状态{@link #status}为{@link Status#ERROR}时出现。
*
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Data
public class Response<T> {
/**
* 业务数据
*/
private T data;
/**
* 响应状态
*/
private Status status;
/**
* 错误消息
*/
private String errMesg;
/**
* 错误堆栈
*/
private String errDetail;

public enum Status {
SUCCESS, ERROR;
}
}
ValidateTokenResponse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 对应{@link com.lxs.mms.auth.resource.AuthenticationResource#validateToken(ValidateTokenRequest)} 响应。
*
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Data
@ApiModel(value = "验证Token响应")
public class ValidateTokenResponse {
@ApiModelProperty(value = "Token是否合法,true合法,false非法。")
private Boolean legally;
@ApiModelProperty(value = "Token中的Audience属性")
private String audience;
@ApiModelProperty(value = "描述")
private String desc;

}

包装类Response用于抽象共通的响应,其中data是各个业务的对象,所以是个泛型。
ValidateTokenResponse,具体的业务响应。封装时候将对象Set到Response后进行序列化。

反序列化时由于泛型被擦,如果直接用fromJson方法则data的类型不会是期望的业务类,而是List<LinkedHashMap>。
所以Gson提供了TypeToken用于处理这个问题:

1
2
3
4
Gson gson = new Gson();
Type jsonType = new TypeToken<Response<ValidateTokenResponse>>() {
}.getType();
Response<ValidateTokenResponse> r=gson.fromJson(json, jsonType);

由于TypeToken的构造是protected的,所以需要new一个匿名子类(anonymous subclass),在构造的时候TypeToken会根据你显示传入的泛型获取泛型类并作为ParameterizedType返回。这样在fromJson的时候就可以知道具体的泛型类,然后反序列化时就可以得到正确的data类型。

所以在提取这种代码时候进行的封装则按常理应该是:

1
2
3
4
5
6
public static <T> Response<T> unwrap(String json) {
Gson gson = new Gson();
Type jsonType = new TypeToken<Response<T>>() {
}.getType();
return gson.fromJson(json, jsonType);
}

看起来毫无破绽,编译运行都没有问题,但就是data属性还是List
因为上面的T是一个类型参数(java.lang.reflect.TypeVariable)并不是具体的业务类。所以TypeToken没法取得期待的泛型类。
根据TypeToken的源码来看它在获取泛型类的时候实际上返回了一个ParameterizedType用于代表泛型类。
所以按照思路只要实现这个接口然后告诉它正确的泛型类就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static <T> Response<T> unwrap(String json, Class<?> clazz) {
return gson.fromJson(json, new ParameterizedType() {
/**
* 原始类型实际的泛型类,与外部传入<code>clazz</code>显示指明。
*/
@Override
public Type[] getActualTypeArguments() {
return new Class[]{clazz};
}

/**
* 原始类型。
*/
@Override
public Type getRawType() {
return Response.class;
}

/**
* 如果是内部类需要指明所属的对象,如果不是返回null。
*/
@Override
public Type getOwnerType() {
return null;
}
});

}

调用时:

1
Response<ValidateTokenResponse> r = ResponseUnwrap.unwrap(json,ValidateTokenResponse.class);

就可以正确的处理反序列化。
其实还可以封装的更灵活点,比如处理多个泛型需要修改传参和getActualTypeArguments方法的返回,如果是传集合类型还要单独处理下,不应该写死RawType。