JavaでSparkを使って要素を値でソートする方法

Sparkでは、値ではなくキーによるソートのみが可能です。 値による並べ替えは、値としてのキーとキーとしての値を使用して新しいペアにマッピングすることで実現できます。 次に、キーでソートして出力をレンダリングします。

例:

long start = System.nanoTime();
JavaPairRDD<String, Stats0> extracted = dataSet1.mapToPair(s -> new Tuple2<>(getIp(s), new Stats0(1)));
JavaPairRDD<String, Stats0> baseKeyPair = extracted.reduceByKey(Stats0::merge);

// Map for sorting
JavaPairRDD<Integer, Tuple2<String, Stats0>> sortingRDD = baseKeyPair
		.mapToPair(t ->new Tuple2<>(t._2().getCount(), t));

// Sort by keys
sortingRDD = sortingRDD.sortByKey(false);

// Collect to display the output
List<Tuple2<Integer, Tuple2<String, Stats0>>> output = sortingRDD.collect();

end = System.nanoTime();
for (Tuple2<Integer, Tuple2<String, Stats0>> t : output) {
	System.out.println(t._2()._1 + "\t" + t._1());
}

System.out.println("Processed in : " + (int) (end - start)/1000000 + " ms");

このJavaの例では、クラスStats0を使用しています。つまり、整数を包むラッパーです。 これは、汎用コンパレータを使用するようにカスタマイズすることができ、この方法では、直列化可能である限り、任意のObject型を使用します。


public static class Stats0 implements Serializable {

	private final int count;

	public Stats0(int count) {
		this.count = count;
	}

	public Stats0 merge(Stats0 other) {
		return new Stats0(count + other.count);
	}

	public int getCount() {
		return count;
	}

	public String toString() {
		return String.format("n=%s", count);
	}
}

参考文献:

sparkjava