Friday, February 18, 2011

Getting started with Avro

Apache Avro is a language-neutral data serialization system. It's own data format can be processed by many languages (currently C, C++, Python, Java, Ruby and PHP).

Avro provides:
  • Rich data structures.
  • A compact, fast, binary data format.
  • A container file, to store persistent data.
  • Remote procedure call (RPC).

Data Schema
Avro relies on schemas, that specifies which fields and types an object is made. In this way, each datum is written with no per-value overhead. When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.

Avro has traditional Primitive Types like int, float, string, ... and other Complex Types like enum, record, array, ... You can use this types to create your own complex types like in example below:
{
  "type": "record", 
  "name": "Person", 
  "fields": [
      {"name": "name", "type": "string"},
      {"name": "age", "type": "int"},
      {"name": "emails", "type": {"type": "array", "values": "string"}},

      {"name": "father", "type": "Person"},
      {"name": "mother", "type": "Person"},
  ]
}
You can create schemes by code using the Schema class methods, or just parsing a json file using the Schema.parse() method.

Reading & Writing
Once that you've written the schema you can start to serialize your objects, generating the right data structure for your types.

An example of serialization for the schema written above, can be something like:
public GenericData.Record serialize() {
  GenericData.Record record = new GenericData.Record(schema);

  record.put("name", this.name);
  record.put("age", this.age);

  int nemails = this.mails.length();
  GenericData.Array emails = new GenericData.Array(nemails, emails_schema);
  for (int i = 0; i < nemails; ++i)
     record.put(this.mails[i]);
  record.put("emails", emails);

  record.put("father", this.father.serialize());
  record.put("mother", this.mother.serialize());
}
The same code written in python looks like this:
def serialize(self):
  return { 'name', self.name, 
           'age', self.age, 
           'emails': self.mails, 
           'father': self.father.serialize()
           'mother': self.mother.serialize()
         }
Now that you've the Avro Object that reflect the schema, you've just to write it. To do that you've to use a DatumWriter that uses an Encoder to write the datum on an OutputStream.
...
Encoder encoder = BinaryEncoder(outputStream); 
GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
datumWriter.write(person.serialize(), encoder);
...
As you can imagine, reading data is quite similar. Pick up a DatumReader and a Decoder that can read from an InputStream and start reading your Avro Objects.
...
Decoder decoder = BinaryDecoder(inputStream); 
GenericDatumReader datumReader = new GenericDatumReader(schema);

GenericData.Record record = new GenericData.Record(schema);
while (...) {
  datumWriter.read(record, decoder);
  // record.get("name")
  // record.get("...");
}
...

Object Container Files
Avro includes an object container file format. A file has a schema, and all objects stored in the file must be written according to that schema.

Since Avro is designed to be used with Hadoop and Map-Reduce, the file format is similar to the Hadoop' SequenceFile. Objects are grouped in blocks and each block may be compressed. Between each block a sync-marker is added to allow an efficient splitting of files.
void testWrite (File file, Schema schema) throws IOException {
   GenericDatumWriter datum = new GenericDatumWriter(schema);
   DataFileWriter writer = new DataFileWriter(datum);

   writer.setMeta("Meta-Key0", "Meta-Value0");
   writer.setMeta("Meta-Key1", "Meta-Value1");

   writer.create(schema, file);
   for (Person p : people)
      writer.append(p.serialize());

   writer.close();
}
Since the File contains the schema, you don't need to specify a schema for the reader. You can extract the used by calling the getSchema() method of the reader.
void testRead (File file) throws IOException {
   GenericDatumReader datum = new GenericDatumReader();
   DataFileReader reader = new DataFileReader(file, datum);

   GenericData.Record record = new GenericData.Record(reader.getSchema());
   while (reader.hasNext()) {
     reader.next(record);
     System.out.println("Name " + record.get("name") + 
                        " Age " + record.get("age"));
   }

   reader.close();
}
For a more "advanced" reading operation, (see the File Evolution example), you can specify the expected file schema.

Data Evolution
The first problem that you'll encounter working with a custom binary format, or even using an XML/JSON based, is to deal with data evolution.

During your application development you will surely have to add, remove or rename some fields from the various data structures. To solve the compatibility problem you've to introduce a "versioning step", that transforms your "Version X" document to a "Version (X + 1)" format.

Avro has this problem solved applying some Schema Resolution rules. In brief...
  • If a field is added, old document doesn't contains the field and the new readers uses the default value, specified in the schema.
  • If a field is removed, new readers doesn't read it and no one cares if it's present or not.
  • If a field is renamed, new schema must contains the old name of the field in it's alias list.

Note, that only the forward compatibility is covered.

Inter-Process Calls
Avro makes RPC really easy, with a few lines of code you can write a full client/server.

First of all, you need to write your protocol schema, specifying each messasge with request and response objects.
{
  "namespace": "test.proto",
  "protocol": "Test",

  "messages": {
    "xyz": {
        "request": [{"name": "key", "type": "string"}],
        "response": ["bytes", "null"]
    }
  }
}
A simple java server can be written in this way. You define the respond method, handling each message, and for each message you return the related response object.
static class Responder extends GenericResponder {
  public Object respond (Protocol.Message message, Object request) {
    String msgName = message.getName();

    if (msgName == "xyz") {
      // Make a response for 'xyz' get data from request 
      // GenericData.Record record = (GenericData.Record)request;
      // e.g. record.get("key")
      return(response_obj);
    }

    throw new AvroRuntimeException("unexcepcted message: " + msgName);
  }
}

public static void main (String[] args) 
  throws InterruptedException, IOException
{
  Protocol protocol = Protocol.parse(new File("proto-schema.avpr"));
  HttpServer server = new HttpServer(new Responder(protocol), 8080);
  server.start();
  server.join();
}
The client connects to the server, and send the message with the specified schema format.
def main():
  proto = protocol.parse(file('proto-schema.avpr').read())

  client = ipc.HTTPTransceiver('localhost', 8080)
  requestor = ipc.Requestor(proto, client)
  result = requestor.request('xyz', {'key': 'Test Key'})
  print result

  client.close()

I've made a couple of examples (written in C, Java and Python) that shows how to use Avro Serialization and Avro IPC. Sources are available on my github at avro-examples.

2 comments: