pirxpilot.me

Generators and Readable Streams in JS

JS generators are one of those features of the language that you don't think you'll ever need. And then you do need it, and there is almost nothing you can replace it with.

Among many things Furkot does is generating GPX files from your trip plan. GPX was once thought to be a de facto standard for exchanging itinerary data, but the standard itself is under-specified and unwieldy. Over the years it grew a lot of traditions and proprietary extensions - some of which of supported by Furkot. GPX is of course XML and we normally generate it by traversing a graph of stops and routes and emitting a tree of XML nodes. That sounds simple enough if the number of stops is low and entire XML document (or at least a large portion of it) fits into available memory.

Problems start when we are generating XML faster than the network can send it to the browser. In such cases Node tries to allocate buffer to temporarily hold our data and - if we are not careful - it may run out of memory. The easiest hack is to slow the whole thing down. A strategically placed call to setTimeout() would let the network layer to catch up with our overly efficient XML generator.

There are two issues with that:

  • it's not very efficient - we never know exactly how fast is the network and, to be on the safe side, we would have to wait longer than necessary,
  • it's not very pretty - our relatively simple synchronous graph traversing code quickly deteriorates into a mess of callbacks.

Furkot's GPX export is a typical producer consumer problem. In a preemptive multi-threaded environment we would keep our producer relatively simple and use synchronization primitives when accessing common buffer. JavaScript VM is a different beast though. We have to explicitly relinquish control to let consumer catch up. The upside is that whenever you are in charge you know for a fact no-one else is messing with your data structures. The drawback is having to deal with the callbacks or promises and all the fun they bring to the party.

That said, Node has streams to address this class of problems. As long as we properly implement a ReadableStream and pipe it to the network response, the infamous back pressure should take care of our little buffer exhaustion issue.

ReadableStream looks trivial to implement. All we have to do is to write a single method read. Inside the read method we need to spit out chunks of generated data calling push() for as long as said push returns something truthy. Once we fill up the receiving stream buffers (in our case: once we saturate the network) node will make push return false at which point we need to stop pushing data and wait for another opportunity - when read gets called again.

This is great if we could easily rewrite our algorithm in such passive way. In some cases it's trivial - if we are producing a flat data format, such as CSV, all we need to do is to track the line number. You can see how it's implemented in furkot-driving-log

// outline of ReadableStream.read implementation
while (currentLine < maxLine) {
  if (!this.push(getLine(currentLine++))) {
    // downstream tells us to stop pushing for a while
    break;
  }
}
if (currentLine >= maxLine) {
  this.push(null);
}

Things get a bit more complicated if we happen to produce a hierarchical data structure and our algorithm consist of several nested methods that call each other. In this case tracking the internal state and thus allowing for continuing from an arbitrary place can be daunting and it certainly involves rewriting our algorithm. If only Javascript had some way of interrupting execution in an arbitrary place and then - at a later moment - continuing from where it was interrupted.

As it turns out JS generators support exactly that. We can yield a value (as opposed as returning a value) and when our generator gets called again the thread of execution will pick up just below the yield statement. We can nest generators - you do that by yielding with a star to a sub generator. It's not exactly as easy as calling a function but close enough.

Knowing that, we can rewrite our algorithm from an active form of a function that pushes chunks of XML to a passive form of a generator that only provides new data when we asked for it. The rewrite does not change the structure of the code. It mostly requires adding a few yield statements to mark the places where we are ready to generate data and more than a few stars to mark all the functions that yield data as generators. Looping over generator is a bit more elaborate than calling forEach - but it's easy to replace all the loops with our custom made each generator.

You can see the top level generator function below. It delegates to $metadata, $waypoints and other sub generators to output the entire XML. For the full implementation check the furkot-gpx project.

function *generate() {
  out.header();
  out.start('gpx', getSchema({
    xmlns: 'http://www.topografix.com/GPX/1/1',
    creator: 'http://furkot.com/',
    version: '1.1',
    'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance'
  }, ctx.garmin));

  yield* gs.read();

  yield* $metadata(ctx.metadata);    // note yield* - that's how you call sub generator
  yield* $waypoints(ctx.waypoints);
  yield* $routes(ctx.routes);
  yield* $tracks(ctx.tracks);

  out.end();

  yield* gs.read();
}

Having a generator is not the same as having a ReadableStream though. But it turns out that it's trivial to convert a generator into a readable stream. You can do that by calling generator.next and stream.push in a loop - the only important part is to ensure that we don't oversaturate the pipe. The beauty is ReadableStream.read implementation does not need to know anything about specifics of the generator being used.

// outline of generic ReadableStream implementation
do {
  var v = generator.next(); // grab the next { done, value } object
  if (v.done) {
    // nothing more to send
    this.push(null);
    break;
  }
  // keep pushing while push() returns truthy
} while (this.push(v.value));

There are a few modules that do exactly that (see: stream-generators). So whenever we need to send GPX to the network response we can do something like this:

var streamify = require('stream-generators');
streamify(generate).pipe(response);

If you want to see more details check the [github commit] converting traditional GPX converter to a generator. The interesting part is that the structure of the code mostly stays the same. New parts can be easily spotted: function * to mark generator functions and yield statements for calling sub generators.