Flink Kafka Consumer 集成Spring boot

如果直接在Flink的执行程序中引入Spring容器的相关实例如dao、service会因为Spring容器中有一些实例没有实现序列化接口而导致Flink在加载配置过程中抛出无法序列化异常。

思路是将Spring容器的实例通过Spring容器来获取,而不是采取自动注入的方式。

ApplicationContextUtil

ApplicationContextUtil工具类通过手动获取bean的方式来获取Spring容器中的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
private static ApplicationContext applicationContext;

// 获取上下文
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

// 设置上下文
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
ApplicationContextUtil.applicationContext = applicationContext;
}

// 通过名字获取上下文中的bean
public static Object getBean(String name) {
return applicationContext.getBean(name);
}

// 通过类型获取上下文中的bean
public static Object getBean(Class<?> requiredType) {
return applicationContext.getBean(requiredType);
}
}

加载Spring容器

在sink的子类的open方法中加载Spring的容器,初始化相关实例。

1
2
3
4
5
@Override
public void open(Configuration parameters) throws Exception {
String[] args = {};
CciEigenvalueExtractionApplication.main(args);
}

获取Spring容器中的实例

在sink的子类的open方法中加载Spring的容器,初始化相关实例。

1
2
// 在需要获取Spring容器实例的地方通过Spring容器工具类获取对应的实例
ApplicationContextUtil.getBean("bean name");