February 26, 2020

Scaling ETL with Scala

Dave Handy, Protenus

When I joined Protenus in 2015, the first version of our ETL “pipeline” was a set of HiveQL scripts executed manually one after another. The company, still a start-up focused on proving out the analytics and UX, had adopted Spark, Hive, and MongoDB as core technologies. With our Series A funding round completed, my first task was to take these scripts and build out an ETL application. The first attempt naturally adopted Spark and Hive as primary technologies and added state management. This version got us through our next few clients.

Months later, when we realized another change was needed, we were fully invested in the framework we had built. Among the issues that arose (and there were several) our clients were not yet interested in our SaaS offering and were opting for on-site installations. We were just a small startup company. Maintaining multiple on-site installations with a big data stack was proving untenable for us and our customer IT shops. Was there a way we could change the conversation?

Our CTO, Chris Jeschke, proposed a third option: on-site ETL and UI with cloud analytics on anonymized data. Only anonymized data necessary for our product would upload to our cloud, and the on-site system requirements would be drastically reduced. For this to work, our ETL package needed to be simple enough for our customers to install and operate themselves. Complicated on-site installations of HDFS, Spark, and Hive were a liability.

Performance

We decided to stick with Scala and add Akka Streams. Even though Protenus doesn’t need to support streaming data, Akka Streams gave us the tools to manage CPU and RAM efficiently. In our old Spark model, each ETL step was represented by transforming a partition of data from one Hive table to another table structure, and ultimately into a MongoDB collection; one step ran at a time. In the new architecture, each ETL step would be an Akka Streams “Flow”: they would all run in parallel to keep memory usage down, and output directly to MongoDB. To scale further, multiple instances process different incoming files in parallel, using a simple database record locking technique.

Multiple EHR Systems

On top of the three different deployment models, we needed to scale for different EHR systems. To ensure as much reuse as possible, we adopted a plugin architecture. Domain models and type aliases for common “Flow” types are defined in a core package. Each plugin class is discovered via Java’s ServiceLoader. The plugin class creates a scaldi Module. All the scaldi Module instances are merged together to form a single scaldi Injector. This context is then used to discover all of the individual pieces of the Akka Streams processing graph and connect them. 

Our ETL code is written in pure Scala, with simple APIs for each supported file type (CSV, XML, JSON, and Avro). This dramatically improves readability and testability, allowing the team to focus on the transformation logic rather than the framework.

Nested Contexts

The scaldi TypesafeConfigInjector provides a clean way to access configuration values. However, we needed to configure multiple instances of the same class within different contexts. Here’s an example of the basic config structure we wanted to support.

epic {
  csv.delimiter = “\t”
}
epic2 {
  csv.charset = “ISO-8859-1”
}
meditech {
  csv.delimiter = “|”
}

Our first attempt to load this type of config involved adding “prefix” arguments to classes that loaded configuration values, which quickly became complex and error prone.

<script src="https://gist.github.com/dave-handy/45d9fc4d7059ed5a60da06ab73dd8e92.js"></script>

Fortunately, we were able to layer some logic on top of scaldi’s Module class to incorporate this prefixing technique, so that we could remove the prefix arguments.

<script src="https://gist.github.com/dave-handy/ab188221e1b52383f54b60fba9727b16.js"></script>

This technique works well for configuration because all config values have String identifiers. But what about other types of bindings? Ideally, we want to instantiate a single instance of CSVParserSettings within each context, and then call inject[CSVParserSettings] to get the correct instance. To support this, we introduced a new class, NestedModule, which simply checks the internal list of bindings, and then checks the outer context’s bindings. When used together, these classes fully encapsulate the DI context.

Here’s an example of what our plugin classes look like with these concepts.

<script src="https://gist.github.com/dave-handy/f717307713d33d2283f8d7c0caef7707.js"></script>

In the real world, we have many more parsers for each module, and many other contextual bindings specific to each plugin. If you’ve seen this concept implemented in other DI frameworks I’d love to hear about it.

Final Thoughts

While our transition out of Spark was incredibly beneficial, we never ended up deploying any clients in the hybrid architecture. After achieving some key security certifications, customers began to buy our SaaS product. Since then we’ve been able to convert all of our original on-site deployments to our cloud. Nevertheless, the constraints of that proposed architecture helped us focus on drastically simplifying our entire ETL pipeline.

If you’d like to hear more about engineering at Protenus, please check out my coworker’s articles on Scaling Infrastructure for Growth and Engineering Culture. You can also connect with me on LinkedIn and Twitter.