背景

  • Jdk1.8推出了函数式接口,函数式接口最大的特点是可以通过Lambda实例化它们,借助Lambda表达式可以将一个函数方法作为一个方法参数进行传递,所以我们可以将一个或多个逻辑放到一个集合里,然后通过调用调用接口暴露的方法来触发对应的逻辑

饭前小吃

  • 函数式接口(Functional Interface

    • 一个接口有且只有一个未实现的方法
    • 引入了一个新的注解:@FunctionalInterface,表明这个接口是一个函数式接口,编译器会检查是否只有一个未实现的方法,否则报错
  • 常用函数式接口

    • Function<T, R>: 接受一个参数、返回R apply(T t)
    • Consumer :有输入无输出的void accept(T t)方法
    • Supplier:有输出无输入的T get()方法
    • Predicate:返回boolean值的 boolean test(T t) 方法
    • BiFunctionR apply(T t, U u)2个入参,最后也跟Function一样返回一个结果
    • 其它...:使用基本数据类型(Int,Long,Double,Boolean)情况做了扩展:DoubleConsumer 只消费double数据

代码实现

  • 下面代码是利用函数式接口实现事件的发布订阅,从而达到业务解耦、关注点分离的目的
  • 使用场景:

    • 比如用户下完单后有一系列操作发优惠券,累加积分,常规操作是不是一个接一个方法的逻辑,这样的话我们是不是可以抓住两个地方:
      • 1、用户下完单后有个通知操作,达到一个触发的目的
      • 2、下完单后做什么,我们需要在一个地方将这些逻辑存起来,利用函数式接口就可以将此逻辑存储起来
  • 带着上面的思路,可以设计一个Map<Class<?>, LinkedList<Consumer<Object>>> eventConsumers集合,key是用于区分是那个事件类型,LinkedList用于存放具体的业务逻辑,我们先把要做的逻辑先put进去,然后再依次执行accept方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package com.songsy.springboot.test.service.util;

import com.songsy.springboot.test.service.SpringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* 自定义事件发布订阅回调工具类(业务解藕、关注点分离,避免互相依赖)
* <pre>
* 支持两种模式
* 1.无返回值:订阅事件消费(register)+ 发布事件消息(publishEvent/publishEventAsync)
* 2.有返回值:监听回调通知处理(listenCallback)+通知回调(notifyCallback),通过notifyMessageType+MessageChannel 即可标识唯一的一组通知回调与监听回调处理
* <pre>
*/
public final class EventPublishSubscribeUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishSubscribeUtils.class);

/**
* 事件消费者
*/
private static final Map<Class<?>, LinkedList<Consumer<Object>>> eventConsumers = new ConcurrentHashMap<>();

/**
* 有返回值的事件消费者
*/
private static final Map<Class<?>, ConcurrentHashMap<MessageChannel, Function<Object, Object>>> callbackFunction = new ConcurrentHashMap<>();

private EventPublishSubscribeUtils() {
}


/**
* 注册事件回调消费者
* 用法:EventSubscribeConsumeUtils.register(this::xxxx方法) 或lambda表达式
* 注意:若回调方法添加了事务注解,则应指派其代理对象的方法来完成回调,如:
* EventSubscribeConsumeUtils.register((xxxService)SpringUtils.getBean(this.class)::xxxx方法)
*
* @param eventConsumer
*/
public static void register(Class<?> eventMessageType, Consumer<Object> eventConsumer) {
if (eventConsumer == null) {
return;
}

LinkedList<Consumer<Object>> eventConsumerItems = null;
if (!eventConsumers.containsKey(eventMessageType)) {
eventConsumers.putIfAbsent(eventMessageType, new LinkedList<>());
}
eventConsumerItems = eventConsumers.get(eventMessageType);

eventConsumerItems.add(eventConsumer);
}

/**
* 取消订阅回调
*
* @param eventMessageType
* @param eventConsumer
*/
public static void unRegister(Class<?> eventMessageType, Consumer<Object> eventConsumer) {
if (!eventConsumers.containsKey(eventMessageType)) {
return;
}

LinkedList<Consumer<Object>> eventConsumerItems = eventConsumers.get(eventMessageType);
int eventConsumerIndex = eventConsumerItems.indexOf(eventConsumer);
if (eventConsumerIndex == -1) {
return;
}
eventConsumerItems.remove(eventConsumerIndex);
}


/**
* 发布事件,同步触发执行回调事件消费者方法(存在阻塞等待),即事件消息生产者
* 用法:在需要触发事件消息回调时调用,如:publishEvent(eventMessage);
*
* @param eventMessage
*/
public static <T> void publishEvent(T eventMessage) {
Class<?> eventMessageType = eventMessage.getClass();

if (!eventConsumers.containsKey(eventMessageType)) {
return;
}

for (Consumer<Object> eventConsumer : eventConsumers.get(eventMessageType)) {
try {
eventConsumer.accept(eventMessage);
} catch (Exception ex) {
// 各个逻辑相互独立
LOGGER.error("eventConsumer.accept error:{},eventMessageType:{},eventMessage:{}",
ex, eventMessageType, eventMessage);
}
}
}


/**
* 发布事件,异步触发执行回调事件消费者方法(异步非阻塞),即事件消息生产者
* 用法:在需要触发事件消息回调时调用,如:publishEventAsync(eventMessage);
*
* @param eventMessage
*/
public static <T> void publishEventAsync(final T eventMessage) {
try {
Executor asyncTaskExecutor = (Executor) SpringUtils.getBean("asyncTaskExecutor");
asyncTaskExecutor.execute(() -> {
publishEvent(eventMessage);
});
} catch (Exception e) {
e.printStackTrace();
}
}


/**
* 监听回调处理(需要有返回值),即有返回值的回调消费者
*
* @param notifyMessageType
* @param messageChannel
* @param callbackFunc
*/
public static void listenCallback(Class<?> notifyMessageType, MessageChannel messageChannel, Function<Object, Object> callbackFunc) {
if (!callbackFunction.containsKey(notifyMessageType)) {
callbackFunction.putIfAbsent(notifyMessageType, new ConcurrentHashMap<>());
}

Map<MessageChannel, Function<Object, Object>> functionMap = callbackFunction.get(notifyMessageType);
if (!functionMap.containsKey(messageChannel)) {
functionMap.putIfAbsent(messageChannel, callbackFunc);
} else {
LOGGER.error("该通知消息类型:{}+消息通道:{},已被订阅监听,重复订阅监听无效!", notifyMessageType.getSimpleName(), messageChannel.getDescription());
}

}

/**
* 通知回调(同步等待获取监听回调的处理结果),即生产者
*
* @param notifyMessage
* @param messageChannel
* @param <R>
* @return
*/
@SuppressWarnings("unchecked")
public static <R> R notifyCallback(Object notifyMessage, MessageChannel messageChannel) {
Class<?> notifyMessageType = notifyMessage.getClass();

Map<MessageChannel, Function<Object, Object>> functionMap = callbackFunction.getOrDefault(notifyMessageType, null);
if (functionMap != null) {
Function<Object, Object> callbackFunction = functionMap.getOrDefault(messageChannel, null);
if (callbackFunction != null) {
LOGGER.info("通知回调消息已发布,正在执行回调处理:{},messageChannel:[{}]", notifyMessage, messageChannel.getDescription());
Object result = callbackFunction.apply(notifyMessage);
try {
return (R) result;
} catch (ClassCastException castEx) {
throw new ClassCastException(String.format("监听回调处理后返回值实际类型与发布通知回调待接收的值预期类型不一致,导致类型转换失败:%s," +
"请确保notifyCallback与listenCallback针对通知消息类型:%s+消息通道:%s返回值类型必需一致。",
castEx.getMessage(), notifyMessageType.getSimpleName(), messageChannel.getDescription()));
}

}
}
return null;
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 自定义事件发布订阅回调工具类测试类
*
*/
public final class EventPublishSubscribeUtilsTest {

private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishSubscribeUtilsTest.class);

/**
* 无返回值:订阅事件消费(register)
*/
@Test
public void test1() {

/**
* 注册事件回调消费者
*/
// 下单完成了,给用户发优惠券
EventPublishSubscribeUtils.register(OrderEvent.class, (obj) -> {
LOGGER.info("给用户发优惠券");
});
// 下单完成了,给用户累加积分
EventPublishSubscribeUtils.register(OrderEvent.class, (obj) -> {
LOGGER.info("给用户累加积分");
});

// 如果在Spring容器中可以使用 @PostConstruct 注册
// @PostConstruct
// public void listerEvent(){
// EventPublishSubscribeUtils.register(OrderEvent.class, (obj) -> {
// // 下单完成了,给用户累加积分
// });
// }

/**
* 发布事件
*/
EventPublishSubscribeUtils.publishEvent(new OrderEvent(1L, "下单已完成"));
}

@Test
public void test2 () {
// 注册事件回调消费者
EventPublishSubscribeUtils.listenCallback(OrderEvent.class,MessageChannel.ORDER, o -> {
OrderEvent msg=(OrderEvent)o;

// 下单处理

return 1;
});


// 回调处理
int result = EventPublishSubscribeUtils.notifyCallback(new OrderEvent(), MessageChannel.ORDER);
if (result > 0) {
// 下单成功
}
}

}

总结

  • 通过上面的测试类我们可以发现下单的后续逻辑单独提取处理了且相互独立,下单主流程只要发布一个事件publishEvent即可完成对应的后续业务操作
  • 备注:
    • 上面的event的异常都被吃掉了,需注意在方法体内自己做补偿或者操作日志追踪
    • MessageChannel可以自定义消息通道,但只能定义一对一的消费模式,因为这种模式下它是有带返回值的

参考