Skip to content

Commit

Permalink
add error handler for azure message listener (#27225)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiliuTo committed Feb 24, 2022
1 parent f3c295e commit 8779723
Show file tree
Hide file tree
Showing 30 changed files with 335 additions and 30 deletions.
12 changes: 12 additions & 0 deletions eng/code-quality-reports/src/main/resources/revapi/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@
"new": "(class|interface) org.springframework..*",
"justification": "We allow this in springframework"
},
{
"code": "java.method.numberOfParametersChanged",
"old": "method com.azure.spring.messaging.container.ListenerContainerFactory<? extends com.azure.spring.messaging.container.MessageListenerContainer> com.azure.spring.messaging.config.AzureMessagingConfiguration::azureListenerContainerFactory(com.azure.spring.integration.core.api.SubscribeByGroupOperation)",
"new": "method com.azure.spring.messaging.container.ListenerContainerFactory<? extends com.azure.spring.messaging.container.MessageListenerContainer> com.azure.spring.messaging.config.AzureMessagingConfiguration::azureListenerContainerFactory(com.azure.spring.integration.core.api.SubscribeByGroupOperation, org.springframework.beans.factory.ObjectProvider<org.springframework.util.ErrorHandler>)",
"justification": "In order to support configuration of error handler for message listener container"
},
{
"regex": true,
"code": "java.class.nonPublicPartOfAPI",
Expand Down Expand Up @@ -178,6 +184,12 @@
"new": "(interface|enum) io\\.cloudevents.*",
"justification": "Azure Event Grid cloud native cloud event is allowed to use CloudEvents types in public APIs as it implements interfaces defined by CloudEvents"
},
{
"regex": true,
"code": "java\\.class\\.externalClassExposedInAPI",
"new": "(interface|class) org\\.springframework.util\\.ErrorHandler",
"justification": "Azure Spring Cloud Messaging need the Spring's public interface for error handler registration, it is a common class for users to handle runtime errors."
},
{
"regex": true,
"code": "java\\.annotation\\.added",
Expand Down
1 change: 1 addition & 0 deletions sdk/spring/azure-spring-cloud-messaging/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 2.14.0-beta.1 (Unreleased)

### Features Added
- Support error handler for `@AzureMessageListener`.

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import com.azure.spring.messaging.container.DefaultAzureListenerContainerFactory;
import com.azure.spring.messaging.container.ListenerContainerFactory;
import com.azure.spring.messaging.container.MessageListenerContainer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ErrorHandler;

/**
* @author Warren Zhu
Expand All @@ -20,12 +22,16 @@ public class AzureMessagingConfiguration {
/**
* Bean for the {@link ListenerContainerFactory}.
* @param subscribeByGroupOperation the {@link SubscribeByGroupOperation}.
* @param errorHandler the {@link ErrorHandler}
* @return the {@link ListenerContainerFactory} bean.
*/
@ConditionalOnMissingBean
@Bean(name = AzureListenerAnnotationBeanPostProcessor.DEFAULT_AZURE_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
public ListenerContainerFactory<? extends MessageListenerContainer> azureListenerContainerFactory(
SubscribeByGroupOperation subscribeByGroupOperation) {
return new DefaultAzureListenerContainerFactory(subscribeByGroupOperation);
SubscribeByGroupOperation subscribeByGroupOperation, ObjectProvider<ErrorHandler> errorHandler) {
DefaultAzureListenerContainerFactory azureListenerContainerFactory =
new DefaultAzureListenerContainerFactory(subscribeByGroupOperation);
azureListenerContainerFactory.setErrorHandler(errorHandler.getIfUnique());
return azureListenerContainerFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.azure.spring.messaging.endpoint.AbstractAzureListenerEndpoint;
import com.azure.spring.messaging.endpoint.AzureListenerEndpoint;
import com.azure.spring.integration.core.api.SubscribeByGroupOperation;
import org.springframework.lang.Nullable;
import org.springframework.util.ErrorHandler;

/**
* Base {@link ListenerContainerFactory} for Spring's base container implementation.
Expand All @@ -19,13 +21,20 @@ abstract class AbstractAzureListenerContainerFactory<C extends AbstractListenerC

private final SubscribeByGroupOperation subscribeOperation;

@Nullable
private ErrorHandler errorHandler;

protected AbstractAzureListenerContainerFactory(SubscribeByGroupOperation subscribeOperation) {
this.subscribeOperation = subscribeOperation;
}

@Override
public C createListenerContainer(AzureListenerEndpoint endpoint) {
C instance = createContainerInstance();
if (this.errorHandler != null) {
instance.setErrorHandler(this.errorHandler);
}

initializeContainer(instance);
endpoint.setupListenerContainer(instance);
return instance;
Expand All @@ -51,4 +60,7 @@ public SubscribeByGroupOperation getSubscribeOperation() {
return subscribeOperation;
}

public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.Nullable;
import org.springframework.util.ErrorHandler;

abstract class AbstractListenerContainer implements BeanNameAware, DisposableBean, MessageListenerContainer {
private static final Logger LOG = LoggerFactory.getLogger(AbstractListenerContainer.class);
private final Object lifecycleMonitor = new Object();
private String destination;
private String group;
private AzureMessageHandler messageHandler;
@Nullable
private ErrorHandler errorHandler;
private boolean autoStartup = true;
private int phase = 0;

Expand Down Expand Up @@ -122,4 +126,23 @@ public void setMessageHandler(AzureMessageHandler messageHandler) {
public boolean isAutoStartup() {
return autoStartup;
}

/**
* Return the ErrorHandler to be invoked in case of any uncaught exceptions thrown
* while processing a Message.
*/
@Nullable
public ErrorHandler getErrorHandler() {
return errorHandler;
}

/**
* Set the ErrorHandler to be invoked in case of any uncaught exceptions thrown
* while processing a Message.
* <p>By default, there will be <b>no</b> ErrorHandler so that error-level
* logging is the only result.
*/
public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.spring.messaging.container;

import com.azure.spring.integration.core.api.SubscribeByGroupOperation;
import org.springframework.util.ErrorHandler;

/**
* @author Warren Zhu
Expand All @@ -18,8 +19,14 @@ class DefaultMessageListenerContainer extends AbstractListenerContainer {
@Override
protected void doStart() {
synchronized (this.getLifecycleMonitor()) {
subscribeOperation.subscribe(getDestination(), getGroup(), getMessageHandler()::handleMessage,
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
subscribeOperation.subscribe(getDestination(), getGroup(), getMessageHandler()::handleMessage,
errorHandler::handleError, getMessageHandler().getMessagePayloadType());
} else {
subscribeOperation.subscribe(getDestination(), getGroup(), getMessageHandler()::handleMessage,
getMessageHandler().getMessagePayloadType());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.springframework.test.util.AssertionErrors.assertNotNull;


/**
* @author Warren Zhu
*/
Expand All @@ -32,6 +32,9 @@ public abstract class AbstractAzureMessagingAnnotationDrivenTests {
@Test
public abstract void fullConfigurableConfiguration();

@Test
public abstract void errorHandlerConfiguration();

@Test
public abstract void customConfiguration();

Expand Down Expand Up @@ -76,6 +79,19 @@ public void testFullConfiguration(ApplicationContext context) {
assertEquals("1-10", endpoint.getConcurrency());
}

/**
* Test for {@link ErrorHandler} discovery. In this case, an error handler bean is provided. This shows that the default factory is only retrieved if it needs to be.
*/
public void testErrorHandlerConfiguration(ApplicationContext context) {
AzureListenerContainerTestFactory errorHandlerListenerContainerFactory =
context.getBean("errorHandlerListenerContainerFactory", AzureListenerContainerTestFactory.class);
assertEquals(1, errorHandlerListenerContainerFactory.getListenerContainers().size());
MessageListenerTestContainer container =
errorHandlerListenerContainerFactory.getListenerContainers().get(0);
assertEquals("errorHandlerTest", container.getEndpoint().getId());
assertNotNull("Error handler should be registered", container.getErrorHandler());
}

/**
* Test for {@link CustomBean} and an manually endpoint registered with "myCustomEndpointId". The custom endpoint
* does not provide any factory so it's registered with the default one
Expand Down Expand Up @@ -177,6 +193,14 @@ public void customHandle(String msg) {
}
}

@Component
static class ErrorHandlerBean {

@AzureMessageListener(id = "errorHandlerTest", containerFactory = "errorHandlerListenerContainerFactory", destination = "myQueue")
public void customHandle(String msg) {
}
}

static class DefaultBean {

@AzureMessageListener(destination = "myQueue")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

/**
* @author Warren Zhu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import com.azure.spring.messaging.container.ListenerContainerFactory;
import com.azure.spring.messaging.endpoint.AzureListenerEndpoint;
import org.springframework.lang.Nullable;
import org.springframework.util.ErrorHandler;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -18,11 +20,17 @@ public class AzureListenerContainerTestFactory implements ListenerContainerFacto

private final Map<String, MessageListenerTestContainer> listenerContainers = new LinkedHashMap<>();
private boolean autoStartup = true;
@Nullable
private ErrorHandler errorHandler;

public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

public List<MessageListenerTestContainer> getListenerContainers() {
return new ArrayList<>(this.listenerContainers.values());
}
Expand All @@ -35,6 +43,9 @@ public MessageListenerTestContainer getListenerContainer(String id) {
public MessageListenerTestContainer createListenerContainer(AzureListenerEndpoint endpoint) {
MessageListenerTestContainer container = new MessageListenerTestContainer(endpoint);
container.setAutoStartup(this.autoStartup);
if (errorHandler != null) {
container.setErrorHandler(errorHandler);
}
this.listenerContainers.put(endpoint.getId(), container);
endpoint.setupListenerContainer(container);
return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import com.azure.spring.messaging.listener.DefaultAzureMessageHandler;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -57,6 +59,14 @@ public void fullConfigurableConfiguration() {

}

@Test
@Override
public void errorHandlerConfiguration() {
ConfigurableApplicationContext context =
new AnnotationConfigApplicationContext(EnableAzureMessagingErrorHandlerConfig.class, ErrorHandlerBean.class);
testErrorHandlerConfiguration(context);
}

@Override
@Test
public void customConfiguration() {
Expand Down Expand Up @@ -316,6 +326,29 @@ SubscribeByGroupOperation subscribeByGroupOperation() {
}
}

@Configuration
@EnableAzureMessaging
static class EnableAzureMessagingErrorHandlerConfig {

@Bean
public AzureListenerContainerTestFactory errorHandlerListenerContainerFactory(ObjectProvider<ErrorHandler> errorHandler) {
AzureListenerContainerTestFactory azureListenerContainerTestFactory =
new AzureListenerContainerTestFactory();
azureListenerContainerTestFactory.setErrorHandler(errorHandler.getIfUnique());
return azureListenerContainerTestFactory;
}

@Bean
SubscribeByGroupOperation subscribeByGroupOperation() {
return mock(SubscribeByGroupOperation.class);
}

@Bean
public ErrorHandler customErrorHandler() {
return t -> t.getMessage();
}
}

@Component
@Lazy
static class LazyBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.azure.spring.messaging.container.MessageListenerContainer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.ErrorHandler;

/**
* @author Warren Zhu
Expand All @@ -26,6 +28,9 @@ public class MessageListenerTestContainer implements MessageListenerContainer, I

private boolean destroyInvoked;

@Nullable
private ErrorHandler errorHandler;

MessageListenerTestContainer(AzureListenerEndpoint endpoint) {
this.endpoint = endpoint;
}
Expand Down Expand Up @@ -130,4 +135,13 @@ public void setDestination(String destination) {
public void setGroup(String group) {

}

@Nullable
public ErrorHandler getErrorHandler() {
return errorHandler;
}

public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## 2.14.0-beta.1 (Unreleased)

### Features Added

- Support error handler for `@AzureMessageListener`.
### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## 2.14.0-beta.1 (Unreleased)

### Features Added

- Support error handler for `@AzureMessageListener`.
### Breaking Changes

### Bugs Fixed
Expand Down
Loading

0 comments on commit 8779723

Please sign in to comment.