Friends of OpenJDK Today

Structured Concurrency is More Than ShutdownOnFailure

July 11, 2024

ShutdownOnSuccess and ShutdownOnFailure are often the task scopes shown as example when referring to Java 21 structured concurrency. Let's see other possibilities that can be done with this API.

Author(s)

  • Avatar photo
    Anthony Goubard

    Java developer since 1995, I've seen, in so many years, many technologies: from J2ME to the cloud, from sockets to GraphQL. Working now as freelance for clients or for my ... Learn more

Since Java 21, structured concurrency has been added as a preview feature. Structured concurrency is a way to manage sub-tasks that are run in parallel within a given scope.

If you've ever seen a presentation about structured concurrency, you've probably seen the use of the ShutdownOnSuccess or ShutdownOnFailure classes.
These classes will stop the scope and the still running sub-tasks within that scope when one of the sub-task succeeds or fails.

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
     var student = scope.fork(() -> getStudent(studentID)); 
     var grades = scope.fork(() -> getGrades(studentID));
            
    scope.join();          
    scope.throwIfFailed();

    return student.get().getName() + " " + grades.get().getAverage();
} catch (Exception ex) {
    return ex.getMessage();
}

In project Virtually, I've extended the task scope to offer new possibilities.
Virtually is an open-source project under the Apache License 2.0 to help projects to migrate to virtual threads friendly code.

1️⃣ Throttling

Virtual threads are great, you can create and run million of threads. With task scope, it's easy to make parallel calls to another web service or database but can it handle the load?
You may not want to DDOS your system and send thousands of calls at the same time.
In EnhancedTaskScope, you can use the setMaxConcurrentTasks to specify the maximum of execution at the same time within this StructuredTaskScope. If the maximum is reached, tasks will wait until another sub-task finishes.

2️⃣ Circuit breaker

If suddenly many sub-tasks fail within the scope at the same time, it is more likely a problem somewhere and you may want to stop executing sub-tasks within that scope as there are very likely to fail too.
EnhancedTaskScope has a method setMaxConsecutiveFails. When consecutive failures have reached the maximum the whole task scope will stop.

3️⃣ Default value on failure

You may not want a task to fail and provide a default value if the submitted task fails.
EnhancedTaskScope has a forkWithDefault(Callable task, U defaultValue). This will never cause the sub-task to fail by providing a default return value if the Callable fails.

4️⃣ Critical tasks

In the provided ShutdownOnFailure scope, all tasks are critical and will fail the scope if one task fails.
Among the sub-tasks submitted, some may be more critical than others. EnhancedTaskScope provides a forkCritical method that will fail the scope if the submitted task fails.

5️⃣ List conversion

Quite often parallel calls within a task scope are done for converting a list. For example, you have a list of student IDs and would like to get the students information.
ListTaskScope has been added to Virtually where you can provide a mapper to convert a list to another list using structured concurrency.
As ListTaskScope extends EnhancedTaskScope, you also benefits of the previous features.

6️⃣ More ideas

EnhancedTaskScope and ListTaskScope can be extended to provide more features.
For example:

  • You may want to do throttling or circuit breaker based on a key, like the web service name. So this would work from multiple task scopes.
  • You may want do logging or monitoring inside your task scope.
  • If you're converting a list, you may want to do caching in your task scope in case the input list contains same values.
  • You may want to execute a lambda to get the default value

Conclusion

StructuredTaskScope is a good class to extend to provide extended features when you want to execute multiple tasks in virtual threads.

// A small demo to finish:
void listTaskScope() {
    List<Product> products = ShopFactory.createManyProducts(15_000);
    CallableFunction<Product, Double> productToPrice = (Product p) -> priceService.retreivePrice(p.id());
    try (ListTaskScope<Product, Double> scope = new ListTaskScope(productToPrice)) {
        scope.setMaxConsecutiveFails(50);
        scope.setMaxConcurrentTasks(1_000);
        for (Product product : products) {
            scope.convert(product);
        }
        Map<Product, Double> productWithPrices = scope.getResultsAsMap();
        List<Double> prices = scope.getResultsAsList();
        System.out.println("Size: " + productWithPrices.size() + " & " + prices.size());
    }
}


Topics:

Related Articles

View All

Author(s)

  • Avatar photo
    Anthony Goubard

    Java developer since 1995, I've seen, in so many years, many technologies: from J2ME to the cloud, from sockets to GraphQL. Working now as freelance for clients or for my ... Learn more

Comments (2)

Your email address will not be published. Required fields are marked *

Highlight your code snippets using [code lang="language name"] shortcode. Just insert your code between opening and closing tag: [code lang="java"] code [/code]. Or specify another language.

Save my name, email, and website in this browser for the next time I comment.

Java Weekly, Issue 551 | Baeldung

[…] >> Structured Concurrency is More Than ShutdownOnFailure [foojay.io] […]

Foo

fork in EnhancedTaskScope does not guarantee rate-limit: tryAcquire is non-blocking and the task will start immediately if it returns false.

Semaphore acquire should be called within a wrapped Callable.

Subscribe to foojay updates:

https://foojay.io/feed/
Copied to the clipboard