Dispatch van Utrecht 1: Settling In

Earlier this year, my girlfriend, Phoenix was offered a tenure track teaching position at HKU, a respected university in the Netherlands. It was too good of an offer for her to pass up, despite having a pretty well established life in NYC.

My job at CCNMTL involves many aspects, but the bulk of it is programming and sysadmin work that I can ultimately do anywhere I have access to a computer and a good internet connection. I've been living in NYC since 1999 and as much as I love it, I was ready for a bit of a change. So I worked out a telecommuting arrangement, packed up my life, and headed across the Atlantic with her.

The last few months have been a whirlwind of paperwork, packing, and travelling. The process of moving internationally, I wouldn't wish on my worst enemy. It's been expensive, stressful, and exhausting. Frankly, we're both impressed that our relationship has remained intact through it (just don't put us in a rental car with me driving and Phoenix navigating, and everything will be fine). I may write more later about the trials and tribulations of international Apostille, international health insurance plans, bank transfers, Dutch immigration bureaucracy, and moving a cat to Europe, but take my word for it that all of those things should be avoided if you can manage it.

At the beginning of August, we finished clearing out of New York and landed in Schiphol airport with a couple suitcases and a cat. All of our belongings had been either given to friends, sent to family for storage, or packed into a shipping container that was going to be taking the slow boat across the Atlantic (it's still not here yet; hopefully early September). We had an air mattress and an apartment in Utrecht that we'd signed a lease on and transferred a significant amount of money over for sight unseen.

That moment pretty much compressed the stress and uncertainty of the whole process into one laser-focused point. We'd spent months dealing with setback after setback on every front of the move. We'd both lived in New York long enough to see just about every kind of scam and shadiness from landlords and rental agencies. Yet here we were, in a new country with barely more than the clothes on our backs, our finances nearly depleted, and difficult to access internationally, trusting that we'd have somewhere to live, based on some email conversations and an online listing with a couple small pictures.

Setting foot in our new apartment for the first time, I think we each would've cried if we weren't so exhausted and shocked.

Coming from cramped, dingy, loud NYC apartments, we felt like we'd just won the lottery. The pictures had not even done the place justice. Everything was perfect. It was huge. Two floors, so we could live in the bottom floor and keep offices upstairs. High ceilings, big windows, stained glass. Everything newly renovated, clean, and high quality. A small, closed in back yard with a shed (containing an electric lawn mower that looks like it's made by Fisher-Price, but gets the job done), a balcony. All located in a little neighborhood that was both quiet (I think entire days go by without a car driving down our street) but only a couple minutes bike ride from the city center and train station (which is then a 30 minute ride to Amsterdam Centraal). The landlord had even set up internet for us before we got there and left us fresh flowers on the counter, and wine, beer, and coffee in the fridge.

We spent the first few days taking care of basic necessities. We bought bicycles (cycling is the primary means of transportation here), explored our neighborhood and the downtown, located grocery stores and cafes, and made the pilgrimage to Ikea. Largely, for the first week before the new furniture we bought was delivered, we'd sort of wander around the empty apartment in a daze, not quite believing we had this much space.

I'll close this entry with a shot of the cat and I sitting in our new living room (the smallest room in the apartment) watching a dove that stopped by our back yard for a visit:

the new space

TAGS: utrecht

Apple's SSL/TLS bug and programming language design

A big story in the tech world recently has been Apple's bug deep in the SSL/TLS stack. There's a good writeup of the code on ImperialViolet that tracks down the exact point in the code. It boils down to these couple lines of C code:

The crux of the problem is that the indentation of the second goto fail; makes it appear to be part of the same clause as the first one. The double gotos look weird but harmless at a glance. But a closer look reveals that there are no braces after the if statement. The indentation of the second goto is a lie, so it actually parses like so:

The second goto fail; isn't in a conditional at all and so a goto will always be executed. Clearly not what the original programmer(s) intended.

Apple has the resources to hire top-notch programmers and focus intently on quality in their work. This being security related code, was probably subject to more scrutiny and code review than most other code. Still, this simple but brutal mistake slipped through and a lot of systems are vulnerable as a result. The point here isn't that the programmers responsible for this code were bad; it's that even the very best of us are human and will still miss things.

These kinds of mistakes fascinate me though and I like to think about how they can be prevented.

Designers of some more "modern" languages have solved this particular problem. Python, which catches its share of flak for having significant whitespace eliminates this type of problem. In fact, preventing this kind of disconnect between the intent of the code and its appearance on screen is a large part of why Python has significant whitespace. Imagine if that block of code had been written in Python (of course Python doesn't have gotos, etc, etc, but you catch my drift):

"A" and "B" there are the two possible ways it could be written. Both are unambiguous; they each work exactly like they look like they should work. It's much harder for Python code to develop those subtle traps. This isn't to say Python doesn't have its own share of blind spots. Arguably, Python's dynamic typing opens up a whole world of potential problems, but at the very least, it avoids this one.

Go, Another language I spend a lot of time with, which is heavily C-influenced, also manages to sidestep this particular trap. In Go, it's simply required to use braces after an if statement (or for the body of a for loop). So a Go translation of the code would have to look like one of the following:

Again, there's really no way to write the code in a deceptive manner. You could indent it however you want, but the required closing brace would stick out and make a problem obvious long before it went out to half the computers and mobile devices on the planet.

Go even goes a step further by including go fmt, a code formatting tool that normalizes indentation (and does many other things) and the Go community has embraced it whole-heartedly.

A code formatting tool run over the original C code would've made the problem immediately obvious, but there doesn't seem to be strong push in that community for promoting that kind of tool.

I try to always remember that programming complex systems and maintaining them across different environments and through changing requirements is one of the most difficult intellectual tasks that we can take on. Doing it without making mistakes is nearly impossible for mere humans. We should lean on the tools available to us for avoiding traps and automating quality wherever and whenever we can. Language features, compiler warnings, code formatters, linters, static analysis, refactoring tools, unit tests, and continuous integration are all there to help us. There may be no silver bullet, but we've got entire armories full of good old fashioned lead bullets that we might as well put to use.

Erlang in Erlang

The FAQ for the Erlang Programming language says:

Erlang is named after the mathematician Agner Erlang. Among other things, he worked on queueing theory. The original implementors also tend to avoid denying that Erlang sounds a little like "ERicsson LANGuage".

My guess though is that most programmers writing Erlang code don't really know much about who Agner Erlang was or why his work inspired programmers at Ericsson nearly a century later to name a programming language after him. I wasn't there, so obviously I can't speak to exactly what they were thinking when naming the programming language, but I think I can give a bit of background on Agner Erlang's contributions and the rest should be fairly obvious. In the process of explaining a bit about his work, I simply can't resist implementing some of his equations in Erlang.

Agner Erlang was a mathematician who worked for a telephone company. At the time, a very practical problem telephone companies faced was deciding how many circuits had to be deployed to support a given number of customers. Don't build enough and you get busy signals and angry customers. Build too many and you've wasted money on hardware that sits idle.

This is a pretty fundamental problem of capacity planning and has clear analogies in nearly every industry. Agner Erlang laid many of the fundamentals for modeling demand in terms of probabilities and developed basic equations for calculating quality of service given a known demand and capacity.

Erlang's first equation simply defines a unit of "offered traffic" in terms of an arrival rate and average holding time. This unit was named after him.

The equation's pretty rough. Brace yourself:

$$ E = \lambda h $$

λ is the call arrival rate and h is the call holding time. They need to be expressed in compatible units. So if you have ten calls per minute coming in (λ) and each call lasts an average of 1 minute (h), you have 10 Erlangs of traffic that your system will need to handle. This is pretty fundamental stuff that would later develop into Little's Law, which I think of as the \(F = ma\) of Queuing Theory.

Implementing this equation in Erlang is trivial:

I call it "Erlang-A" even though I've never seen anyone else call it that. His two more well-known, substantial equations are called "Erlang-B" and "Erlang-C" respectively, so you see where I'm coming from.

Let's look at those two now, starting with Erlang-B.

Erlang-B tells you, given a traffic level (expressed in Erlangs) and a given number of circuits, what the probability of any incoming call has of getting a busy signal because all of the circuits are in use.

This one turns out to be non-trivial. \(E\) is the traffic, \(m\) is the number of circuits:

$$ P_b = B(E,m) = \frac{\frac{E^m}{m!}} { \sum_{i=0}^m \frac{E^i}{i!}} $$

I won't go into any more detail on that equation or how it was derived. If you've taken a course or two on Probabilities and Statistics or Stochastic Processes, it probably doesn't look too strange. Just be thankful that Agner Erlang derived it for us so we don't have to.

It's perhaps instructive to plug in a few numbers and get an intuitive sense of it. That will be easier if we code it up.

That equation looks like a beast to program though as it's written. Luckily, it can be expressed a little differently:

$$ B(E,0) = 1 $$ $$ B(E,m) = \frac{E B(E,m - 1)}{E B(E,m - 1) + m} $$

That actually looks really straightforward to implement in a programming language with recursion and pattern matching. Like Erlang...

I just introduced an intermediate 'N' to avoid the double recursion.

Now we can run a few numbers through it and see if it makes sense.

The base case expresses an idea that should be intuitively obvious. If you don't have any circuits at all, it doesn't matter how much or how little traffic you are expecting, you're going to drop any and all calls coming in. \(P_b = 1\).

Similarly, though not quite as obvious from looking at the formula is that if you have no expected traffic, any capacity at all will be able to handle that load. \(B(0, m) = 0\) no matter what your \(m\) is.

The next more complicated case is if you have 1 circuit available. A load of 1 Erlang makes for a 50% chance of dropping any given call. If your load rises to 10 Erlangs, that probability goes up to about 91%.

More circuits means more capacity and smaller chances of dropping calls. 1E of traffic with 10 circuits, \(B(1,10) = .00000001\). Ie, a very, very low probability of dropping calls.

I encourage you to spend some time trying different values and getting a sense of what that landscape looks like. Also think about how that equation applies to systems other than telephone circuits like inventory management, air traffic, or staffing. Clearly this is wide-ranging, important stuff.

Erlang-C builds on this work and extends it with the idea of a wait queue. With Erlang-B, the idea was that if all circuits were in use, an incoming call "failed" in some way and was dropped. Erlang-C instead allows incoming calls to be put on hold until a circuit becomes available and tells you the probability of that happening, again given the expected traffic and number of circuits available:

$$ P_W = {{\frac{E^m}{m!} \frac{m}{m - E}} \over \sum_{i=0}^{m-1} \frac{E^i}{i!} + \frac{E^m}{m!} \frac{m}{m - E}} $$

Another rough looking one to implement. Once again though, there's an alternate expression, this time in terms of Erlang-B:

$$ C(E,0) = 1 $$ $$ C(E,m) = \frac{m B(E,m)}{m - E(1 - B(E,m))} $$

Which again, is straightforward to implement in Erlang:

Again, you should try out some values and get a feel for it. An important anomaly that you'll discover right away is that you can get results greater than 1 when the expected traffic is higher than the number of circuits. This is a little strange and of course in the real world you can never have a probability higher than 1. In this case, it simply represents the fact that essentially, as \({t \to +\infty}\), the wait queue will be growing infinitely long.

I'll stop here, but it's important to note that these equations are just the tip of the iceberg. From them you can predict average queue wait times, throughput levels, etc. Given how many different types of systems can be represented in these ways and how applicable they are to real-world problems of capacity planning and optimization, I think it's now clear why Agner Erlang deserves (and receives) respect in the field.

I guess I should also point out the obvious, that you should do your own research and not trust any of my math or code here. I only dabble in Queuing Theory and Erlang programming so I've probably gotten some details wrong. The Erlang code here isn't meant for production use. In particular, I'm suspicious that it has some issues with Numerical Stability, but a proper implementation would be harder to understand. Ultimately, I just wanted an excuse to write about a topic that I'm fascinated by and make a very long, drawn out "Yo Dawg..." joke.

yo dawg

A Gentle Introduction to Heka, Part 2: Replacing Statsd

This is a continuation of A Gentle Introduction To Heka, where I showed the simplest config possible to get Heka to do anything at all.

Now let's up the ante a little bit and get Heka to actually do something useful.

If you're collecting metrics with graphite, there's a good chance that you are running statsd or one of its clones as well. Statsd has a simple, but important job of collecting simple counter and timer data from applications, rolling it up every 10 seconds or minute or whatever, and submitting it in an aggregated form to Graphite. This makes it easier to instrument your applications by sprinkling very simple "increment this counter" calls throughout your code wherever you think you might want to track something. The calls to statsd send UDP packets which don't require a response, so your application doesn't have to block during the request, and if statsd is down, it doesn't take your application down with it. On the other end, Graphite gets metrics in a nicer form that's easier to turn into a useful graph, and because statsd is only periodically flushing metrics to it, it evens out the load on Graphite. Your application might get a huge traffic spike, generating lots of events in a short time, but statsd takes the brunt of it and Graphite still just sees one submission every cycle, just with larger numbers on the counters.

Statsd, useful as it is, is very simple. It turns out that Heka can be made to replace its basic functionality just by wiring together a couple of the components that it comes with. Here's a config:

Save that into statsd.conf, change the output section to point it at your Graphite/Carbon server and run Heka with it: hekad -config=statsd.conf

Assuming you have the python statsd client library installed, you should be able to do something like:

$ python
>>> import statsd
>>> c = statsd.StatsClient('localhost', 8125)
>>> c.incr('stats.heka.test')

Wait ten seconds or so for it to flush it out, and in your Graphite interface, you should now see that a 'stats.heka.test' metric has been created. Increment the counter a few times and it should do what you expect. Timers should also work:

>>> c.timing('stats.heka.timed', 320)

Heka's carbon output is still pretty rudimentary (currently not supporting prefixes other than 'stats', eg), but I expect that it will only improve.

Once again, the config is pretty simple but has a couple little things that took some figuring out. It strings together three different components, the StatsInput, which listens on a UDP port specified for messages in the format that statsd takes, a StatAccumInput, which does the aggregation and rollup of those messages, and the CarbonOutput, which collects the rolled up stats and sends them to your Carbon server in the format that it expects.

StatsInput and StatAccumInput have default output/input message types that happen to match up, so they work together automatically. Getting the output of StatAccumInput to CarbonOutput took a little more work, telling CarbonOutput to watch for messages with the heka.statsmetric type, and telling StatAccumInput to put its data into the payload. I'm still not perfectly clear on why that had to be done, but I'm hoping it will make more sense as I get more familiar with wiring up more complicated setups.

The nice thing about how these pieces are broken up is that you can see how you could extend it to a much more powerful system. Imagine you had a few different kinds of stats that you wanted to collect that needed to be rolled up with different periods and maybe sent to different Carbon servers. With statsd, this could quickly lead to a proliferation of multiple statsd servers running. With Heka, you could just add additional CarbonOutputs and StatAccumInputs. The StatAccumInputs could do pattern matching to handle different metrics and route them to the appropriate outputs.

This is just the beginning of how Heka's pluggable pipeline architecture starts to pay off.

I'm going to take a little break and spend more time exploring Heka, but perhaps next time I'll show how Heka can also replace at least some of Logstash's functionality.

TAGS: sysadmin heka

A Gentle Introduction to Heka

Sysadmins know that monitoring, metrics, logging, and alerting are deceptively complicated problems. And complicated problems have developed complicated tools.

I've used a lot of them. Individually, they all suck in different ways. At least in the open source world, no single tool will handle everything. The combination of multiple tools needed to cover all the territory sucks even more. Multiple services to run on every machine, multiple config formats, multiple open ports and data directories that have to be kept track of. An exponential explosion of things that can break or be misconfigured.

Things are looking up though. Over the last few years, a few key pieces have been falling into place. Graphite has emerged as a pretty good backbone to hang a lot of the other pieces off. If you can figure out how to turn whatever you're interested into a single number, you can feed it into Graphite. Poll Graphite for that metric and you can send an alert if it passes a threshold. There are any number of tools out there to take your graphite metrics and turn them into beautiful dashboards that you can put on a big screen monitor in your office so everyone can see the health of your whole system at a glance. It's not perfect, but it handles a lot of use-cases and makes life simpler.

That's only one piece of the puzzle though. Funneling everything down to a single numerical metric works for quite a few things, but there are still plenty of cases where you want actual log entries, preferably even parsed out into fields and tagged with timestamps. If you can feed it into elasticsearch, Kibana will give you a pretty reasonable interface for looking through it. Even for the data making its way to Graphite, you still have to collect it from a variety of sources and grind it up and turn it into the right format.

What's left is basically a plumbing problem. Collecting data from logs various logs and applications in different formats and routing it around, parsing some, filtering out noise at the source, aggregating and rolling up some, turning some of it into counters and rate variables and getting the right bits out to the right endpoints.

There are a few options for this now. Riemann and Logstash will both handle this kind of plumbing. They have very different approaches, but can both fill that role.

Recently, Mozilla announced Heka, which is another player in that space. Heka is clearly heavily inspired by Logstash, but has its own flavor. It's a very young project, but so far I'm really liking what I'm seeing. It's written in Go and compiles to a small, fast binary that's painfully easy to deploy and is very light on resource usage. I tend to run small virtual servers without a lot of memory to spare, so I've been paying close attention.

Where Heka gets difficult is in configuring it. This is a problem that it shares with the other similar "plumbing" tools. The problem is that since they mostly just connect other components together, when looked at in isolation, it's hard to know exactly what you should do with it at first. You can easily download the right Heka binary for your system and run it. And you'll see... nothing. In order to see it do anything, you'll need to at the very least figure out how to get some kind of data into it and tell it how to output data to some other system that you can watch. To do that, you'll have to read quite a bit of the documentation. I've found that Heka's current documentation particularly rough for this initial "but how do I actually use it?" use case. The example config file that you first encounter does a good job of giving you a sense of what Heka is capable of, but isn't likely to work for your setup and you'll have to read a lot more to make sense of what it's doing.

So I figured it would be helpful to come up with a couple very simple config files that might be more helpful for someone just starting out with Heka.

Of course, first, you'll want to download the appropriate Heka binary for your system. Personally, for testing purposes, I just grabbed the right tarball and unpacked it to get the hekad binary. You can run it very straightforwardly with:

$ hekad -config=/path/to/config.toml

It will either complain about errors in your config file or it will run in the foreground.

Now let's make some config files. The format is TOML which is super easy to get right, so at least there's that.

First up is pretty much a "hello world" equivalent config. We just want Heka to watch one log file and, when a line is appended to it, to print that line out to another config file. This is the simplest Heka config I've been able to make that actually does something. Here it is:

Save that to echo.toml and point hekad at it.

Then open two more terminals. In one, append a line or two to /tmp/input.log:

$ echo one >> /tmp/input
$ echo two >> /tmp/input
$ echo three >> /tmp/input

In the other terminal, run a tail -f /tmp/output. You should see something like:




If you append more text to the input, you should see it show up the output. There's an extra newline on each of them, but at least we're seeing something. We've successfully recreated a unix pipe.

There are a few things to note in the config. Obviously, there's a "LogfileInput" section that points it at a log file to watch and a "FileOutput" section that tells it to write output to a particular file. The "seekjournal" line tells Heka where it can store its own data for basic internal bookkeeping. That's only necessary because I'm running it as my regular user and Heka's default, var/run/hekad/seekjournals/ isn't writeable by my user, so I just point it off at a temp file for now. Then, in the output section, this line:

message_matcher = "Type =~ /.*/"

Is telling Heka to apply this to any and all messages that it encounters. Remember that Heka's job is to handle lots of different inputs, apply various filters to them, and route them to the right outputs. So everything that isn't an input needs to be specific about which messages it would like to handle. Our simple output section just matches anything whatsoever. As you get more complicated Heka configs, this output section is a handy one to be able to paste in when you're debugging so you can see everything that Heka is doing.

In the next post, I'll show you a slightly more complicated config that will let Heka run as a replacement for statsd.

TAGS: sysadmin heka

A Quick Meditation on Types

This is valid Java:

But this is not valid Python:

It's pretty easy to pick them apart and understand exactly why each is what it is, but I think it's worth meditating on why it is somewhat surprising to most programmers.

Ricon East 2013 Talk Summaries

This week I attended Ricon East, a conference organized by Basho, makers of the Riak distributed database. Riak was featured prominently at the conference but it was intended more as a conference on distributed systems in general spanning academia and industry.

For once, I brought a laptop and actually took full notes of every talk I went to (there were two tracks going on, so I could unfortunately only be at half of them). Last weekend, I even wrote myself a little toy wiki in Go (with Riak as the backend) to take notes in and really get in the spirit.

I'm a fan of Richard Feynman's basic technique for learning, which can be summarized as "write down a summary of the topic as if you were explaining it to a new student". It's simple, but forces you to identify the areas that you don't really understand as well as you think. Then you can go off and learn those areas.

Since distributed systems are an area that I'm deeply interested in and trying to learn as much as I can about, I decided to use this as an excuse to apply the Feynman technique and write up summaries of all the talks I saw. The following summaries are explanations largely from memory, informed by my notes, of my understanding of the content of each talk. I'm sure I got some stuff wrong. A lot of the content was in areas that I'm not experienced with and only loosely understand. I was typing notes as fast as I could while trying to listen at the same time, and I probably misheard some things and mistyped others. If you were at the talks (or just know the subjects well) and have corrections, please let me know. As I find links to speakers' slide decks, I'll try to add those.

First though, a few words about the conference overall. This was one of the best run conferences I've been to in a long time. Everything mostly ran on time, there were only a few minor technical problems (though the projection screens in Stage 2 were pretty rough looking), the venue and catering were excellent, it was full but not overcrowded, and the talks were consistently well-executed with a good range of accessibility and depth. If my "summaries" seem long, it's because all of the talks were really that information dense. I'm leaving out a lot of detail.

Automatically Scalable Computation

by Dr. Margo Seltzer, Harvard SEAS

video, slides

Margo Seltzer and her grad students at Harvard have been working on a new technique for automatically parallelizing sequential code at the VM/runtime level.

The idea begins with looking at a running machine in terms of the full set of registers and memory as a single (very large) bit vector. If you know the full state of the machine at a given time, it should be completely deterministic (we are excluding all external IO from this model). An instruction executing is just a function on the input state that produces another state as the output and computation is just this process repeated. You can view computation as a trajectory through this large-dimensional state space. A naive approach to thinking about making that computation parallel is to pick N points on the trajectory and have each processor start at those points (this makes more sense with a diagram, sorry). Of course, that presupposes that you know what the final computation trajectory would look like, which is clearly not realistic (and if you did know it, you might as well jump straight to the end) as it would require having pre-computed every state transition in the entire state-space ahead of time (and having it all stored where the outputs can be retrieved).

Seltzer's approach is a variation on this naive version. The proposal is to say: instead of being able to have each processor pick up at exactly the points on the trajectory that we know the program will hit, what if we can come up with reasonable guesses that put us in close to the same area? If that's possible, then parallel processes could work to refine from there and speed up the computation. Indeed, it turns out that real world programs tend to follow predictable enough trajectories through the state space that this might be reasonable.

The overall system involves a few components. There's a VM, which takes the input state vector, knows how to find the instruction pointer in that vector, will look up the current instruction, run it, and generate an output state vector. Then there are components that look at pieces of input vectors and use different strategies to try to predict pieces of output vectors. Eg, some operate at a single bit level and make predictions like: if a bit was 1 last time, it will be 1 next time. Others work at a 32-bit word level and do, eg linear regression.

Aggregators combine those individual predictors and make larger predictions by running a given input vector though N instructions and adjusting the weights of each of the predictors with each round. Basically, very simple machine learning. There is a trajectory cache which maps input vectors to known output vectors.

A key insight here is that instead of caching the entire state vector each time, the system pays attention to which bits in the inputs are read and written to during the course of execution. So while a given state vector may be huge (since it encompasses all of the program's memory), in the course of running 1000 instructions from a given start state, only a small subset of that memory will be read or written and the vast majority of the memory and registers will remain whatever they were before. So the cache takes advantage of this and stores the masks along with the values of the bit vectors in those masks. On a lookup, it can see if the current state matches any cached mask+value pairs. If it does, it can change only the bits that the cache entry says get changed, leaving the rest of the vector intact. What that is analogous to is taking a linear transformation of the cached trajectory fragment and applying it to the input vector.

Implementation of their system is still immature, but Seltzer shows that they are able to get pretty good speedups on real physics problems written in C and compiled with normal GCC. It's nowhere near linear speedup (with the number of additional CPUs) and is not as good as a programmer could do manually (all their examples are for embarassingly parallel problems). A 10x speedup from 128 cores sounds bad but she points out that at some point in the future your laptop will have that many cores and this kind of automatic parallelizing will be the only way that you'll be able to make use of most of that. She's also happy to note that there are still a large number of "stupid" things that the implementation does, so there's plenty of room for it to improve.

ZooKeeper for the Skeptical Architect

by Camille Fournier, VP of Technical Architecture, Rent the Runway

video slides

Camille presented ZooKeeper from the perspective of an architect who is a ZooKeeper committer, has done large deployments of it at her previous employer (Goldman Sachs), left to start her own company, and that company doesn't use ZooKeeper. In other words, taking a very balanced engineering view of what ZooKeeper is appropriate for and where you might not want to use it.

ZooKeeper is a distributed consensus and distributed locking system that came out of Hadoop and is now safely part of the Apache project. In the last few years, it seems to have worked its way into many places: neo4j, Apache Kafka, Storm, Netflix, OpenStack, ubiquitous in cloud computing, SaaS, PaaS, and at any organization that deploys very large distributed systems. This makes the systems that don’t use Zookeeper notable: Riak, Cassandra, MongoDB, etc.

A few reasons that ZooKeeper isn't used there: 1) from a CAP theorem perspective, ZooKeeper favors consistency over availability. It requires a quorum of nodes and will not run in a reduced capacity. That makes it an inappropriate match with systems like those mentioned above that favor availability via eventual consistency. 2) ZooKeeper has a very high operational complexity. It requires a hard minimum of three nodes, takes a fair amount of resources to run (Java...), and the ZooKeeper project strongly advises against colocating ZooKeeper nodes on servers that are running other services, or running them on virtual servers. Ie, you really should have at least three dedicated bare metal servers just for ZooKeeper to use it properly. If you are running a Hadoop cluster or a big SaaS operation, that’s probably not a problem. But if you're a vendor like Basho trying to get people to use your product, adding a ZooKeeper dependency makes it a hard sell. She has also experienced first-hand the problem that an out of control client that doesn't understand ZooKeeper can write too much data to it, filling memory (ZooKeeper needs to keep all its data resident in memory), and taking down the entire ZooKeeper cluster. Facebook and Google (Chubby is their equivalent to ZooKeeper) both deal with that problem by enforcing strict code review policies on any code that goes near the distributed locking. But at some level, you need to be able to trust any developers that are accessing it.

Why would you use ZooKeeper though? Why is it so popular? She divides the functionality into two main categories: distributed lock management, and service management/discovery. Distributed locking lets you do things like failover of nodes in a large system safely. If you have traditional database servers with master/slave replicas and a master goes offline, you need to promote one of the slaves to be the new master. If your system can’t agree on which slave to promote and two try to claim the crown, it’s a disaster. Likewise, if a slave gets promoted and then the original master comes back online because it was just a network partition and it doesn't realize that it’s been replaced, it will be disaster. ZooKeeper can be used to absolutely guarantee that only one of the database servers holds the "master" title at a given time. Service management/discovery would be, eg, the rest of your system being able to be notified that there is a new master that they should direct their writes to. Ie, having that be able to happen automatically based on events in the system rather than someone manually changing a config file settings and doing rolling restarts on the cluster to get that change reflected. Once a distributed system scales past a certain point, doing that kind of thing automatically rather than manually becomes crucial.

Distributed locking is typically needed if you have components that are not designed with a distributed, eventual consistency or immutable data kind of mindset from the beginning (eg, scaling out traditional databases rather than something like Riak). Service management can often be achieved to acceptable levels with other techniques like load balancing proxies or DNS (see a blog post from Spotify on how they are using DNS instead of ZooKeeper), or by storing service information in a shared database. Those all have their own scaling issues and potential problems, but are often simpler than running ZooKeeper). Ultimately, she argues that you should run ZooKeeper if you have one of the above use cases as a requirement, you have the operational resources to justify it, and you have rapid growth and need the dynamic resource allocation.

Scaling Happiness Horizontally

by Mark Wunsch, Gilt Group

video slides

Mark's talk was focused on Conway's Law, roughly "organizations which design systems are constrained to produce designs which are copies of the communication structures of these organizations" and its converse, that the software architecture at a development shop influences the communication structures of the organization in turn. Gilt's development team wanted to optimize for developer happiness and used this approach as their fundamental philosophy.

When most companies start out, they have one monolithic architecture, platform, framework, etc. That gets them quite a ways and then they begin to feel pain as they outgrow it. Eg, Twitter's beginnings on RoR, moving to JVM, Scala, etc. That pain manifests in development but also in a monolithic organizational structure. Eg, only one engineer might understand a crucial part of the system and they become a bottleneck for the rest of the organization. It’s a classic concurrency problem and managers become thread schedulers.

They view developer happiness as a dining philosophers problem. At any given time, a developer can either code or think. A developer who is only able to code and has no time to think burns out and is unhappy. A developer who only thinks and can not code (due to organizational bottlenecks like above) gets frustrated and is unhappy. These are deadlock and livelock problems at an organizational level. Gilt sought to eliminate them the way a distributed systems programmer would. Find the mutex and eliminate it. Make it a proper lock-free distributed system from the ground up.

What they ended up doing was breaking up into small autonomous teams and changing their architecture to "LOSA", "Lots Of Small Applications", basically SOA for every component in use. Teams then have "initiatives", "ingredients", and "KPIs". Initiatives are projects that they expect to work on for a while. Key Performance Indicators, or KPIs are metrics used to track the progress of those initiatives. They must be agreed on between the team and the management and simple to calculate. "Mission statements are an antipattern; they’re just a less useful version of a metric". Teams are otherwise autonomous. They self-select and pick initiatives to work on. Teams are organizationally flat and made up of "ingredients" like designer, architect, developer, ops, etc. "Job titles are an antipattern; they let people say 'that's not my job'".

There is a spectrum of communication styles from "hive minds" with tightly coupled, synchronous, informal communication to "pen pals" with loosely coupled, asynchronous, formal communication. A team should be a hive mind and teams should interact with other teams as pen pals. They apply CAP theorem to their organizational design and choose A over C. The result is that there is sometimes inconsistency between teams. Eg, it’s OK for their microsites to each have slightly different UI elements temporarily as one team advances that component faster than another team is keeping up with. Eventual consistency will bring things in line in a while and in the meantime, it’s better than deadlocking until the teams can achieve consistency. Other organizations like Apple, or small startups probably would value consistency over availability.

Gilt's preferred communication tool is code reviews. They use Gerrit. A code review gives them a permalink associated with every change along with a history of the communication and decision making process associated with that change.

Firefighting Riak At Scale

by Michajlo Matijkiw, Sr. Software Engineer at Comcast Interactive Media


This talk was basically "we did a very large Riak deployment when Riak was very young, found a bunch of problems, solved them, and kept everything running".

Comcast wanted to use Riak to expose a simple, easy to use key/value store to their developers. They weren't necessarily interested in massive scale or performance. They built a small object wrapper around the Riak client that hid complexity like vector clocks, added monitoring and logging, and made it very simple to store objects at keys. It was an enterprise system, so they did enterprise capacity planning and load testing. Everything worked well until they started getting a lot of HTTP 412 errors. Those occur when you do a sequence of GETs and conditional PUTs based on eTags and a pre-condition fails. What they discovered was that overall latency in the cluster had gone way up. Not enough to trip their alarms, but enough that it was causing other problems. The analogy is "if you leave your apartment door unlocked while you run out quickly to switch your laundry, it’s probably ok, but if you leave it unlocked for a week while you go on vacation, you’ll come home to find everything gone". High latency was leaving a large window for other inconsistencies to show up. Machines were getting so overloaded that they were overheating the machines next to them causing cascading failures that way.

When hardware failed, they figured out that they could abandon a bad node, then reintroduce it to the cluster as a new node and that worked as a quick fix, so they did that a lot. That became their ops solution to all problems. They started getting some failures that were not hardware failures, but corruption to the bitcask data stores. They decided to use their usual fix even though Basho kept telling them not to do that. When they rejoined the bad nodes, they found that not all the data was getting read-repaired. Their N-values weren't set correctly and the old nodes that they had been replacing hadn't been replicating data. It had been fine up until the point where they reached a threshold and all the replicas of some pieces were gone. They ended up having to do surgery on the bitcask files, which are append-only log structures. So they went in and replayed them up until the point of the corruption, getting back all the data from before that point, and then forcing the cluster to read-repair everything after that. Riak has since built in detection and correction for that situation.

Retrospective lessons: better monitoring of utilization and latency were needed. They needed to spend more time with clients showing them proper ways to use (and ways not to use) the system, and they needed to introduce proper choke points so they could throttle traffic to limit damage.

Bloom: Big Systems From Small Programs

by Neil Conway, Berkely PhD candidate.

video, slides

This talk was a continuation of Neil's talk BashoTalk on CALM computing and his professor's talk from the first Ricon, which was a big hit. This one reviewed the basic ideas and introduced the Bloom language that he’s been working on which implements the ideas.

Distributed computing is the new normal (between mobile devices and cloud computing). You are all building distributed systems whether you want to admit it or not. But our tools are still built for traditional problems. The kinds of optimizations in distributed systems are new (global coordination, locality, etc) and compilers don't know about them. They can't detect deadlocks, replica divergence, or race conditions. We have no tools for debugging an entire system. We know how to build distributed systems, but we don’t know how to make developers productive and efficient at writing them. We can do better, but we need new approaches.

The two main approaches to date are either: enforce global order at all nodes with distributed locks and consensus, allowing us to only have to consider one possible event ordering, but paying the price in terms of availability and increased latency penalties. Or, we can ensure correct behavior for any event ordering with "weak consistency", but that makes race conditions and deadlocks hard to reason about and is a major burden on developers.

His approach involves Bounded Join Semilattices, which are just a mathematical way of talking about objects that only grow over time. The core is a monotonic merge() function. The classic example is Sets with union as the merge() function. Eg, if you start with {a}, {b}, {c}, you can merge() those into {a,b}, {a,c}, {b,c}, and merge() again into {a,b,c}. The order that any of those merge’s happened in doesn't matter; you will end up at the same result eventually. Ie, merge() is commutative. You can also think about Integers with max(), Booleans with OR(), etc. These lattices form the basis of "convergent replicated data types" like sets, counters, graphs, and trees. If you can express your computation in terms of CRDTs, you can build systems that are easy to reason about mathematically and do not depend on the ordering of events. Neil calls this "disorderly data" and "disorderly computation".

CRDTs and asynchronous messaging form a core model that eliminates inconsistency. However, not everything can be expressed in terms of monotone functions. So they've been working on a language, Bloom, that has those primitives and has syntax to explicitly mark non-monotone parts, allowing the compiler to reason about the overall program and point out to the user which parts are safe and which are not (and would require some kind of synchronization).

One of the key techniques is to use immutable data wherever possible. Neil showed the example of a shopping cart model. There would be multiple SessionApps involved and a Client that adds and removes a number of items, possibly hitting a different SessionApp each time, eventually checking out. Instead of having a shared mutable state representing the cart, each SessionApp keeps a log of add and remove operations; adding entries to those logs and merging logs are monotonic. They can be completely inconsistent during most of the process, only synchronizing at the final checkout point, which is non-monotonic, but it's intuitive that that is the point where synchronization needs to happen.

Bloom is implemented as a Ruby DSL (but they really don’t care about Ruby) with async message passing actors added and rule triple syntax. Each triple is a left-hand side, an operation, and a right-hand side. The engine looks at the RHS values, and if they are true, sets the LHS values to true, using the operator to track how the two are related in terms of time and location. Eg, same location, same timestamp means that it is a traditional computation. Same location, next timestamp means that it’s some kind of persistence operation like storing data or deleting something. Different location, non-deterministic timestamp implies that the rule application represents communication between parts of the system. The result is concise high level programs where state update, asynchrony, and non-monotonicity are explicit in the syntax.

Just Open A Socket -- Connecting Applications To Distributed Systems

by Sean Cribbs, Software Engineer at Basho

video, slides

Sean Cribbs has written many of the client libraries for Riak and presented on techniques and lessons learned writing client libraries for distributed systems.

Riak supports streaming operations, which are very good for performance. The client can ask for a set of results and get individual notifications for each result as the cluster sends it back. This lets it act on each ASAP instead of having to wait for the full set, which is a huge win. You would normally do that by setting up an iterator that runs over the results. The Ruby client does that with code blocks and uses libcurl for the HTTP communication. Each block is a closure that will execute on each response that comes in. That worked well until he started getting lots of errors. Eventually tracked it down to connection caching in libcurl to avoid setup/teardown overhead. That turned out to be implicit global state and made the blocks non-reentrant. The fix was to use multiple connections (which you ought to be doing when building a distributed system anyway) and pooling them.

His second example involved a customer complaining that Riak 1.2 was performing 20% slower than the previous release on their queries. They were pretty sure that all the changes they made would only improve performance and couldn't duplicate the problem. The customer was using the Python client and ProtocolBuffers instead of HTTP and making mapReduce requests over secondary indices. They were eventually able to duplicate the problem in test clusters when they used the same clients access it. Did a DTrace and optimized a bunch of things but still didn't understand why it was slowing down so much. Eventually noticed that it was making a LOT of calls to gen_tcp:send() with very small payloads, just a couple bytes each, but each call was taking ~30ms. Essentially, since it was doing mapreduce over index data rather than the main data, the queries and responses were both very small and they were running into Nagle's algorithm. Nagle's algorithm is a low level optimization in the TCP/IP stack that tries to maximize network throughput by bunching up traffic to/from the same hosts into large enough packets to be efficient. In a nutshell, it sets a threshold of a few KB and collects smaller packets, holding off on firing them all off until it passes the threshold or a timeout passes. For most network traffic, that makes much more efficient use of the network and saves a lot of switching overhead that would be incurred by sending out every small packet as soon as it can. But in this case, they were hitting a pathological case and it was killing their performance. Their solution was to buffer the requests themselves so they could control the latency instead.

Distributed systems fail in very interesting ways. Clients often have a hard time telling all the different types of failure apart. Sean recommends dividing it into two main classes: system related or network errors such as network failures, connection refused, etc. and unexpected results such as quorum failures, key not found, bad request, server side error. The first class can usually be dealt with by doing a retry, preferably with an exponential backoff. The second type can be handled with a retry only if the request is idempotent, but you need to be very careful to make sure that requests are really idempotent. A third class of failures are timeouts and those are the trickiest kind to deal with. He doesn't have a good solution for what to do then. Often a retry, but not always.

Overall, he suggests opening multiple connections, keeping them isolated, connect to as many peers as you can (rather than rely on intermediate load balancers), know the structure of your entire network stack well, and keep in mind that clients are not passive observers of the system, but active participants whose presence changes the system.

Nobody ever got fired for picking Java: evaluating emerging programming languages for business-critical systems

by Alex Payne, Co-Founder at Breather, wrote the O'Reilly Scala book

video, slides

Alex Payne wrote the O'Reilly Scala book, but considers himself a true polyglot programmer and also organizes the Emerging Languages Conference every year. That conference has seen 48 new languages presented since 2010. So the problem is that there is so much to build, so many languages, and no good way to choose what language is appropriate for a given task.

There are existing "crappy" solutions to the problem. You can make a totally arbitrary selection. You can make it a popularity contest (slightly better than the first option). Or you can do "Design By Hacker News" and pick the language and technologies that seem to have the most buzz online.

He presents, instead, an evidence based approach to language selection. The first problem with that approach is that you need to avoid subjective criteria. He includes in that category: readability, terseness, productivity, agility, and mindshare. He does argue though that there are a few subjective criteria which have some value when evaluating emerging languages. Developer appeal: some programmers are just excited by novelty. Strategic novelty: you get to invent your own design patterns for the language. This promotes sort of a zen beginner mind mentality, which can be good for problem-solving. Homesteading: you may get to write the basic, fundamental libraries. This is fairly easy since you have examples from other languages to copy and improve upon. Malleable roadmap: emerging languages are often still in search of their ideal problem domain and if you're an early adopter, you can help move it in a direction that's useful to you. Finally, accessible community: you will probably be able to gain easy access to the small number of people involved in writing and driving the library.

He then sets out the actual objective criteria that can be used for language choice: performance (mostly), library breadth, available primitives (eg, Actors and CSP in languages like Erlang and Go), stability of syntax, security history, and development cycle (how often are new versions of the language released and how is backwards compatibility, etc handled). With those criteria, he recommends taking the very boring MBA like approach of setting up a weighted matrix and analyzing each option with it. Programmers are disappointed when he gives them that advice, but he doesn't know of a better approach.

Once a language is chosen, he does suggest three strategies for succeeding with an emerging language: First, acknowledge the risk, and hedge against it by also working in a more traditional, "safe" language. Eg, start up a new project in the new language, but keep your old code going alongside it. Next, involve yourself in the language community. You can help shape it, you will be on top of potentially dangerous language changes, and you will have more ready access to the language experts to call upon if you have trouble. Finally, play to your pick's strengths and use another language where it's weak. He points out that if you look at any large development organization like Google, Facebook, Apple, or even Microsoft, they are all polyglot environments. He doesn't know of any even moderately large organization that has managed to truly standardize on only one language for development.

High Availability with Riak and PostgreSQL

by Shawn Gravelle and Sam Townsend, developers from State Farm

Two developers from State Farm talking about rolling out a nationwide platform to be allow all their customers to interact with all their services through all channels (mobile, web, phone, and at terminals in their offices). Their priorities are availability, scalability, performance, supportability, and cost in that order. Their applications layer requires three nines of uptime (about eight hours of downtime per year allowed) and the infrastructure layer itself should perform at four nines (52 minutes a year of downtime).

At a high level, they had a greenfield project. Planning three sites active, each would get 100% replicas of the data. All infrastructure would be virtualized, use automatic provisioning, and would be oriented around services. Application and service development is Spring and Java, using PostgreSQL for relational data and Riak for non-relational, all running on top of RHEL.

For the Riak part, they had a seven person team, needed to hook it into their existing monitoring, security, and testing setups. The needed to get replication latency between data centers low and consistent. Each site would have a customer facing cluster, the business area would get their own full silo starting at a five node cluster and growing as needed, an internal-facing cluster with read-only replication and lower availability requirements that would be used for running batch analysis jobs that they didn't want impacting the customer facing clusters, and a backup cluster that would get a full replica set, then be shut down and the filesystem snapshotted once per day for off-site backups. They wrapped Basho's client jar in their own which they added monitoring, security, and some data type restrictions to which was then the single interface that the application developers would have to access the cluster.

They had very strict security requirements which they addressed by running every node behind Apache so they could re-use their existing security infrastructure. Their custom jars also had strict logging requirements and ensured that all operations were logged with all the relevant client info. The downside was that they only had per-node security granularity. So all of their internal applications could potentially access the data stored by other applications.

Their main challenges deploying Riak were that they did not have much experience using non-relational data storage and had to do a lot of internal education to teach application developers how to make best use of it, when to use relational datastores instead, etc. Eg, developers would contact them asking them to create buckets for them because they were used to the RDBMS model where they would define a DDL and hand it off to DBAs to create the table structure for them before they could use it. They had to explain to them that with Riak, you just start saving data in the structure you want and it is created on the fly. Developers also had a lot of doubts about performance characteristics which they addressed by doing heavy load testing and benchmarking ahead of time so they could preempt most of those. They maintain three full copies of the ENTIRE cluster setup that code has to pass through on a deployment pipeline: testing, load testing, and pre-production.

They used both PostgreSQL and Riak alongside each other to play to each database's strengths. Their background is heavily relational and PostgreSQL is very smart and has a lot of functionality that they find extremely useful like spatial data awareness. Riak lines up perfectly with their five priorities but tries to be "dumb" about the data that's stored in it (in a good way). PostgreSQL has issues with size, both in terms of scaling the entire database up and also with individual row sizes (if you go over about 8KB, it works but can negatively affect performance) and a lot of their data objects are large forms and documents that are acceptable to be opaque to the database but can grow quite large. PostgreSQL doesn't scale writes out as far as they'd like with a standard master-standby replication scheme, making it a single point of failure. Auto sharding tools exist for PostgreSQL but they aren't as useful for them since they are acting as a service provider and don't necessarily know much about the schema or access patterns of the application data. Riak is transactional internally but does not participate with the applications' transactions the way a RDBMS does. They developed a "pseudo-transactional" approach where they start a transaction in Postgres, write data to Riak, wait for it to succeed/fail and then commit or rollback the Postgres transaction. That works though it's not ideal in all situations.

They highly recommend logging and automating everything. If you have 52 minutes a year of downtime allowed, you can't waste any of that doing things manually or risk human error during those short windows. However, be careful to not automate decision making. Eg, automate failover, but make recovery after the failover be manually initiated. That helps prevent cascading failures where there are bugs in your automated processes. They found that planned outages were harder to deal with than the unplanned ones. Postgres upgrades w/ replication involve locking both sites temporarily and dealing with all the traffic backing up. On unplanned, it crashes so there's no traffic. Plus, on unplanned outages, people expect some data loss and rollback; on planned ones, they expect none.

How Do You Eat An Elephant?

by Theo Schlossnagle and Robert Treat, at OmniTI

video slides

Two consultants at OmniTI, who focus on web scalability and actually implement systems for their clients instead of just advising. Existing monitoring tools were not sufficient for the deployments they were doing so they decided to build their own to handle the large scale that they work at. In particular, they deal with multiple datacenters which don't necessarily have connections to each other so most fully centralized monitoring systems like Nagios don’t work. RRD backed monitoring like Munin (Graphite and its Whisper data store didn't exist at the time) typically roll old data out after a year but their clients are largely interested in looking at multi-year trends ("how have our traffic patterns changed over the last four years?") so that wasn't a good solution.

They picked Postgres to back their system because they had a Postgres contributor on staff and had experience scaling Postgres out to databases in the hundreds of terabyte range. They built it and it worked well, but it was hard to install, with quite a few components that had to be installed and configured correctly on each instance. Plus, once it was installed, their clients didn't necessarily understand how to make the best use of it: "when you download the tool, you don't get the philosophy." So they decided that the best solution was to "shove it down their throats as an SaaS".

They rebuilt it as a SaaS, still on Postgres and it was wildly successful. Which means it was a nightmare. Since anyone could hook up new systems to be monitored without going through them, growth was unpredictable and faster than they expected. They discovered that if you have monitoring as a service, it’s far more sensitive to downtime than just about any other system you could imagine. When all your clients are using it to track their uptime, if you go down, all of their system graphs have a gap where you were down, so everyone notices. The CEO comes in on Monday morning, looks at the traffic graphs for the weekend, sees the gap and panics. So the tolerance for downtime was now much lower than before.

They planned on storing data rolled up every five minutes for each metric, averaging about 50 metrics per server, and storing that data for up to ten years (their definition of "forever" for internet companies). That comes out to 50 million rows of data per server. They went through quite a few steps to get this manageable, and developed some clever tricks with Postgres array data types and schemas to let them mix real-time metric collection with batch-mode rollups of the data. Storage growth and performance were basically handled.

The major issue became fault tolerance. PostgreSQL is rock solid, but the hardware it runs on would still fail sometimes and they wanted to be able to do updates. In a cluster as large as they were running, these failures happen regularly. PostgreSQL can do reasonably good fault tolerance with a replication setup but it fundamentally expects you to be able to pause traffic while it does some of the more sensitive failover operations (because PG's focus is on guaranteeing ACID and consistency rather than availability). With their incredibly low tolerance for downtime, this was becoming seriously difficult.

They decided that long term they needed to switch to a column store of some type that was simpler to deal with from a fault tolerance perspective. They ruled out commercial offerings like Oracle RAC and Teradata (would not fit with their pricing model and require dedicated DBAs), Hadoop (their data is heavily structured), Cassandra (column-store, which is good, but apparently Cassandra insists on being able to delete old data when it feels like it? I’m not too familiar with Cassandra, so I’m not sure what they were referring to), and took a close look at Riak. Riak was a very close fit for what they needed but at the time it didn't have a column-store backend (LevelDB wasn't implemented yet) and it had more overhead than they needed (all their data operations were append-only, so Riak's entire model for eventual consistency would be wasted on them) and eventually decided to just design their own simple column-store database for their service.

They built "Snowth" in eight programmer weeks over three months (and put four programmer-months more into it over the next three years), and managed to achieve a 93% reduction in storage usage and 50-60% reduction in IOPs and it no longer had a single point of failure. They ran the two systems in parallel for 14 months, slowly shifting more and more of the traffic to Snowth via feature flags as they gained confidence in it, eventually shutting off PostgreSQL.

Realtime Systems for Social Data Analysis

by Hilary Mason, chief data scientist at

video slides

Most of us are familiar with and use it regularly. The basic business model is that one person takes a long link, puts it into Bitly to get a shorter version, then shares that link with their friends on various social networks. Bitly sees what links people are putting in to share, and then they can follow that link as it is shared and other people click on it. From this they can do basic demographics and learn a lot about what people share, how links propagate through social networks, and they can sell that insight to advertisers and marketers.

A typical link's traffic spikes quickly soon after being posted and then trails off over time. This pattern has caused bitly to be very focused on real-time or near real-time analysis and they've been doing more experiments with real-time visualization and summarizing of the data that comes through their system.

Hillary Mason's job at Bitly is to guide the analysis and guide their strategies for taking a stream of raw data and telling stories with it. The talk was peppered with frequent fun tidbits of things that she's learned about peoples' link sharing and consuming habits. Eg, the internet is more than half robots (shouldn't surprise anyone), pictures of dogs were shared more than pictures of cats (the internet, apparently, is not made of cats), people share links that make their identity look good (news, technology, science) but they consume links about gossip, sports, and sex (every employee that she’s seen start there eventually goes through a phase of getting really depressed about humanity based on how much traffic articles about the Kardashians get).

On a technical level, she went through a few different real-time components of their stack. First is a simple classifier to determine the language of a link. Basic "is this English or Japanese?" kinds of questions. Originally, they used Google’s service for this heavily until Google shut it off, but it was always less than perfect anyway. The standard machine learning approach to this problem is supervised learning. Get a large corpus of text that’s been marked out by language (these are readily available), train your classifier on it, and get a set of weights for a Bayesian network that can classify new content. This works well for standard text, but fails on short bits like tweets, social network status updates, and image sharing. They realized that they have access to more than just the content though and started feeding the browser request headers and server response headers in as well, paying close attention to character set encodings, etc and using those to adjust the weights. They did something similar for detecting robots vs humans but instead of using the content, they found that the best indicator was actually the spike and trail pattern of the traffic over time. Any link that has a different pattern tends to be something other than a human sharing links with other humans (eg, people wrap JavaScript embed links in Bitly or the links to avatar images and those get automatically included all over the place).

The other side of their experimentation has been around the visualization and real-time mining of different kinds of trends. Eg, supporting real-time search, which means "show me links mentioning term X that people are sharing right now", automatically identifying bursts from the traffic patterns (eg, on the day of the talk, the algorithms could easily tell that Angelina Jolie was in the news), and how to group articles together into related "stories". They built real-time search with the Zoie Solr plugin running alongside a Redis cluster that held scoring info. When a search comes in, they search Solr to find links that match the term, then query Redis for scoring info to sort the results. Links that hadn't been shared in the last 24 hours automatically fall out of those databases, so they stay a pretty steady size. was originally written in PHP but that didn't last very long. Now, typically as new features are developed, she describes them as going through one phase of badly written Python, then a phase of much better Python, then, when they are proven, well-understood and obviously need to be faster, they get rewritten in C or Go.

Using Datomic with Riak

by Rich Hickey, creator of Clojure + Datomic


Datomic is the distributed database developed by Rich Hickey, the creator of the Clojure programming language. Datomic is not open source (boo!), but integrates tightly with a number of open source applications like Riak.

Datomic is a fairly radical new approach to databases and at some level aims to operate in a space that isn’t really exactly what other databases do. Hence, he views Datomic as complementing RDBMSs and distributed databases like Riak rather than competing with them. That idea is reinforced by Datomic's implementation which doesn't have its own storage model and instead uses Riak or any number of other databases as the underlying storage, treating them as opaque datastores in the same way that a regular database treats the filesystem as opaque block storage.

Datomic aims to be a sound model of information with time and brings declarative programming to applications, reducing complexity. In Rich's view, where you typically see an impedence mismatch between databases and applications, the application is wrong. Declarative systems like SQL are right and the application is wrong. Datomic wants you to be able to treat the entire database as a single value that you can calculate on.

Datomic built around a database of atomic facts (called "datoms"). You can assert and retract datoms. Datomic keeps all of those assertions and retractions and gives you a consistent view of the world according those datoms. In this way, it is very analogous to an RDF triple store kind of view of the world, but hopefully not as painful. Datomic guarantees transactional writes (so it chooses C over A in CAP and you must accept the availability constraints that that leaves you with).

He defines some terms: “value”: something immutable, magnitude, quantity, number, or immutable composite thereof, “identity”: a putative entity we associate with a series of causally related values (states) over time, “state”: value of an identity at a moment in time, “time”: something that is relative. A is either before or after B, and “epochal time model”: identity (succession of states). Apply functional transformation from value to value to move along time. All data in Datomic is immutable and it is built around Okasaki’s Purely Functional Data Structures, particularly trees of the immutable data.

The core components are a Transactor, which does transaction coordination and accepts writes, which it applies serially in an append-only structure like LevelDB or BigTable, an App Server, which runs on each node and does querying and indexing. App Servers all have access to the same underlying (pluggable) storage, so distributed storage like Riak is ideal, and the top level nodes of the tree structure are stored in a distributed, consistent store, which is usually Zookeeper (particularly in deployments using Riak for the data storage). Essentially, the “values” go into Riak and the “identities” go into Zookeeper. Those top level nodes are very few and very small but consistency on them is crucial to holding the whole thing together, hence Zookeeper. Since data is immutable, the app servers can always do consistent reads against the distributed storage and never have to worry about coordination. Only the writes have to be synchronized through the Transactor + Zookeeper. Further, because the data is immutable, they can use Riak in “R=1” mode, meaning that Riak, on a read, doesn’t need to verify that the data exists on more than one node (how it would normally detect conflicts) and that is the most efficient way possible to query Riak, avoiding all the overhead of vector clocks and siblings. Datomic automatically generates a few spanning indexes for the data and allows you to specify additional ones, those end up stored in the same structure as the data with a key or two at the top in zookeeper and most of the index as immutable data in Riak.

Lessons Learned and Questions Raised from Building Distributed Systems

by Andy Gross, chief architect at Basho

video slides

This talk was the closing Keynote and is very hard to summarize; I highly recommend just looking up video of the talk and watching it. Definitely worth it.

Andy Gross is the chief architect at Basho and one of the current big names in distributed systems. He is a self-taught programmer (starting with a VIC20), college dropout, has written databases, filesystems, CDNs, and according to legend, wrote the first version of Riak on a plane to Boston to interview at Basho. He considered the prototype a failure but they hired him anyway. His career advice is "BS or charm your way into jobs you aren’t qualified for, quit before it gets boring, and repeat".

Herb Sutter published a paper in 2005 called "The Free Lunch is Over" that pointed out the fundamental shift towards concurrency and distributed systems. Moore's law had stopped giving us faster and faster single-core processors and the clear direction was going to be more and more cores running in parallel. So we could no longer rely on software getting faster just by virtue of the underlying hardware clock speed going up. Instead it had to be able to take advantage of more and more parallel cores and that required a fundamental change in how software was designed (unless Dr. Seltzer has her way). The software industry was not ready for what was going to happen to them and probably still isn't.

For part of the industry though, it has meant a renaissance in distributed systems research. We are all writing distributed systems now whether we want to acknowledge it or not. Distributed systems are one of those areas where the key concepts keep recurring and being rediscovered. In the late 70’s Leslie Lamport dug up an (already old) Dijkstra paper on self-stabilizing systems and hailed it as Dijkstra's most important work. Programmers now are just rediscovering Lamport's work and finding it relevant and cutting edge when applied to modern technology. Andy put a hack in his browser that made it so if he tried to go to Reddit or Hacker News, it redirects him to a random article of Lamport’s.

He stressed the importance of abstractions as how we move computer science and engineering forward and wants to see more common abstractions for distributed systems.

He asks, “Where’s my libPaxos?” If Linux can have literally hundreds of toy filesystems implemented, why can’t we just import a standard library for distributed consensus? “Where’s my libARIES?” (for write-ahead logging). Why do we have to keep reimplementing these fundamental abstractions that recur over and over in distributed systems. Languages that support immutable data structures and have good concurrency primitives (Actors in Erlang/Scala/Rust, CSP in Go, STM in Clojure/Haskell) are a step forward but not nearly enough. These things should all be reusable primitives that we just have at our disposal so we can move on to the next layer of the problem. He’s worked on solving this by writing Riak Core, which is a reusable implementation of distributed consensus, gossip protocols, consistent hashing, etc. Riak itself is just a set of plugins on top of Riak Core, RiakCS is a different set of plugins on the same core, and Riak Core has a couple large deployments at other organizations (Yahoo!, OpenMX, and Stackmob) but otherwise hasn’t seen much adoption.

The other major deficiency is in testing tools for distributed systems. Unit tests are woefully insufficient. Distributed systems fail in more creative and interesting ways than other software and traditional tools for testing, debugging, and monitoring aren't good enough. Many of the algorithms used in distributed systems can be proven correct but still prove to be buggy when implemented (insert relevant Knuth quote). Something in between formal verification and unit tests is needed. A good start in that direction is a tool called QuickCheck which generates, like a fuzzer, unit tests for your code based on static analysis of the source code, then it runs millions of them covering the entire domain of every function rather than just a few discrete values and automatically reduces failures into minimal test cases. Run it overnight and you’ll find heisenbugs that normal unit tests wouldn't have found. Unfortunately "Quick"check is very complicated, requiring a lot of training to use, takes a long time to set up for each codebase, and the value of the tests that it creates decays quickly, so it takes a massive investment in time to keep the quickcheck config up to date and useful. There’s also a tool called “PULSE” which works with Erlang to spoof the scheduler and run your code under pathological scheduling conditions, making sure that you get hit with every weird edge case around out-of-order message delivery and overloaded or deadlocked components.

He finished with a list of the remaining "Hard Problems" he sees in distributed systems:

My History with News Feeds

Back when RSS first came out and a couple major sites started supporting it, I cobbled together an "aggregator" that was just a Perl script that pulled down the dozen or so feeds that I knew of (Slashdot, memepool, and a couple others), did a tiny bit of munging to get them merged into one XML file that sat in my web root, then used an XSL transformation (via Apache Cocoon) to turn that into a single HTML page that had all the links from all those sites in one place.

I thought it was the coolest thing ever.

It was broken about 70% of the time because one feed or another in the group would have a stray ampersand or angle bracket in a headline making the entire thing invalid and breaking the transformation.

More feeds kept coming out and it got too cumbersome to do things that way, so I eventually built a database backed reader with a mod_perl frontend that had an interface a little more like the Reader style with a stream of news and a notion of read vs unread entries. 70% of the feeds were still invalid at any given time and couldn't be parsed, but at least they didn't take down my whole aggregator. I think I spent a lot of time sending email to sites letting them know that their feeds were invalid.

Then Mark Pilgrim released his Universal Feed Parser Python script along with his whole Postel's Law rant. I was way more into Perl at that point, but I knew a little Python and the promise of being able to parse even invalid feeds was quite enticing, so I rewrote the import side of my aggregator as a Python script using feedparser that then did a POST to the mod_perl app with the data. It worked great. Probably only 10% of the feeds were broken at any given time now.

The hourly cronjob that ran on my desktop pulling down and parsing all the feeds was a beast though. I would pretty much have to stop working for five minutes every hour while my machine chugged away. I put up with that for probably a year or two. It sucked, but I was hooked on having fresh news feeds all pulled into one place where I could quickly see only the new items. It was like having an IV drip of information heroin straight into my veins.

Then Bloglines came out. It had a snappier feeling interface than mine, seemed to do an even better job handling broken feeds, and most importantly, didn't drag my workstation down every hour. So I switched there without hesitation and was happy for years.

Reader came out and I stuck with Bloglines for quite a while but the Plumber kept showing up more and more frequently (Bloglines users know what I'm talking about), and there was an inevitable feeling that Google was just going to crush them sooner or later, so I eventually switched away. Most of the Reader interface I liked much less than Bloglines, but it was fast, and the keyboard commands for navigation were really nice. I've been there ever since.

I looked at my feed count the other week when they announced it was shutting down and it was well over 600. I occasionally try out standalone reader clients and mobile clients, but none of them work for my intense feed reading workflow.

A typical session for me is that I would open up Reader in my browser in the morning. There will be a few hundred unread entries. 95% of them I know will be complete noise, so the goal is to get through them all and find the entries worth actually reading in as little time as possible. In Reader, hitting g, then a, puts you in a stream of all the entries from all the categories all merged together. Then j over and over again to advance, marking each entry as read along the way.

Each entry that comes up has about half a second for me to decide if it's noise or something that might potentially be interesting. Noise just gets a j and I'm on to the next one. Potentially interesting, I open it in a background tab and continue going through my feeds. I can make it through hundreds of entries in just a few minutes that way. Once I've made it through all my feeds and have no more unread items, I have about a dozen to two dozen browser tabs going in the background. I work through each of those with browser keyboard commands (Ctrl-PgDn and Ctrl-w mostly). One pass through to sift out ones that on second look really aren't worth spending time on. Then a pass where I actually read any of them that look digestable in a couple minutes. Then a final pass where long articles I want to read later on my tablet get sent to Pocket, and what's left is generally programming-centric stuff that I want to actually do something with immediately. Eg, a new Python module that looks useful that I will dig through the code and changelog or even install and write a small script to play with.

The whole process takes about an hour, and the end result is that I've got a half dozen articles sent to my tablet for subway reading, I've read about a dozen short to medium length posts, I've probably played with a bit of code, and I've also seen a few hundred headlines go past so I have a sense of what's going on even in areas that I didn't bother investigating deeply.

I have a smartphone and an Android tablet like a good little techie, but I've had zero interest in ever reading news feeds on them, even using the Google Reader Android app. Without my full browser tab setup, I could never figure out a way to get through my feeds at even a tenth the speed of my regular workflow. I'm much happier using the devices for reading the longer articles that I'd previously saved off.

When Google changed the behavior of the "share" button on Reader last year to integrate it with Google+ and everyone got upset, I didn't really share in the outrage. But then, I also never understood why an aggregator would have a "share" button in the first place. I certainly didn't use it.

The other week, Google announced they were shutting down Reader and I said, "well, guess I'm going to have to write my own again". So the next weekend, that's what I did. I've been on a bit of a kick for the last year, freeing myself from walled garden services. I shut down my facebook account, put up my own photo gallery site and let my Flickr account lapse. I just no longer want to be dependent on services that are out of my control.

At this point, my aggregator is unpolished, but it fully supports my workflow for reading feeds. Actually, I already prefer it to Reader since everything non-essential to my workflow is stripped out. It's running at and is open to anyone to sign up, but I can make no guarantees about longevity. Honestly, at this point, you should only think about using it if you want to help me debug it.

If you've got any programming chops, I recommend writing your own. It's a fun problem that will expose you to more of the development landscape than a typical web app (eg, a task queue to take the heavy lifting out of the request/response cycle is pretty much a necessity) and you'll be able to reduce your own level of dependency on services controlled by other people with priorities that don't match your own.

New Year Python Meme 2012

Following Tarek's lead:

1. What is the coolest Python application, framework or library you have discovered in 2012?

I'm really tempted to go along with Tarek and pick PyZMQ, since 0MQ has been a pretty major mind expander for me this year, but I honestly haven't used anything from it besides the basic 0MQ bindings. Instead, I'll pick IPython Notebook, which I've really only just begun playing with (after the PyData NYC conference), but is looking pretty exciting. I'm seriously impressed at how much potential is packed into something so simple. Salt really needs mentioning too. Very simple and powerful.

Yeah, I really suck at picking only one thing to highlight.

2. What new programming technique did you learn in 2012?

I guess the biggest newish "technique" (rather than a language, library, or framework) for me in Python-land would be Behavior Driven Development. BDD with Lettuce was a big factor in the development of a major project at work. I'm hoping to streamline our Django/Lettuce/Selenium setup to make it easier for more of our projects to move in that direction.

3. What is the name of the open source project you contributed the most in 2012? What did you do?

Well, my biggest project lately, that I'm very proud of is a distributed image and thumbnail server called Reticulum that is written in Go, not Python. I did develop a prototype in Python/Django first before I settled on Go for the main implementation.

4. What was the Python blog or website you read the most in 2012?

I really have no idea. I have hundreds of blogs and aggregators in my RSS reader so it all just comes at me in one big firehose of information. By the time I've read and digested it, I've lost track of where it came in from.

5. What are the three top things you want to learn in 2013?

6. What are the top software, app or lib you wish someone would write in 2013?

I've got a big project at work starting up that is going to involve writing a ton of Netlogo. Don't get me wrong, Netlogo is a neat little platform and is very good for what it does and what we're using it for. But I would kill for a platform with the same capabilities for agent-based modelling that let me code in Python instead. Actually, years ago, I built a simulation framework in Python for our engineering department that came pretty close to being what I am imagining, but funding on that dried up, our priorities shifted, and it never quite made it out into the world. Maybe I'll revive it in a modernized form. Now I'm thinking about something architected like IPython Notebook (or built off it) with a Python engine and a web/websockets frontend that can make use of nice D3.js and SVG visualization.

Stupid Cellular Automata Tricks, Part 1

A few years back, I got really into Cellular Automata. I had a big project at work that involved implementing a bunch of simulations via CAs and generally found it to be an interesting topic.

I had the good fortune to spend some time on that project with Don Hopkins, who showed me a bunch of tricks and pointed me in some very fruitful directions (including recommending the book Cellular Automata Machines by Toffoli and Margolus as a primer).

The result is that I've got a bunch of tricks in and around Cellular Automata in my head. I don't think I invented any of these. I don't think Don even did (maybe he was just being modest). But they are things that I've never seen written down and probably should be. It recently occurred to me that I have a blog so I should, you know, do that.

I've got enough that I plan to do it as a small series of blog posts (if I can keep up the writing momentum over the next week or two).

Starting with a trick that's pretty simple but is easy to overlook.

It deals with standard two-dimensional CAs (think Game of Life) and is really just a clever way to avoid a common naïve implementation problem.

To review, there is a rectangular, tiled Grid made up of Cells, each of which has a particular State (such as "on" or "off" or a numerical value). At each generation, the new state of each cell is determined by applying a fixed set of Rules or a mathematical function using the states of the cells in its Neighborhood.

A typical neighborhood for CA is the Von Neumann Neighborhood which includes the cell itself (which we'll call "center" or just C) and the four cells that share sides with it, which we'll call "north", "south", "east", and "west" (or N, S, E, and W). There is also a Moore Neighborhood which expands the Von Neumann Neighborhood with Northwest, Southwest, Northeast, and Southeast.

The obvious representation for a CA grid in most programming languages will be a two-dimensional array of whatever type the cell's state needs to be stored as.

The naïve programmer then writes something like the following (pseudocode):

/* NOTE: square grid in all examples to simplify */
for j from 0 to SIZE - 1:
    for i from 0 to SIZE - 1:
        c = grid[i][j]
        /* get the neighborhood for the cell */
        n = grid[i - 1][j]
        s = grid[i + 1][j]
        e = grid[i][j + 1]
        w = grid[i][j - 1]
        /* and calculate the next state... */

And they run into a problem right away. For cells on the edges of the grid, one or more of the neighbors in their neighborhood doesn't exist. They probably get some kind of runtime error with an array out of bounds exception.

The standard solution here is not hard to imagine. Typically, you decide if you want the edges of the grid to "wrap" around like a torus or just have some default value. Then insert some bounds checking in the code to handle it:

for j from 0 to SIZE - 1:
    for i from 0 to SIZE - 1:
        c = grid[i][j]
        /* get the neighborhood for the cell */
        if i != 0:
            n = grid[i - 1][j]
            n = grid[SIZE - 1][j]
        if i != SIZE - 1:
            s = grid[i + 1][j]
            s = grid[0][j]
        if j != SIZE - 1:
            e = grid[i][j + 1]
            e = grid[i][0]
        if j != 0:
            w = grid[i][j - 1]
            w = grid[i][SIZE - 1]
        /* and calculate the next state... */

Not pretty, but it works and lets the programmer get on to exploring the fun world of Cellular Automata.

Now, the problem is that there is a lot of conditional logic happening in an inner loop. Each of those tests gets carried out on every cell in the grid even though only a small percentage of them is really necessary (on a sufficiently large grid there will be way more inner cells than cells on the edges).

So here's the first trick.

What it boils down to is putting a "gutter" around the grid to ensure that every cell becomes one of the inner cells and doesn't require any testing.

That translates to expanding the two-dimensional array holding the grid so it has an extra row of cells around the entire border. Then, before running each generation, prepopulating those gutters with the wraparound data (or just defaults). The code then loops over only the inside part of the expanded grid and can always safely assume that every cell has a full set of neighbors.

The pseudocode becomes something like:

grid[SIZE + 2][SIZE + 2]
/* wrap gutters */
for i from 0 to SIZE + 1:
    grid[i][0] = grid[i][SIZE + 1]
    grid[i][SIZE + 1] = grid[i][1]
    grid[0][i] = grid[SIZE + 1][i]
    grid[SIZE + 1][i] = grid[1][i]

for j from 1 to SIZE + 1:
    for i from 1 to SIZE + 1:
        c = grid[i][j]
        /* get the neighborhood for the cell */
        n = grid[i - 1][j]
        s = grid[i + 1][j]
        e = grid[i][j + 1]
        w = grid[i][j - 1]
        /* and calculate the next state... */

This isn't the most mind-blowing thing ever, but I've seen quite a few programmers starting out with CAs miss it.

I also like it as an example of a general heuristic for optimizing code by making a pass over the data to clean or prepare it and avoid expensive boundary checking altogether at a later stage when it will be more expensive.

Let's take a moment to think about the complexity analysis though. In a strict, theoretical sense, none of this really matters. If you're dealing with an N by N grid, both pieces of code are O(N^2) because real computer scientists don't care about constants. If anything, from a theoretical perspective, the second approach is worse if you are interested in storage costs.

But in the real world, I can pretty much guarantee that the second approach will perform noticeably better. Sometimes constants matter.

The next part of this series, when I get around to writing it, will be a bit more implementation specific (Python, in my case) but will reap some additional rewards from the addition of gutters to the grid representation.