如果直接在Flink的执行程序中引入Spring容器的相关实例如dao、service会因为Spring容器中有一些实例没有实现序列化接口而导致Flink在加载配置过程中抛出无法序列化异常。
Flink Kafka Consumer集成Spring boot
思路是将Spring容器的实例通过Spring容器来获取,而不是采取自动注入的方式。
ApplicationContextUtil
ApplicationContextUtil工具类通过手动获取bean的方式来获取Spring容器中的实例。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
private static ApplicationContext applicationContext;
// 获取上下文
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
// 设置上下文
public void setApplicationContext(ApplicationContext applicationContext) {
ApplicationContextUtil.applicationContext = applicationContext;
}
// 通过名字获取上下文中的bean
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
// 通过类型获取上下文中的bean
public static Object getBean(Class<?> requiredType) {
return applicationContext.getBean(requiredType);
}
}
加载Spring容器
在sink的子类的open方法中加载Spring的容器,初始化相关实例。1
2
3
4
5
public void open(Configuration parameters) throws Exception {
String[] args = {};
CciEigenvalueExtractionApplication.main(args);
}
获取Spring容器中的实例
在sink的子类的open方法中加载Spring的容器,初始化相关实例。1
2// 在需要获取Spring容器实例的地方通过Spring容器工具类获取对应的实例
ApplicationContextUtil.getBean("bean name");