Angelika Langer - Training & Consulting
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | CONTACT | Twitter | Lanyrd | Linkedin
 
HOME 

  OVERVIEW

  BY TOPIC
    JAVA
    C++

  BY COLUMN
    EFFECTIVE JAVA
    EFFECTIVE STDLIB

  BY MAGAZINE
    JAVA MAGAZIN
    JAVA SPEKTRUM
    JAVA WORLD
    JAVA SOLUTIONS
    JAVA PRO
    C++ REPORT
    CUJ
    OTHER
 

GENERICS 
LAMBDAS 
IOSTREAMS 
ABOUT 
CONTACT 
Effective Java

Effective Java
Überblick über Java 9
Teil 6: Concurrency Updates
 

Java Magazin, Januar 2017
Klaus Kreft & Angelika Langer

Dies ist die Überarbeitung eines Manuskripts für einen Artikel, der im Rahmen einer Kolumne mit dem Titel "Effective Java" im Java Magazin erschienen ist.  Die übrigen Artikel dieser Serie sind ebenfalls verfügbar ( click here ).

 

Concurrency Updates

Es gibt einige Ergänzungen in den Abstraktionen der  java.util.concurrent.* -Packages (siehe / CONC /).

Ergänzungen zu Atomics

Die Atomic-Klassen im Package  java.util.concurrent.atomic haben neue Methoden bekommen, mit denen Zugriffsmodi wie bei den in Java 9 neuen Variable Handles möglich sind.  Beispiele sind Methoden wie  get/setPlain() getAcquire() setRelease() get/setOpaque() weakCompareAndSetVolatile() usw. in den Klassen  AtomicBoolean AtomicLong AtomicReference etc.

Spin-Wait-Hints

Die Thread-Klasse hat eine neue statische Methode  onSpinWait() , mit der der aktuelle Thread zu erkennen gibt, dass er in eine Spin-Wait-Loop (auch Busy-Wait genannt) geht (siehe / SPIN /).  Auf manchen Plattformen kann dieser Hinweis zur Laufzeit genutzt werden, um die Performance zu verbessern.  Das ist beispielsweise auf Intel-Prozessoren so, denn im x86-Instruction-Set gibt es extra eine PAUSE-Anweisung für solche Situationen.  Es handelt sich aber bei der Methode  onSpinWait() lediglich um einen Hinweis, der auch ignoriert werden kann, wenn die Plattform keine entsprechenden Optimierungsmöglichkeiten bietet.

Ergänzungen zum CompletableFuture

Die Klasse  CompletableFuture , die mit Java 8 zum JDK hinzugekommen ist, hat einige neue Methoden bekommen.
 
 

Bislang gab es keine Möglichkeit, über das  CompletableFuture eine Task mit Timeout zu starten.  Man kann sie starten und Funktionalität für die asynchrone Resultatverarbeitung anhängen. Falls aber die Task zu lange braucht, bis sie das Ergebnis liefert, konnte man bislang nicht ohne Weiteres nach einer bestimmten Zeit abbrechen und ohne das Resultat weitermachen.  Dafür gibt es jetzt mit Java 9 die Methoden  orTimeout() und  completeOnTimeout() .  Hier ein Beispiel:
 
 

CompletableFuture

  .supplyAsync(()->blockingReadPage(url))

  . orTimeout (1,TimeUnit.SECONDS)

  .whenComplete((r,e)->{

     if (r!=null) System.out.println(r);

     if (e!=null && e instanceof TimeoutException)

                  System.out.println("time limit exceeded");

);
 
 

Die Methode  or Timeout() bewirkt, dass das  CompletableFuture nach Ablauf der Timeout-Zeit mit einer  TimeoutException zurückkommt.  Bei der Methode  completeOnTimeout() kommt keine  TimeoutException , sondern man kann einen Ersatzwert angeben, der dann im Falle des Timeouts anstelle des eigentlichen Resultats verwendet wird. 
 
 

Der Timeout bezieht sich übrigens nicht auf die Ablaufzeit der Task, sondern die Zeit läuft bereits, sobald die Methode  supplyAsync() die Task an der unterliegenden Executor übergeben hat.  Wenn die Task dort längere Zeit in der Task-Queue steht, dann ist die Zeit schon abgelaufen, ehe die Task überhaupt gestartet wurde.
 
 

Wenn man beispielsweise eine ganze Reihe von Tasks unmittelbar nacheinander losschickt, dann werden nur die ersten erfolgreich enden und alle späteren scheitern mit  TimeoutException .  So wie hier:
 
 

for (String symbol : stockSymbols) {

  CompletableFuture

  .supplyAsync(()->getStockInfo(symbol))

  .orTimeout(1,TimeUnit.SECONDS)

  .whenComplete((r,e)->{

     if (r!=null) System.out.println(r);

     if (e!=null && e instanceof TimeoutException) 

                  System.out.println("time limit exceeded");

   );

}
 
 

Der Output sieht dann beispielsweise so aus:

GOOG: "Alphabet Inc.",835.24,"+12.03 - +1.46%"

INTC: "Intel Corporation",35.93,"-0.27 - -0.75%"

MXIM: "Maxim Integrated Products, Inc.",44.64,"+0.34 - +0.77%"

time limit exceeded

time limit exceeded

time limit exceeded

time limit exceeded
 
 

Eine andere neue Method im CompletableFuture ist die Methode  completeAsync() .  Sie ist eine Vereinfachung für diejenigen, die  CompletableFuture s als Returnwert an ihrer Schnittstelle zurückgeben wollen.  Bislang musste man ein leeres "incomplete" Future zurückgeben und selber dafür sorgen, dass das Resultat asynchron produziert und anschließend über die Methode  complete() in dem leeren Future abgelegt wird.  Mit  completeAsync() kann man die asynchrone Resultatproduktion nun mit Hilfe des  CompletableFuture machen.
 
 

Beispiel:
 
 

CompletableFuture<String> getWebpage(URL url) {

  CompletableFuture<String> future = new CompletableFuture<>();

  Supplier<String> task = () -> blockingReadPage(url);

  Executor pool =  ... grab a thread pool ...

  return future. completeAsync (task,pool);

}
 
 

Es gibt noch ein paar andere Kleinigkeiten.  Beipielsweise kann man sich mit  delayedExecutor(long,TimeUnit) einen Thread-Pool holen, den man beim  supplyAsync() runAsync() , etc. als Executor mitgeben kann.  Er startet die Tasks erst nach dem angegebenen "delay".  Dahinter steht ein  ScheduledThreadPoolExecutor mit einem Daemon-Thread, der die Tasks mit der gewünschten Verzögerung an den eigentlichen Executor (z.B. an den ForkJoin-Common-Pool) übergibt.

Reactive Programming with java.util.concurrent.Flow

Im  java.util.concurrent Package gibt es eine neue Klasse  Flow .  Sie implementieren ein API, das von der Reactive Stream Initiative (siehe / REAC /) als Standard für die asynchrone Verarbeitung von Datenströmen definiert wurde.  Damit soll folgendes Problem gelöst werden: 

 
 

Es geht um Multithread-Anwendungen, in denen ein Strom von Daten (der Stream ) asynchron von einer Quelle (oft Source oder Producer genannt) zu einem Ziel ( Sink oder Consumer genannt) transferiert wird. Das ist oft als Push-Strategie implementiert, wo die Quelle die Initiative übernimmt und ihre Daten an das Ziel sendet.  Dabei kann es vorkommen, dass die Quelle die Daten schneller sendet, als das Ziel sie verarbeiten kann.  Diese Situation wird als Overflow bezeichnet. Dann müssen die Daten gepuffert werden und dieser Puffer kann zum Engpass werden, denn wenn er voll ist, muss die Quelle anhalten und warten, bis das Ziel bzw. der Puffer wieder Daten aufnehmen kann.  Wenn es zu dieser Overflow-Situation kommt, dann ist natürlich der Effekt der Parallelverarbeitung weg, weil nun die Quell-Threads auf die Ziel-Threads warten müssen.
 
 

Im Manifest der Reactive Stream Initiative wird eine Lösung dafür vorgeschlagen, bei der die Datenflussmenge kontrolliert wird, damit solche blockierenden Overflow-Situationen gar nicht erst entstehen.  Die Idee ist, dass das Ziel einen Gegendruck (die sogenannte Backpressure ) aufbaut, indem das Ziel der Quelle sagt, wie viele Daten es aufnehmen kann, statt einfach nur alles zu akzeptieren, was von der Quelle kommt.  Die Implementierung soll so sein, dass die Quelle nun in Kenntnis der Kapazität des Ziels dafür sorgt, dass es nicht zur Blockade kommt.  Das wird als Non-Blocking Backpressure bezeichnet.
 
 

Für die Implementierung der Reactive Stream Ideen gibt es mit Java 9 die Klasse  Flow im  java.util.concurrent Package.  Sie implementiert das Publish-Subscriber-Pattern.  Publish-Subscriber ist ein Messing-Pattern: der Publisher stellt mehreren Subscriber n Daten zur Verfügung.  Die Subscriber spezifizieren in Form einer Subscription , welche und wie viele Daten sie haben wollen.  Die Subscription dient der Entkopplung von Publisher und Subscriber. 
 
 

Um nun das Ziel der Reactive Stream Initiative, die Non-Blocking Backpressure, zu erreichen, ist eine Pull-Strategie implementiert, bei der der Subscriber die Initiative übernimmt und von sich aus nur eine begrenzte Menge von Daten anfordert.
 
 

Für das Publish-Subscriber-Pattern gibt es vier neue Interfaces  Flow.Publisher Flow.Subscriber Flow.Processor und  Flow.Subscription und eine neue Klasse  SubmissionPublisher , die das  Flow.Publisher Interface implementiert.
 
 

 
 

If you are interested to hear more about this and related topics you might want to check out the following seminars:
Seminars
Java Module System - Names, Unnamed, Automatic Modules, Module Descriptors, Tool Chain
1 day seminar ( open enrollment and on-site)
Java 8 & 9 - Lambdas & Stream, New Concurrency Utilities, Date/Time API, Module System
4 day seminar ( open enrollment and on-site)
 

 

  © Copyright 1995-2018 by Angelika Langer.  All Rights Reserved.    URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/91.Java9.What-is-new-in-Java-9/90.java-9.1.overview.ready_8.html  last update: 26 Oct 2018