Do you want your ad here?

Contact us to get your ad seen by thousands of users every day!

[email protected]

How to Diagnose and Mitigate Pinning in Java’s Virtual Thread Execution

  • October 10, 2023
  • 7036 Unique Views
  • 4 min read
Table of Contents
What is Pinning?Monitoring Pinning

In our last article, we highlighted the impressive performance gains achieved through the use of virtual threads. However, upon diving deeper into the code, we discovered an issue caused by the jsoup library: a phenomenon known as pinning. But before we delve into solutions, let's take a moment to understand what pinning actually is.

What is Pinning?

In the context of virtual threads, pinning refers to the condition where a virtual thread is "stuck" to its carrier thread (the platform thread on which it runs).

When a virtual thread is pinned, it cannot be unmounted from its carrier, effectively monopolizing that carrier thread for the duration of the pinning.

When Does Pinning Occur?

Pinning occurs in two main scenarios:

  1. Synchronized Blocks or Methods: When a virtual thread enters a synchronized block or method, it becomes pinned to its carrier thread. This means that during the execution of that block or method, the carrier thread cannot be reused for other tasks.
  2. Native Methods or Foreign Functions: When a virtual thread executes a native method or a foreign function, it also becomes pinned.

Why is Pinning a Limitation?

The essence of virtual threads is their ability to be unmounted from carrier threads when they perform blocking operations, thereby freeing up the carrier threads for other tasks. Pinning negates this advantage in the following ways:

  1. Reduced Throughput: Because a pinned virtual thread occupies its carrier thread, other virtual threads have to wait for free carrier threads, reducing the system's overall throughput.
  2. Resource Inefficiency: Carrier threads are a finite resource tied to system capabilities. Having them blocked due to pinned virtual threads is an inefficient use of these resources.
  3. Scalability Concerns: If a significant portion of your virtual threads becomes pinned due to frequent use of synchronized blocks or native methods, you might run into scalability issues.

Mitigating Pinning

To alleviate the effects of pinning, consider the following strategies:

  1. Use ReentrantLocks: Instead of synchronized blocks or methods, use ReentrantLock from java.util.concurrent.locks as it allows the virtual thread to be unmounted when blocked.
  2. Code Review: Regularly review your code to identify and minimize the use of synchronized methods or blocks and native methods in the context of virtual threads.

Monitoring Pinning

So, you may be wondering, how do you diagnose this pinning issue in your own code? One way to get to the bottom of this problem is by utilizing specific JVM flags.

To identify pinning, you can use the JVM flag -Djdk.tracePinnedThreads=full when executing your program.

Let's go ahead and run the previous program with this flag enabled.

Upon doing so, you'll observe certain outputs that shed light on the issue at hand.

 Task :Crawler.main()
Thread[#421,ForkJoinPool-1-worker-51,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:185)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:631)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2648)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67)
    java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:408)
    java.base/sun.nio.ch.Poller.pollIndirect(Poller.java:137)
    java.base/sun.nio.ch.Poller.poll(Poller.java:102)
    java.base/sun.nio.ch.Poller.poll(Poller.java:87)
    java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:175)
    java.base/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:280)
    java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:304)
    java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346)
    java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796)
    java.base/java.net.Socket$SocketInputStream.read(Socket.java:1099)
    java.base/java.net.Socket$SocketInputStream.read(Socket.java:1093)
    java.base/sun.security.ssl.SSLSocketInputRecord.deplete(SSLSocketInputRecord.java:509)
    java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.readLockedDeplete(SSLSocketImpl.java:1216)
    java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.deplete(SSLSocketImpl.java:1191)
    java.base/sun.security.ssl.SSLSocketImpl.bruteForceCloseInput(SSLSocketImpl.java:808)
    java.base/sun.security.ssl.SSLSocketImpl.duplexCloseOutput(SSLSocketImpl.java:664)
    java.base/sun.security.ssl.SSLSocketImpl.close(SSLSocketImpl.java:584)
    java.base/sun.net.www.http.HttpClient.closeServer(HttpClient.java:1139)
    java.base/sun.net.www.protocol.https.HttpsClient.closeServer(HttpsClient.java:442)
    java.base/sun.net.www.http.KeepAliveCache.put(KeepAliveCache.java:196)
    java.base/sun.net.www.protocol.https.HttpsClient.putInKeepAliveCache(HttpsClient.java:679)
    java.base/sun.net.www.http.HttpClient.finished(HttpClient.java:450)
    java.base/sun.net.www.http.KeepAliveStream.close(KeepAliveStream.java:100)
    java.base/sun.net.www.MeteredStream.justRead(MeteredStream.java:85)
    java.base/sun.net.www.MeteredStream.read(MeteredStream.java:132)
    java.base/java.io.FilterInputStream.read(FilterInputStream.java:119)

If you scrutinize the stack trace, one line will likely grab your attention:

java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:185)

This line indicates that our virtual thread is, in fact, getting pinned to carrier threads. This isn't ideal for our application's performance. Upon closer inspection, you'll find that this pinning issue arises due to the use of the jsoup library in our code. The culprit could be a `synchronized` block or perhaps some sort of native function call within the library.

To remedy this, consider refactoring your application to work without the jsoup library. By avoiding the use of `synchronized` blocks or native code, we're likely to see a significant performance improvement.

package ca.bazlur.virtualthreads;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Crawler2 implements Runnable {
    private static final Pattern LINK_PATTERN = Pattern.compile("href\\s*=\\s*\"([^\"#]*?)\"", Pattern.CASE_INSENSITIVE);
    private static final ReentrantLock QUEUE_LOCK = new ReentrantLock();
    private static final Condition QUEUE_NOT_EMPTY = QUEUE_LOCK.newCondition();
    private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder().build();
    private static final int MAX_PAGES_TO_SEARCH = 100;
    private final AtomicInteger pageCount;
    private final ConcurrentMap<String, Boolean> visitedPages;
    private final String url;
    private final Queue<String> pageQueue;

    public Crawler2(AtomicInteger pageCount, ConcurrentMap<String, Boolean> visitedPages, String url, Queue<String> pageQueue) {
        this.pageCount = pageCount;
        this.visitedPages = visitedPages;
        this.url = url;
        this.pageQueue = pageQueue;
    }

    @Override
    public void run() {
        if (!visitedPages.containsKey(url) && pageCount.get() < MAX_PAGES_TO_SEARCH) {
            try {
                List<String> links = extractLinks(url);
                List<String> newUrls = links.stream()
                        .filter(nextUrl -> nextUrl.startsWith("http") && !visitedPages.containsKey(nextUrl))
                        .toList();

                visitedPages.put(url, true);
                pageCount.incrementAndGet();

                if (!newUrls.isEmpty()) {
                    pageQueue.addAll(newUrls);
                    signalQueueNotEmpty();
                }

            } catch (IOException | InterruptedException e) {
                System.err.printf("Error occurred while accessing URL '%s', the error: %s\n", url, e.getMessage());
                e.printStackTrace();
            }
        }
    }

    public static List<String> extractLinks(String url) throws IOException, InterruptedException {
        String pageContent = getPageContent(url);
        var links = new HashSet<String>();
        Matcher matcher = LINK_PATTERN.matcher(pageContent);

        while (matcher.find()) {
            links.add(matcher.group(1));
        }

        return new ArrayList<>(links);
    }

    private static String getPageContent(String url) throws IOException, InterruptedException {
        System.out.println("Visiting: " + url);

        HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
        var response = HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString());

        return response.body();
    }

    private static void signalQueueNotEmpty() {
        QUEUE_LOCK.lock();
        try {
            QUEUE_NOT_EMPTY.signalAll();
        } finally {
            QUEUE_LOCK.unlock();
        }
    }


    //Classical Threads
    //Pages crawled: 731
    //Execution time: 9766ms
    //Throughput: 74.85152570141307 pages/sec

    //Virtual threads
    //Pages crawled: 710
    //Execution time: 3119ms
    //Throughput: 227.63706316126962 pages/sec

    public static void main(String[] args) {
        final ConcurrentMap<String, Boolean> visitedPages = new ConcurrentHashMap<>();
        final Queue<String> pageQueue = new LinkedBlockingDeque<>();
        pageQueue.add("https://en.wikipedia.org/wiki/Main_Page");

        long startTime = System.currentTimeMillis();
        AtomicInteger pageCount = new AtomicInteger();

        //try (var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())) {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            while (pageCount.get() <= MAX_PAGES_TO_SEARCH) {
                QUEUE_LOCK.lock();
                try {
                    while (pageQueue.isEmpty()) {
                        QUEUE_NOT_EMPTY.await();
                    }
                    String polledUrl = pageQueue.poll();
                    if (polledUrl != null && !visitedPages.containsKey(polledUrl)) {
                        executor.submit(new Crawler2(pageCount, visitedPages, polledUrl, pageQueue));
                    }
                } finally {
                    QUEUE_LOCK.unlock();
                }
            }

            executor.shutdown();
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Main thread interrupted!");
        }

        long endTime = System.currentTimeMillis();
        long totalTime = endTime - startTime;

        System.out.println("\nPages crawled: " + visitedPages.size());
        System.out.println("Execution time: " + totalTime + "ms");

        double pagesPerSecond = (double) visitedPages.size() / (totalTime / 1000.0);
        System.out.println("Throughput: " + pagesPerSecond + " pages/sec");
    }
}

To fix the pinning issue, we switched from using the jsoup library to using HttpClient. HttpClient is built into Java and is available from version 11 onwards. The best part? It doesn't have the pinning problem we saw with jsoup.

By making this change, we avoid the issues that were slowing down our program, likely making it run faster.

But be careful; this new approach isn't perfect. You might run into an error that says:

Caused by: java.io.IOException: too many concurrent streams

I'll talk more about what this error means in my next article.

Do you want your ad here?

Contact us to get your ad seen by thousands of users every day!

[email protected]

Comments (0)

Highlight your code snippets using [code lang="language name"] shortcode. Just insert your code between opening and closing tag: [code lang="java"] code [/code]. Or specify another language.

No comments yet. Be the first.

Subscribe to foojay updates:

https://foojay.io/feed/
Copied to the clipboard