SureshJoshi.com ▼

Streaming Protocol Buffers with Javascript


2016-06-07

How do you comfort a JavaScript bug? You console it…

Okay, now that I’ve broken the ice, time to get to the bad news… I’m going to be showing a little bit of Javascript code today…

It won’t be good, it won’t be pretty, but it’ll be mostly functional - so, you know… Javascript.

Specifically, I needed to make a proof-of-concept set of functions for my length-delimited, streaming protocol buffer implementation that I developed for Android and iOS.

To The … Code…?

The system runs on a NodeJS server, so I wrote a quick implementation that should get the job done. It’s not optimized, but it’s entirely a proof-of-concept intended to be improved upon.

Dependencies

The only dependency (other than NodeJS itself) is on this Javascript Protocol Buffer library.

There is another, more popular library out there (this one), but I avoid it because I really dislike the API. Not a seamless experience.

Writing to a Stream

I created two simple functions which would write a single message to a stream, or write multiple messages to a stream.

// TODO: Optimize this function - pretty ugly...
var writeDelimitedMessageTo = function(outputStream, message) {
    const size = message.length
    var buffer = new Buffer(4);
    buffer.writeInt32LE(size);
    outputStream.cork();
    outputStream.write(buffer);
    outputStream.write(message);
    outputStream.uncork()
    return size + 4;
}

var writeDelimitedMessagesTo = function(outputStream, messages) {
    const numberOfMessages = messages.length;
    var writtenSize = 0;
    for (var i = 0; i < numberOfMessages; ++i) {
        writtenSize += writeDelimitedMessageTo(outputStream, messages[i]);
    }
    return writtenSize;
}

Reading from a Stream or Buffer

Correspondingly, I wrote the following to either read from a full buffer of data, or to read from an input stream (kudos to a colleague for figuring out an annoying implementation issue with the stream version).

var readDelimitedMessagesFromBuffer = function(buffer) {
    var bufferLength = buffer.length;
    var start = 0;
    var end = 0;
    var messages = [];

    while (end < bufferLength) {
        var size = buffer.readInt32LE();
        start += 4;
        end = start + size;
        var bytes = buffer.slice(start, end);
        var sample = protobufBuilder.Sample.decode(bytes);
        start += size;

        messages.push(sample);
    }

    return messages;
}

var readDelimitedMessagesFromStream = function(inputStream, callback) {
    var messages = [];

    inputStream.on('readable', function (){
        var sizeb;
        while (null !== (sizeb = inputStream.read(4))) {
            var size = sizeb.readInt32LE();
            var bytes = inputStream.read(size);
            var sample = protobufBuilder.Sample.decode(bytes);
            messages.push(sample);
        }
    });

    inputStream.on('end', () => {
      return callback(messages);
    });
}

Some More Sample Code!

And to put it all together, here is a sample file with the functions plus a small test (and also a sample protocol buffer).

"use strict";

const fs = require('fs');
const protobuf = require('protocol-buffers')

const protobufBuilder = protobuf(fs.readFileSync('Sample.proto'))

// TODO: Optimize this function - pretty ugly...
var writeDelimitedMessageTo = function(outputStream, message) {
    const size = message.length
    var buffer = new Buffer(4);
    buffer.writeInt32LE(size);
    outputStream.cork();
    outputStream.write(buffer);
    outputStream.write(message);
    outputStream.uncork()
    return size + 4;
}

var writeDelimitedMessagesTo = function(outputStream, messages) {
    const numberOfMessages = messages.length;
    var writtenSize = 0;
    for (var i = 0; i < numberOfMessages; ++i) {
        writtenSize += writeDelimitedMessageTo(outputStream, messages[i]);
    }
    return writtenSize;
}

var readDelimitedMessagesFromBuffer = function(buffer) {
    var bufferLength = buffer.length;
    var start = 0;
    var end = 0;
    var messages = [];

    while (end < bufferLength) {
        var size = buffer.readInt32LE();
        start += 4;
        end = start + size;
        var bytes = buffer.slice(start, end);
        var sample = protobufBuilder.Sample.decode(bytes);
        start += size;

        messages.push(sample);
    }

    return messages;
}

var readDelimitedMessagesFromStream = function(inputStream, callback) {
    var messages = [];

    inputStream.on('readable', function (){
        var sizeb;
        while (null !== (sizeb = inputStream.read(4))) {
            var size = sizeb.readInt32LE();
            var bytes = inputStream.read(size);
            var sample = protobufBuilder.Sample.decode(bytes);
            messages.push(sample);
        }
    });

    inputStream.on('end', () => {
      return callback(messages);
    });
}


var sample = protobufBuilder.Sample.encode({
  x: 42,
  y: 84
})

const fileLocation = './tmp';
var samples = [sample, sample, sample];

var writeStream = fs.createWriteStream(fileLocation);

// Write array of messages
var size = writeDelimitedMessagesTo(writeStream, samples);
console.log("Size of Write Multiple: " + size);

// Write single message
size = writeDelimitedMessageTo(writeStream, sample);
console.log("Size of Single Write: " + size);

writeStream.end()

// // Synchronous Buffer read implementation - Commented out for now
// var buffer = fs.readFileSync(fileLocation);
// var messages = readDelimitedMessagesFromBuffer(buffer);
// console.log("Messages from Synchronous Buffer read implementation");
// console.log(messages);

// Asynchronous Stream read implementation
var inputStream = fs.createReadStream(fileLocation);
readDelimitedMessagesFromStream(inputStream, function(message){
    console.log("Messages from Asynchronous Stream read implementation");
    console.log(message);
});
syntax = "proto2";

package sureshjoshi.pb;

option java_package = "com.sureshjoshi.android.pb";

message Sample {
    optional uint32 x = 1;
    optional uint32 y = 2;
}

Alrighty, now that that’s done, time to wash that feeling off with some cool, refreshing C++