티스토리 뷰

스프링을 사용하다보면 @Transactional과 함께 자주 사용하는 애노테이션중에 하나가 바로 @Async 일 것이다. 스프링은 애플리케이션에서 메소드의 비동기 수행이 필요한 경우, @Async 애노테이션을 통해 애플리케이션 코어 로직을 수정하지 않고도 AOP를 통해 메소드를 비동기 처리로 전환할 수 있다.

 

오늘은 이 스프링 비동기 실행 및 이를 위해 필요한 스레드풀 관련해서 포스팅하려 한다.

아래 Spring 코드는 5.x 기반
주의. 스프링 프레임워크 코드 및 이미지가 많아 스크롤 압박은 있을 수 있으나 내용은 실제로 그렇게 길지 않습니다.

Spring Async 설정 및 메소드 비동기 수행

스프링은 일련의 작업을 수행하는 메소드를 간단하게 비동기로 처리할 수 있게끔 프레임워크단에서 AOP로 비동기 처리 기능을 구현하여 제공하는데, 이 비동기 처리를 하기 위해서 스프링은 스레드풀을 이용해 별도 스레드를 생성 및 활용한다.
스프링은 이 스레드풀을 Java의 TaskExecutor 인터페이스를 기반으로 하여 여러 구현체를 구현하고 있다. java 5부터 제공되는 TaskExecutor는 애플리케이션 개발자가 비동기 작업을 함에 있어 스레드 관리에 신경쓰지 않아도 된다.

 

비동기 수행 AOP가 적용되는 과정을 코드와 함께 살펴보자.

기본적으로 해당 기능을 사용하려면 @EnableAsync을 통해 Async 기능을 활성화시키고, 적용을 원하는 클래스의 메소드에 @Async 애노테이션을 적용해야한다.

코드는 스프링 프록시를 기반으로 추적해보겠다.
(그리고 당연한 말이지만, Async AOP를 사용할 때, 해당 애플리케이션이 스프링AOP를 활용한다면 프록시 객체 생성을 위해 @Async를 원하는 클래스는 상속가능하고 해당 메소드는 public 이여야 한다.)

 

@EnableAsync의 내부를 보면 AsyncConfigurationSelector 클래스를 Import하고 있고, adviceMode에 따라 아래와 같이 메서드의 비동기 수행을 적용시켜줄 Configuration 클래스를 import하게 된다.
아래 ProxyAsyncConfiguration 클래스를 보면 AsyncAnnotationBeanPostProcessor스프링 빈을 등록하고 있다(BeanPostProcssor의 일종). 이 BeanPostProcessor가 스프링 애플리케이션이 시작되고 애플리케이션 레벨의 스프링 빈이 생성 및 후처리를 하는 과정에서 해당 빈의 public 메서드에 @Async가 존재하는 빈을 찾아 AOP Proxy로 wrapping 및 비동기 수행 AOP를 위한 AsyncAnnotationAdvisor를 추가하게 된다.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
}

// -------------------------------------------------------------------

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }

}

// -------------------------------------------------------------------

/**
 * {@code @Configuration} class that registers the Spring infrastructure beans necessary
 * to enable proxy-based asynchronous method execution.
 */
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata wa고 not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }

}

// -------------------------------------------------------------------

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
    @Override

    public void setBeanFactory(BeanFactory beanFactory) {
        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    // ..
        this.advisor = advisor;
    }

  // ...

}

 

위 configuration을 통해 우리가 정의하고 @Async를 추가한 스프링 빈에 비동기 수행이 가능해지게 된다.
좀 더 상세히는 해당 스프링 빈 클래스를 상속받은 CglibAopProxy 객체에 실제 메소드 호출 전 후에 수행될 수 있는 부가기능 관련 정보를 저장하는데, DynamicAdvisedInterceptor라는 AOP 전용 콜백에 AsyncAnnotationAdvisor가 적재된다. 이 advisor는 메소드 수행을 intercept해서 Executor를 통해서 비동기 처리할 수 있게 만드는 advice인 AnnotationAsyncExecutionInterceptor가 세팅된다.

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

    private Advice advice;

    private Pointcut pointcut;

    public AsyncAnnotationAdvisor(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

        Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
        asyncAnnotationTypes.add(Async.class);
        try {
            asyncAnnotationTypes.add((Class<? extends Annotation>)
                    ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
        }
        catch (ClassNotFoundException ex) {
            // If EJB 3.1 API not present, simply ignore.
        }
        this.advice = buildAdvice(executor, exceptionHandler);
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }


    protected Advice buildAdvice(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

        AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
        interceptor.configure(executor, exceptionHandler);
        return interceptor;
    }
}

메소드 call intercept 및 비동기 처리 AOP 기능 수행은 AsyncExecutionInterceptor에서 진행된다.
큰 흐름은 TaskExecutor를 찾고, 람다를 통해 실제 메소드 콜을 callable 객체로 만들고, 해당 작업을 submit하는 순이다.
executor 결정시 @Async 설정시 지정한 TaskExecutor 빈이나 이나 지정하지 않았을 경우 TaskExecutor 빈 중 우선 생성된 빈을 BeanFactory로 부터 가져와 사용하고, 만약 애플리케이션에서 Task Executor 빈을 아예 정의하지 않았다면 default로 SimpleAsyncTaskExecutor(pooling을 전혀 하지않고 계속 스레드를 생성/해제하는)를 사용한다고 알고 있었는데, boot에서 테스트를 하다보니 설정이 되지않은 ThreadPoolTaskExecutor를 사용하고 있다. (이부분은 좀 블랙박스)

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

    public Object invoke(final MethodInvocation invocation) throws Throwable {
    // ...

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    // ...

        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };

        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

}

// -------------------------------------------------------------------

public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {

    @Nullable
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                }
                catch (Throwable ex) {
                    throw new CompletionException(ex);
                }
            }, executor);
        }
        else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }
        else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        }
        else {
            executor.submit(task);
            return null;
        }
    }
}

doSubmit내에서 submit을 하게되면 이제는 각 TaskExecutor 구현체의 영역이고 각 구현 방식에 따라 concurrency 제어를 하게 된다.

 


ThreadPoolTaskExecutor

Async 처리시 TaskExecutor의 구현체로 ThreadPoolTaskExecutor를 사용하면 스레드 풀링을 통해서 concurrency와 함께 리소스 효율을 볼 수 있다. ThreadPoolTaskExecutor는 내부적으로 java의 ThreadPoolExecutor를 wrapping하여 호출하고 있으므로 실제 어떻게 pooling이 되는지는 ThreadPoolExecutor 동작이라고 보면 될 것이다.

 

❗ThreadPoolTaskExecutor의 동작과 관련해서 기억해둬야할 속성은 아래와 같다.

  • corePoolSize : 풀이 항상 유지하고 있는 스레드의 최수 갯수. 리소스 해제X (단, allowCoreThreadTimeOut 속성이 활성화되어 있다면 해당 스레드들도 해제됨. 하지만 default=false)
  • maxPoolSize : 풀이 생성할 수 있는 스레드의 최대 갯수.
  • queueCapacity : task가 제출되고 스레드에 의해 수행되기 전 까지 대기하는 큐의 최대 용량.
  • keepAliveSeconds : corePoolSize를 초과하는 idle 스레드들이 해제되는 timeout

❗ThreadPoolTaskExecutor의 동작을 요약하자면 아래와 같다.

corePoolSize만큼 요청이 들어오면 그만큼 메소드 콜시 해당 task를 담당할 thread가 생성되어 처리를 시작하고,
corePoolSize 이상 요청이 들어오면 queue에 적재(현재 idle한 thread가 존재하지 않고, 신규 생성도 불가).
메소드 콜이 쌓여 queue 사이즈를 초과하게 되면, 그때부터의 maxPoolSize까지 추가로 thread를 생성해 queue에 적재되지 않은 초과된 task를 할당받아 수행함.
queue가 해소되고 idle 상태가 된 maxPoolSize - corePoolSize 만큼의 thread는 keepAliveSeconds가 지나면 리소스 해제가 된다.

⚠️ 스프링에서 Async 처리를 위해 TaskExecutor 빈 정의시 흔히들 실수 하는 것이 maxPoolSize만 지정하는 것이다.

@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
  ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  threadPoolTaskExecutor.setMaxPoolSize(100);
  return threadPoolTaskExecutor;
}

이렇게 생성해버리면 maxPoolSize를 제외한 나머지 속성들은 default로 남아있게 되는데,
corePoolSize = 1, maxPoolSize = 100, queueCapacity = (2^31) - 1로 설정이 되고, queue에 대기중인 task가 약 21억을 초과하지 않으면 corePoolSize인 1개의 스레드만으로 비동기 처리를 수행하게 된다.

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
        implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

    private int corePoolSize = 1;
    private int maxPoolSize = Integer.MAX_VALUE;
    private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;
}

사실 좀 헷갈릴만 한 것 같다. 스레드는 아니지만 pooling 개념에서 애플리케이션 개발자에게 익숙한 db connection pool의 동작 같은 경우는 initial/idle/max 커넥션의 수를 지정을 해두고, 커넥션이 필요한 경우 max까지 우선적으로 생성을 해 처리한다.(물론 여기엔 큐가 없긴 하다)
일단 우리가 지정한 최대 thread 갯수 만큼 동시적으로 처리를 가능하게 하고, 그 후 부터 밀리는 요청들을 queue에 쌓으면서 하나하나 뽑아 처리를 하는게 더 리소스 처리 관점에서 낫지 않을까하는 생각인데, 여러 글을 확인해보면 많은 사람들도 그렇게 생각하는 듯하다.

 

이상 Async와 ThreadPoolTaskExecutor에 대해 간단히 정리를 마치겠다.

참고

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함