Tag Archives: Mapreduce

Java Streams – A Simple MapReduce Example

In this tutorial, you convert a List of Strings to a List of Integers using the MapReduce programming paradigm. Every class that implements the java.util.Collection interface has a stream method. This method converts the collection into a Stream.

Streams are a much more convenient and efficient way to work with collections using functional programming. Rather than beginning by describing Functional Programming, I begin by showing you why you might consider incorporating functional programming into your everyday coding. In this simple example using the MapReduce programming paradigm. Let’s jump in with an example, and then return to the theory after completing the example.

A good overview of Streams on YouTube that I would recommend watching prior to completing this tutorial is Java Streams Filter, Map, Reduce by Joe James.

  1. Open Eclipse and create a new Java project. Name the project functional.
  2. Create a top-level package by right-clicking on the src folder, selecting New, and then Package from the menu.
  3. Name the package com.functional.example and note the created package structure.
  4. Create a new class named MapReduceExample in the functional package. Do not forget to add a main method to the class.
  5. Create a static method named oldWay that takes a List of Strings and returns an Integer List. Be certain to import the java.util.List package.
public static List<Integer> oldWay(List<String> stringValues){
}
  1. Create a List variable named convertedList and initialize it as an ArrayList. Import the java.util.ArrayList package.
List<Integer> convertedList = new ArrayList<>();
  1. Create a for loop that iterates over the stringValues List, converts each element, adds the converted element to the convertedList variable and then returns the converted list.
public static List<Integer> oldWay(List<String> stringValues){
     List<Integer> convertedList = new ArrayList<>();
     for(String theString:stringValues) {
          Integer val = Integer.parseInt(theString);
          convertedList.add(val);
     }
     return convertedList;
}
  1. Create another static method named sumOldWay that takes an Integer List, sums them, and returns the result.
public static Integer sumOldWay(List<Integer> values) {
     Integer retVal = new Integer(0);
     for(Integer val:values) {
          retVal+=val;
     }
     return retVal;
}
  1. In the main method:
    1. create a list of Strings using the Arrays.asList method,
    2. assign the list to a variable named inValues,
    3. convert them to an Integer List using the oldWay static method,
    4. sum the Integer List,
    5. and print the value.
public static void main(String[] args) {
     List<String> inValues = Arrays.asList("1","2","3","4","5","6","7");
     List<Integer> outValues = MapReduceExample.oldWay(inValues);
     Integer finalValue = MapReduceExample.sumOldWay(outValues);
     System.out.println(finalValue);
}
  1. Run the program and 28 is printed to the console.  The following is the complete program.
package com.functional;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MapReduceExample {
     public static List<Integer> oldWay(List<String> stringValues){
          List<Integer> convertedList = new ArrayList<>();
          for(String theString:stringValues) {
               Integer val = Integer.parseInt(theString);
               convertedList.add(val);
          }
          return convertedList;
     }

     public static Integer sumOldWay(List<Integer> values) {
          Integer retVal = new Integer(0);
          for(Integer val:values) {
               retVal+=val;
          }
          return retVal;
     }

     public static void main(String[] args) {
          List<String> inValues = Arrays.asList("1","2","3","4","5","6","7");
          List<Integer> outValues = MapReduceExample.oldWay(inValues);
          Integer finalValue = MapReduceExample.sumOldWay(outValues);
          System.out.println(finalValue);
     }
}
  1. Now let’s rewrite this program using the MapReduce programming paradigm. Specifically, we use the Java Stream interface. The complete code follows.
package com.functional;

import java.util.Arrays;
import java.util.List;

public class MapReduceExample {
  public static void main(String[] args) {
    List<String> inValues = Arrays.asList("1","2","3","4","5","6","7");
    Integer finalValue2 = inValues.stream().mapToInt(num->Integer.parseInt(num)).sum();
    System.out.println(finalValue2);
  }
}
  1. Run the program and 28 is printed to the console.

The mapToInt method takes a lambda expression as the function it applies to the list elements. The mapToInt method returns an IntStream.  The IntStream’s sum method is a reducer, as it reduces the elements to a single Integer value.

  1. Replace the List creation with the Stream.of method.
  2. Rewrite the function using the Stream’s map function and reduce function.
public static void main(String[] args) {
  Stream<String> myStream =  Stream.of("1","2","3","4","5","6","7");
  Integer finalValue = myStream.map(num->Integer.parseInt(num)).reduce(0, 
       Integer::sum);
  System.out.println(finalValue);
}
  1. Run the program and 28 is printed to the console.

The map function takes a lambda function that is applied to the list elements. The result is a Stream of Integers. The reduce method then applies the provided lambda function to reduce the stream, here an Integer containing the sum of the values. The Integer::sum is called an accumulator because it accumulates the values. Note that :: is a method reference telling the compiler to use the sum method from Integer.

  1. Rewrite the function, but instead of using the :: method reference, provide a different lambda expression to the map method.
  2. Change the sum method to the reduce method as follows.
public static void main(String[] args) {
  Stream<String> myStream =  Stream.of("1","2","3","4","5","6","7");
  Integer finalValue = myStream.map(num->Integer.parseInt(num)).reduce(0, 
        (x,y) -> x+y);
  System.out.println(finalValue);
}
  1. Run the program and 28 is printed to the console.

Note that in the above code we used Stream.of rather than creating a data structure and then streaming it to a stream. Remember, a Stream is not a data structure and does not modify the underlying data source, the Stream streams the elements in the underlying collection. We could have also used the Stream.builder method to create a stream.

Mapping

The mapToInt and map Stream methods are mapping operations. The map function applies the supplied function to a stream’s elements to convert into a stream of a different type. For instance,

myStream.map(num->Integer.parseInt(num))

converts the stream, myStream, that contains Strings to a stream containing Integers. It does this using the mapper. A mapper is a stateless lambda expression applied to each of a stream’s elements.

num->Integer.parseInt(num)

The mapToInt method returns an IntStream. Other mapping methods include mapToLong, mapToDouble, and flatMap, flatMapToLong, flatMapToInt, and flatMapToDouble. Flatmap is covered in another post and is not discussed here.

Lambda Expressions

A lambda expression is a function that is not tied to a class. It can be passed to methods as if it were an object, and it can be executed upon demand. A lambda expression’s syntax is as follows,

lambda operator -> body

A lambda operator is can contain zero or more parameters. Lambda expressions are covered in a later tutorial. However, note here that the following two expressions are lambda expressions.

num->Integer.parseInt(num)   // apply the parseInt method to num and return result.
(x,y) -> x+y    // supply x, and y and return the result.

The first expression parses the integer value of the supplied element. The second expression takes two elements and sums them. Note that it is used in the reduce method recursively. The first element is the sum, the second element, y, is the new element of the stream. So with each iteration x increases while the value of y varies according to the current element.

Filters

Filters are a convenient way to remove unwanted values. The Stream interface declares a filter method that applies a predicate to a Stream and returns a Stream of only the elements that match the predicate.  A predicate is a functional method that returns true or false.

  1. Add a new element to the Strings with the value “Ralph.”
    public static void main(String[] args) {
      Stream<String> myStream = Stream.of("1","2","3","4","5","6","7","Ralph");
      Integer finalValue = myStream.map(num->Integer.parseInt(num))
         .reduce(0, (x,y) -> x+y);
      System.out.println(finalValue);
    }
  2. Run the program and note the exception. This is obviously because “Ralph” cannot be parsed into an integer.
Exception in thread "main" java.lang.NumberFormatException: For input string: "Ralph"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at com.functional.MapReduceExample.lambda$0(MapReduceExample.java:33)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
at com.functional.MapReduceExample.main(MapReduceExample.java:33)
  1. Add the filter method to myStream before the map method to filter any non-strings from the resulting Stream.
public static void main(String[] args) {
  Stream<String> myStream = Stream.of("1","2","3","4","5","6","7","Ralph");
  Integer finalValue = myStream.filter(x->x.matches("-?\\d+(\\.\\d+)?"))
        .map(num->Integer.parseInt(num)).reduce(0, (x,y) -> x+y);
  System.out.println(finalValue);
}
  1. Run the program and 28 is printed to the console.

PipeLining

A Stream is immutable, and cannot be modified. For intermediate methods, the result of each processing step is a new Stream with the transformation applied. This allows the convenient transformation “pipelining.”

Each function applied to a Stream returns a new Stream. This allows chaining the operations together into a series of processing steps. There are two types of transformations when processing a Stream, intermediate and terminal operations. An intermediate operation returns another Stream. A terminal operation returns a final value, terminating the pipeline.

  1. Modify the program by adding another map transformation.
public static void main(String[] args) {
  Stream<String> myStream = Stream.of("1","2","3","4","5","6","7","Ralph");
  Integer finalValue = myStream.filter(x->x.matches("-?\\d+(\\.\\d+)?"))
    .map(num->Integer.parseInt(num)).map(x->x*2).reduce(0, (x,y) -> x+y);
  System.out.println(finalValue);
}
  1. Run the program and 56 is printed to the console.

You can chain as many intermediate methods together to form a processing pipeline. Note that intermediate operations that reduce a stream’s size should be executed before elements applied to each element. For instance, it makes little sense to perform the following,

myStream.map(x->x*2).filter(x->x%2==0)

as you would multiply every number in a stream by 2 only to take the resultant stream and half its size by discarding odd numbers.

Collectors

Sometimes you do not wish to reduce a stream to a single variable. Instead, you might wish to transform a collection to another collection, performing processing steps along the way. An easy way to collect a stream into a collection is through Collectors. Let’s consider a typical data processing task developers face daily.

  1. Create a new class named Widget and provide an id and a color property of the enum type Color.
  2. Create an enumeration for Color.
    package com.functional;
    enum Color {red,blue,green,yellow,orange};
    public class Widget {
      private int id;
      private Color color;
    
      public int getId() { return this.id;}
      public Color getColor() {return this.color;}
    }
  3. Create a constructor that takes an int and Color as parameters.
package com.functional;

enum Color {red,blue,green,yellow,orange};

public class Widget {
  private int id;
  private Color color;

  public int getId() { return this.id;}
  public Color getColor() {return this.color;}

  public Widget(int id, Color color) {
    this.id = id;
    this.color = color;
  }
}

Suspend disbelief and assume the Widget class represents a business entity in your software. In a typical program, much code is written dedicated to storing multiple instances of an object in a collection, iterating over the collection’s elements, transforming them, and aggregating the results into another collection.

  1. Add a method to Widget named getRedIds that returns a list of ids for red widgets. The code should look familiar; certainly, you have written code like this countless times.
public List<Integer> getRedIds(List<Widget> widgets){
  List<Integer> ids = new ArrayList<>();
  for(Widget aWidget:widgets) {
    if(aWidget.color == Color.red) {
      ids.add(aWidget.id);
    }
  }
  return ids;
}
  1. Create a main method with five Widget instances added to an ArrayList. Pass the list to the getRedIds method, and print the results.
public static void main(String[] args) {
  List<Widget> widgets = new ArrayList<>();
  widgets.add(new Widget(1, Color.red));
  widgets.add(new Widget(2, Color.blue));
  widgets.add(new Widget(3, Color.green));
  widgets.add(new Widget(4, Color.red));
  widgets.add(new Widget(5, Color.red));
  List<Integer> ids = Widget.getRedIds(widgets);
  System.out.println(ids);
}
  1. Run the program and the string, [1, 4, 5] is printed to the console.

The above is typical boilerplate code, familiar to most developers. Again, suspend disbelief and focus on the processing and not the reality of the business object. But armed with our acquired functional programming knowledge we can discard the getRedIds method and replace it with a single line of code.

  1. Add the following two lines to the end of the main method.
ids = widgets.stream().filter(x->x.getColor()==Color.red).map(x->x.getId())
     .collect(Collectors.toList());
System.out.println(ids);
  1. Run the program and the following two lines are printed to the console.
[1, 4, 5]

[1, 4, 5]

The complete class follows.

package com.functional;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

enum Color {red,blue,green,yellow,orange};

public class Widget {

  private int id;
  private Color color;

  public Widget(int id, Color color)
  {
    this.id = id;
    this.color = color;
  }

  public int getId() { return this.id;}
  public Color getColor() {return this.color;}

  public static List<Integer> getRedIds(List<Widget> widgets){
    List<Integer> ids = new ArrayList<>();
    for(Widget aWidget:widgets){
      if(aWidget.color == Color.red) {
        ids.add(aWidget.id);
      }
    }
    return ids;
  }

  public static void main(String[] args) {
    List<Widget> widgets = new ArrayList<>();
    widgets.add(new Widget(1, Color.red));
    widgets.add(new Widget(2, Color.blue));
    widgets.add(new Widget(3, Color.green));
    widgets.add(new Widget(4, Color.red));
    widgets.add(new Widget(5, Color.red));
    List<Integer> ids = Widget.getRedIds(widgets);
    System.out.println(ids);

    ids = widgets.stream().filter(x->x.getColor()==Color.red).map(x->x.getId())
            .collect(Collectors.toList());
    System.out.println(ids);
  }
}

The terminal method is the stream’s collect method. We provide this method the Collectors toList method which returns a new list.

forEach and Consumer

The forEach method is a useful terminal operation that you can use to apply a lambda function to all elements in a stream.

  1. Create a new class named ForEachExample, be certain to add a main method.
  2. Add a new class to the class named AddTen that returns an Integer.
package com.functional;

import java.util.Arrays;
import java.util.List;

class ForEachExample {
  public Integer addTen(Integer val) {
    val+=10; 
    return val;
  }
  public static void main(String[] args) {
  }
}
  1. In main, create a ForEachExample instance, and a list of Integers.
  2. Stream the list and create a forEach statement and supply it with a lambda expression that calls the addTen method and then prints the results.
  3. Stream the list again and print each element, just to prove that the integers in values are truly immutable.
package com.functional;

import java.util.Arrays;
import java.util.List;

class ForEachExample {

  public Integer addTen(Integer val) {
    val+=10;
    return val;
  }

  public static void main(String[] args) {
    ForEachExample example = new ForEachExample();
    List<Integer> values = Arrays.asList(1, 2, 3, 4, 5);
    values.stream().forEach((x)->{System.out.println(example.addTen(x));});
    values.stream().forEach(System.out::println);
  }
}
  1. Run the program and the following is printed to the console.
11
12
13
14
15
1
2
3
4
5

The code,

(x)->{System.out.println(example.addTen(x));}

is a lambda expression. The actual argument for forEach is a Consumer. A consumer is a functional interface that allows you to define a lambda expression to apply to the input but returns no value.

  1. Modify the main method by removing the lambda function from forEach and creating a new Consumer instance.
  2. Supply the forEach with the consumer instance.
  3. Run the program and the results are the same as before.
public static void main(String[] args) {
  ForEachExample example = new ForEachExample();
  List<Integer> values = Arrays.asList(1, 2, 3, 4, 5);
  Consumer<Integer> action = x->{System.out.println(example.addTen(x));};
  values.stream().forEach(action);
  values.stream().forEach(System.out::println);
}

In practice, you rarely require creating a Consumer and then applying it to the forEach method. But, you could if you had a complex lambda expression. Although in that situation I would personally probably create a separate method.

Conclusion

In this tutorial, we explored how Streams simplify working with Collections. Be aware that lambdas, Streams, and functional programming are a rich and complex topic. However, like Java generics, integrating these concepts into your everyday coding does not require a deep topic mastery. As this tutorial demonstrates, integrating streams into your everyday coding allows you to write more concise code that is easier to read and test. The stream’s MapReduce programming paradigm literally allows you to replace entire methods of boilerplate code with a single line of code. That line can contain as many intermediate transformations as necessary.