Smart pipelining — reactive approach to computation scheduling

Many tech-eons ago (several human years now), my colleagues and I started developing an in-house big data platform. The goal was to ignite the company’s digital transformation and enable people to harvest the power of data. It was a greenfield project. There was nothing except the task, the company’s AWS account and our imagination. Now, in hindsight, I can say that that was all we needed.

Being a team of experienced people, we knew that what we start with is not going to be what we will end up with.

One can be an amazing visionary but can foresee business requirements only to certain extent.

There are always surprises along the way. And those are usually unpleasant ones.

So we took the time and let the product evolve – shape itself, if you will.

Phase one — linear computations

At first, both business requirements and the initial set of computations were simple.There were roughly a dozen of daily ETL processes. Each of those computations could live on its own, taking in input and producing output without colliding with any other.

As everything was smooth, we decided to choose AWS Data Pipeline service as the backbone of our platform. It was easy to set up and convenient for us to start.

AWS Data Pipeline service provides a number of “jobs”, such as shell activity (running anything that can be ran in shell), SQL activity, Redshift activity (for copying data to/from AWS Redshift warehouse) and other. 

The latter provides functionality of scheduling running times of the pipelines so we could easily configure a job to run at 2AM every day, for example. At Casumo call these pipelines scheduled or hardcoded.

It felt like we made a good choice — we had a job orchestration and scheduling at the same time.

Idempotence was something we insisted on from the very beginning.

Having a set of batch computations, we had to be sure that the results would not change with every rerun. Doubling, tripling and quadrupling the number of output rows when rerunning a job is not acceptable for any of our cases.
That being said, our Hive tables are partitioned and partitions are being overwritten on each computational run. When it comes to our Redshift tables, similar applies: we rely on RedshiftCopyActivity’s insertMode property and its OVERWRITE_EXISTING value.

At the end of phase one, we ended up with a set of linear, independent computations, all idempotent and all scheduled at corresponding moments of the day.

Phase two — dependencies entangled

By then, things had started to get more complex. Our once simple computations grew and started developing dependencies amongst each other. What once was a set of independent jobs, became a dependency chain, then a dependency tree, then a dependency thing

Computation A had to be ran before computations B and C, but C also needed computation D to finish. Computation B maybe needed D, E and F. And on it went.

Since all jobs were scheduled and we knew their running times, we were able to compute their end times, to schedule ones that should come after.

The level of wrongness of this approach is astonishing.

Imagine job B depending on job A — it uses output of A as its input.

Job A starts at 1am and takes about 1 hour to finish.

In that case, job B could be scheduled for 2am and everything should be fine.

But what if for some reason job A has much more data to process than usual. Let’s say there was an aggressive marketing campaign the day before that drove more traffic or simply, over time, the business is doing good and data size is increasing?

Job B would start, job A would not produce complete results, and B would work on incomplete input.

One could tweak the scheduled times for each computation from time to time and fix the issue, but this is not how it should be done. This is not flexible and doesn’t scale well with the number of computations increasing.

At this moment in time, we knew we haven’t implemented the optimal solution, but still we moved on with defensive mechanism: each computation had to sleep for some time before actually starting, giving it chance to wait for its dependencies to finish if they are still running.

At the end of phase two, we had a complex set of entangled jobs, building a branching chain of dependencies. Computations were able to wait for their dependencies to finish, but not intelligently — if something took extra long time to finish, the whole structure would produce wrong results.

Phase three — race with time

Photo by Mat Brown from Pexels

After applying the fix and postponing the same problem until unknown moment in the future, we knew that we have to tackle this challenge once and for all. In addition to described in Phase 2, it was time to integrate a consumer of the results into the whole computational chain.

Company already had a BI solution in place and we wanted it to import data our batch jobs produced.

Because of the BI tool’s way of working, we had to do that before certain moment during the night — if we failed to do so, BI solution would import incomplete results. There would be no difference for people looking into BI graphs and widgets — they would look into incomplete data without knowing it is incorrect. Imagine the level of confusion seeing acquisition numbers going down in just a day, while you know there were no actions that would cause that behavior. Some seriously wrong decisions could be made based on incomplete and incorrect data. Nobody wants that to happen.

We realized we are fighting a lost battle with the Time.
Along with problems of dependencies and job scheduling, Time got us surrounded from both sides. Computations had to wait for one day to end, in order to have the data complete, and had to finish processing before certain point at night, to have the results ready for the BI tool.

This is when we realized we have to make a twist in our approach — we couldn’t beat Time with time. But we can beat it if we don’t care about it.

We went for something we now call “a reactive approach”. When we really want to brag, we call it smart scheduling (although it is a natural thing to develop, nothing really super-smart).

How does it work? Easy to explain in several steps…

  • Each computation “knows” who it is depending on
  • Each computation fires a notification that it has finished
  • Each computation waits for notifications from all of its dependencies before starting
  • Importing data into BI is done in same manner, only when everything prior has been finished

Wrap up

There are some details of this approach that I haven’t mentioned so far which are very important.

For example, one might ask why we didn’t go for some of the existing scheduler and orchestrator tools?

They weren’t mature when we started the whole story.

And we wanted to own the tool.

And we didn’t want to have any infrastructure for it.

That is it, you read it right. Our scheduler implementation doesn’t ask for any infrastructure.

By choosing AWS services cleverly, we managed to keep its expenses down to 0$ per month.

More on that in some future post soon. I hope I got you intrigued.