Software engineer; tech consultant
139 stories

How Waze tames production chaos using Spinnaker managed pipeline templates and infrastructure as code


“At an abstract level, a deployment pipeline is an automated manifestation of your process for getting software from version control into the hands of your users.” ― Jez Humble, Continuous Delivery: Reliable Software Releases through Build, Test, and Deployment Automation 

At Waze, we’ve been using Spinnaker for simultaneous deployments to multiple clouds since it was open sourced by Netflix in 2015.

However, implementing deployment pipelines can raise issues when your organization has more than 100 microservices (and growing), multiplied by the number of environments/regions/cloud providers.

  • Managing hundreds of clusters and their deployment pipelines can quickly become a major maintenance burden. Once you have a few hundred deployment pipelines or more, keeping them up to date, or making a big change to all of them, is far from trivial. 
  • There’s no code reuse among pipelines, even though many of them are probably identical. For example, bake stages, deploy stages and resize/clone/destroy stages tend to be identical except for some parameters. 
  • Providing a paved road for deployments is easier said than done. Even though each development team should decide on their deployment strategy, it’s important to have a paved road — a way that works for most teams, which saves them time and effort. Start with that, and change as needed. However, there’s no easy way to create such a paved road and maintain it over time across all relevant pipelines in the system. 
Thankfully, Netflix — with contributions from the open-source community, including Google —added pipeline template support to Spinnaker, which solves these pain points and paves the way for multi-cloud infrastructure as code. With some much-appreciated help from the Google Spinnaker team, Waze is now using managed pipeline templates in production, and we plan to use them across our entire production system.

Unlike cloud vendor-specific solutions, Spinnaker managed pipeline templates work on top of all supported providers. (Currently: Azure, GCP, AWS, Kubernetes, OpenStack, App Engine and more are on the way.)

This means that for the first time, we can start realizing the dream of infrastructure as code across multiple clouds, whether you’re deploying to multiple public clouds, to a local Kubernetes on prem or to a mixed environment. We can see a future where automation, or some form of AI, decides to change instance types and other factors to reduce cost, improve utilization or mitigate real-time production incidents.

Runnable pipelines are composed of a pipeline template combined with pipeline configurations (using variables). Multiple configurations can use the same pipeline template as their base. In addition, pipeline templates support inheritance.

The benefits of pipeline templates 

Multi-cloud/provider continuous delivery and infrastructure as code (without vendor lock-in) 

Saving pipeline templates and configs in version control, subjecting them to code review and leaving an audit trail of why each infrastructure change was made are all extremely important for keeping your production system clean. These practices allow each production change to be understood, tracked in case there are any problems, and reproducible.

Being able to reproduce infrastructure enhances existing standards like reproducible builds, provides stability across the system and makes debugging, when required, easier. For example, changing an instance type, or adding a load balancer, is no longer just a “Clone” operation in Spinnaker; it’s now a code commit, a code review and a Jenkins job picking up the pipeline-template config change and publishing it to Spinnaker. After that, you just run the resulting pipeline(s) to apply the infrastructure change. You can easily track the infrastructure’s change history.

All of this is done in a way that supports multiple cloud providers. This may even be relevant for companies using Kubernetes on a single cloud provider, because they're required to interact with two sets of APIs and two control planes to manage their infrastructure. This is also why Kubernetes is treated as a separate cloud provider in Spinnaker. Spinnaker neatly abstracts all of that, while other solutions could lead to vendor lock-in.

Code reuse for pipelines 

In a continuous-delivery environment, many pipelines can contain identical components, except for parameters. Managed pipeline templates provide a perfect way to reduce code duplication and centralize pipeline management, affecting hundreds or thousands of pipelines downstream. This method reduces mistakes, saves time and allows the same template-based configuration to provision multiple identical environments, such as staging, QA and production.

Automated checks and canary analysis on each infrastructure change 

You’re probably familiar with using deployment pipelines to manage application and configuration changes, but now you can use them for infrastructure changes, too — changed instance types, firewall rules, load balancers and so on. Having an official deployment pipeline for each application means you can now run automated testing and canary analysis to even these changes, making infrastructure changes much safer.

Defining a clear paved road for deployments + override capabilities by each application 

We've found that most deployment pipelines follow a very simple pattern:
However, most teams customize these pipelines based on their requirements. Here are just a few examples:

  • Resize previous group to zero instead of destroying. 
  • Lots of parameters for the deploy stage, wait stage. 
  • Two canary analysis stages (first with 1% of traffic, the next with 50% of traffic). 
  • Various parameters passed on to the canary analysis. 
Pipeline templates allow these teams to start off with a paved road, then mix and match stages, specify parameters, and replace or inject stages as needed, while still using the maintained paved road template. If a common stage in the base template is updated, the pipelines inherit the updates automatically. This reduces maintenance considerably for hundreds of pipelines.

Conditional stages use variables to control which stages are enabled. By utilizing conditional stages, you can use a single template for more than one use case. Here’s a video demo showing how it looks in practice.

Injecting stages allows any pipeline child template or configuration to add stages to the pipeline stage graph. For example, if a team uses a basic Bake -> Deploy -> Disable -> Wait -> Destroy pipeline template, they can easily inject a Manual Judgement stage to require a human decision before the previous group is disabled and destroyed. Here’s how this looks in practice.

Automatic deployment pipeline creation for new services 

When a new service comes online — whether in the development, staging or production environments — pipeline templates can be used to create automatic deployment pipelines as a starting point. This reduces the effort of each team as they get a tested and fully working deployment pipeline out of the box (which can later on be customized if needed). Traffic guards and canary analysis give us the confidence to do this completely automatically.

Auto-generate pipelines to perform a rolling OS upgrade across all applications 

Applying OS security updates safely across an entire large scale production system can be quite a challenge. Doing so while keeping the fleet immutable is even harder. Spinnaker pipeline templates combined with canary analysis can provide organizations with a framework to automate this task.

We use a weekly OS upgrade pipeline, which spawns the official deployment pipelines of all production applications, in a rolling manner, from least critical to most critical, spreading the upgrades to small batches which are then cascaded across the entire work week. Each iteration of the pipeline upgrades more applications than it did the day before. We use the official deployment pipelines for each application, sending a runtime parameter which says “Don’t change the binary or configuration — just rebake the base operating system with latest security updates,” and we get a new immutable image unchanged from the previous image except for those security updates. All this while still going through the usual canary analysis safeguards, load balancer health checks and traffic guards.

Pipeline templates can take this pattern one step further. Any new application can be automatically added to this main OS upgrade pipeline, ensuring the entire production fleet is always up to date for OS security updates.


Spinnaker pipeline templates solve a major issue for organizations running a lot of deployment pipelines. Plus, being able to control infrastructure as code for multiple providers, having it version-controlled and living alongside the application and configuration, removes a major constraint and could be a big step forward in taming operational chaos.

Getting-started references 

To get started with pipeline templates:

  • Set up Spinnaker
  • Read the spec, review the pipeline templates getting started guide
  • Check out the converter from existing pipelines into templates, a good place to start. 
  • roer, the spinnaker thin CLI used to validate and publish templates and configurations to Spinnaker. (Note: Better tooling is on the way.) Remember: roer pipeline-template plan is your friend. Also, orca.log is a great place to debug after publishing a template. 
  • Watch the full demo
  • Try out some example templates
  • Join the Spinnaker slack community and ask questions on the #declarative-pipelines channel.
Read the whole story
2504 days ago
Minneapolis, MN
Share this story

Spinnaker Orchestration


Author: Rob Fletcher

When the Spinnaker project first started more than two years ago we implemented Orca — Spinnaker’s orchestration engine µservice — using Spring Batch. It wasn’t an entirely unreasonable fit at the time. It gave us atomic, compartmentalized units of work (tasks in a Spinnaker pipeline), retry semantics, the ability to define branching and joining workflows, listeners that could get notified of progress and many other things we needed. However, in the long run, that choice—and our implementation on top of it—imposed a number of constraints.

For a long time some of the operational constraints of Orca have been a source of frustration and not something we were proud of or keen to advertise.


The most obvious constraint was that Orca was a stateful service—it pinned running pipelines to a single instance. Halting that instance, whether due to a failure or a normal red-black deploy, would unceremoniously stop the pipeline in its tracks with no easy way to continue it.

In addition, Orca locked a thread for the entire duration of the pipeline, which although typically minutes long, are not infrequently hours or days. It did this even when the pipeline was doing nothing more than polling every so often for a change, waiting for a predefined duration or even awaiting manual judgment before continuing.

When deploying a new version of Orca we’d have to allow work to drain from the old server groups. Although we automated this process (monitoring instances until they were idle before shutting them down) it wasn’t uncommon for a handful of instances to be hanging around for days, each one draining one or two long running canary pipelines.

Because of the way we mapped pipelines to Spring Batch jobs we had to plan the entire execution in advance, which is very limiting. We were forced to jump through all kinds of hoops to build functionality like rolling push deployments on top of such a static workflow model. It was also very hard to later implement the ability for users to restart pipelines after a stage failed or to automatically restart pipelines dropped in the case of instance failure as the mapping of pipeline to Spring Batch job initially wasn’t idempotent.

As an aside, I should point out that most of the constraints we struggled with are not inherent limitations of Spring Batch. It’s very good at what it does. But it’s really not intended as a general-purpose workflow engine and certainly not designed with distributed processing in mind.

Sabrina’s Christmas Wish

Despite these issues, things hung together well enough. We were aware of the limitations and chafed against them, but they never bit us so badly that we prioritized tackling them over some of the new features we were working on. Although, as the engineer who implemented most of Orca in the first place, I was desperate to fix what I saw as being my own mess.

I finally got that chance when the volume of internal use at Netflix hit a point that we decided it was time to tackle our resiliency and reliability concerns. The fact that Orca is ticking over when running between 2000 and 5000 pipelines per day (peaking at over 400,000 individual task executions some days) is not too shabby. However, with Spinnaker existing as the control plane for almost the entire Netflix cloud and the project growing in use in the wider community we felt it was time to harden it and make sure resiliency was something we had real confidence in.

To that end, we recently rolled out a significant change we dubbed “Nü Orca”. I’d like to take some time to introduce the changes and what makes them such an improvement.

Brand New Couch

Instead of using Spring Batch to run pipelines, we decided to implement our own solution using a simple command queue. The queue is shared across all the instances in an Orca cluster. The queue API has push operations for immediate and delayed delivery and a pop operation with acknowledgment semantics. Any message that goes unacknowledged for more than a minute gets re-delivered. That way, if we lose an Orca instance that’s in the process of handling a message, that message is simply re-delivered and will get picked up by another instance.

Messages on the queue are simple commands such as “start execution”, “start stage”, “run task”, “complete stage”, “pause execution”, etc. Most represent desired state changes and decision points in the execution while “run task” represents the atomic units of work that pipelines break down into. The intention is that messages should be processed quickly — in the order of seconds at most when running tasks that talk to other services such as CloudDriver.

We use a worker class — QueueProcessor — to pop messages from the queue. It is invoked by Spring’s scheduler with a 10ms delay between polls. The worker’s only job is to hand each message off to the appropriate MessageHandler. Once a handler has processed a message without any uncaught exceptions, the worker acknowledges the message. The call to the handler and the acknowledgment of the message happen asynchronously using a dedicated thread pool so they do not delay the queue polling cycle.

Message handlers can add further commands to the queue. For example:

  • StartStageHandler identifies the sequence of sub-stages and tasks and then sends StartStage or StartTask commands to set them running.
  • StartTaskHandler records that a task is running then queues a RunTask command.
  • RunTaskHandler executes a task once and then either queues the same RunTask command with a delay if the task is not complete (for example, a task polling until a server group reaches a desired number of healthy instances) or a CompleteTask if the execution can move on.
  • CompleteStageHandler figures out what downstream stages are able to run next and queues a StartStage message for each or a CompleteExecution message if everything is finished.

…and so on.

This design allows work to naturally spread across the Orca cluster. There’s no requirement for a queued message to be processed by any particular Orca instance. Because un-acknowledged messages are re-queued, we can tolerate instance failure and aggressively deploy new versions of the service without having to drain work from older server groups. We can even turn Chaos Monkey loose on Orca!

Message handlers can also emit events using Spring’s application event bus that keep other listeners informed of the progress of a pipeline. We use this for sending email / Slack notifications and triggering downstream pipelines — things Orca was doing already. We’ve also added a log of the activity on a particular pipeline since trying to track distributed work using server logs will be next to impossible. Processing of these pub/sub events does currently happen in-process (although on a different thread).

We have in-memory, Redis and SQS queue implementations working. Internally at Netflix we are using the Redis implementation but having SQS is a useful proof-of-concept and helps us ensure the queue API is not tied to any particular underlying implementation. We’re likely to look at using dyno-queues in the long term.

Why Redis? Partially because we’re using it already and don’t want to burden Spinnaker adopters with further infrastructure requirements. Mainly because Redis’ speed, simple transactions and flexible data structures give us the foundation to build a straightforward queue implementation. Queue data is ephemeral — there should be nothing left once a pipeline completes — so we don’t have concerns about long-term storage.

Fundamentally the queue and handler model is extremely simple. It maps better to how we want pipelines to run and it gives us flexibility rather than forcing us to implement cumbersome workarounds.

Yes And

Ad-hoc restarts are already proving significantly easier and more reliable. Pipelines can be restarted from any stage, successful or not, and simultaneous restarts of multiple branches are no problem.

We have also started to implement some operational capabilities such as rate limiting and traffic shaping. By simply proxying the queue implementation, we can implement mechanisms to back off excessive traffic from individual applications, prioritize in-flight work or urgent actions like rollbacks of edge services, or pre-emptively auto-scale the service to guarantee capacity for upcoming work.

Let’s Find Out

If you want to try out the queue based execution engine in your own pipelines, you can do so by setting queue.redis.enabled = true in orca-local.yml.

Then, to run a particular pipeline with the queue, set “executionEngine”: “v3” in your pipeline config JSON.

To run ad-hoc tasks (e.g. the actions available under the “server group actions” menu), you can set the configuration flag orchestration.executionEngine to v3 in orca-local.yml.

Within Netflix, we are migrating pipelines across to the new workflow engine gradually. Right now, Nü Orca exists alongside the old Spring Batch implementation. Backward compatibility was a priority as we knew we didn’t want a “big bang” cut-over. None of the existing Stage or Task implementations have changed at all. We can configure pipelines individually to run on either the old or new “execution engine”.

As we gain confidence in the new engine and iron out the inevitable bugs, we’ll get more and more high-risk pipelines migrated. At some point soon I’ll be able to embark on the really fun part — deleting a ton of old code, kludgy workarounds and cruft.

After the Party

Once we’re no longer running anything on the old execution engine, there are a number of avenues that open up.

  • More efficient handling of long-running tasks.
  • Instead of looping, the rolling push deployment strategy can “lay track in front of itself” by adding tasks to the running pipeline.
  • Cancellation routines that run to back out changes made by certain stages if a pipeline is canceled or fails can be integrated into the execution model and surfaced in the Spinnaker UI.
  • The implementation of execution windows (where stages are prevented from running outside of certain times) can be simplified and made more flexible.
  • Determination of targets for stages that affect server groups can be done ad-hoc, giving us greater flexibility and reducing the risk of concurrent mutations to the cloud causing problems.
  • Using a state convergence model to keep an application’s cloud footprint in line with a desired state rather than simply running finite duration pipelines.

I’m sure there will be more possibilities.

I’m relieved to have had the opportunity to make Orca a better fit for Netflix’s distributed and fault-tolerant style of application. I’m excited about where we can go from here.

Spinnaker Orchestration was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Read the whole story
2608 days ago
Minneapolis, MN
Share this story

Field notes - ElasticSearch at petabyte scale on AWS

1 Share

I manage a somewhat sizable fleet of ElasticSearch clusters. How large? Well, "large" is relative these days. Strictly in ElasticSearch data nodes, it's currently operating at the order of:

  • several petabytes of provisioned data-node storage
  • thousands of Xeon E5 v3 cores
  • 10s of terabytes of memory
  • indexing many billions of events a day (24/7/365)

And growing. Individual clusters tend to range anywhere from 48TB to over a petabyte. When I said "petabyte scale", that includes individual clusters:


It's not that large in terms of modern systems from a resource perspective, but seems fairly large in the ElasticSearch world. Having spent a little over a year (with ES versions 0.90.x - 2.2.x) managing these systems has yielded countless designs, operations strategies and other discoveries that I feel are worth sharing.

Every topic discussed in this post could each be exploded into individual blog posts or translate to several days or weeks worth of testing for the reader. This post would be 100 pages to dive into every area with great detail, so my goal is to hit high level areas I find important when operating ElasticSearch at some level of scale.

I'm going to assume you've heard twelve times over that ElasticSearch is basically an abstraction layer of distributed and replicated database mechanics on top of Apache Lucene with a (great) search API strapped on. An index is a Lucene instance. What replicas are. Whether or not to have a quorum of dedicated masters. Etc. I'm more interested in sharing general thought patterns about how we operate, and hopefully it can be translated into decisions particular to your own situation.

So anyway, let's do it!

Capacity Planning

and general cluster design

I primarily spec around two drivers: I have an indexing driven workload and specific requirements around data retention (in days). Conversations around shard counts tend to relegate to references of black magic or art, but it's actually a bit more straightforward than that (...or is it?).

Basically, consider a shard a unit of performance. It has a particular performance profile over time (e.g. under constant indexing, resource demand increases in relation to the segment sizes during merging, until leveling out at the max segment size). Then there's expected costs based on activity (e.g. larger merges more heavily stress the garbage collector, querying larger data sets occupies the query thread pool for more time, added query complexity and regex burns more CPU, etc.). Finally, every operation lives within explicit mechanical quantities. Merging, search, indexing, etc - all require threads. These threads are specifically defined (and mostly configurable) in count. We know how many threads per shard or per node that we can consume, and that each activity type has it's own particular resource demands characteristics (CPU, disk, memory, or combinations). This means everything is measurable.

The crux of ElasticSearch, or any complex system for that matter, is that capacity planning feels like black magic or an art because most things go unmeasured or uncontrolled.

This is our theme: measuring and controlling. Here's how I approach this with our clusters.

(Roughly) Measuring

Our standard data node is the d2.2xlarge instance. It has 8 cores of Xeon E5-2676v3, 61GB of memory, and 6x 2TB direct-attach "probably good SATA or nearline-SAS" drives. It's configured with a 6-way LVM stripes for 11TB usable storage (after fs and other overhead) that can sustain a solid, consistent ~900MB/s sequential read/write*.

*(AWS doesn't over-provision storage on d2s; the full hypervisor is 24 spindles, which you're allocated all 24 at the largest 8xl instance. With each size down, you're just halving access to the resources, so you get 6 disks / 8 cores / 61GB with the 2xl. In testing, the hypervisor HBA also supports simultaneous saturation of all disks - hence the consistent performance you'd see.)

Each data node is basically a "shard container" for our "units of performance". Take your standard build, whatever it is, stick a single-shard index on it and light it up. You'll see an ebb and flow consistency to the resource utilization. For instance, if we saturate it with indexing and wait until the max merged segment sizes are hit, we'll see a cycle of CPU, storage IO, CPU, storage IO. That's compression/decompression inside segment merging (CPU) and flushing the new, larger segments out to disk (big, sequential IO). When the merges hit max load, you'll see a specific number of cores saturated on merge threads (the size of the merge thread pool; if not explicitly defined, it's calc'd based on CPU cores. Check _nodes/_local/hot_threads - you'll see a bunch of [[index-name][shard-num]: Lucene Merge Thread #xxx threads.). And when the merge flushes trigger, you'll see the disks saturated for a somewhat consistent period of seconds. Merge, flush, merge, flush. That's indexing.

Indexing is much more predictable than query. You'll find some docs/sec. rate (your doc size and structure really matters, too) that you can safely index into a single shard until the indexing/bulk threads queued hovers near it's limit (if you had to monitor one thing, stats from _cat/thread_pool would probably be it) or your bulk indexing time is simply too high (which I care more about). In our case, we'll say that's roughly 2,500 docs/sec. per shard. At this rate, we will saturate 3 cores on merging (let's say we locked our merge thread pool at 3 per shard for predictability / scaling purposes, which I'll mention shortly) and ocassionally peg the storage on new segment flushes for about 3-5s. While capacity planning revolves around your own requirements, I have an indexing priority workload; this leaves 5 open cores and a storage system that's not overloaded. I can slap on another shard on this node and squeeze 5K docs/sec. per node. That's roughly our per-node indexing capacity.

Single shard at peak utilization, locked at 3 merge threads. New line separated hot_threads output. img

So, these numbers may actually be made up, but the concept is real. Follow this ramble for a second. Rather than getting lost right off the bat thinking about arbitrary cluster sizes or trial/error shard counts, you establish performance baselines per shard as it runs on our standard build. You then look at our desired workload and determine the number of shards necessary to fulfill that workload. Finally, you think about what workload density you want to run per box (controlled with our shards at peak utilization per box count). How much headroom do we want to leave on our nodes for other activity— query, rebalances, fault tolerance, replication? We let calculated shard counts and headroom requirements dictate node counts.

Want to run 40K docs/sec. and don't care too much about query?

40,000 / 2,500 = 16 shards per index / 2 shards-per-node = 8 nodes

Want replication? Roughly double it, go with 16 nodes (although the scaling isn't exactly linear - there's additional memory and networking overhead).

Seriously, this mostly works. I roughly scale by speculation using this sort of calculation, then later scale-to-fit according to the realized performance characteristics (query for instance varies greatly across clusters). It's been successful from 500 doc/sec. clusters to well over 150K.

For instance, if we saturated our 2 shards/node example at 5K docs/sec., we'd eventually leave 6 cores pegged solid when the max segment size merges are at play (+ bulk thread overhead*). Are 2 free cores enough for your query load? Maybe. Definitely not if you're expecting to regularly perform complex regex queries against 50GB of index data.

higher workload density, less headroom: img

same workload at lower density, more headroom: img

One major caveat is that the observed indexing resource demand depends on many factors. In order to see the described behavior, you would likely require specific tuning that deviates from the ES default. As mentioned, my workload is indexing priority. For performance and scaling purposes, it's preferable to get events to disk and let merging run as fast as possible. My resource demand patterns behave this way, intentionally, largely due to configuring index.translog.durability to 'async' and to 'none', in addition to favoring fewer/larger bulk requests and having large event sizes. If you're using defaults, have small events and issue many requests with great concurrency, you'll likely burn CPU on bulk threads and would experience a very different scaling pattern.

The goal of capacity planning and coming up with a cluster spec is understanding the general mechanics of ElasticSearch and how they relate/compete for resources. At least from my perspective, it's a essentially game of establishing per-shard performance baselines and building a cluster large enough to house the required number of "scaling units" while leaving enough headroom for other activity. Although this is simply just an idea; to be effective, this requires real-world testing and data on your part.

Unfortunately this has all been the easy part, and we got to enjoy it by isolating one single function of ElasticSearch (indexing). Sizing becomes more difficult when you introduce other user and cluster activities such as queries, shard rebalances, and long GC pauses. The recently mentioned "headroom for other activity" is that difficult part, and the best approach to sanity is controlling inputs; the second half of our theme and next topic.


ElasticSearch stability is all about controlling inputs. While in the measuring section I talked about capacity planning in order to handle known input volumes such as indexing, it's desirable to manage all resource-demanding activity, beyond indexing. Otherwise figuring out a spec at all sort of doesn't matter, unless you just round way up (and own flip flops made out of hundred dollar bills).

I consider there to be 3 primary categories of control to think about:

  • indexing
  • query
  • general cluster activity (shard replication, rebalance, etc.)

I'll just hit each of these in order.

For indexing: we use a homebrew indexing service. I consider it absolutely critical that indexers have configurable or self-tuning rate limiters (be it at the indexer or an upstream throttling service), as well as controls on batch indexing parameters (such as batch size / timeout flush triggers). You've probably read about using bulk indexing - it's all true, do that. One important piece I'd like to add that is probably less often considered is the total number of outstanding indexing requests your indexers can hold against a cluster. If you reference the capacity planning section, your nodes each have an active and queued bulk thread limit; your total number of outstanding bulk index requests shouldn't exceed this.

If we build spec a cluster to 60K docs/sec., we lock our indexers at 60K and queue upstream if the total ingest exceeds the ES capacity specification (and of course fire off an espresso and scale the cluster). Indexing control is incredibly valuable and I tend to see it skipped quite often and it addressed by slapping on more resources.

Performance note from the previous section mentioning the cost of bulk threads: while playing with your bulk request rate vs size, I would balance the performance you're squeezing from your indexers against the bulk thread utilization on the ES data nodes. For instance if you max everything out, see how far you can scale back your request rates at the indexers without impacting total performance. You'll likely open up some CPU capacity on the ES cluster.

For query: basically same story. We have a homebrew query service that treats queries just like indexing. Indexing and query are both a resource demanding workload, right? We only allow so many concurrent queries with a TTL'd query queue. One caveat is that until ElasticSearch has improved running query controls, the actual impact of a single query can be considered an almost non-deterministic workload if you're exposing it to external users. Unless you syntactically prevent it, there's currently nothing that can prevent a user from submitting a single query that will chew up 40 minutes on 1,100 cores followed by cataclysmic garbage collections. Query rate and concurrency on the other hand, is at least an approachable control point.

For cluster activity: I think this one is pretty neat. So in the early days of ElasticSearch in production (like version 0.90 era), there'd be fantastic outages triggered by say an initial imbalance of primary shards over the available data nodes. One data node would hit the high watermark and kick off a shard rebalance on an older index, which of course I totally left unthrottled. The combined double indexing load (it wasn't designed in this scenario for 2 primaries on a single node) and shard migration would cause full GC pauses that exceeded the cluster timeout, ejecting the node from the cluster.

I have a thousand stories like that and equally as many one-liners to fix it. I mean, if this doesn't tickle your neckbeard:

$ curl -s localhost:9200/_cat/shards | awk '{if($3~/p/) p[$(NF)]++}; {if($3~/r/) r[$(NF)]++} END {for(x in p) if(p[x] >1) print x,p[x]," primaries"} END {for(x in r) if(r[x] >1) print x,r[x]," replicas"}'
xxx-search05 3  primaries  
xxx-search06 2  primaries  
xxx-search07 6  primaries  
xxx-search08 3  primaries  
xxx-search05 3  replicas  
xxx-search06 2  replicas  
xxx-search07 3  replicas  
xxx-search08 4  replicas  

The real fix was to simply control this activity. A lot of it came down to changes in settings (e.g. the primary shard isolation weight [now deprecated I believe]). The rest was solved by quickly whipping together a settings scheduler service I've named sleepwalk. It allows us to apply transient cluster settings on a time schedule. For instance, "allow 0 rebalances and more aggressively limit transfer rates between 9AM and 9PM, otherwise, go nuts".

Since most workloads fluctuate with daytime highs to nighttime lows, it was easy to establish an index lifecycle. I lock down general cluster activity during peak indexing, optimize and replicate (more on that later) the index using other tooling, then open the cluster up for whatever it needs to do:


Here, we allow 5 concurrent shard rebalances at fairly unrestricted transfer rates. Then right around the 'sleepwalk end' annotation, our tasks finish and the indexing rate starts to kick back up.

Naturally, if you set out to control cluster activity as a load inducing input, you'll figure it's equally important to monitor things like shard movements or recoveries like you would GCs or storage utilization. Definitely monitor that stuff. Tracking shard movements/states is tremendously useful.

For a quick-n-easy hack, it works incredibly well and completes our 3rd control point. Having all of these measures implemented (in addition to a lot of time testing and tuning between instance types, storage and cluster topologies, JVM foolery), we've brought tremendous performance and stability improvements to our fleet.

Other considerations

You probably still have some thoughts in mind. "So should I use SSDs?", "Do I need 128GB of memory for the page cache?".

I'm not a fan of prescribing generalized performance recommendations without context. Why? Let me recite a personal systems quote:

Saying "an HDD is too slow" is like saying "steel is too heavy, it can't float", concluding you shouldn't build a ship from it.

Context and specifications matter. Think about requirements in terms of "search x must return in 100ms" or "max size segment flushes should never take more than 5 seconds". Start designing your system while considering how each component helps you tick off those reqs.

In the case of SSDs of course, no matter what, the latency (and very likely sequential IO) will be significantly faster than HDDs. But does it matter for you? Take my workload for instance, it's indexing driven and retention is everything. 200TB of ElasticSearch data is borderline pocket change. We have relatively large documents at ~2KB+ and absolutely burn cores on indexing (lz4 decompression/compression cycles or DEFLATE in ES 2.x). If I saturate a standard box on indexing where every core is pegged solid, the disks are mostly idle, saturated with a big sequential write, idle, saturated, idle, etc. I could swap the storage for SSDs and probably chop that flush time from 5s to 2s. But do I care? Not when data retention is my secondary capacity spec and I just ramped storage costs up by some multiple, yet acquired no meaningful gain according to the expected user experience standards.

What about query you ask? I would think about your worst and average case latency tolerances (Hey what are SLOs anyway?). The app that my ES clusters back is asked the type of questions that you might ask a Hadoop based app, so 5s query responses are relatively "fast" (having a single cluster with an aggregate sequential read throughput of > 70GB/s sort of cool & helpful). Most really long query responses are almost always complex regex that's CPU rather than storage bound. That said, I tried up to i2.2-8xlarge instances. Basically, it generally sucked and definitely wasn't worth it for us; the balance of storage to CPU perf was totally off. The indexing performance was worse than c3 instances with EBS optimized channels and GP2 storage simply because of the doc size and amount of lz4 happening. But the i2 may be the best instance for you.

But don't get me wrong about hardware. Personally, my house is full of SSD everything because flash is awesome. The fact is that usually someone is paying you to design systems rather than to be awesome. My advice is to stick to thinking about systems design rather than subscribing to generalizations.

Ultimately if I did have to generalize, you'll probably need more CPU resources than most people would initially guess. On AWS, c4 and d2 instances are your friend. If you have low data volumes, lots of cash or are query latency sensitive: yeah, slap on the fastest storage you afford.`

An additional factor in your general cluster design that should dictate data node spec is minimum transfer times across the cluster. A number every system builder should remember is that with perfect saturation, you're going to move a theoretical max of no more than 439 GB/hr. for each 1Gb of network throughput. If you thought stockpiling 20TB per node with 1Gb links was a good idea because it covers your indexing/query bandwidth and will save rack space, you're asking for it. If you experience a node fault and need to evacuate the entire data set from it, measuring recovery in days sounds frightening.

In our larger cluster designs, there's definitely a factor of weighing the storage density and total volumes against attributes like cluster aggregate storage throughput and bisectional network bandwidth. I think about questions like "What timelines should I expect if I have to re-replicate these 20 indices that hold 300TB of data?".

Strange Reliability Ideas

and further performance thoughts

Reliability in many ways is similar to capacity planning, and they definitely relate. I'll talk about two subclasses of reliability - stability and durability.

In terms of stability, this was mostly addressed in the previous section (size right and control inputs). I'd say the largest stability factor outside of that is query complexity and volume. I feel like I see two categories of query workloads:

  • many and small
  • few and large

The many and small being those apps that serve up "____ thousand queries per second" but the query complexity and time ranges (or referenced data set sizes) are relatively small. The few and large is what I deal with: "let's take these 25 billion docs and make a 7 layer regex dip out of it". The greatest impact from really complex queries targeting large datasets is sustained CPU usage and the follow up massive GCs and long pauses. If you can't syntactically prevent it (let's say those wild queries are actually required), the best bet is to probably go with the query throttling method I previously mentioned and aggressively restrict the active query concurrency.

If you truly want to scale up higher volumes of complex queries or really high rates of small queries, there's a variety of tiered node cluster architectures that are often implemented. For instance, if you had a relatively low indexing volume and huge query volume: build a pool of indexing nodes (defined via tags) and a separate pools of query nodes and crank up the replicas count. At this point, you could view your the replica nodes as a single chunk of capacity (cores, heap memory) set aside for query. Need more query capacity? Grow the pool from 240 cores / 960GB heap to 360 cores 1.44TB heap and spin up the replica count.

In regards to durability, ElasticSearch's most obvious control functionality is shard replication. Excluding the galactic topic of byzantine style failures, replication strategy has long reaching performance and total capacity considerations. Out of the box, I'm going to guess most people are cutting indexes with the default 1 replica per primary setting. The obvious cost to this is doubling the storage consumption. The less obvious is that indexing into a replicated index has (obviously) worse performance than writing into a non-replicated index. What's less obvious until you measure it is that the performance consistency with replication has always been erratic, at least in my experience. Indexing latency and resource usage fluctuates over a greater range than it does writing into a non-replicated index. Is it important? Maybe, maybe not.

It did give me some ideas, though. It sounds crazy but you should ask yourself: "Do I actually need the active index to be replicated?". If it's not a hard yes, having no replicas is actually pretty incredible. Cluster restart times are really fast (this has diminished in newer versions of ElasticSearch due to improved recovery mechanics) and you can seriously squeeze awesome indexing performance per node. And the more predictable performance consistency of a non-replicated index means that I can run data nodes closer to the edge of my headroom / spare capacity line without randomly flipping too far into it and starving out other workload demands. Sounds dangerous, right? Of course it is. Basically, have a good replay mechanism and checkpoint often by rolling the active index and replicating the one you just phased out. Lose a node? Cut another index to pick up new writes, allocate the missing shard in the impacted index and replay your data. Older replicated indices will be recovering on their own in the background.

Similarly, you should ask yourself: "Does my data lose value as it ages?". If so, don't carry replicas for the entire lifecycle of the index. Off-cluster storage (such as s3 or the massive warm storage in your datacenter) is very likely that of a lower $/GB cost. If you hold on to 90 days worth of indices but 95% of your queries hit the most recent 7 days, considering sending snapshots off-cluster and flipping older indices to no replication. If it fits in your SLO, it's an easy shot at slashing the footprint of a huge cluster.

Lastly, both of these topics talk a lot about playing with shard counts. I should caveat that shards are not free (even when they carry no data) in that every shard occupies some space in the cluster state. All this metadata is heartbeated to every node in the cluster, and I've definitely experienced some strange reliability problems in high shard / high node count clusters. Always aim for the fewest shards you can get away with, all things considered.


and other live cluster mutations

Distributed database upgrades, everyone's favorite. I think one of the first questions I always get about our fleet (strangely) is "How long do your rolling restarts take? Ours take days!". Oh man. I'm way too sassy for processes that unreasonably slow.

Option 1 is obvious. If you're super SLA relaxed and can schedule an outage window, do a full stop > upgrade > start, and it's good to go in like 3 minutes (I made it sound simple but there actually is some pre-flight sanity checks and other operations stuff that happens).

Option 2, if you live in reality and zero downtime is your goal, I actually dislike the rolling upgrade/restart method. So much data shuffling, coordination and time. Option 2 is to:

  1. Splice new nodes into your cluster (with either rebalances set to 0 or some allocation exclusion on them so rebalances don't kick off willy nilly).


  2. Cut a new index that's allocated strictly to the new nodes. Start writing inbound data to this index.

  3. Then, set exclusions older indices and throttle the rebalances (using the cluster.routing.allocation.node_concurrent_recoveries and indices.recovery.max_bytes_per_sec settings).


  4. When the original nodes are clear of data, remove them.


The new nodes could be simply new configurations, new version of ElasticSearch (but don't try this across major versions ;)), or even part of a recovery operation (we don't try to fix nodes; at the first sight of a fault, we simply stick a new one in the cluster and set an exclusion on the faulted node).

Once you have this operationally wired, it's pretty incredible to watch. Chuck in the new nodes and simply spin (or schedule with sleepwalk) the node_concurrent_recoveries and recovery.max_bytes_per_sec knobs, wait until the deprecated nodes are clear, then terminate.

We have a ton of ElasticSearch tricks, but I have to say this is probably the most powerful relative to how simple it is. From my perspective, it was the best way to slide around petabytes of ES multiple times with zero downtime.

Final Words

Hopefully this post struck an interesting balance between helpful systems ideology and ranting. It's a little glimpse into one of the many incredibly cool projects that I work on and I thought I would keep it high level for two reasons.

The first is that I for some reason rarely feel like any specific ElasticSearch topic was the one that I'd do a deep dive post on, so I never end up writing on the topic at all. The second is that I've experienced several large projects spanning over two years that a lot of other people have been a part of. I feel like we sort of have a cool story that is owed thanks and credits for those involved:

Chris for playing manager yet still having worked through every whiteboard session to date while continuing to pull 1AM barrel-roll cluster fixes. Nathan for stepping in and making it a two man team (yes, count 'em) managing a petabyte scale ElasticSearch fleet basically as a side-project. Brad for operational/financial navigation and doing an incredible job visualizing and storytelling our work internally. Shaun for helping with lots of the automation tooling not shared in this post. Justin, Eric, and Bill for building our ElasticSearch index/search clients and staying up late working through hard problems.

And of course, Antoine Girbal, Pius Fung and the rest of the ElasticSearch team for being walking copies of the ElasticSearch code. The lightning fast responses to the most obscure ElasticSearch technical questions is unparalleled. Plus you guys have a fun office ;).

Read the whole story
2950 days ago
Minneapolis, MN
Share this story

Young Han Solo movie announced

1 Comment and 3 Shares

Disney is proceeding at full steam in delivering more Star Wars to your eyeballs. Today they announced that Christopher Miller and Phil Lord, the duo behind The Lego Movie, will direct a movie about a young Han Solo.

The screenplay is written by Lawrence Kasdan and Jon Kasdan. The story focuses on how young Han Solo became the smuggler, thief, and scoundrel whom Luke Skywalker and Obi-Wan Kenobi first encountered in the cantina at Mos Eisley.

Release date is May 2018. As Princess Leia once said, "Disney, I hope you know what you're doing."

Tags: Christopher Miller movies Phil LordStar WarsThe Lego Movie movies
Read the whole story
3272 days ago
Minneapolis, MN
Share this story
1 public comment
3297 days ago
Don't. F**k. This. Up.
3297 days ago
At this point does it really matter?
3297 days ago
It's going to become like the Marvel Universe where I can't even be bothered to watch the trailers of the new films, let alone the actual movie.

A Well Known But Forgotten Trick: Object Pooling

1 Share

This is a guest repost by Alex Petrov. Find the original article here.

Most problem are quite straightforward to solve: when something is slow, you can either optimize it or parallelize it. When you hit a throughput barrier, you partition a workload to more workers. Although when you face problems that involve Garbage Collection pauses or simply hit the limit of the virtual machine you're working with, it gets much harder to fix them.

When you're working on top of a VM, you may face things that are simply out of your control. Namely, time drifts and latency. Gladly, there are enough battle-tested solutions, that require a bit of understanding of how JVM works.

If you can serve 10K requests per second, conforming with certain performance (memory and CPU parameters), it doesn't automatically mean that you'll be able to liearly scale it up to 20K. If you're allocating too many objects on heap, or waste CPU cycles on something that can be avoided, you'll eventually hit the wall.

The simplest (yet underrated) way of saving up on memory allocations is object pooling. Even though the concept is sounds similar to just pooling objects and socket descriptors, there's a slight difference.

When we're talking about socket descriptors, we have limited, rather small (tens, hundreds, or max thousands) amount of descriptors to go through. These resources are pooled because of the high initialization cost (establishing connection, performing a handshake over the network, memory-mapping the file or whatever else). In this article we'll talk about pooling larger amounts of short-lived objects which are not so expensive to initialize, to save allocation and deallocation costs and avoid memory fragmentation.

Object Pooling

Read the whole story
3275 days ago
Minneapolis, MN
Share this story

When someone suggests drinking at lunch

1 Share

Read the whole story
3886 days ago
Minneapolis, MN
Share this story
Next Page of Stories