2016-04-12 8 views
5

に私はcoroutines in Pythonについては、このページを読んでいますと、このWikipedia page.は私がlibraries in Javaいくつかの実装コルーチンがあることを見ました。コルーチンは、Java

私の質問は、Javaデザイナーがこれまでコルーチンを実装しないことを決めた理由はありますか?将来のバージョンのJavaに組み込む計画はありますか?

ありがとうございました。

+0

あなたはJavaで 'Thread'sを行うことができませんでしたコルーチンで何ができます? –

+0

すでにコルーチンを実装するライブラリがある場合は、どのような言語設計者は何をする必要がありますか? –

+0

私は最初のリンクから理解したように、threads' '未満のリソースを消費します。問題は、それが標準のJava言語に組み込まれていない理由です。 「言語設計者はこのまたはそれを行うなかった理由」...実際にここに場所を持っていないような – joel314

答えて

6

実際、コルーチンのコンセプトは、のJavaスレッドシステムの設計であるです。 wait/notify機構は、その後多くの構造スレッドセーフではなく、アルゴリズムを作るために特に行われているのでnotify

yieldに相当する共ルーチンの単純な形態です。これは、スレッドセーフでなければならないスレッド間の通信に使用されるデータ構造が同期/収束する必要があるコードではないことを認識しています。

+0

「実現」と呼ぶものは本当に決定です。この問題には多くのアプローチがありますが、唯一可能なものとしてJavaを解決するべきではありません。 – back2dos

+0

@ back2dos - 私の答えに追加してください。私は代替案に興味があります。 – OldCurmudgeon

3

質問の"の計画がある..."一部には、答えは次のとおりです。この段階で

ない

JEPリスト(http://openjdk.java.net/jeps/0)しませんコルーチンについて言及する。このリストには、Java 8で追加された機能、Java 9に追加された機能、または将来のリリースで提案された機能が含まれています。

興味深いことに、2013年3月(https://bugs.openjdk.java.net/browse/JDK-8029988)に提出したRFEがありました。 RFEは1票しか得ておらず、9ヵ月後にJEPを提出するよう提案された。誰も私が思っている以上のことを考えていない。今、あなたはこのよう (例えばフィボナッチ数)に

スレッドバージョンをニシキヘビコルーチンを使用することができます

import java.lang.ref.WeakReference; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicBoolean; 
import java.util.concurrent.atomic.AtomicReference; 

class CorRunRAII { 
    private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>(); 

    public CorRunRAII add(CorRun resource) { 
     if (resource == null) { 
      return this; 
     } 
     resources.add(new WeakReference<>(resource)); 

     return this; 
    } 

    public CorRunRAII addAll(List<? extends CorRun> arrayList) { 
     if (arrayList == null) { 
      return this; 
     } 
     for (CorRun corRun : arrayList) { 
      add(corRun); 
     } 

     return this; 
    } 

    @Override 
    protected void finalize() throws Throwable { 
     super.finalize(); 

     for (WeakReference<? extends CorRun> corRunWeakReference : resources) { 
      CorRun corRun = corRunWeakReference.get(); 
      if (corRun != null) { 
       corRun.stop(); 
      } 
     } 
    } 
} 

class CorRunYieldReturn<ReceiveType, YieldReturnType> { 
    public final AtomicReference<ReceiveType> receiveValue; 
    public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue; 

    CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { 
     this.receiveValue = receiveValue; 
     this.yieldReturnValue = yieldReturnValue; 
    } 
} 

interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> { 
    boolean start(); 
    void stop(); 
    void stop(final Throwable throwable); 
    boolean isStarted(); 
    boolean isEnded(); 
    Throwable getError(); 

    ReceiveType getReceiveValue(); 
    void setResultForOuter(YieldReturnType resultForOuter); 
    YieldReturnType getResultForOuter(); 

    YieldReturnType receive(ReceiveType value); 
    ReceiveType yield(); 
    ReceiveType yield(YieldReturnType value); 
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another); 
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value); 
} 

abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { 

    private ReceiveType receiveValue; 
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>(); 

    // Outside 

    private AtomicBoolean isStarted = new AtomicBoolean(false); 
    private AtomicBoolean isEnded = new AtomicBoolean(false); 
    private Throwable error; 

    private YieldReturnType resultForOuter; 

    @Override 
    public boolean start() { 

     boolean isStarted = this.isStarted.getAndSet(true); 
     if ((! isStarted) 
       && (! isEnded())) { 
      receive(null); 
     } 

     return isStarted; 
    } 

    @Override 
    public void stop() { 
     stop(null); 
    } 

    @Override 
    public void stop(Throwable throwable) { 
     isEnded.set(true); 
     if (throwable != null) { 
      error = throwable; 
     } 

     for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { 
      CorRun child = weakReference.get(); 
      if (child != null) { 
       child.stop(); 
      } 
     } 
    } 

    @Override 
    public boolean isStarted() { 
     return isStarted.get(); 
    } 

    @Override 
    public boolean isEnded() { 
     return isEnded.get(); 
    } 

    @Override 
    public Throwable getError() { 
     return error; 
    } 

    @Override 
    public ReceiveType getReceiveValue() { 
     return receiveValue; 
    } 

    @Override 
    public void setResultForOuter(YieldReturnType resultForOuter) { 
     this.resultForOuter = resultForOuter; 
    } 

    @Override 
    public YieldReturnType getResultForOuter() { 
     return resultForOuter; 
    } 

    @Override 
    public synchronized YieldReturnType receive(ReceiveType value) { 
     receiveValue = value; 

     run(); 

     return getResultForOuter(); 
    } 

    @Override 
    public ReceiveType yield() { 
     return yield(null); 
    } 

    @Override 
    public ReceiveType yield(YieldReturnType value) { 
     resultForOuter = value; 
     return receiveValue; 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) { 
     return yieldFrom(another, null); 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) { 
     if (another == null || another.isEnded()) { 
      throw new RuntimeException("Call null or isEnded coroutine"); 
     } 

     potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); 

     synchronized (another) { 
      boolean isStarted = another.start(); 
      boolean isJustStarting = ! isStarted; 
      if (isJustStarting && another instanceof CorRunSync) { 
       return another.getResultForOuter(); 
      } 

      return another.receive(value); 
     } 
    } 

    @Override 
    public void run() { 
     try { 
      this.call(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 

      stop(e); 
      return; 
     } 
    } 
} 

abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { 

    private final ExecutorService childExecutorService = newExecutorService(); 
    private ExecutorService executingOnExecutorService; 

    private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>()); 

    private final CorRun<ReceiveType, YieldReturnType> self; 
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList; 
    private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn; 

    private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue; 

    // Outside 

    private AtomicBoolean isStarted = new AtomicBoolean(false); 
    private AtomicBoolean isEnded = new AtomicBoolean(false); 
    private Future<YieldReturnType> future; 
    private Throwable error; 

    private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>(); 

    CorRunThread() { 
     executingOnExecutorService = childExecutorService; 

     receiveQueue = new LinkedBlockingDeque<>(); 
     potentialChildrenCoroutineList = new ArrayList<>(); 

     self = this; 
    } 

    @Override 
    public void run() { 
     try { 
      self.call(); 
     } 
     catch (Exception e) { 
      stop(e); 
      return; 
     } 

     stop(); 
    } 

    @Override 
    public abstract YieldReturnType call(); 

    @Override 
    public boolean start() { 
     return start(childExecutorService); 
    } 

    protected boolean start(ExecutorService executorService) { 
     boolean isStarted = this.isStarted.getAndSet(true); 
     if (!isStarted) { 
      executingOnExecutorService = executorService; 
      future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self); 
     } 
     return isStarted; 
    } 

    @Override 
    public void stop() { 
     stop(null); 
    } 

    @Override 
    public void stop(final Throwable throwable) { 
     if (throwable != null) { 
      error = throwable; 
     } 
     isEnded.set(true); 

     returnYieldValue(null); 
     // Do this for making sure the coroutine has checked isEnd() after getting a dummy value 
     receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN); 

     for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { 
      CorRun child = weakReference.get(); 
      if (child != null) { 
       if (child instanceof CorRunThread) { 
        ((CorRunThread)child).tryStop(childExecutorService); 
       } 
      } 
     } 

     childExecutorService.shutdownNow(); 
    } 

    protected void tryStop(ExecutorService executorService) { 
     if (this.executingOnExecutorService == executorService) { 
      stop(); 
     } 
    } 

    @Override 
    public boolean isEnded() { 
     return isEnded.get() || (
       future != null && (future.isCancelled() || future.isDone()) 
       ); 
    } 

    @Override 
    public boolean isStarted() { 
     return isStarted.get(); 
    } 

    public Future<YieldReturnType> getFuture() { 
     return future; 
    } 

    @Override 
    public Throwable getError() { 
     return error; 
    } 

    @Override 
    public void setResultForOuter(YieldReturnType resultForOuter) { 
     this.resultForOuter.set(resultForOuter); 
    } 

    @Override 
    public YieldReturnType getResultForOuter() { 
     return this.resultForOuter.get(); 
    } 

    @Override 
    public YieldReturnType receive(ReceiveType value) { 

     LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>(); 

     offerReceiveValue(value, yieldReturnValue); 

     try { 
      AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take(); 
      return takeValue == null ? null : takeValue.get(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     return null; 
    } 

    @Override 
    public ReceiveType yield() { 
     return yield(null); 
    } 

    @Override 
    public ReceiveType yield(final YieldReturnType value) { 
     returnYieldValue(value); 

     return getReceiveValue(); 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) { 
     return yieldFrom(another, null); 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) { 
     if (another == null || another.isEnded()) { 
      throw new RuntimeException("Call null or isEnded coroutine"); 
     } 

     boolean isStarted = false; 
     potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); 

     synchronized (another) { 
      if (another instanceof CorRunThread) { 
       isStarted = ((CorRunThread)another).start(childExecutorService); 
      } 
      else { 
       isStarted = another.start(); 
      } 

      boolean isJustStarting = ! isStarted; 
      if (isJustStarting && another instanceof CorRunSync) { 
       return another.getResultForOuter(); 
      } 

      TargetYieldReturnType send = another.receive(value); 
      return send; 
     } 
    } 

    @Override 
    public ReceiveType getReceiveValue() { 

     setLastCorRunYieldReturn(takeLastCorRunYieldReturn()); 

     return lastCorRunYieldReturn.receiveValue.get(); 
    } 

    protected void returnYieldValue(final YieldReturnType value) { 
     CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn; 
     if (corRunYieldReturn != null) { 
      corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value)); 
     } 
    } 

    protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { 
     receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue)); 
    } 

    protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() { 
     try { 
      return receiveQueue.take(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     return null; 
    } 

    protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) { 
     this.lastCorRunYieldReturn = lastCorRunYieldReturn; 
    } 

    protected ExecutorService newExecutorService() { 
     return Executors.newCachedThreadPool(getThreadFactory()); 
    } 

    protected ThreadFactory getThreadFactory() { 
     return new ThreadFactory() { 
      @Override 
      public Thread newThread(final Runnable runnable) { 
       Thread thread = new Thread(runnable); 
       thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 
        @Override 
        public void uncaughtException(Thread thread, Throwable throwable) { 
         throwable.printStackTrace(); 
         if (runnable instanceof CorRun) { 
          CorRun self = (CorRun) runnable; 
          self.stop(throwable); 
          thread.interrupt(); 
         } 
        } 
       }); 
       return thread; 
      } 
     }; 
    } 
} 

+1

"私はこれ以上のアイデアは得られませんでした。 - ちょっと興味があるんだけど;何を言っているの? Javaコミュニティーの誰もコルーチンを望んでいないのですか?オラクルはそれらを実装することを熱望していないのですか?何か他のもの – Abdul

+1

Java世界のどのような身長の誰もコルーチンが必要であるとは思っていないということです。なぜなら、もし誰かが彼らが必要と思っていたら、誰かがJEPを始めたはずだからです。しかし、明らかにそれは「証明」にはならない。 –

0

別の選択肢がありますがJava6の+

はこちら神託コルーチンの実装です:

class Fib extends CorRunThread<Integer, Integer> { 

    @Override 
    public Integer call() { 
     Integer times = getReceiveValue(); 
     do { 
      int a = 1, b = 1; 
      for (int i = 0; times != null && i < times; i++) { 
       int temp = a + b; 
       a = b; 
       b = temp; 
      } 
      // A pythonic "yield", i.e., it returns `a` to the caller and waits `times` value from the next caller 
      times = yield(a); 
     } while (! isEnded()); 

     setResultForOuter(Integer.MAX_VALUE); 
     return getResultForOuter(); 
    } 
} 

class MainRun extends CorRunThread<String, String> { 

    @Override 
    public String call() { 

     // The fib coroutine would be recycled by its parent 
     // (no requirement to call its start() and stop() manually) 
     // Otherwise, if you want to share its instance and start/stop it manually, 
     // please start it before being called by yieldFrom() and stop it in the end. 
     Fib fib = new Fib(); 
     String result = ""; 
     Integer current; 
     int times = 10; 
     for (int i = 0; i < times; i++) { 

      // A pythonic "yield from", i.e., it calls fib with `i` parameter and waits for returned value as `current` 
      current = yieldFrom(fib, i); 

      if (fib.getError() != null) { 
       throw new RuntimeException(fib.getError()); 
      } 

      if (current == null) { 
       continue; 
      } 

      if (i > 0) { 
       result += ","; 
      } 
      result += current; 

     } 

     setResultForOuter(result); 

     return result; 
    } 
} 

同期(非スレッド)バージョン:

class Fib extends CorRunSync<Integer, Integer> { 

    @Override 
    public Integer call() { 
     Integer times = getReceiveValue(); 

     int a = 1, b = 1; 
     for (int i = 0; times != null && i < times; i++) { 
      int temp = a + b; 
      a = b; 
      b = temp; 
     } 
     yield(a); 

     return getResultForOuter(); 
    } 
} 

class MainRun extends CorRunSync<String, String> { 

    @Override 
    public String call() { 

     CorRun<Integer, Integer> fib = null; 
     try { 
      fib = new Fib(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     String result = ""; 
     Integer current; 
     int times = 10; 
     for (int i = 0; i < times; i++) { 

      current = yieldFrom(fib, i); 

      if (fib.getError() != null) { 
       throw new RuntimeException(fib.getError()); 
      } 

      if (current == null) { 
       continue; 
      } 

      if (i > 0) { 
       result += ","; 
      } 
      result += current; 
     } 

     stop(); 
     setResultForOuter(result); 

     if (Utils.isEmpty(result)) { 
      throw new RuntimeException("Error"); 
     } 

     return result; 
    } 
} 

が実行される(どちらのバージョンが動作します):

// Run the entry coroutine 
MainRun mainRun = new MainRun(); 
mainRun.start(); 

// Wait for mainRun ending for 5 seconds 
long startTimestamp = System.currentTimeMillis(); 
while(!mainRun.isEnded()) { 
    if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) { 
     throw new RuntimeException("Wait too much time"); 
    } 
} 
// The result should be "1,1,2,3,5,8,13,21,34,55" 
System.out.println(mainRun.getResultForOuter()); 
関連する問題