Spring ก็เร็วได้! มาเรียก Method ใน Spring แบบ Parallel ด้วย @Async กันเถอะ
สมมติว่าเรามีโจทย์ที่ต้องยิง HTTP Request เพื่อดึง resource มาจาก Web service อื่นหลายๆ ที่ ถ้าเราไม่คิดอะไรเราก็อาจจะเขียนโค้ดให้ ยิงทีละตัวแบบ Synchronous ไปได้ แต่ถ้าแต่ละ Request ไม่ได้ขึ้นต่อกัน เราจะมีตัวเลือกมากขึ้นผ่านการยิงแบบ Asynchronous ครับ ซึ่งผมเพิ่งรู้ว่า Java มันทำได้
Example
สมมติว่าเรามี Service ที่ไว้ยิง HTTP Request ไปที่ API อื่นแบบง่ายๆ หน้าตาประมาณนี้
@Service
public class ActivityService {
RestTemplate restTemplate = new RestTemplate();
public ResponseSecond getEndpointOne() {
String url = "http://localhost:8000/1";
ResponseSecond response = restTemplate.getForObject(url, ResponseSecond.class);
return response;
}
}
และมี Controller ที่เรียกใช้ Service นี้หน้าตาประมาณนี้
@RestController
public class ActivityController {
@Autowired
ActivityService activityService;
@GetMapping(path = "/")
public ResponseEntity<String> getActivity() throws Throwable {
try {
ResponseSecond one = activityService.getEndpointOne();
ResponseSecond two = activityService.getEndpointOne();
ResponseSecond three = activityService.getEndpointOne();
String response = "";
if (one != null && two != null && three != null) {
response = one.getMessage() + " " + two.getMessage() + " " + three.getMessage();
}
return ResponseEntity.ok(response);
} catch (Exception e) {
throw e.getCause();
}
}
ก็ดูทำงานได้ปกติใช่มั้ยครับ แต่สิ่งที่เกิดคือ มันจะทำงานแบบ Synchronous ก็คือ getEndpointOne แต่ละตัวต้องถูกทำให้เสร็จก่อนที่จะเรียกตัวถัดไป ซึ่งสมมติว่าแต่ละ Endpoint ต้องทำงาน 1 วิ นั่นคือ 3 วิเลยกว่า controller นี้จะวิ่งจนครบ loop
ทำให้มัน Async เถอะ
อย่างแรกที่เราต้องแก้คือ Service ครับ โดยเราจะใช้ความสามารถของ Annotation ชื่อ @Async มาทำให้ Method ของเรานั้นสามารถที่จะเรียกแบบ Asynchronous ได้ แต่มีข้อแม้สองข้อครับคือ
- Method ที่เราจะทำให้เป็น Async นั้นต้องเป็น public method
- Method ที่เรียกใช้งาน Async Method นั้นต้องอยู่คนละ Class กันครับ เพราะ Async ทำ self-invocation ไม่ได้
พอรู้แบบนี้แล้ว ขั้นแรกคือเราต้องแปลง method ที่เราจะทำให้มันเป็น async ให้หน้าตาประมาณนี้ครับ
@Async
public CompletableFuture<ResponseSecond> getEndpointOne() {
String url = "http://localhost:8000/1";
ResponseSecond response = restTemplate.getForObject(url, ResponseSecond.class);
return CompletableFuture.completedFuture(response);
}
จะเห็นว่ามีสิ่งที่เพิ่มขึ้นมา 3 อย่างคือ
- @Async ที่เป็นการระบุว่า Method นี้สามารถเรียกใช้แบบ Asynchronous ได้
- Return type signature จะถูกครอบด้วย
CompletableFuture<T>
ครับ - Return value จะต้องเพิ่ม
CompletableFuture.completedFuture()
ครอบไปครับ
ในส่วนของ method ที่เรียกใช้งาน Async method นั้นจะเปลี่ยนไปหน้าตาประมาณนี้ครับ
public ResponseEntity<String> getAsync() throws Throwable {
try {
CompletableFuture<ResponseSecond> one = activityService.getEndpointOne();
CompletableFuture<ResponseSecond> two = activityService.getEndpointOne();
CompletableFuture<ResponseSecond> three = activityService.getEndpointOne();
CompletableFuture.allOf(one, two, three);
String response = "";
if (one != null && two != null && three != null) {
response = one.get().getMessage() + " " + two.get().getMessage() + " " + three.get().getMessage();
}
return ResponseEntity.ok(response);
} catch (Exception e) {
throw e.getCause();
}
}
จะเห็นว่าในส่วนที่เรียก method จะเปลี่ยน return type เป็น CompletableFuture<T>
แทนครับ และจะมีโค้ดเพิ่มขึ้นมาคือ CompletableFuture.allOf(<async method>)
ซึ่งจะทำหน้าที่เรียกและรอ return future value ให้เราจนกว่า async method ที่เรียกข้างในทำงานเสร็จทั้งหมดครับ ซึ่งหลังจากนั้นเราสามารถใช้ method get() เรียก CompletableFuture เพื่อดึงค่าออกมาจาก async method ได้เลย
แต่ถ้าเราเรียกตอนนี้จะเห็นว่ามันยังเรียกแต่ละ method แบบ sychronous กันอยู่ทำให้ใช้เวลารวมทั้งหมดอยู่ 3 วิ (ถ้าแต่ละ method ใช้ 1 วิ) ซึ่งเกิดจากการที่ Executor ของเรายังเป็น applicationTaskExecutor ซึ่งรันอยู่บน single thread อยู่ครับ โดยเราสามารถเปลี่ยนไปใช้ Executor แบบ Async โดยการเพิ่ม Async Config ตามตัวอย่างข้างล่างได้ครับ
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
return executor;
}
}
ซึ่งพอถึงจุดนี้ใน Service ของเราก็จะทำงานผ่าน asyncExecutor แล้วครับ แต่ถ้าเราสั่งให้ controller เราทำงานจะเห็นว่ามันยังทำงานอยู่ที่ 3 วิ อยู่ ทำไมมันยังไม่เร็วขึ้นนะ?
corePoolSize vs maxPoolSize
สาเหตุที่มันยังทำงานแบบ synchronous อยู่ทั้งๆ ที่เราใช้ asyncExecutor เป็นเพราะว่ามันยังใช้ Thread เดียวอยู่ครับ ซึ่งถ้าเราไปดู Implementation ของ ThreadPoolTaskExecutor จะเห็นว่าเซต corePoolSize อยู่ที่ 1 เลยทำให้มันเป็น single thread แต่เราสามารถเพิ่มจำนวน thread ที่ stand by อยู่ได้ผ่าน method: setCorePoolSize ได้ โดยในที่นี้จะเซตเป็น 3 เท่าจำนวน tasks ที่เราจะแยก Thread ออกสูงสุดครับ
COPY
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
return executor;
}
}
อีกส่วนคือ maxPoolSize ซึ่งเป็นตัวกำหนดว่า Java จะแตก Thread ได้มากที่สุดกี่ตัวครับ ซึ่ง default จะเป็นเท่ากับจำนวน Integer เลย ซึ่งเราควรจะกำหนดไว้เพื่อกันไม่ให้ Java แตก Thread ออกมามากเกินไปครับ ในที่นี้จะกำหนดเป็น 5
ซึ่งพอถึงจุดนี้ตัว Controller นี้จะทำงานอยู่ที่ประมาณ 1 วิ + นิดหน่อยครับ แล้วถ้าเราดูที่ service ปลายทางได้เราจะเห็นว่ามันยิง HTTP Request ออกมาแทบจะเวลาเดียวกันแล้วครับ
Reference
- เริ่มที่บล็อกนี้ได้ จะพาทัวร์ CompletableFuture API แบบง่ายๆ แต่ถ้าพูดถึง Thread Pool ไว้สั้นมาก จนอาจจะเผลอเลื่อนเลยไป https://medium.com/sipios/how-to-make-parallel-calls-in-java-springboot-application-and-how-to-test-them-dcc27318a0cf
- บล็อกนี้พาทัวร์ CompletableFutureAPI แบบ Advance กว่าลิ้งค์ข้างบน https://www.baeldung.com/java-completablefuture
- บล็อกนี้จะคล้ายๆ ลิ้งค์แรก แต่มีสิ่งที่เลอค่าตรง Limitation ของ @Async ที่สองบล็อกก่อนหน้าไม่ได้พูดถึง https://www.baeldung.com/spring-async
- บล็อกนี้พาไปทัวร์ Future API (เข้าใจว่าโบราณกว่า CompletableFuture) แต่จุดที่น่าสนใจคือพาเรามารู้จัก Thread Pools นี่แหละ https://www.baeldung.com/java-future
- บล็อกนี้อธิบายว่าเราควรเลือกใช้ corePoolSize กับ maxPoolSize ยังไง https://www.baeldung.com/java-threadpooltaskexecutor-core-vs-max-poolsize