Do you want your ad here?

Contact us to get your ad seen by thousands of users every day!

[email protected]

Structured Concurrency is More Than ShutdownOnFailure

  • July 11, 2024
  • 9021 Unique Views
  • 3 min read
Table of Contents
1️⃣ Throttling2️⃣ Circuit breaker3️⃣ Default value on failure4️⃣ Critical tasks5️⃣ List conversion6️⃣ More ideasConclusion

Since Java 21, structured concurrency has been added as a preview feature. Structured concurrency is a way to manage sub-tasks that are run in parallel within a given scope.

If you've ever seen a presentation about structured concurrency, you've probably seen the use of the ShutdownOnSuccess or ShutdownOnFailure classes.
These classes will stop the scope and the still running sub-tasks within that scope when one of the sub-task succeeds or fails.

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
     var student = scope.fork(() -> getStudent(studentID)); 
     var grades = scope.fork(() -> getGrades(studentID));
            
    scope.join();          
    scope.throwIfFailed();

    return student.get().getName() + " " + grades.get().getAverage();
} catch (Exception ex) {
    return ex.getMessage();
}

In project Virtually, I've extended the task scope to offer new possibilities.
Virtually is an open-source project under the Apache License 2.0 to help projects to migrate to virtual threads friendly code.

1️⃣ Throttling

Virtual threads are great, you can create and run million of threads. With task scope, it's easy to make parallel calls to another web service or database but can it handle the load?
You may not want to DDOS your system and send thousands of calls at the same time.
In EnhancedTaskScope, you can use the setMaxConcurrentTasks to specify the maximum of execution at the same time within this StructuredTaskScope. If the maximum is reached, tasks will wait until another sub-task finishes.

2️⃣ Circuit breaker

If suddenly many sub-tasks fail within the scope at the same time, it is more likely a problem somewhere and you may want to stop executing sub-tasks within that scope as there are very likely to fail too.
EnhancedTaskScope has a method setMaxConsecutiveFails. When consecutive failures have reached the maximum the whole task scope will stop.

3️⃣ Default value on failure

You may not want a task to fail and provide a default value if the submitted task fails.
EnhancedTaskScope has a forkWithDefault(Callable task, U defaultValue). This will never cause the sub-task to fail by providing a default return value if the Callable fails.

4️⃣ Critical tasks

In the provided ShutdownOnFailure scope, all tasks are critical and will fail the scope if one task fails.
Among the sub-tasks submitted, some may be more critical than others. EnhancedTaskScope provides a forkCritical method that will fail the scope if the submitted task fails.

5️⃣ List conversion

Quite often parallel calls within a task scope are done for converting a list. For example, you have a list of student IDs and would like to get the students information.
ListTaskScope has been added to Virtually where you can provide a mapper to convert a list to another list using structured concurrency.
As ListTaskScope extends EnhancedTaskScope, you also benefits of the previous features.

6️⃣ More ideas

EnhancedTaskScope and ListTaskScope can be extended to provide more features.
For example:

  • You may want to do throttling or circuit breaker based on a key, like the web service name. So this would work from multiple task scopes.
  • You may want do logging or monitoring inside your task scope.
  • If you're converting a list, you may want to do caching in your task scope in case the input list contains same values.
  • You may want to execute a lambda to get the default value

Conclusion

StructuredTaskScope is a good class to extend to provide extended features when you want to execute multiple tasks in virtual threads.

// A small demo to finish:
void listTaskScope() {
    List<Product> products = ShopFactory.createManyProducts(15_000);
    CallableFunction<Product, Double> productToPrice = (Product p) -> priceService.retreivePrice(p.id());
    try (ListTaskScope<Product, Double> scope = new ListTaskScope(productToPrice)) {
        scope.setMaxConsecutiveFails(50);
        scope.setMaxConcurrentTasks(1_000);
        for (Product product : products) {
            scope.convert(product);
        }
        Map<Product, Double> productWithPrices = scope.getResultsAsMap();
        List<Double> prices = scope.getResultsAsList();
        System.out.println("Size: " + productWithPrices.size() + " & " + prices.size());
    }
}


Do you want your ad here?

Contact us to get your ad seen by thousands of users every day!

[email protected]

Comments (2)

Highlight your code snippets using [code lang="language name"] shortcode. Just insert your code between opening and closing tag: [code lang="java"] code [/code]. Or specify another language.

Java Weekly, Issue 551 | Baeldung

6 months ago

[…] >> Structured Concurrency is More Than ShutdownOnFailure [foojay.io] […]

Highlight your code snippets using [code lang="language name"] shortcode. Just insert your code between opening and closing tag: [code lang="java"] code [/code]. Or specify another language.

Foo avatar

Foo

6 months ago

fork in EnhancedTaskScope does not guarantee rate-limit: tryAcquire is non-blocking and the task will start immediately if it returns false. Semaphore acquire should be called within a wrapped Callable.

Highlight your code snippets using [code lang="language name"] shortcode. Just insert your code between opening and closing tag: [code lang="java"] code [/code]. Or specify another language.

Subscribe to foojay updates:

https://foojay.io/feed/
Copied to the clipboard