敲代码的长腿毛欧巴的博客-Lumos 敲代码的长腿毛欧巴Logo

Java异步编程的新星 —— loom项目

异步 并发编程 Java Loom

Java异步编程的新星 —— Loom项目

Loom项目开始于2017年末,在18年底的时候已经达到可初步演示的原型阶段,Loom的开发是为了降低高吞吐量并发程序的编写难度。

当今机器CPU内核和线程数的现状

长久以来,计算机只有一个CPU核心。在这种情况下,还是需要同时运行多个程序。操作系统为了实现并发执行,它通过一些算法来调度CPU,如先来先服务、短作业优先、优先权调度和时间片轮转等。在这个过程中,如果该程序没有在特定的时间范围内完成,它将存储其状态以供以后使用。然后,它运行下一个要运行的程序。好处是,所有这些都可以通过操作系统线程的概念很好地从开发人员中抽象出来,由操作系统进行线程的繁重维护,包括存储执行的状态。但是,线程有两个缺点:

  •  线程具备多种状态(新建、就绪、运行、等待、挂起和完成),并且这些状态拥有转换关系,如下图。
  •  线程有很大的机器资源开销。 图1 线程的6种状态

在当今,现代的操作系统允许拥有M(M可以上万)个线程。现代机器CPU是多核心的并且支持超线程,提供N个核,支持超线程的还会提供N个逻辑线程,N和M差几个数量级。拥有多个CPU内核(无论是物理内核还是逻辑内核)并不会从根本上改变底层机制:M >> N,并且操作系统负责从线程到内核的映射。所以即使计算机在发展进步还是离不开对线程的调度,频繁的上下文切换依旧需要很大的机器资源开销。

当前的并发框架已经很优秀了为什么还要设计Loom?

在Java中线程就是一切,Java被用于编写世界上最大的可伸缩性的程序。可伸缩是Java的一个特性,在JDK 8中引入了lambda管道用来描述,设置为parallel调用多核心进行处理一系列的操作。但是存在一些相对独立并且需要处理的任务,更普遍更难处理,我们称之为并发性。在Web服务器中,大部分请求都独立于其他请求。对于每种请求,我们都需要进行一些解析,查询数据库或向服务发出请求,然后等待结果,进行更多处理并发送响应。这个过程不仅在完成某项工作时不与其他同时发出的HTTP请求配合使用,而且在大多数情况下它根本不关心其他请求在做什么,但它仍与它们竞争处理和I / O资源。这种情况像是是出租车,每辆出租车都有自己的路线和目的地,它行驶并停下来。在同一道路系统上行驶的其他出租车的存在并不会使任何一辆出租车尽快到达目的地(如果有的话,可能会使它减速),但是如果在任何时候只有一辆出租车可以在城市道路上行驶,则不会不只是慢行系统,这将是一个功能失调的人。可以在不使市区拥堵的情况下共享道路的出租车越多,系统越好。从一开始,Java就支持这种工作。Servlet使我们可以编写在屏幕上看起来很简单的代码。这是一个简单的序列-解析,数据库查询,处理,响应-不必担心服务器现在仅处理一个请求还是处理其他一千个请求。 问题在于,线程无法匹配应用程序的自然并发单元(会话,HTTP请求或单个数据库操作)的规模,也无法与现代硬件相匹配的并发规模支持。服务器可以处理一百万个以上的并发连接,但是操作系统无法有效处理数千个活动(非空闲)线程。随着Servlet容器上工作负载的增加和互动中更多的请求,根据利特尔定律[1],操作系统可支持的线程数量相对较少,因而无法扩展其处理的能力,处理请求的平均时间与我们可以同时处理的请求数量成正比。因此,如果我们用线程表示并发的域单元,那么线程早于硬件成为了我们的可伸缩性的瓶颈。Servlet读起来不错,但是伸缩性很差。这不是线程概念的基本限制,而是JDK包装操作系统的线程造成的,完成依赖操作系统的线程。操作系统的线程占用很多的空间,创建它们需要分配资源,并且对其进行调度(即为其分配硬件资源)处理的不够好。他们不是出租车,而是火车。这造成什么之间的线程是一个大的错配意味着做的事情-和他们有效地-计算资源的抽象调度作为一个简单的结构可以做到。几个数量级的不匹配会产生很大的影响。

Loom做的改变

程序员被迫在直接将单元域并发建模为线程与浪费其硬件可以支持的大量吞吐量之间进行选择,或者选择使用其他方式在非常细粒度的级别上实现并发,但放弃了Java平台的优势。无论是硬件还是开发和维护工作,这两种选择都会产生可观的财务成本。Loom旨在消除高效运行并发程序与高效编写,维护和观察程序之间令人沮丧的折衷。它依赖于平台的优势而不是与之抗衡,还取决于异步编程的有效组件的优势。它使您可以使用熟悉的API,以熟悉的风格编写程序,并与平台及其工具(以及与硬件)保持一致,以达到写时和运行时成本之间的平衡,我们希望这将广泛地吸引人。这样做无需更改语言,并且只需对核心库API进行少量更改即可。一个简单的同步Web服务器将能够处理更多请求,而无需更多硬件。Loom做了以下事情:

  1. 忘记线程数

虚拟化线程单元,隐藏对计算机资源的管理。Loom的基础是虚拟线程,由Java进行运行和管理,他和现有的线程不同,不是对操作系统线程的包装,而是由JDK实现的。操作系统线程非常重,因为它们必须支持所有语言和所有工作负载。线程需要具有暂停和恢复计算执行的能力。这需要保留其状态,该状态包括包含当前指令索引的指令指针或程序计数器,以及存储在堆栈中的所有本地计算数据。Loom增加了控制执行,挂起和恢复执行的功能,方法是将其状态而不是作为OS资源而是作为VM已知的Java对象,并在Java运行时的直接控制下进行状态化。Java对象可以安全高效地对各种状态机和数据结构进行建模,因此也非常适合于模型执行。Java运行时知道Java代码是如何利用堆栈的,因此它可以更紧凑地表示执行状态。对执行的直接控制还使我们可以选择更适合我们的工作量的调度程序(普通的Java调度程序)。实际上,我们可以使用可插入的自定义调度程序。因此,Java运行时对Java代码的深入了解使我们能够减少线程的成本。

  1. 从线程迁移到虚拟线程

Thread这个类从JDK 1.0 就已经有了,JDK团队发现Thread一些API很常用,尤其是Thread.currentThread()和ThreadLocal。但是其中很多API 很少用甚至很少有人知道。从Java 5开始,鼓励程序员通过ExecutorServices间接创建和启动线程,使大家更加不会关注Thread中的一些API,这样降级了ThreadAPI的一些乱用。Loom 早期设计思想与一个开源Java协程技术非常相似。这个技术就是 Quasar Fiber 。而现Project Loom 的主要设计开发人员 Ron Pressler 就是来自 Quasar Fiber。公司人员对Loom的开发持两个态度,一个是继续在原来的API上进行维护,另一个是重新设计一个API。JDK 将引入一个新类:java.lang.Fiber。此类与 java.lang.Thread 一起,都成为了 java.lang.Strand 的子类。即线程变成了一个虚拟的概念,有两种实现方法:Fiber 所表示的轻量线程和 Thread 所表示的传统的重量级线程。

##虚拟线程的两种

Quasar Fiber

  使用Quasar Fiber的话需要导入quasar-core核心包,Quasar实现的coroutine[2]的方式与Golang很像,只不过一个是框架级别实现,一个是语言的内置机制。 Quasar里的Fiber其实是一个continuation,他是可以被Quasar定义的scheduler调度,一个continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。Quasar其实是通过修改bytecode来达到这个目的,所以运行Quasar程序的时候,你需要先通过java-agent在运行时修改你的代码,也可以在编译的时候进行织入。 golang的内置了自己的调度器,Quasar则默认使用ForkJoinPool(JDK 8及以上),具有work-stealing[3]功能的线程池来当调度器。Work-stealing非常重要,因为你不清楚哪个Fiber会先执行完,而work-stealing可以动态的从其他的等等队列偷一个context过来,这样可以最大化使用CPU资源。 Quasar会通过java-agent在运行时扫描哪些方法是可以中断的,同时会在方法被调用前和调度后的方法内插入一些continuation逻辑。

new Fiber<>(() -> System.out.println("task-1"))
        .setName("Fiber-task-")
        .start();
Virtual Thread

  默认情况下,虚拟线程是由全局调度程序调度的,其工作线程的数量与CPU内核的数量相同(或由显式设置-Djdk.defaultScheduler.parallelism=N)。然后,将许多虚拟线程调度到少数平台线程上。这称为M:N调度(将M个用户模式线程调度到N个内核线程上,其中M >> N)。 在我们创建虚拟线程时:Thread.builder().virtual()中的virtual默认设置的调度器为窃取调度。 因为窃取调度程序非常适合事务处理和消息传递中涉及的线程,这些线程通常在短时间突发中处理并经常阻塞。 进入到virtual可以看到,如果传入的scheduler为null会赋值默认的调度器。虚拟线程是抢占式的,不是协作式的-它们没有显式的await在调度(任务切换)点进行操作。而是在阻塞I / O或同步时抢占它们。 如果平台线程占用CPU的时间超过某些分配的时间片,则有时会被内核强行抢占。 当活动线程的数量不超过内核的数量,并且只有很少的线程需要大量处理时,分时可以很好地用作调度策略。如果某个线程占用CPU的时间过长,则可以抢先使其他线程响应,然后再次为另一个时间片安排时间。当我们有数百万个线程时,该策略的效率就会降低:如果其中许多线程非常耗CPU, 以至于它们需要分时共享,则我们的数量不足,因此调度策略无法挽救我们。在其他所有情况下,要么窃取工作的调度程序会自动解决零星的CPU占用问题,要么我们可以将有问题的线程作为平台线程运行,并依赖于内核调度程序。因此,JDK中的调度程序目前都没有使用基于时间片的虚拟线程抢占。

/**
 *  virtual的源码
 */
public Builder virtual(Executor scheduler) {
            if (scheduler == null)
                scheduler = VirtualThread.defaultScheduler();
            this.group = null;
            this.scheduler = scheduler;
            this.virtual = true;
            return this;
        }

    /**
     * 创建默认的调度器
     * Creates the default scheduler as ForkJoinPool.
     */
    private static Executor createDefaultScheduler() {
        ForkJoinWorkerThreadFactory factory = pool -> {
            PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
            return AccessController.doPrivileged(pa);
        };
        PrivilegedAction<Executor> pa = () -> {
            int parallelism, maxPoolSize;
            String propValue = System.getProperty("jdk.defaultScheduler.parallelism");
            if (propValue != null) {
                parallelism = Integer.parseInt(propValue);
            } else {
                parallelism = Runtime.getRuntime().availableProcessors();
            }
            propValue = System.getProperty("jdk.defaultScheduler.maxPoolSize");
            if (propValue != null) {
                maxPoolSize = Integer.max(parallelism, Integer.parseInt(propValue));
            } else {
                maxPoolSize = Integer.max(parallelism << 1, 128);
            }
            Thread.UncaughtExceptionHandler handler = (t, e) -> { };
            boolean asyncMode = true; // FIFO
            return new ForkJoinPool(parallelism, factory, handler, asyncMode,
                         0, maxPoolSize, 1, pool -> true, 30, SECONDS);
        };
        return AccessController.doPrivileged(pa);
    }

1.创建一个虚拟线程,不用显示的写start,startVirtualThread已经实现

    public static void startVirtualThread() {
        try {
            Thread.startVirtualThread(() -> System.out.println("hello JDK 16 Loom!"))
            .join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
public static Thread startVirtualThread(Runnable task) {
        Objects.requireNonNull(task);
        var thread = new VirtualThread(null, null, VIRTUAL, task);
        thread.start();
        return thread;
    }

2.使用builder API 创建一个线程

    public static void createVirtualThreadByBuilder() {
        Thread.builder().virtual().task(() -> System.out.println("Thread.builder build")).build().start();
    }

3.使用builder API 创建一个自定义名字的线程

public static void createVirtualThreadByBuilderWithName() {
        Thread.builder()
                .virtual()
                .name("Thread-builder")
                .task(() -> System.out.println("Thread.builder start, thread name: " +   
				Thread.currentThread().getName()))
                .start();
    }

4.使用线程工厂创建一个自定义名字的虚拟线程

 public static void createVirtualThreadByFactoryWithName() {
        ThreadFactory factory = Thread.builder().virtual().name("worker-", 0).factory();
        factory.newThread(() ->
                System.out.println("ThreadFactory thread: " + Thread.currentThread().getName())).start();
    }

5.使用Executor 创建一个虚拟线程

public static void createVirtualThreadWithExecutor() {
        try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {
            executor.execute(() -> System.out.println("newVirtualThreadExecutor 1"));
            executor.execute(() -> System.out.println("newVirtualThreadExecutor 2"));
        }
    }

6.使用executor 创建虚拟线程,并选择一个作为返回结果,其余虚拟线程会被中断

public static void createVirtualThreadWithExecutorDependency() {
        try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {
            Callable<String> task1 = () -> "foo";
            Callable<String> task2 = () -> "bar";
            Callable<String> task3 = () -> "baz";
            String result = executor.invokeAny(List.of(task1, task2, task3));
            System.out.println("invokeAny result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

7.使用submit 提交三个任务,然后延迟填充组合数据

public static void createVirtualThreadWithExecutorWithComplete() {
        try (ExecutorService executor = Executors.newVirtualThreadExecutor()) {
            Callable<String> task1 = () -> "completed foo";
            Callable<String> task2 = () -> "completed bar";
            Callable<String> task3 = () -> "completed baz";
            List<CompletableFuture<String>> cfs = executor.submitTasks(List.of(task1, task2, task3));
            CompletableFuture.completed(cfs)
                    .map(CompletableFuture::join)
                    .forEach(System.out::println);
        }
    }

8.ExecutorService,该服务在截止日期之前在其自己的虚拟线程中运行每个任务。每个任务有自己的deadline时间如果最后期限在执行程序终止之前到期,则执行此代码的线程将被中断,并且所有正在运行的任务都将被取消。 (这将导致虚拟线程被中断)

public static void createVirtualThreadWithExecutorWithDeadLine() {
        Instant deadline = Instant.now().plusSeconds(1);
        try (ExecutorService executor = Executors.newVirtualThreadExecutor().withDeadline(deadline)) {
            executor.submit(() -> {
                try {
                    Thread.sleep(500);
                    System.out.println("withDeadline 500");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });

            executor.submit(() -> {
                try {
                    Thread.sleep(800);
                    System.out.println("withDeadline 800");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });

            executor.submit(() -> {
                try {
                    Thread.sleep(1200);
                    System.out.println("withDeadline 1200");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
    }

9.执行以上方法

public static void main(String[] args){
        startVirtualThread();
        createVirtualThreadByBuilder();
        createVirtualThreadByBuilderWithName();
        createVirtualThreadByFactoryWithName();
        createVirtualThreadWithExecutor();
        createVirtualThreadWithExecutorDependency();
        createVirtualThreadWithExecutorWithComplete();
        createVirtualThreadWithExecutorWithDeadLine();
    }

10.运行效果

hello JDK 16 Loom!
Thread.builder build
newVirtualThreadExecutor 1
newVirtualThreadExecutor 2
ThreadFactory thread: worker-0
Thread.builder start, thread name: Thread-builder
invokeAny result: foo
completed foo
completed bar
completed baz
withDeadline 500
withDeadline 800

##总结 Java Loom通过虚拟化线程的方式追赶上了Go内置的goroutines和通道的并发现管理。Loom解决了Java中线程开销过大,线程和 操作系统线程的严重耦合。Loom 虚拟线程,表现出相同的内存一致性,具有单个工作平台线程的调度程序将使所有内存操作完全排序,而不需要使用锁, 并且将允许使用例如HashMap代替ConcurrentHashMap。通过自行管理虚拟线程解决线程数M >> CPU核心数N的问题,将虚拟结构映射到具体的物理内存中(物理内存,OS线程),Java Loom的虚拟化线程的实现将是Java 的一场革命,期待包含Loom的正式版JDK发布。 ##说明

使用上述的JDK 虚拟线程API需要安装JDK 16预览版,且idea需要升级到2020.3才能正常编译运行。虚拟线程很有可能在JDK 17 LTS 版本上登场。 Quasar Fiber 需要设置Agent代理或者设置编译插件,否则无法运行。

##名词解释 [1] 利特尔定律:在一个稳定的系统(L)中,长期的平均顾客人数,等于长期的有效抵达率(λ),乘以顾客在这个系统中平均的等待时间(W);或者,我们可以用一个代数式来表达:L = λW

[2] Coroutine:协程又称微线程,计算机程序的一类组件,推广了协作式多任务的子程序,允许执行被挂起与被恢复。相对子例程而言,协程更为一般和灵活,但在实践中使用没有子例程那样广泛。协程更适合于用来实现彼此熟悉的程序组件,如协作式多任务、异常处理、事件循环、迭代器、无限列表和管道。

[3] work-stealing:工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。 一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并未每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如线程1负责处理1队列里的任务,2线程负责2队列的。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。 而在这时它们可能会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务线程永远从双端队列的尾部拿任务执行。

##Loom Project 预览体验链接和Git仓库

Loom 预览版链接

comsat仓库

Loom 仓库

##参考文献

Oracle Project Loom

Quasar Fiber

2020-12-02 14:37:59 星期三