Friday, February 18, 2011

Getting started with Avro

Apache Avro is a language-neutral data serialization system. It's own data format can be processed by many languages (currently C, C++, Python, Java, Ruby and PHP).

Avro provides:
  • Rich data structures.
  • A compact, fast, binary data format.
  • A container file, to store persistent data.
  • Remote procedure call (RPC).

Data Schema
Avro relies on schemas, that specifies which fields and types an object is made. In this way, each datum is written with no per-value overhead. When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.

Avro has traditional Primitive Types like int, float, string, ... and other Complex Types like enum, record, array, ... You can use this types to create your own complex types like in example below:
{
  "type": "record", 
  "name": "Person", 
  "fields": [
      {"name": "name", "type": "string"},
      {"name": "age", "type": "int"},
      {"name": "emails", "type": {"type": "array", "values": "string"}},

      {"name": "father", "type": "Person"},
      {"name": "mother", "type": "Person"},
  ]
}
You can create schemes by code using the Schema class methods, or just parsing a json file using the Schema.parse() method.

Reading & Writing
Once that you've written the schema you can start to serialize your objects, generating the right data structure for your types.

An example of serialization for the schema written above, can be something like:
public GenericData.Record serialize() {
  GenericData.Record record = new GenericData.Record(schema);

  record.put("name", this.name);
  record.put("age", this.age);

  int nemails = this.mails.length();
  GenericData.Array emails = new GenericData.Array(nemails, emails_schema);
  for (int i = 0; i < nemails; ++i)
     record.put(this.mails[i]);
  record.put("emails", emails);

  record.put("father", this.father.serialize());
  record.put("mother", this.mother.serialize());
}
The same code written in python looks like this:
def serialize(self):
  return { 'name', self.name, 
           'age', self.age, 
           'emails': self.mails, 
           'father': self.father.serialize()
           'mother': self.mother.serialize()
         }
Now that you've the Avro Object that reflect the schema, you've just to write it. To do that you've to use a DatumWriter that uses an Encoder to write the datum on an OutputStream.
...
Encoder encoder = BinaryEncoder(outputStream); 
GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
datumWriter.write(person.serialize(), encoder);
...
As you can imagine, reading data is quite similar. Pick up a DatumReader and a Decoder that can read from an InputStream and start reading your Avro Objects.
...
Decoder decoder = BinaryDecoder(inputStream); 
GenericDatumReader datumReader = new GenericDatumReader(schema);

GenericData.Record record = new GenericData.Record(schema);
while (...) {
  datumWriter.read(record, decoder);
  // record.get("name")
  // record.get("...");
}
...

Object Container Files
Avro includes an object container file format. A file has a schema, and all objects stored in the file must be written according to that schema.

Since Avro is designed to be used with Hadoop and Map-Reduce, the file format is similar to the Hadoop' SequenceFile. Objects are grouped in blocks and each block may be compressed. Between each block a sync-marker is added to allow an efficient splitting of files.
void testWrite (File file, Schema schema) throws IOException {
   GenericDatumWriter datum = new GenericDatumWriter(schema);
   DataFileWriter writer = new DataFileWriter(datum);

   writer.setMeta("Meta-Key0", "Meta-Value0");
   writer.setMeta("Meta-Key1", "Meta-Value1");

   writer.create(schema, file);
   for (Person p : people)
      writer.append(p.serialize());

   writer.close();
}
Since the File contains the schema, you don't need to specify a schema for the reader. You can extract the used by calling the getSchema() method of the reader.
void testRead (File file) throws IOException {
   GenericDatumReader datum = new GenericDatumReader();
   DataFileReader reader = new DataFileReader(file, datum);

   GenericData.Record record = new GenericData.Record(reader.getSchema());
   while (reader.hasNext()) {
     reader.next(record);
     System.out.println("Name " + record.get("name") + 
                        " Age " + record.get("age"));
   }

   reader.close();
}
For a more "advanced" reading operation, (see the File Evolution example), you can specify the expected file schema.

Data Evolution
The first problem that you'll encounter working with a custom binary format, or even using an XML/JSON based, is to deal with data evolution.

During your application development you will surely have to add, remove or rename some fields from the various data structures. To solve the compatibility problem you've to introduce a "versioning step", that transforms your "Version X" document to a "Version (X + 1)" format.

Avro has this problem solved applying some Schema Resolution rules. In brief...
  • If a field is added, old document doesn't contains the field and the new readers uses the default value, specified in the schema.
  • If a field is removed, new readers doesn't read it and no one cares if it's present or not.
  • If a field is renamed, new schema must contains the old name of the field in it's alias list.

Note, that only the forward compatibility is covered.

Inter-Process Calls
Avro makes RPC really easy, with a few lines of code you can write a full client/server.

First of all, you need to write your protocol schema, specifying each messasge with request and response objects.
{
  "namespace": "test.proto",
  "protocol": "Test",

  "messages": {
    "xyz": {
        "request": [{"name": "key", "type": "string"}],
        "response": ["bytes", "null"]
    }
  }
}
A simple java server can be written in this way. You define the respond method, handling each message, and for each message you return the related response object.
static class Responder extends GenericResponder {
  public Object respond (Protocol.Message message, Object request) {
    String msgName = message.getName();

    if (msgName == "xyz") {
      // Make a response for 'xyz' get data from request 
      // GenericData.Record record = (GenericData.Record)request;
      // e.g. record.get("key")
      return(response_obj);
    }

    throw new AvroRuntimeException("unexcepcted message: " + msgName);
  }
}

public static void main (String[] args) 
  throws InterruptedException, IOException
{
  Protocol protocol = Protocol.parse(new File("proto-schema.avpr"));
  HttpServer server = new HttpServer(new Responder(protocol), 8080);
  server.start();
  server.join();
}
The client connects to the server, and send the message with the specified schema format.
def main():
  proto = protocol.parse(file('proto-schema.avpr').read())

  client = ipc.HTTPTransceiver('localhost', 8080)
  requestor = ipc.Requestor(proto, client)
  result = requestor.request('xyz', {'key': 'Test Key'})
  print result

  client.close()

I've made a couple of examples (written in C, Java and Python) that shows how to use Avro Serialization and Avro IPC. Sources are available on my github at avro-examples.

Saturday, February 12, 2011

Linux cgroups: Memory Threshold Notifier

Through cgroups Notification API you can be notified about changing status of a cgroup. Memory cgroup implements memory thresholds using cgroups notification API, It allows to register multiple memory and memsw thresholds and gets notifications when it crosses.

This can be very useful if you want to maintain a cache but you don't want to exceed a certain size of memory.

To register a threshold application need:
  • create an eventfd using eventfd(2);
  • open memory.usage_in_bytes or memory.memsw.usage_in_bytes;
  • write string like "<event_fd> <fd of memory.usage_in_bytes> <threshold>"
...
// Open cgroup file (e.g. "memory.oom_control" or "memory.usage_in_byte")
snprintf(path, PATH_MAX, "%s/%s", cgroup, file);
cgroup_ctrl->fd = open(path, O_RDONLY);
cgroup_ctrl->efd = eventfd(0, 0);

// Prepare ctrl_string e.g.
// <event_fd> <fd of memory.oom_control>
// <event_fd> <fd of memory.usage_in_bytes> <threshold>
snprintf(ctrl_string, CTRL_STRING_MAX,
         "%d %d", cgroup_ctrl->efd, cgroup_ctrl->fd);

// Write ctrl_string to cgroup event_control
snprintf(path, PATH_MAX, "%s/cgroup.event_control", cgroup);
fd = open(path, O_WRONLY);
write(fd, ctrl_string, strlen(ctrl_string));
close(fd);
...
Now you can add eventfd() descriptor to your epoll()/select() event loop and wait your notification. Here, you can handle your cache release.
...
nfds = epoll_wait(epfd, events, NEVENTS, TIMEOUT);
for (i = 0; i < nfds; ++i) {
    if (events[i].data.fd == cgroup_ctrl->efd) {
        /* Memory Threshold Notification */
        read(cgroup_ctrl->efd, &result, sizeof(uint64_t));

        /* free some memory */
    }
}
...
A full demo source code is avable on github at cgroup-mem-threshold demo.

Wednesday, February 9, 2011

HBase I/O: HFile

In the beginning HBase uses MapFile class to store data persistently to disk, and then (from version 0.20) a new file format is introduced. HFile is a specific implementation of MapFile with HBase related features.

HFile doesn't know anything about key and value struct/type (row key, qualifier, family, timestamp, …). As Hadoop' SequenceFile (Block-Compressed), keys and values are grouped in blocks, and blocks contains records. Each record has two Int Values that contains Key Length and Value Length followed by key and value byte-array.

HFile.Writer has only a couple of append overload methods, one for KeyValue class and the other for byte-array type. As for SequenceFile, each key added must be greater than the previous one. If this condition is not satisfied an IOException() is raised.

By default each 64k of data (key + value) records are squeezed together in a block and the block is written to the HFile OutputStream with the specified compression, if specified. Compression Algorithm and Block size are both (long)constructor arguments.

One thing that SequenceFile is not good at, is adding Metadata. Metadata can be added to SequenceFile just from the constructor, so you need to prepare all your metadata before creating the Writer.

HFile adds two "metadata" type. One called Meta-Block and the other called FileInfo. Both metadata types are kept in memory until close() is called. 

Meta-Block is designed to keep large amount of data and its key is a String, while FileInfo is a simple Map and is preferred for small information and keys and values are both byte-array. Region-server' StoreFile uses Meta-Blocks to store a BloomFilter, and FileInfo for Max SequenceId, Major compaction key and Timerange info.

On close(), Meta-Blocks and FileInfo is written to the OutputStream. To speedup lookups an Index is written for Data-Blocks and Meta-Blocks, Those indices contains n records (where n is the number of blocks) with block information (block offset, size and first key). 
At the end a Fixed File Trailer is written, this block contains offsets and counts for all the HFile Indices, HFile Version, Compression Codec and other few information.

Once the file is written, the next step is reading it. You've to start by loading FileInfo, the loadFileInfo() of HFile.Reader loads in memory the Trailer-block and all the indices, that allows to easily query keys. Through the HFileScanner you can seek to a specified key, and iterate over.
The picture above, describe the internal format of HFile...