Kryo and FastUtil wrapper

While I was working with Apache Spark, I had a problem to write a FastUtil wrapper with Kryo Serializer. So I passed a day to working on it and I found a solution.
This is an example code that uses a Int2LongOpenHashMap wrapper that has public  long counter attribute.

WrappedInt2LongOpenHashMap p = new WrappedInt2LongOpenHashMap();
p.addTo(220, 20);
p.addTo(30, 5);
p.addTo(30, 15);
p.addTo(220, 5);
p.counter = 10;
p.inputName="prova";

System.out.println(p.counter);
System.out.println("---------");

The Java default serializer works fine, it serializes HashMap and the counter attribute.
How to serialize with the default Java API:

FileOutputStream fos = new FileOutputStream("/tmp/prova.bin");
ObjectOutputStream output = new ObjectOutputStream(fos);
output.writeObject(p);
output.close();
fos.close();

WrappedInt2LongOpenHashMap pp = null;
FileInputStream fip = new FileInputStream("/tmp/prova.bin");
ObjectInputStream in = new ObjectInputStream(fip);
pp = (WrappedInt2LongOpenHashMap) in.readObject();
in.close();
fip.close();

pp.entrySet().forEach(System.out::println);
System.out.println("c:"+pp.counter);

This is the output:

10
-----
30=>20
220=>25
c:10

If I use the Kryo serializer, I have some problem with the inheritance.
This is the code I used:

Kryo kSer = new Kryo();
kSer.register(WrappedInt2LongOpenHashMap.class);

FileOutputStream fos = new FileOutputStream("/tmp/prova.bin");
Output output = new Output(fos);
kSer.writeObject(output, p);
output.close();
fos.close();

WrappedInt2LongOpenHashMap pp = null;
FileInputStream fip = new FileInputStream("/tmp/prova.bin");
Input in = new Input(fip);
pp = (WrappedInt2LongOpenHashMap) kSer.readObject(in, WrappedInt2LongOpenHashMap.class);
in.close();
fip.close();

This is the output:

10
-----
220=>25
30=>20
c:0 <-- wtf?!

Reading the Kryo documentation, I see that ad hoc serializer can be write , and I found a FastUtili Serializer on Apache Giraph that with little edits,  serializes in the right way:
First, I add this two methods to wrapper, writeObject e readObject:

private void writeObject(ObjectOutputStream stream) throws IOException {
	stream.defaultWriteObject();
	stream.writeLong(this.counter);
}
	
private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
        stream.defaultReadObject();
        this.counter = stream.readLong();
}

Then I edited FastUtilSerializer costructor, adding two attributed used to access to writeObject and readObject methods of parent object.

writeMethod = type.getDeclaredMethod(
          "writeObject", ObjectOutputStream.class);
writeMethod.setAccessible(true);
      
writeMethodParent = type.getSuperclass().getDeclaredMethod(
              "writeObject", ObjectOutputStream.class);
writeMethodParent.setAccessible(true);
      
readMethod = type.getDeclaredMethod(
          "readObject", ObjectInputStream.class);
readMethod.setAccessible(true);
      
readMethodParent = type.getSuperclass().getDeclaredMethod(
              "readObject", ObjectInputStream.class);
readMethodParent.setAccessible(true);

And I change in this way the write method

writeMethodParent.invoke(object, outputWrapper);
writeMethod.invoke(object, outputWrapper);

and in this way the read method

readMethodParent.invoke(result, inputWrapper);
readMethod.invoke(result, inputWrapper);

Finally I invoked kryo.register passing the serializer.

kSer.register(WrappedInt2LongOpenHashMap.class new FastUtilSerializer<WrappedInt2LongOpenHashMap>(kSer, WrappedInt2LongOpenHashMap.class ));

And this is my output, the right output:

10
-----
30=>20
220=>25
c:10

 

For Spark, you just invoke “config(“spark.kryo.registrator”, “utility.MyRegistrator”)” to SparkSession:

SparkSession spark = SparkSession
	    		  .builder()
	    		  .master("local")
	    		  .appName("Test")
	    		  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
	    		  .config("spark.kryo.registrator", "utility.MyRegistrator")
	    		

Where MyRegistrator is a subclass of KryoRegistrator:

import org.apache.hadoop.io.NullWritable;
import org.apache.spark.serializer.KryoRegistrator;
import com.esotericsoftware.kryo.Kryo;
import kcs.WrappedInt2LongOpenHashMap;



public class MyRegistrator implements KryoRegistrator {

	@Override
	public void registerClasses(Kryo kryo) {
		kryo.register(WrappedInt2LongOpenHashMap.class, new FastUtilSerializer<WrappedInt2LongOpenHashMap>(kryo, WrappedInt2LongOpenHashMap.class ));
		kryo.register(NullWritable.class);	
	}
	

}

That’s all!