Java 实现协程的方法

协程(Coroutine)这个词其实有很多叫法,比如有的人喜欢称为纤程(Fiber),或者绿色线程(GreenThread)。其实究其本质,对于协程最直观的解释是线程的线程。虽然读上去有点拗口,但本质上就是这样。

协程的核心在于调度那块由他来负责解决,遇到阻塞操作,立刻放弃掉,并且记录当前栈上的数据,阻塞完后立刻再找一个线程恢复栈并把阻塞的结果放到这个线程上去跑,这样看上去好像跟写同步代码没有任何差别,这整个流程可以称为coroutine,而跑在由coroutine负责调度的线程称为Fiber。

java协程的实现

早期,在JVM上实现协程一般会使用kilim,不过这个工具已经很久不更新了,现在常用的工具是Quasar,而本文章会全部基于Quasar来介绍。

下面尝试通过Quasar来实现类似于go语言的coroutine以及channel。

为了能有明确的对比,这里先用go语言实现一个对于10以内自然数分别求平方的例子。

func counter(out chan<- int) {
 for x := 0; x < 10; x++ {
  out <- x
 }
 close(out)
}

func squarer(out chan<- int, in <-chan int) {
 for v := range in {
  out <- v * v
 }
 close(out)
}

func printer(in <-chan int) {
 for v := range in {
  fmt.Println(v)
 }
}

func main() {
 //定义两个int类型的channel
 naturals := make(chan int)
 squares := make(chan int)

 //产生两个Fiber,用go关键字
 go counter(naturals)
 go squarer(squares, naturals)
 //获取计算结果
 printer(squares)
}

上面这个例子,通过channel两解耦两边的数据共享。对于这个channel,大家可以理解为Java里的SynchronousQueue。下面我直接上Quasar版JAVA代码的,几乎可以原封不动的复制go语言的代码。

public class Example {

 private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException {
  Integer v;
  while ((v = in.receive()) != null) {
   System.out.println(v);
  }
 }

 public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
  //定义两个Channel
  Channel<Integer> naturals = Channels.newChannel(-1);
  Channel<Integer> squares = Channels.newChannel(-1);

  //运行两个Fiber实现.
  new Fiber(() -> {
   for (int i = 0; i < 10; i++)
    naturals.send(i);
   naturals.close();
  }).start();

  new Fiber(() -> {
   Integer v;
   while ((v = naturals.receive()) != null)
    squares.send(v * v);
   squares.close();
  }).start();

  printer(squares);
 }
}

两者对比,看上去Java似好像更复杂些,没办法这就是Java的风格,而且这还是通过第三方的库来实现的。

说到这里各位肯定对Fiber很好奇了。也许你会表示怀疑Fiber是不是如上面所描述的那样,下面我们尝试用Quasar建立一百万个Fiber,看看内存占用多少,我先尝试了创建百万个Thread。

for (int i = 0; i < 1_000_000; i++) {
 new Thread(() -> {
  try {
   Thread.sleep(10000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }).start();
}

很不幸,直接报Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,这是情理之中的。下面是通过Quasar建立百万个Fiber。

public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
 int FiberNumber = 1_000_000;
 CountDownLatch latch = new CountDownLatch(1);
 AtomicInteger counter = new AtomicInteger(0);

 for (int i = 0; i < FiberNumber; i++) {
  new Fiber(() -> {
   counter.incrementAndGet();
   if (counter.get() == FiberNumber) {
    System.out.println("done");
   }
   Strand.sleep(1000000);
  }).start();
 }
 latch.await();
}

我这里加了latch,阻止程序跑完就关闭,Strand.sleep其实跟Thread.sleep一样,只是这里针对的是Fiber。

最终控制台是可以输出done的,说明程序已经创建了百万个Fiber,设置Sleep是为了让Fiber一直运行,从而方便计算内存占用。官方宣称一个空闲的Fiber大约占用400Byte,那这里应该是占用400MB堆内存,但是这里通过jmap -heap pid显示大约占用了1000MB,也就是说一个Fiber占用1KB。

Quasar是怎么实现Fiber的

其实Quasar实现的coroutine的方式与Go语言很像,只不过前者是使用框架来实现,而go语言则是语言内置的功能。

不过如果你熟悉了Go语言的调度机制的话,那么对于Quasar的调度机制就会好理解很多了,因为两者有很多相似之处。

Quasar里的Fiber其实是一个continuation,他可以被Quasar定义的scheduler调度,一个continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。

Java 实现协程的方法

扫一扫手机访问