Spark permet seulement le tri par les clés et non par les valeurs. Le tri par les valeurs peut être réalisé en mappant une nouvelle paire avec la clé en tant que valeur et la valeur en tant que clé.
long start = System.nanoTime(); JavaPairRDD<String, Stats0> extracted = dataSet1.mapToPair(s -> new Tuple2<>(getIp(s), new Stats0(1))); JavaPairRDD<String, Stats0> baseKeyPair = extracted.reduceByKey(Stats0::merge); // Map for sorting JavaPairRDD<Integer, Tuple2<String, Stats0>> sortingRDD = baseKeyPair .mapToPair(t ->new Tuple2<>(t._2().getCount(), t)); // Sort by keys sortingRDD = sortingRDD.sortByKey(false); // Collect to display the output List<Tuple2<Integer, Tuple2<String, Stats0>>> output = sortingRDD.collect(); end = System.nanoTime(); for (Tuple2<Integer, Tuple2<String, Stats0>> t : output) { System.out.println(t._2()._1 + "\t" + t._1()); } System.out.println("Processed in : " + (int) (end - start)/1000000 + " ms");
Cet exemple Java utilise la classe Stats0, c'est-à-dire un wrapper autour d'un entier. Cela peut être personnalisé pour utiliser un comparateur générique, et ainsi utiliser n'importe quel type d'objet tant qu'il est sérialisable.
public static class Stats0 implements Serializable { private final int count; public Stats0(int count) { this.count = count; } public Stats0 merge(Stats0 other) { return new Stats0(count + other.count); } public int getCount() { return count; } public String toString() { return String.format("n=%s", count); } }