Sunday, March 25, 2012

Cleaning up the BufferedReader Mess in a Proxy Server

A couple of weeks ago, friends from the university knocked my door. They were given an assignment to implement a HTTP Proxy Server. I tried to do my best and told them the basics. That is, they should first simply read the HTTP headers line by line, and then read the rest of the stream in bytes. After that, the mechanics are easy:
  1. Pass the request headers from browser to the server, which is, provided by Host header in the browser request,
  2. Pass back the response sent by server to the browser.
Easy, right? Just before the homework due, my door knocked again. Suprise! Suprise! They couldn't properly read the server data after reading the headers. It sounded like a trivial problem at first, as hours pass by while I'm trying to fix the code, it appeared to be not like so. Since you are here for the code, let me first show you the working draft.
import java.io.*;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;

public class Main {
    static final int PROXY_PORT = 8080;
    static final String PROXY_HOST = "localhost";
    static final String NEWLINE = "\r\n";

    public static void main(String[] args) throws Exception {
        ServerSocket proxySocket = new ServerSocket(
                PROXY_PORT, 32, InetAddress.getByName(PROXY_HOST));

        while (true) {
            // Accept the incoming connection.
            Socket clientSocket = proxySocket.accept();
            BufferedInputStream clientInputStream = new BufferedInputStream(
                    new DataInputStream(clientSocket.getInputStream()));
            OutputStream clientOutputStream = new DataOutputStream(
                    clientSocket.getOutputStream());

            // Read client headers.
            List<String> clientHeaders = readHeaders(clientInputStream);
            display("Client Headers", clientHeaders);

            // Locate the web server.
            String hostHeader = getHeader(clientHeaders, "Host");
            display("HostHeader", hostHeader);
            String[] hostHeaders = hostHeader.split(":");
            String hostName = hostHeaders[0];
            display("HostName", hostName);
            int hostPort = hostHeaders.length > 1
                    ? Integer.parseInt(hostHeaders[1]) : 80;
            display("HostPort", hostPort);

            // Connect to the web server.
            Socket serverSocket = new Socket(hostName, hostPort);
            BufferedInputStream serverInputStream = new BufferedInputStream(
                    new DataInputStream(serverSocket.getInputStream()));
            OutputStream serverOutputStream = new DataOutputStream(
                    serverSocket.getOutputStream());

            // Pass the client request to the web server.
            writeHeaders(serverOutputStream, clientHeaders);
            serverSocket.shutdownOutput();
            display("Sent server headers.");

            // Read web server response headers.
            List<String> serverHeaders = readHeaders(serverInputStream);
            display("ServerHeaders", serverHeaders);

            // Read web server response data.
            byte[] serverData = readData(serverInputStream);
            display("ServerDataLength", serverData.length);

            // Try to sign the response data.
            byte[] signedData = sign(serverHeaders, serverData);

            // Pass the web server response to the client.
            writeHeaders(clientOutputStream, serverHeaders);
            clientOutputStream.write(signedData);

            display("---------------------");
            display("");

            serverSocket.close();
            clientSocket.close();
        }
    }

    static byte[] readData(InputStream stream) throws Exception {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BufferedInputStream bufferedStream = new BufferedInputStream(stream);
        byte[] buf = new byte[8192];
        int len;
        while ((len = bufferedStream.read(buf)) > 0)
            baos.write(buf, 0, len);
        return baos.toByteArray();
    }

    static void writeHeaders(OutputStream stream, List<String> headers)
            throws IOException {
        StringBuilder builder = new StringBuilder();
        for (String header : headers) {
            builder.append(header);
            builder.append(NEWLINE);
        }
        builder.append(NEWLINE);
        stream.write(builder.toString().getBytes());
    }

    static List<String> readHeaders(BufferedInputStream stream)
            throws Exception {
        List<String> lines = new ArrayList<String>();
        BufferedReader reader = new BufferedReader(
                new InputStreamReader(stream));
        String line;
        long nRead = NEWLINE.length();  // For the last empty line.
        stream.mark(Integer.MAX_VALUE);
        while ((line = reader.readLine()) != null && !(line.isEmpty())) {
            nRead += line.getBytes().length + NEWLINE.length();
            if (!line.startsWith("Accept-Encoding"))    // Avoid compressed pages.
                lines.add(line);
        }
        stream.reset();
        long nSkipped = stream.skip(nRead);
        assert (nSkipped == nRead);
        return lines;
    }

    static String getHeader(List<String> headers, String name) {
        for (String line : headers)
            if (line.startsWith(name))
                return line.split(": ")[1];
        return null;
    }

    static byte[] sign(List<String> headers, byte[] data) {
        String header = getHeader(headers, "Content-Type");
        if (header == null || !header.startsWith("text/html"))
            return data;
        String content = new String(data);
        Pattern pattern = Pattern.compile("^(.*<title>)(.*</title>.*)$",
                Pattern.CASE_INSENSITIVE | Pattern.MULTILINE);
        return pattern.matcher(content).replaceFirst("$1[VY] $2").getBytes();
    }

    static void display(String title, List<String> lines) {
        System.out.println("### <" + title + "> ###");
        for (String line : lines)
            System.out.println("'" + line + "'");
        System.out.println("### </" + title + "> ###");
    }

    static void display(String title, Object obj) {
        System.out.println("### " + title + ": '" + obj + "'");
    }

    static void display(Object obj) {
        System.out.println("### " + obj);
    }

    static void display() {
        System.out.println();
    }
}
The truth is, it took my almost four hours to find and squash the bug. serverSocket.shutdownOutput() was a low hanging one, it solved the problem of web server waiting to start sending data. But take a closer look at the readHeaders() method. You see the mess? Particularly, the ones with stream arithmetic using mark(), reset() and skip() calls. The problem was, in order to make readLine() requests on an InputStream, you first need to wrap it with a BufferedReader. But once you wrap it up and make a call to readLine(), BufferedReader buffers the input stream to a position that is much more advanced than you generally expect it to be. Hence, after you finish reading headers and continue with reading the response data in bytes, read() tells you that there is nothing left to read. To avoid such nasty bugs, after reading from an InputStream using some sort of buffered reader, do not forget to reset the stream to a position where you expect it to be.

Monday, March 19, 2012

Shared Transport in JGroups

A transport in a JGroups application is a heavy-weight concept and a majority of the JGroups entities (remote procedure call, replicated hash map, etc.) generally require their dedicated channels for communication. However, by sharing a transport between multiple channels, it is possible to optimize available resources. (Remember that each transport occupies a thread pool and a socket.) For this purpose, one just needs to set the singleton_name attribute for the transport.


Unfortunately, it is not possible to alter the singleton_name attribute of a transport using something like:
String singletonName = "yabba";
JChannel channel = new JChannel();
channel.getProtocolStack().getTransport()
        .setValue("singleton_name", singletonName)
At the moment, for this purpose, you have to alter the XML specification of the channel you are using. (See Bela Ban's reply to Setting Up a Shared Transport post in JGroups mailing-list.)
For this purpose, I put together a ChannelFactory as follows.
final class ChannelFactory {
    private static ChannelFactory instance = null;
    private static final String singletonName = "shared";
    private List channels;

    private ChannelFactory() {
        channels = new ArrayList();
    }

    synchronized JChannel create() throws Exception {
        JChannel channel = new JChannel(
                this.getClass().getClassLoader()
                        .getResource("META-INF/channel.xml"));
        channels.add(channel);
        return channel;
    }

    synchronized void clear() {
        for (JChannel channel : channels) {
            channel.disconnect();
            channel.close();
        }
        channels.clear();
    }

    static synchronized ChannelFactory getInstance() {
        if (instance == null)
            instance = new ChannelFactory();
        return instance;
    }
}
Later, I copied (say) udp.xml from JGroups sources to META-INF/channel.xml. And I added singleton_name="shared" line to the protocol specification in META-INF/channel.xml file.

Now, my individual classes make calls to ChannelFactory.getInstance().create() to get a channel dedicated to them. And while program shuts down, I just make a call to ChannelFactory.getInstance().clear() to tidy things up.

Sunday, March 11, 2012

Chat Application with Play Framework and KnockoutJS

For a geography based social web application, I chose to work on a Single Page Application template. For this purpose, I needed a Javascript framework to do the necessary plumbing for synchronizing server side and client side model objects. It took my nearly 3-4 weeks to finish the implementation. I was very happy with its final state and was excited to use it in the rest of the project. Next morning, I met with Backbone.js with a suggestion from a friend of mine. Damn! My implementation was so close to Backbone.js that almost every function name was identical. Anyway, I couldn't know if I should cry or smile. Following that path, it didn't take long for me to discover KnockoutJS. I found it astonishingly beautiful and directly dived into the depths of the tutorial and spent that half day to read-and-evaluate the exercises. Since the best way to learn something is to use it, in this blog post I will try to walk you through a chat application using Play Framework and KnockoutJS.

In order to save some effort, I'll use the Facebook Login mechanics described in my Facebook Login and Secure Module Integration post. Since I put together the necessary pieces into a GitHub project (play-facebook), I will bootstrap directly from there.
$ git clone git://github.com/vy/play-facebook.git play-chat
$ cd play-chat
$ play deps
$ emacs conf/application.conf            # Edit application.name
$ emacs app/controllers/Security.java    # Edit FBOAuth
$ play run
So far, so good. Now, we need a Room model to store the posted messages.
public class Room {
    private static Room instance = null;
    public static final int EVENT_STREAM_SIZE = 100;
    private final ArchivedEventStream<Event> eventStream =
            new ArchivedEventStream<Event>(EVENT_STREAM_SIZE);

    public void publish(Event event) {
        eventStream.publish(event);
    }

    public Promise<List<IndexedEvent<Event>>> nextMessages(long lastReceived) {
        return eventStream.nextEvents(lastReceived);
    }

    public static abstract class Event {
        public final String date;
        public final String user;
        public final Type type;

        public final static DateFormat dateFormat =
                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        public enum Type {
            JOIN,
            LEAVE,
            MESSAGE
        }

        public Event(User user, Type type) {
            this.date = dateFormat.format(new Date());
            this.user = user.name;
            this.type = type;
        }
    }

    public static class JoinEvent extends Event {
        public JoinEvent(User user) {
            super(user, Type.JOIN);
        }
    }

    public static class LeaveEvent extends Event {
        public LeaveEvent(User user) {
            super(user, Type.LEAVE);
        }
    }

    public static class MessageEvent extends Event {
        public final String text;

        public MessageEvent(User user, String text) {
            super(user, Type.MESSAGE);
            this.text = text;
        }
    }

    public static Room getInstance() {
        if (instance == null)
            instance = new Room();
        return instance;
    }
}
Below is the list of major components provided by the Room model.
  • eventStream, which is of type ArchivedEventStream<Event>, is the stream of messages that are published in the room. We bound the size of the stream window with EVENT_STREAM_SIZE.
  • nextMessages, returns the list of messages that are published since the time pointed by passed index number, lastReceived. The main crux here is that, we return a Promise, which is actually a tiny magic provided by ArchivedEventStream. Hence, web server can process these requests asynchronously without blocking the whole thread. And this results in a huge increase in the number of simultaneous requests that we can handle.
  • Event is the base type that is fed to eventStream. We represent each user action (join, leave, message) by corresponding subclasses of Event, that is, JoinEvent, LeaveEvent, and MessageEvent. As a side note, these classes will be serialized to their JSON counterparts while getting transferred to the client side. Hence, try to keep them as compact as possible.
Since we have a Room, now we will implement our Chat controller.
@With(Secure.class)
public class Chat extends Controller {
    public static void index() {
        renderArgs.put("username", Security.getSessionUser().name);
        render();
    }

    public static void join() {
        Room.getInstance().publish(
                new Room.JoinEvent(Security.getSessionUser()));
    }

    public static void leave() {
        Room.getInstance().publish(
                new Room.LeaveEvent(Security.getSessionUser()));
    }

    public static void say(String text) {
        Room.getInstance().publish(
                new Room.MessageEvent(Security.getSessionUser(), text));
    }

    public static void waitMessages(long lastReceived) {
        renderJSON(
                // Here we use continuation to suspend the execution until a
                // new message arrives.
                await(Room.getInstance().nextMessages(lastReceived)),
                new TypeToken<List<IndexedEvent<Room.Event>>>() {}.getType());
    }
}
See the @With(Secure.class) annotation? Yes, only signed in Facebook users are allowed. Per see, index(), join(), leave(), say() methods are clear. The significant bit here is the waitMessages method. Each client will make long XHR polls to the server and wait for incoming messages. And waitMessages will render necessary number of messages from Room event stream in JSON. Since Room.nextMessages() returns a Promise, waitMessages will work in an asynchronous (non-blocking) manner.


Let's modify app/views/Application/index.html as follows to provide a link to the chat room for the signed in users.
#{extends 'main.html' /}
#{set title:'Home' /}

#{if user}
<h1>Welcome, ${user.name}! #{if user.isAdmin}(admin)#{/if}</h1>
<a href="@{Secure.logout()}">Logout</a>
<a href="@{Chat.index()}">Chat Room</a>
#{/if}

#{else}
#{fbLogin text:'Log In', perms:'publish_stream' /}
#{/else}
It is time for the client side implementation of the chat room. I'll first present you the app/views/Chat/index.html as is, and then explain the bits in it. Here we go.
#{extends 'main.html' /}
#{set title:'Chat Room' /}
#{set 'moreScripts'}
    #{script 'jquery.scrollTo.js' /}
    #{script 'knockout.js' /}
#{/set}
#{set 'moreStyles'}
    #{stylesheet 'chat.css' /}
#{/set}

<div>
    <button data-bind="enable: !isJoined(), click: join">Join</button>
    <button data-bind="enable: isJoined, click: leave">Leave</button>
</div>

<div id="messages" data-bind="foreach: messages">
    <div>
        <span class="date" data-bind="text: date"></span>
        <span class="message" data-bind="if: type == 'MESSAGE'">
            <span class="user"
                  data-bind="text: user,
                             css: { you: user == '${username}' }"></span>&gt;
            <span class="text" data-bind="text: text"></span>
        </span>
        <span class="join" data-bind="if: type == 'JOIN'">
            <span class="user" data-bind="text: user"></span> joins the room.
        </span>
        <span class="leave" data-bind="if: type == 'LEAVE'">
            <span class="user" data-bind="text: user"></span> leaves the room.
        </span>
    </div>
</div>

<form data-bind="submit: say">
    <label for="inputText">Type your text here:</label>
    <input id="inputText" type="input"
           data-bind="enable: isJoined, value: messageText" />
    <input type="submit" value="Send" data-bind="enable: isJoined" />
</form>

<script type="text/javascript">
    $(document).ready(function() {
        var XHR = {
            join: #{jsAction @join() /},
            leave: #{jsAction @leave() /},
            say: #{jsAction @say() /},
            waitMessages: #{jsAction @waitMessages(':lastReceived') /}
        };

        var RoomModel = function() {
            var self = this;
            var lastReceived = 0;

            self.isJoined = ko.observable(false);
            self.messages = ko.observableArray([]);
            self.messageText = ko.observable("");

            self.join = function() {
                $.post(XHR.join(), {}, function() {
                    self.isJoined(true);
                    getMessages();
                });
            };

            self.leave = function() {
                $.post(XHR.leave(), {}, function() {
                    self.isJoined(false);
                });
            };

            self.say = function() {
                $.post(XHR.say(), {text: self.messageText()});
                self.messageText("");
            };

            var getMessages = function() {
                if (self.isJoined())
                    $.getJSON(XHR.waitMessages({lastReceived: lastReceived}), {},
                            function(events) {
                                $(events).each(function() {
                                    self.messages.push(this.data);
                                    lastReceived = this.id;
                                });
                                $("#messages").scrollTo("max");
                                getMessages();
                            });
            };

            $(window).unload(function() {
                if (self.isJoined())
                    $.post(XHR.leave());
            });
        };

        var roomModel = new RoomModel();
        ko.applyBindings(roomModel);
    });
</script>
Before diving into the sources, you will need to get jquery.scrollTo.js and knockout.js files.
$ cd public/javascripts
$ wget http://flesler-plugins.googlecode.com/files/jquery.scrollTo-1.4.2-min.js -O jquery.scrollTo.js
$ wget http://github.com/downloads/SteveSanderson/knockout/knockout-2.0.0.js -O knockout.js
And a simple CSS (public/stylesheets/chat.css) for the chat room.
#messages {
    height: 200px;
    overflow: auto;
    font-family: Arial, Verdana, sans-serif;
    font-size: 10pt;
    padding-bottom: 8px;
    padding-top: 8px;
}

#messages .user { font-style: oblique; }
#messages .message .user { font-style: normal; }
#messages .you { font-weight: bold; }
#messages .join { color: green; }
#messages .leave { color: red; }
#messages .date { font-family: monospace; padding-right: 6px; }
Let me introduce you to KnockoutJS with a very simple snippet from the above mess.
  1. There is a javascript object, called RoomModel, and it has a boolean field called isJoined.
  2. There is a submit button to send messages to the room and we want the button to be enabled and disabled appropriately according to the RoomModel.isJoined flag.
  3. We instantiate RoomModel as an observable using KnockoutJS.
    var roomModel = new RoomModel();
    ko.applyBindings(roomModel);
  4. We set isJoined to an observable as well.
    self.isJoined = ko.observable(false);
  5. Add data-bind="enable: isJoined" attribute to the button.
After three easy settings, we have the send button auto-magically binded to the isJoined flag. When there occurs a change on the isJoined variable, change will be propagated to the all dependent objects.

Another good thing about KnockoutJS observables is that the change propagation channel is bidrectional. That is, when an observable variable changes, this change gets propagated to the rest of the binded elements. Moreover, a change on the binded elements are also gets reflected to the variables as well. Let's see this in action.
  1. There is an observable text field called messageText.
  2. Message input box is binded to this observable through data-bind="enable: isJoined, value: messageText" attribute.
Here, messageText variable and the value of the input box are bidirectionally synchronized by KnockoutJS observables. Another cool KnockoutJS feature is its foreach looping functionality over observable arrays. We see on in action while printing out the incoming messages. That is,
  1. Messages are stored in an observable array via
    self.messages = ko.observableArray([]);
  2. We loop over the messages observable via
    <div ... data-bind="foreach: messages">
    ...
    </div>
  3. Inside the loop body, current array element is accessed as is. (Remember the list of JSON-inified Room.Event subclasses?)
There are other cool KnockoutJS tricks that are available in the above chat room sources (e.g., if: type == 'MESSAGE', css: { you: user == '${username}', etc.), but explaining all of them is out of the scope of my agenda, at least not in this blog post. I strongly advise you to take a look at the KnockoutJS tutorials and documentation.

Near the end, I also would like to take your attention to the XHR singleton in the code.
var XHR = {
    join: #{jsAction @join() /},
    leave: #{jsAction @leave() /},
    say: #{jsAction @say() /},
    waitMessages: #{jsAction @waitMessages(':lastReceived') /}
};
Thanks God, Play handles server side and client side connectivity of the XHR functions for me. (Look ma, I can even pass parameters!)

Ok, enough talking. Let's do it!
And here I join to the room.

To sum up, I would definitely suggest everyone to checkout KnockoutJS. (Did I say that KnockoutJS provides one of the best tutorial experiences I have ever seen?) It was a really exiciting experience to work with KnockoutJS and to be honest, I found KnockoutJS so overwhelming and expressive that I would never ever code a single dynamical web page without it.

Anyway, let's finish this post here. Thanks for reading this far and hope this post helps to you as well.

Note: Source are available through play-chat GitHub project.