Streaming Hapi Responses with MongoDB

Let’s say you are implementing a REST endpoint that lists a collection. Normally, you’d paginate the endpoint with a maximum page size of say 25 items to avoid memory issues. You’d return something like this example, and if the client wants more than one page, it would need to make more than one request:

{
  "data": [
    { "id": 1, "title": "Item 1" },
    { "id": 2, "title": "Item 2" }
  ],
  "pages": {
    "current": 2,
    "prev": 1,
    "hasPrev": true,
    "next": 3,
    "hasNext": true,
    "total": 5
  },
  "items": {
    "perPage": 2,
    "begin": 3,
    "end": 4,
    "total": 9
  }
}

This can be a real pain for clients to deal with, especially if the client knows it wants everything. With NodeJS, you may be able to greatly increase the maximum page size or eliminate it completely with streams. This is because with streams you don’t need to buffer the entire response before sending it. You can send the response piece by piece, greatly reducing the memory demands on the server.

If you are using MongoDB, you can get a streaming response from the database like this:

const mongoStream = db.collection('items').find({}).stream();

This will be an objectMode stream, with each object one document from your MongoDB collection. HapiJS also supports streaming responses like this:

// Within your handler
return reply(responseStream);

HapiJS response streams should be in binary mode or in objectMode emitting strings. You can’t just connect your MongoDB stream to HapiJS because you need to construct a valid JSON object so the client can parse it. To do that, you can pipe your MongoDB stream through a Transform stream that takes in MongoDB documents and emits a JSON object piece by piece that will look like the response above. Here’s an example of a stream that does that:

// PaginationStream.js
const Transform = require('stream').Transform;
const assert = require('assert');

class PaginationStream extends Transform {
  /**
   * Create a PaginationStream
   * @param page {Number} which page of data will be streamed through
        (starting with 1)
   * @param perPage {Number} how many objects are returned per page
        (>= 0; if 0, then return all objects)
   * @param total {Number} the total number of results (>= 0)
   */
  constructor(page, perPage, total) {
    assert(page >= 1 , 'page should be >= 1');
    assert(perPage >= 0, 'perPage should be >= 0');
    assert(total >= 0, 'total should be >= 0');
    super({ objectMode: true });
    this.page = page;
    this.perPage = perPage;
    this.total = total;
    this.count = 0;
    this.perPageReached = false;
  }

  _transform(data, encoding, callback) {
    if (this.perPageReached) {
      return callback(
        new Error('pagination page limit already reached')
      );
    }

    if (this.count === 0) {
      this.push('{\n  "data":[');
    }

    this.push(JSON.stringify(data, null, 2));
    this.count++;

    // When we reach the limit or the total number of objects, emit an
    // end of array marker and the pagination object
    if (this._isEndOfPage()) {
      this.perPageReached = true;
      const pagination = PaginationStream._paginationObject(
        this.page,
        this.total,
        this.perPage
      );
      const paginationJson = JSON
        .stringify(pagination, null, 2)
        .replace(/^{/, '');
      this.push('], ');
      this.push(paginationJson);
    } else {
      this.push(',');
    }

    return callback();
  }

  _isEndOfPage() {
    if (this.perPage > 0) {
      // Has a per-page limit if perPage > 0
      return this.count === Math.min(this.total, this.perPage);
    } else {
      // No per-page limit if perPage === 0
      return this.count === this.total;
    }
  }

  /**
   * Returns a pagination object
   * @param page {Number} current page number
   * @param total {Number} total number of objects
   * @param perPage {Number} number of objects per page
   * @private
   */
  static _paginationObject(page, total, perPage) {
    const countNum = perPage === 0 ? total : perPage,
      begin = (page-1) * countNum + 1,
      end = (page * countNum) > total ? total : (page * countNum);

    return {
      pages: {
        current: page,
        prev: page - 1,
        hasPrev: page > 1,
        next: page + 1,
        hasNext: total > end,
        total: Math.ceil(total / countNum)
      },
      items: {
        perPage,
        begin,
        end,
        total
      }
    };
  }
}

module.exports = PaginationStream;

You can use these streams together like this:

const total = db.collection('items').find({}).count();
const mongoStream = db.collection('items').find({}).stream();
const paginationStream = new PaginationStream(1, 0, total);

return reply(mongoStream.pipe(paginationStream));

This is pretty good, but if the response is large, we really need to gzip it. Luckily, Node provides a GZip stream that we can use.

db.collection('items').find({}).count((err, total) => {
  const mongoStream = db.collection('items').find({}).stream();
  const paginationStream = new PaginationStream(1, 0, total);
  const gzipStream = createGzip();

  const stream = mongoStream.pipe(paginationStream).pipe(gzipStream);

  return reply(stream).header('content-encoding', 'gzip');
});

There are a few edge cases that we need to deal with, though:

  1. If your response is taking a really long time, the user could close the tab of the browser which would close the socket. If the socket is closed, we need to stop the stream both to save resources and because the gzip stream will emit an error if we try to send data to it after the socket closes.
  2. There could be some kind of MongoDB error mid-stream (unlikely, but I’ll show you how to handle it anyway)

To handle these edge cases, we’ll attach error handlers to the streams and log the errors. Whenever an error handler is attached to a stream, Node will assume the application is dealing with the error. If there is no error handler on a stream, any error in the stream causes the process to quit like an unhandled exception.

Here’s the code with error handlers added. Maybe there is a more concise way just trapping all errors, but this is what is working for me:

db.collection('items').find({}).count((err, total) => {
  const mongoStream = db.collection('items').find({}).stream();
  mongoStream.on('error', err => {
    request.log(['warn'], { stream: 'mongo', err });
  });

  const paginationStream = new PaginationStream(1, 0, total);
  paginationStream.on('error', err => {
    request.log(['warn'], { stream: 'pagination', err });
    mongoStream.close();
  });

  const gzipStream = createGzip();
  paginationStream.on('error', err => {
    request.log(['warn'], { stream: 'gzip', err });
  });

  // Handle the browser cancelling the request
  request.raw.req.once('close', () => {
    request.log(
    ['debug'],
    {
      msg: 'stream closed due to client cancellation'
    });
    return mongoStream.close();
  });

  const stream = mongoStream.pipe(paginationStream).pipe(gzipStream);

  return reply(stream).header('content-encoding', 'gzip');
});

There you go! MongoDB streaming HTTP requests using Hapi. A few other things to keep in mind:

  • You need to change the Mongo .find() and .count() calls to also implement your pagination logic. You should use .sort(), .seek() and .limit() to only return one page of data from the database. If you return more than a page of data, PaginatedStream will emit an error event.
  • When using streaming responses, the headers are sent as soon as the first bit of data comes down the stream. There is no way to change the headers after data has been sent, so you cannot change the HTTP status code of the response if there is an error mid-stream. I might do another post on some strategies to deal with that issue later.

Connecting to Heroku Postgres in Python

This is for Python 3, using psycopg2:

import psycopg2
import subprocess

proc = subprocess.Popen('heroku config:get DATABASE_URL -a my-heroku-app', stdout=subprocess.PIPE, shell=True)
db_url = proc.stdout.read().decode('utf-8').strip() + '?sslmode=require'

conn = psycopg2.connect(db_url)

Make sure you’ve installed Postgres locally with SSL support. Here’s how I did it:

brew install postgres --with-openssl
pip3 install psycopg2

For some reason, Postgres.app did not work for me. Instead, I needed to use the Homebrew version.

Using Pandas to make tables in Jupyter

If you are using Jupyter, you can use Pandas to make nice tables from SQL queries:

from sqlalchemy import create_engine
import pandas as pd
import subprocess

proc = subprocess.Popen('heroku config:get DATABASE_URL -a my-heroku-app', stdout=subprocess.PIPE, shell=True)
db_url = proc.stdout.read().decode('utf-8').strip() + '?sslmode=require'

engine = create_engine(db_url)
pd.read_sql_query('select id, email from users', con=engine)

That will give you a nice table like this:

Memory Issues with NodeJS Streams? Remove 0.8.x Streams from Your Pipe

I was working on an ETL process with NodeJS that ran on Heroku, which limits you to 500MB of memory on a standard dyno. The ETL was quickly running out of memory, even through each document that it processed was less than 1MB.

This ETL was made up of a Readable stream from a MongoDB Cursor, then two Transform streams, one using event-stream and one implemented as a subclass of Stream.Transform. The last stream was a Writable stream that validated and wrote back documents to MongoDB.

Clearly the readable stream was reading documents out of Mongo way too quickly for the transform streams to do their work and the writable stream to write the documents back to MongoDB. But wasn’t NodeJS suppose to manage this automatically, assuming the highWaterMark was set to something reasonable?

It turns our that, yes, NodeJS does manage this correctly, as long as every stream in your pipe is a 0.10.x stream or greater. NodeJS implemented the highWaterMark in Node 0.10.x.

The culprit in my case was event-stream, which is an old library that only makes 0.8.x streams.

Moral of the story: if you are having memory issues with streams that seem like backpressure should be solving, make sure all of the streams in your pipe are 0.10.x streams or greater!

Learning React as an AngularJS Developer

React isn’t a framework

You use Angular, so you probably like frameworks. React and redux are not frameworks in the same sense as Angular. You can use React Redux Starter Kit and/or Redux CLI to get a React/Redux app set up with some sensible defaults, though.

Replacements for other parts of Angular

  • $httpaxios
  • Factories and services – ES6 modules with Webpack
  • Mocking modules (or in Angular, services or factories) in your tests – babel-plugin-rewire

Understand the component lifecycle methods

Best resource I’ve found is Understanding the React Component Lifecycle

Get a good authentication library

I recommend redux-auth-wrapper.

A Better $patch Method for Angular's ngResource

Angular’s ngResource makes a pretty decent starting point for developing a front-end model layer, but tends to be a little simplistic as your backend API gets more sophisticated.

One issue I ran into recently is that the $patch method sends the entire object in the request body. This is usually not what you want to do, because the PATCH method is designed for a partial resource modification, typically using something like JSON Merge Patch or JSON Patch as the request body.

Here’s how to add a $patchFields method to your resource that creates a JSON Merge Patch for a set of fields, and then sends it to the backend.

This is how you’d use it:

var product = new Product({
   id: 231
   name: 'iPad',
   price: 429.99,
   size: {
      height: 9.4,
      width: 6.2,
      depth: 0.2
   }
});

product.price = 400;
product.size.height = 10;

var promise = product.$patchFields(['price', 'size.height']);

/*
 * Sends a request like this:
 * PATCH http://api.example.com/products/231
 * {
 *    "price": 400,
 *    "size": {
 *       "height": 10
 *    }
 * }
 *
 */

And here’s the code:

angular.module('myApp').factory('Product', function($resource) {

   var Product = $resource(
      'http://api.example.com/products/:id',
      { id: 'id' }
   );

   angular.extend(Product.prototype, {
      '$patchFields': function(fields, success, error) {
         var self = this;
         patch = self.generatePatch(fields);
         var result = Product.patch.call(
            this,
            { id: self.id },
            patch,
            success,
            error
         );
         return result.$promise || result;
      },
      generatePatch: function(fields) {
         var self = this;
         if(!angular.isArray(fields)) {
            fields = [ fields ];
         }

         return fields.reduce(function(result, fields) {
            setFieldByPath(result, field, self.getFieldByPath(field));
            return result;
         }, {});
      },
      getFieldByPath: function(path) {
         var ret = angular.toJson(json);
         var paths = path.split('.');
         for(var i = 0; i < paths.length; ++i) {
            if(angular.isUndefined(ret[paths[i]])) {
               return undefined;
            } else {
               ret = ret[paths[i]];
            }
         }
         return ret;
      }
   });

   function setFieldByPath(obj, path, value) {
      var paths = path.split('.');
      var setOn = obj;

      for(var i = 0; i < paths.length - 1; i++) {
         var path = paths[i];
         if(!angular.isUndefined(setOn[path])) {
            if(
               angular.isObject(setOn[path]) &&
               !angular.isArray(setOn[path])
            ) {
               setOn = setOn[path];
            } else {
               throw new Error(
                  'Path ' +
                  path +
                  ' has an item that is not an object'
               );
            }
         } else {
            setOn[path] = {};
            setOn = setOn[path];
         }
      }

      if(!angular.isFunction(setOn[paths[paths.length - 1]])) {
         setOn[paths[paths.length - 1]] = value;
      } else {
         throw new Error(
            'Cannot set value at ' +
            path +
            ' since it would overwrite a function'
         );
      }
   }

   return Product;

});