Spark só permite classificar por teclas e não por valores. A classificação por valores pode ser conseguida mapeando para um novo par com a chave como valor e o valor como a chave. Em seguida, ordenando por chave e renderizando a saída.
long start = System.nanoTime();
JavaPairRDD<String, Stats0> extracted = dataSet1.mapToPair(s -> new Tuple2<>(getIp(s), new Stats0(1)));
JavaPairRDD<String, Stats0> baseKeyPair = extracted.reduceByKey(Stats0::merge);
// Map for sorting
JavaPairRDD<Integer, Tuple2<String, Stats0>> sortingRDD = baseKeyPair
.mapToPair(t ->new Tuple2<>(t._2().getCount(), t));
// Sort by keys
sortingRDD = sortingRDD.sortByKey(false);
// Collect to display the output
List<Tuple2<Integer, Tuple2<String, Stats0>>> output = sortingRDD.collect();
end = System.nanoTime();
for (Tuple2<Integer, Tuple2<String, Stats0>> t : output) {
System.out.println(t._2()._1 + "\t" + t._1());
}
System.out.println("Processed in : " + (int) (end - start)/1000000 + " ms");
Este ejemplo de Java usa la clase Stats0, que es un contenedor alrededor de un entero. Esto se puede personalizar para usar un comparador genérico, y de esta manera usar cualquier tipo de Objeto, siempre que sea serializable.
public static class Stats0 implements Serializable {
private final int count;
public Stats0(int count) {
this.count = count;
}
public Stats0 merge(Stats0 other) {
return new Stats0(count + other.count);
}
public int getCount() {
return count;
}
public String toString() {
return String.format("n=%s", count);
}
}