While I was working with Apache Spark, I had a problem to write a FastUtil wrapper with Kryo Serializer. So I passed a day to working on it and I found a solution.
This is an example code that uses a Int2LongOpenHashMap wrapper that has public long counter attribute.
WrappedInt2LongOpenHashMap p = new WrappedInt2LongOpenHashMap(); p.addTo(220, 20); p.addTo(30, 5); p.addTo(30, 15); p.addTo(220, 5); p.counter = 10; p.inputName="prova"; System.out.println(p.counter); System.out.println("---------");
The Java default serializer works fine, it serializes HashMap and the counter attribute.
How to serialize with the default Java API:
FileOutputStream fos = new FileOutputStream("/tmp/prova.bin"); ObjectOutputStream output = new ObjectOutputStream(fos); output.writeObject(p); output.close(); fos.close(); WrappedInt2LongOpenHashMap pp = null; FileInputStream fip = new FileInputStream("/tmp/prova.bin"); ObjectInputStream in = new ObjectInputStream(fip); pp = (WrappedInt2LongOpenHashMap) in.readObject(); in.close(); fip.close(); pp.entrySet().forEach(System.out::println); System.out.println("c:"+pp.counter);
This is the output:
10 ----- 30=>20 220=>25 c:10
If I use the Kryo serializer, I have some problem with the inheritance.
This is the code I used:
Kryo kSer = new Kryo(); kSer.register(WrappedInt2LongOpenHashMap.class); FileOutputStream fos = new FileOutputStream("/tmp/prova.bin"); Output output = new Output(fos); kSer.writeObject(output, p); output.close(); fos.close(); WrappedInt2LongOpenHashMap pp = null; FileInputStream fip = new FileInputStream("/tmp/prova.bin"); Input in = new Input(fip); pp = (WrappedInt2LongOpenHashMap) kSer.readObject(in, WrappedInt2LongOpenHashMap.class); in.close(); fip.close();
This is the output:
10 ----- 220=>25 30=>20 c:0 <-- wtf?!
Reading the Kryo documentation, I see that ad hoc serializer can be write , and I found a FastUtili Serializer on Apache Giraph that with little edits, serializes in the right way:
First, I add this two methods to wrapper, writeObject e readObject:
private void writeObject(ObjectOutputStream stream) throws IOException { stream.defaultWriteObject(); stream.writeLong(this.counter); } private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { stream.defaultReadObject(); this.counter = stream.readLong(); }
Then I edited FastUtilSerializer costructor, adding two attributed used to access to writeObject and readObject methods of parent object.
writeMethod = type.getDeclaredMethod( "writeObject", ObjectOutputStream.class); writeMethod.setAccessible(true); writeMethodParent = type.getSuperclass().getDeclaredMethod( "writeObject", ObjectOutputStream.class); writeMethodParent.setAccessible(true); readMethod = type.getDeclaredMethod( "readObject", ObjectInputStream.class); readMethod.setAccessible(true); readMethodParent = type.getSuperclass().getDeclaredMethod( "readObject", ObjectInputStream.class); readMethodParent.setAccessible(true);
And I change in this way the write method
writeMethodParent.invoke(object, outputWrapper); writeMethod.invoke(object, outputWrapper);
and in this way the read method
readMethodParent.invoke(result, inputWrapper); readMethod.invoke(result, inputWrapper);
Finally I invoked kryo.register passing the serializer.
kSer.register(WrappedInt2LongOpenHashMap.class new FastUtilSerializer<WrappedInt2LongOpenHashMap>(kSer, WrappedInt2LongOpenHashMap.class ));
And this is my output, the right output:
10 ----- 30=>20 220=>25 c:10
For Spark, you just invoke “config(“spark.kryo.registrator”, “utility.MyRegistrator”)” to SparkSession:
SparkSession spark = SparkSession .builder() .master("local") .appName("Test") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryo.registrator", "utility.MyRegistrator")
Where MyRegistrator is a subclass of KryoRegistrator:
import org.apache.hadoop.io.NullWritable; import org.apache.spark.serializer.KryoRegistrator; import com.esotericsoftware.kryo.Kryo; import kcs.WrappedInt2LongOpenHashMap; public class MyRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { kryo.register(WrappedInt2LongOpenHashMap.class, new FastUtilSerializer<WrappedInt2LongOpenHashMap>(kryo, WrappedInt2LongOpenHashMap.class )); kryo.register(NullWritable.class); } }
That’s all!