Locality of Java 8 Streams in Real-Time Big Data Applications

undefined
On the Locality of Java 8 Streams in Real-
Time Big Data Applications
Yu Chan
Ian Gray
Andy Wellings
Neil Audsley
Real-Time Systems Group, Computer Science
University of York, UK
Outline
Context of the work
Focus of the current paper
Previous work on Stored Collections
Java 8: Streams and Pipelines and their relationship to Fork
and Join framework
Explore the impact of ccNUMA and locality on the Java 8
model
Conclusions
 
Java 8 implementation of Streams and pipelines is very complex
 
Context I
 
The JUNIPER EU project is currently investigating
how the Java 8 platform augmented by the RTSJ
can be used for real-time Big Data applications
 
Context II
 
JUNIPER is interested in both Big Data applications
on clusters of servers and on supercomputers
Here were are concerned with the cluster environment
JUNIPER wants to use Java 8 streams to provide
the underlying programming model for the
individual programs executing on the server
computers
 
Context III
 
The Java support is targeted at the server
computers contained within the clusters
it is not an alternative to, for example, the Hadoop
framework whose main concern is the distribution of the
data
Current work is considering how to extend the
Java stream support to a distributed environment
 
Context IV
 
A JUNIPER application consists of a set of Java 8
programs (augmented with the RTSJ) that are
mapped to a distributed computing cluster, such as
an internet-based cloud service
Performance is critical for big data applications
We need to understand the impact of using Java
streams and pipelines
Currently aicas are updating Jamaica for Java 8
and to support locality
 
Focus of the Paper
 
To evaluate the JVM server-level support
Java is architectural neutral: the programming
model essentially assumes SMP support
But, servers nowadays tend to have a ccNUMA
architecture
 
The JVM has the responsibility of optimizing
performance
But, we are also interested in the potential to have
FPGA accelerators
Previous Work I
 
Java's built-in stream sources have a number of
drawbacks for use in Big Data processing
1.
the in-memory sources (e.g. arrays and collections)
store all their data in heap memory
this implies populating the collection before any operations can be
performed, resulting in a potentially long delay while it takes place
heap memory is small compared to disk space, so for Big Data
computations, there may not be enough heap memory to load the entire
dataset from disk
2.
the file-based sources (e.g. BufferedReader.lines)
produce sequential streams, making parallel
execution of the pipeline impossible
 
Previous Work II
 
To overcome these limitations, we have introduced
in the idea of a Stored Collection
reads its data from a file on-demand, thus eliminating
the initial population step
generates a parallel stream to take advantage of multi-
core hardware
Stored Collection programs are up to 1.44 times
faster and their heap usage is 2.35%- 84.1% of
those for in-memory collection programs
Streams and Pipelines
List<Integer> transactionsIds = 
     
transactions.
stream
() .
     
filter
(t -> t.getType() == Transaction.GROCERY) .
     
sorted
(comparing(Transaction::getValue).reversed()) .
     
map
(Transaction::getId) .
     
collect
(toList()); 
 
Lazy evaluation: the data is pulled through the stream not pushed
 
Streams and Pipelines
 
class
 
InputData
 {
  
private long 
sensorReading;
  // ...
  
public long 
getSensorReading
() {
    
return
 sensorReading;
  }
}
 
class
 
OutputData
 {
  
private
 byte[] hashedSensorReading;
  // ...
  
public void 
setHashedSensorReading
(byte[] hash) {
    hashedSensorReading = hash;
  }
}
 
Streams and Pipelines
 
class
 
ProcessData
 {
  
public void 
run
() {
    Collection<InputData> 
inputs
 = ...;
    
inputs.parallelStream().map
(data -> 
{…}).
       forEach(
outData -> { ... });
}
}
 
Input
Stream
Operation
 
Output
Stream
Operation
 
Operation
Terminal
Operation
 
Streams and Pipelines
 
class
 
ProcessData
 {
  
public void 
run
() {
    Collection<InputData> inputs = ...;
    inputs.parallelStream().map(data -> {
      long value = data.getSensorReading();
      byte[] hash = new byte[32];
      SHA256 sha256 = new SHA256();
      for (int shift = 0; shift < 64; shift += 8)
        sha256.hash((byte) (value >> shift));
      sha256.digest(hash);
      OutputData out = new OutputData();
      out.setHashedSensorReading(hash);
      // ...
      return out;
    }).forEach(outData -> { ... });
  }
}
 
Streams and Fork-Join Framework
 
Each parallel stream source can provide a 
spliterator
 which partitions
the stream
Internally in the Java 8 stream support, the spliterator is called to
generate sub streams
Each sub stream is then processed by a task submitted to the default
fork and join pool
 
Incore Stream Sources and Locality
 
Here the memory used to hold the partitioned stream source spans two
ccNUMA nodes
Hence threads executing the tasks may be accessing remote memory
In our experimental set-up, remote access is 18% slower than local
access
Setting thread affinities does not necessarily help
 
Experimental Setup
 
2 GHz AMD Opteron 8350 running Ubuntu 13.04
16 cores, 4 cores per NUMA node
2MB L2 cache: 512KB per node
2 MB of L3 shared cache
16 GB of main memory: 4GB per node
Swap disabled
Java SE 8u5
14 GB initial and maximum heap memory
GC avoided by reusing objects
 
Experiment
 
Measure the main processing time of computing
the SHA-256 cryptographic hash function  on
consecutive long integers starting from 1
Without thread affinity
Binding one thread to one core
Binding not more than  4 threads to each NUMA node
Use array-backed stream and stored collection-
backed stream
For the stored collection: the data is created when
needed rather than reading from disk
Performance of Array-backed Streams
200 runs graph shows cumulative histograms
 
Performance of Stored Collection -
backed Streams
 
Experiment
 
Measure the execution time of computing the SHA-
256 cryptographic hash function  on consecutive
long integers starting from 1
Without thread affinity
Binding one thread to one core
Binding not more than  4 threads to each NUMA node
Use array-backed stream and stored collection-
backed stream
This stream source is on disk: hence more similar
to a big data application
Array-based versus Stored Collections
Array
Stored Collection
 
Conclusions
 
The goal of this work has been (in the context of
Java 8 streams and pipelines) to
understand what impact a ccNUMA architecture will
have on the ability of a JVM to optimize performance
without programmer help
If we just use thread affinity, we may undermine
any attempt made by the JVM to optimize
Stored collections, a partitioned heaped (or
physical scoped memory area) should allow the
programmer more control and enforce locality of
access
Slide Note
Embed
Share

The study explores the impact of ccNUMA and locality on Java 8 Streams, focusing on their complexity and implementation in real-time Big Data applications. It discusses the Java support for server computers in clusters, the use of streams for programming models, and considerations for extending stream support to a distributed environment. The paper evaluates JVM server-level support, SMP assumptions, ccNUMA architecture, and the potential for FPGA accelerators in optimizing performance for Big Data applications.

  • Java 8 Streams
  • Real-Time
  • Big Data Applications
  • ccNUMA
  • Locality

Uploaded on Oct 03, 2024 | 0 Views


Download Presentation

Please find below an Image/Link to download the presentation.

The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. Download presentation by click this link. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.

E N D

Presentation Transcript


  1. On the Locality of Java 8 Streams in Real- Time Big Data Applications Yu Chan Ian Gray Andy Wellings Neil Audsley Real-Time Systems Group, Computer Science University of York, UK

  2. Outline Context of the work Focus of the current paper Previous work on Stored Collections Java 8: Streams and Pipelines and their relationship to Fork and Join framework Explore the impact of ccNUMA and locality on the Java 8 model Conclusions Java 8 implementation of Streams and pipelines is very complex 2 - 22

  3. Context I The JUNIPER EU project is currently investigating how the Java 8 platform augmented by the RTSJ can be used for real-time Big Data applications 3 - 22

  4. Context II JUNIPER is interested in both Big Data applications on clusters of servers and on supercomputers Here were are concerned with the cluster environment JUNIPER wants to use Java 8 streams to provide the underlying programming model for the individual programs executing on the server computers 4 - 22

  5. Context III The Java support is targeted at the server computers contained within the clusters it is not an alternative to, for example, the Hadoop framework whose main concern is the distribution of the data Current work is considering how to extend the Java stream support to a distributed environment 5 - 22

  6. Context IV A JUNIPER application consists of a set of Java 8 programs (augmented with the RTSJ) that are mapped to a distributed computing cluster, such as an internet-based cloud service Performance is critical for big data applications We need to understand the impact of using Java streams and pipelines Currently aicas are updating Jamaica for Java 8 and to support locality 6 - 22

  7. Focus of the Paper To evaluate the JVM server-level support Java is architectural neutral: the programming model essentially assumes SMP support But, servers nowadays tend to have a ccNUMA architecture The JVM has the responsibility of optimizing performance But, we are also interested in the potential to have FPGA accelerators 7 - 22

  8. Previous Work I Java's built-in stream sources have a number of drawbacks for use in Big Data processing 1. the in-memory sources (e.g. arrays and collections) store all their data in heap memory this implies populating the collection before any operations can be performed, resulting in a potentially long delay while it takes place heap memory is small compared to disk space, so for Big Data computations, there may not be enough heap memory to load the entire dataset from disk the file-based sources (e.g. BufferedReader.lines) produce sequential streams, making parallel execution of the pipeline impossible 2. 8 - 22

  9. Previous Work II To overcome these limitations, we have introduced in the idea of a Stored Collection reads its data from a file on-demand, thus eliminating the initial population step generates a parallel stream to take advantage of multi- core hardware Stored Collection programs are up to 1.44 times faster and their heap usage is 2.35%- 84.1% of those for in-memory collection programs 9 - 22

  10. Streams and Pipelines List<Integer> transactionsIds = transactions.stream() . filter(t -> t.getType() == Transaction.GROCERY) . sorted(comparing(Transaction::getValue).reversed()) . map(Transaction::getId) . collect(toList()); Lazy evaluation: the data is pulled through the stream not pushed 10 - 22

  11. Streams and Pipelines class InputData { private long sensorReading; // ... public long getSensorReading() { return sensorReading; } } class OutputData { private byte[] hashedSensorReading; // ... public void setHashedSensorReading(byte[] hash) { hashedSensorReading = hash; } } 11 - 22

  12. Streams and Pipelines class ProcessData { public void run() { Collection<InputData> inputs = ...; inputs.parallelStream().map(data -> { }). forEach(outData -> { ... }); } } Operation Terminal Operation Operation Operation Input Stream Output Stream 12 - 22

  13. Streams and Pipelines class ProcessData { public void run() { Collection<InputData> inputs = ...; inputs.parallelStream().map(data -> { long value = data.getSensorReading(); byte[] hash = new byte[32]; SHA256 sha256 = new SHA256(); for (int shift = 0; shift < 64; shift += 8) sha256.hash((byte) (value >> shift)); sha256.digest(hash); OutputData out = new OutputData(); out.setHashedSensorReading(hash); // ... return out; }).forEach(outData -> { ... }); } } 13 - 22

  14. Streams and Fork-Join Framework Each parallel stream source can provide a spliterator which partitions the stream Internally in the Java 8 stream support, the spliterator is called to generate sub streams Each sub stream is then processed by a task submitted to the default fork and join pool 14 - 22

  15. Incore Stream Sources and Locality Here the memory used to hold the partitioned stream source spans two ccNUMA nodes Hence threads executing the tasks may be accessing remote memory In our experimental set-up, remote access is 18% slower than local access Setting thread affinities does not necessarily help 15 - 22

  16. Experimental Setup 2 GHz AMD Opteron 8350 running Ubuntu 13.04 16 cores, 4 cores per NUMA node 2MB L2 cache: 512KB per node 2 MB of L3 shared cache 16 GB of main memory: 4GB per node Swap disabled Java SE 8u5 14 GB initial and maximum heap memory GC avoided by reusing objects 16 - 22

  17. Experiment Measure the main processing time of computing the SHA-256 cryptographic hash function on consecutive long integers starting from 1 Without thread affinity Binding one thread to one core Binding not more than 4 threads to each NUMA node Use array-backed stream and stored collection- backed stream For the stored collection: the data is created when needed rather than reading from disk 17 - 22

  18. Performance of Array-backed Streams 200 runs graph shows cumulative histograms 2 long integers 2 long integers 26 28 18 - 22

  19. Performance of Stored Collection - backed Streams long integers long integers 26 28 2 2 19 - 22

  20. Experiment Measure the execution time of computing the SHA- 256 cryptographic hash function on consecutive long integers starting from 1 Without thread affinity Binding one thread to one core Binding not more than 4 threads to each NUMA node Use array-backed stream and stored collection- backed stream This stream source is on disk: hence more similar to a big data application 20 - 22

  21. Array-based versus Stored Collections Array Stored Collection long integers 28 2 21 - 22

  22. Conclusions The goal of this work has been (in the context of Java 8 streams and pipelines) to understand what impact a ccNUMA architecture will have on the ability of a JVM to optimize performance without programmer help If we just use thread affinity, we may undermine any attempt made by the JVM to optimize Stored collections, a partitioned heaped (or physical scoped memory area) should allow the programmer more control and enforce locality of access 22 - 22

More Related Content

giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#