Skip to content

Commit

Permalink
✨ 完善 IMqttMessageInterceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed Feb 21, 2024
1 parent 97548a9 commit 5a959e0
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
/**
* mqtt 消息拦截器集合
*
* @since 1.3.9
* @author L.cm
*/
public class MqttMessageInterceptors {
Expand All @@ -45,6 +44,20 @@ void add(IMqttMessageInterceptor interceptor) {
this.interceptors.add(interceptor);
}

/**
* 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected
*
* @param context ChannelContext
* @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败
* @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接
* @throws Exception Exception
*/
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) throws Exception {
for (IMqttMessageInterceptor interceptor : interceptors) {
interceptor.onAfterConnected(context, isConnected, isReconnect);
}
}

/**
* 接收到TCP层传过来的数据后
*
Expand All @@ -64,8 +77,9 @@ public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) thro
* @param context ChannelContext
* @param message MqttMessage
* @param packetSize packetSize
* @throws Exception Exception
*/
public void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) {
public void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) throws Exception {
for (IMqttMessageInterceptor interceptor : interceptors) {
interceptor.onAfterDecoded(context, message, packetSize);
}
Expand All @@ -85,4 +99,18 @@ public void onAfterHandled(ChannelContext context, MqttMessage message, long cos
}
}

/**
* 处理一个消息包后
*
* @param context ChannelContext
* @param message MqttMessage
* @param isSentSuccess 是否发送成功
* @throws Exception Exception
*/
public void onAfterSent(ChannelContext context, MqttMessage message, boolean isSentSuccess) throws Exception {
for (IMqttMessageInterceptor interceptor : interceptors) {
interceptor.onAfterSent(context, message, isSentSuccess);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public boolean onHeartbeatTimeout(ChannelContext context, long interval, int hea
return false;
}

@Override
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) throws Exception {
messageInterceptors.onAfterConnected(context, isConnected, isReconnect);
}

@Override
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
// 标记认证为 false
Expand Down Expand Up @@ -139,11 +144,13 @@ private void notify(ChannelContext context, String clientId, String username, St
}

@Override
public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) {
public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) throws Exception {
// 1. http 请求处理
boolean isHttpRequest = context.get(MqttConst.IS_HTTP) != null;
if (isHttpRequest) {
MqttHttpHelper.close(context, packet);
} else if (packet instanceof MqttMessage) {
messageInterceptors.onAfterSent(context, (MqttMessage) packet, isSentSuccess);
}
}

Expand All @@ -153,7 +160,7 @@ public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) thro
}

@Override
public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) {
public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) throws Exception {
if (packet instanceof MqttMessage) {
messageInterceptors.onAfterDecoded(context, (MqttMessage) packet, packetSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,22 @@
/**
* mqtt 消息拦截器
*
* @since 1.3.9
* @author L.cm
*/
public interface IMqttMessageInterceptor {

/**
* 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected
*
* @param context ChannelContext
* @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败
* @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接
* @throws Exception Exception
*/
default void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) throws Exception {

}

/**
* 接收到TCP层传过来的数据后
*
Expand All @@ -44,8 +55,9 @@ default void onAfterReceivedBytes(ChannelContext context, int receivedBytes) thr
* @param context ChannelContext
* @param message MqttMessage
* @param packetSize packetSize
* @throws Exception Exception
*/
default void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) {
default void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) throws Exception {

}

Expand All @@ -61,4 +73,15 @@ default void onAfterHandled(ChannelContext context, MqttMessage message, long co

}

/**
* 处理一个消息包后
*
* @param context ChannelContext
* @param message MqttMessage
* @param isSentSuccess 是否发送成功
* @throws Exception Exception
*/
default void onAfterSent(ChannelContext context, MqttMessage message, boolean isSentSuccess) throws Exception {

}
}

0 comments on commit 5a959e0

Please sign in to comment.