Reactive Web

with Rails, React, SSE and RethinkDB

May 27, 2016
In the modern world of multi-layer web applications, passing changes through the layers of the system may become a boring job. For example, when an event happens in a background job on the server-side in a Rails web app, the developer needs to define callbacks in multiple layers to pass the changes through the system to the web UI. Usually this also means creating an additional data flow channel using pub/sub services like Pusher.

Pushing changes between the app layers takes a fair amount of web developer's time, that's why modern web framework try to propose solutions for this problem. For example, in MeteorJS, after fetching a collection from the server and storing it in a JS variable, the developer can expect this variable to reflect subsequent server-side changes automatically, without dealing with any pub/sub services explicitly. This is an example of Reactive Programming paradigm in action.
Reactive Programming is building a program around asynchronous data streams. A data stream can be treated as a "variable changing in time".
In MeteorJS the power of reactive programming comes at the price of using websockets under the hood. Meteor's "reactive queries" feature is based on Distributed Data Protocol (DDP) which is like REST for websockets, so technically there's still a separate non-HTTP data channel which needs special care (for example, separate security treatment).
Meteor's DDP implementation... intelligently polls your database to pick up changes and push them down to the client.
The HTML5 specification includes a technology called Server-Sent Events. Unlike DDP, it works on top of HTTP protocol and technically is just a sort of HTTP streaming. In this article we will port Meteor's "reactive query" feature into a standard Rails app using React, SSE and RethinkDB (and no websockets).
SSE works with most browsers, but not IE and Opera yet. See browser compatibility here. We could apply the same concepts using Long Polling, e.g. using Pollymer library.
Setting the stage
Let's dive in and build a Coffee-Shop self-service UI. The only page of this app contains "Make Coffee" button and a list of recently ordered coffee cups. The intention is to show their status changed in real-time, while the actual coffee-making process happens in a background job on the server side.

There are three data flows on the page (see diagram below):

  1. rendering initial state when the page loads;
  2. updating the state as a result of clicking "Make Coffee" button;
  3. as a result of a background operation.
Later we will apply Reactive Programming paradigm to combine these three flows into a single event stream. When this stream is just opened, the initial state of cups collection will go through it first. After that the stream will remain open so that any subsequent changes to the cups collection on the server-side could be delivered to the web UI.

Let's begin with a traditional Rails app. The only difference is that instead of ActiveRecord we will install RethinkDB and use NoBrainer ORM. Here's our CoffeeCup model:
# app/models/coffee_cup.rb
class CoffeeCup  
  include NoBrainer::Document  
  include NoBrainer::Document::Timestamps  

  KINDS_OF_COFFEE = %w(latte espresso cappuccino)
  STATUSES = %w(pending boiling_water grinding_beans 
                combining_together adding_milk ready)


  field :user_id, type: String, required: true
  field :kind, type: String, required: true, in: KINDS_OF_COFFEE
  field :status, type: String, required: true, in: STATUSES, default: 'pending'
  field :percent, type: Integer, required: true, default: 0
end
The list of recent orders will be rendered by CoffeeCupsController#index, and ordering a cup of coffee is handled in CoffeeCupsController#create:
# app/controllers/coffee_cups_controller.rb
class CoffeeCupsController < ApplicationController
  respond_to :html, only: [:index]
  respond_to :json, only: [:create]

  def index
    cups = CoffeeCup.where(user_id: current_user_id)
    respond_with cups
  end

  def create
    cup = CoffeeCup.create! coffee_cup_params.merge(user_id: current_user_id)
    MakeCoffeeJob.new(cup.id).enqueue
    redirect_to coffee_cups_path
  end

  private

  def coffee_cup_params
    params.require(:coffee_cup).permit(:kind)
  end
end
Making good coffee takes some time. We'll simulate this with a background job:
# app/jobs/make_coffee_job.rb
class MakeCoffeeJob < ActiveJob::Base
  def perform(coffee_cup_id)
    coffee_cup = CoffeeCup.find(coffee_cup_id)

    # ... here we are making coffee and reporting our progress periodically ...
    # offee_cup.update(status: 'grinding_beans', percent: 30)

    coffee_cup.update(status: 'ready', percent: 100)
  end
end
Then we'll render the ordering form and the list of ordered cups on the server-side in app/views/coffee_cups/index.html.slim, and everything will work fine... but the user needs to reload the page to track her order's status. Let's go and add some "reactiveness" to this app.
Adding Reactiveness to the Client-Side
Let's replace the server-side rendering of ordered cups list with client-side rendering in React. First of all, let's add these lines to Gemfile to get React installed with ES6 syntax support:
# Gemfile
gem 'react-rails'
gem 'rxjs-rails'
gem 'sprockets-es6'
Now we can render react components in our views using the react_component helper method. We'll pass it url parameter in a second argument to tell where the cups collection live. Notice the :sse format option — we'll cover that a bit later in Adding Reactiveness to the Server-Side.
-# app/views/coffee_cups/index.slim
h2 Your recent orders listed below
= react_component('CupsList', url: coffee_cups_path(format: :sse))
Then we need to define the CupsList react component which renders a list of CoffeeCup components. Each time we call setState and pass it a new list of cups, the UI is refreshed to reflect the changes.
// app/assets/javascripts/components/coffee_cup.es6.jsx
class CupsList extends React.Component {
  constructor(props) {
    super(props);
    this.state = {cups: []};

    const aggregator = new AggregatedEventSource(this.props.url);

    aggregator.subscribe(newCups => {
      this.setState({cups: newCups});
    });
  }

  return (<div>
             {cups.map((cup) => {
               return (<CoffeeCup cup={cup} key={cup.id}/>);
             })}
         </div>);
}
// app/assets/javascripts/components/coffee_cup.es6.jsx
class CoffeeCup extends React.Component {
  render() {
    const {cup} = this.props;
    return (
        // rendering cup's HTML here
    );
  }
}
That's all pretty standard, and the secret sauce is AggregatedEventSource class, which establishes an SSE connection, listens to the changefeed, aggregates it and exposes it as an RxJS subject. When the SSE connection is opened, we expect the initial list of cups first, and then incremental diffs. In our example we expect raw RethinkDB's changesfeed, which contains the old and the new state of a record under old_val and new_val keys in SSE payload. We call onNext function after each change to notify all the subscribers of AggregatedEventSource.
class AggregatedEventSource extends Rx.Subject {
  constructor(url) {
    super();

    this.source = new EventSource(url);
    this.state = {};

    this.source.addEventListener('row', (event) => {
      const payload = JSON.parse(event.data);
      if (payload.new_val) { // added or updated
        this.state[payload.new_val.id] = payload.new_val;
      } else { // deleted
        delete this.state[payload.old_val.id];
      }

      this.onNext(this.collection());
    });
  }

  collection() {
    return Object.keys(this.state).map(key => this.state[key]);
  }
}
Adding Reactiveness to the Server-Side
On the server-side, we need to propagate the changefeed from RethinkDB to the connected clients using Server-Sent Events (SSE). The server will respond to SSE requests asynchronously, so we need to remove Rack::Lock middleware (even in development) and add thin_async gem. Removing Rack::Lock in our case has nothing to do with multi-threading, because we are not going to run Thin in experimental threaded mode.
# config/application.rb   
config.middleware.delete "Rack::Lock"

# Gemfile
gem 'thin_async'
SSE requests are routed within the Rails app just like any other HTTP request. They have cookies, so current_user method works for them too. This means we need no changes to our controller code other than enabling :sse response format and creating a responder for it:
# config/mime_types.rb
Mime::Type.register 'text/event-stream', :sse

# lib/responders/nobrainer_sse_responder.rb
require 'thin/async'
module Responders::NobrainerSseResponder
  def to_sse
    output_stream = ActionController::Live::SSE.new(response, retry: 300, event: 'row')
    # ... subsequent calls to output_stream.write(...) from within rethink_query.em_run
    head -1 # acts like thow :async
  end
end

# controllers/coffee_cups_controller.rb
class CoffeeCupsController < ApplicationController
  responders :nobrainer_sse
  respond_to :html, :sse, only: [:index]
  # ...
In order to handle lots of concurrent clients on a single-threaded server we will be using Event Machine. RethinkDB's Ruby API provides the em_run method which accepts two arguments: a RethinkDB connection and a DbHandler which defines callbacks for various situations. In our DbHandler class we define on_initial_val and on_change callbacks which are called when the initial dataset or subsequent changes are received from a RethinkDB connection. In our example we write RethinkDB records into SSE output_stream as is, but in a more complex scenario we could de-serialize them into Rails models or apply some transformations before sending them to the client-side.
# lib/responders/nobrainer_sse_responder.rb
require 'thin/async'

module Responders::NobrainerSseResponder
  class DbHandler < RethinkDB::Handler
    delegate :write, to: '@writer'

    def initialize(writer)
      @writer = writer
    end

    # find all callbacks at https://rethinkdb.com/api/ruby/em_run/

    def on_initial_val(val)
      write(old_val: nil, new_val: val)
    end

    def on_change(old, new)
      write(old_val: old, new_val: new)
    end
  end

  def to_sse
    output_stream = ActionController::Live::SSE.new(response, retry: 300, event: 'row')
    db_handler = DbHandler.new(output_stream)   
    rethink_query.em_run(NoBrainer.connection.raw, db_handler)
    head -1 # acts like thow :async   
  end

  private

  def rethink_query
    resource.to_rql.changes(include_initial: true)
  end
end
Our responder is mostly finished. We just need to close rethink_handle properly when the client disconnects, add some error handling and send PING messages eventually so that the SSE connection doesn't time out on a platform like Heroku. The finished source code of the responder is available in Github.
Connecting pieces together
The single point where a React component gets linked to the back-end data endpoint is a call to react_component helper in Rails view:
= form_for CoffeeCup.new, remote: true do |f|
  = f.select :kind, CoffeeCup::KINDS_OF_COFFEE
  = f.submit 

h2 Your recent orders listed below
= react_component('CupsList', url: coffee_cups_path(format: :sse))
Conclusion
It's easy to write backend in Rails. It's easy to write highly interactive front-end in React. We can take the best from both worlds connecting them with SSE. Let's analyse the key attributes of this architecture:

  1. We don't introduce additional data channels. Unlike Websockets, SSE is a part of HTTP protocol, and SSE requests are routed along with all other requests in the app.
  2. We rely on Rails stack to route and process the request. We use RESTful routes to define SSE resources, cookies-based sessions to identify current_user, HTTP status codes to report errors.
  3. The initial dataset and the subsequent feed of changes is delivered in a single request, which eliminates any chance of a race condition. E.g. if we're building a chat application, we'll never miss a message and never will deliver a message twice.
  4. We rely on RethinkDB to detect changes in the data. This efficiently decouples the background worker (which makes changes to the data) from the web worker (which renders the SSE stream). In our example we could even implement the background coffee maker job in a different programming language.
Credits
The article was completely re-worked after I read On my radar: RethinkDB + React.js + Rails by Robert Pankowecki. He also proposed "sending the stream of changes with EventMachine and/or Thin instead of Puma", which is a core idea in my article.
Further Reading