Development Streaming Protocol Buffers with Javascript

How do you comfort a JavaScript bug? You console it…

Okay, now that I’ve broken the ice, time to get to the bad news… I’m going to be showing a little bit of Javascript code today…

It won’t be good, it won’t be pretty, but it’ll be mostly functional - so, you know… Javascript.

Specifically, I needed to make a proof-of-concept set of functions for my length-delimited, streaming protocol buffer implementation that I developed for Android and iOS .

To The … Code…?

The system runs on a NodeJS server, so I wrote a quick implementation that should get the job done. It’s not optimized, but it’s entirely a proof-of-concept intended to be improved upon.

Dependencies

The only dependency (other than NodeJS itself) is on this Javascript Protocol Buffer library.

There is another, more popular library out there ( this one ), but I avoid it because I really dislike the API. Not a seamless experience.

Writing to a Stream

I created two simple functions which would write a single message to a stream, or write multiple messages to a stream.

 // TODO: Optimize this function - pretty ugly...
var writeDelimitedMessageTo = function (outputStream, message) {
    const size = message.length;
    var buffer = new Buffer(4);
    buffer.writeInt32LE(size);
    outputStream.cork();
    outputStream.write(buffer);
    outputStream.write(message);
    outputStream.uncork();
    return size + 4;
};

var writeDelimitedMessagesTo = function (outputStream, messages) {
    const numberOfMessages = messages.length;
    var writtenSize = 0;
    for (var i = 0; i < numberOfMessages; ++i) {
        writtenSize += writeDelimitedMessageTo(outputStream, messages[i]);
    }
    return writtenSize;
};
 

Reading from a Stream or Buffer

Correspondingly, I wrote the following to either read from a full buffer of data, or to read from an input stream (kudos to a colleague for figuring out an annoying implementation issue with the stream version).

 var readDelimitedMessagesFromBuffer = function (buffer) {
    var bufferLength = buffer.length;
    var start = 0;
    var end = 0;
    var messages = [];

    while (end < bufferLength) {
        var size = buffer.readInt32LE();
        start += 4;
        end = start + size;
        var bytes = buffer.slice(start, end);
        var sample = protobufBuilder.Sample.decode(bytes);
        start += size;

        messages.push(sample);
    }

    return messages;
};

var readDelimitedMessagesFromStream = function (inputStream, callback) {
    var messages = [];

    inputStream.on("readable", function () {
        var sizeb;
        while (null !== (sizeb = inputStream.read(4))) {
            var size = sizeb.readInt32LE();
            var bytes = inputStream.read(size);
            var sample = protobufBuilder.Sample.decode(bytes);
            messages.push(sample);
        }
    });

    inputStream.on("end", () => {
        return callback(messages);
    });
};
 

Some More Sample Code!

And to put it all together, here is a sample file with the functions plus a small test (and also a sample protocol buffer).

 "use strict";

const fs = require("fs");
const protobuf = require("protocol-buffers");

const protobufBuilder = protobuf(fs.readFileSync("Sample.proto"));

// TODO: Optimize this function - pretty ugly...
var writeDelimitedMessageTo = function (outputStream, message) {
    const size = message.length;
    var buffer = new Buffer(4);
    buffer.writeInt32LE(size);
    outputStream.cork();
    outputStream.write(buffer);
    outputStream.write(message);
    outputStream.uncork();
    return size + 4;
};

var writeDelimitedMessagesTo = function (outputStream, messages) {
    const numberOfMessages = messages.length;
    var writtenSize = 0;
    for (var i = 0; i < numberOfMessages; ++i) {
        writtenSize += writeDelimitedMessageTo(outputStream, messages[i]);
    }
    return writtenSize;
};

var readDelimitedMessagesFromBuffer = function (buffer) {
    var bufferLength = buffer.length;
    var start = 0;
    var end = 0;
    var messages = [];

    while (end < bufferLength) {
        var size = buffer.readInt32LE();
        start += 4;
        end = start + size;
        var bytes = buffer.slice(start, end);
        var sample = protobufBuilder.Sample.decode(bytes);
        start += size;

        messages.push(sample);
    }

    return messages;
};

var readDelimitedMessagesFromStream = function (inputStream, callback) {
    var messages = [];

    inputStream.on("readable", function () {
        var sizeb;
        while (null !== (sizeb = inputStream.read(4))) {
            var size = sizeb.readInt32LE();
            var bytes = inputStream.read(size);
            var sample = protobufBuilder.Sample.decode(bytes);
            messages.push(sample);
        }
    });

    inputStream.on("end", () => {
        return callback(messages);
    });
};

var sample = protobufBuilder.Sample.encode({
    x: 42,
    y: 84,
});

const fileLocation = "./tmp";
var samples = [sample, sample, sample];

var writeStream = fs.createWriteStream(fileLocation);

// Write array of messages
var size = writeDelimitedMessagesTo(writeStream, samples);
console.log("Size of Write Multiple: " + size);

// Write single message
size = writeDelimitedMessageTo(writeStream, sample);
console.log("Size of Single Write: " + size);

writeStream.end();

// // Synchronous Buffer read implementation - Commented out for now
// var buffer = fs.readFileSync(fileLocation);
// var messages = readDelimitedMessagesFromBuffer(buffer);
// console.log("Messages from Synchronous Buffer read implementation");
// console.log(messages);

// Asynchronous Stream read implementation
var inputStream = fs.createReadStream(fileLocation);
readDelimitedMessagesFromStream(inputStream, function (message) {
    console.log("Messages from Asynchronous Stream read implementation");
    console.log(message);
});
 
 syntax = "proto2";

package sureshjoshi.pb;

option java_package = "com.sureshjoshi.android.pb";

message Sample {
    optional uint32 x = 1;
    optional uint32 y = 2;
}
 

Alrighty, now that that’s done, time to wash that feeling off with some cool, refreshing C++