I recently was working on a larger ETL process that started with the reception of various data files via SFTP that were delivered on varying schedules. The requirement was that as files are received we generate a unique event in a database, then execute a sequence of commands to archive the files out of the delivery directory and offline to a central immutable annotated file repository.
This new functionality had to integrate with an existing SFTP legacy server, and would likely have other uses outside of this initial use-case.
Looking around for simple solutions based on a scripting language, I really could not find any that would work or be extensible enough for the need. Hence I ended up writing io-event-reactor.
The basic concept is this; you have a monitor that listens for IO events for particular paths on the filesystem. As these IO events occur, they are passed on to one or more evaluators to decide whether or not the IoEvent should be reacted to by one or more configured reactors. The entire latter sequence is encapsulated in an IoReactor instance that manages the flow between the three described components.
With this module, you construct and configure a single IoReactorService which can manage and contain one or more IoReactor instances, as many as you wish, providing for lots of flexibility for reacting to filesystem events.
When you configure the IoReactorService and its IoReactor instances, you specify which plugins you would like to use to fulfill the monitor and reactor roles. For evaluators you simply provide one or more functions which evaluate whether or not an IoEvent should be passed on to one or more reactors.
For reactor plugins, I developed two based on my initial needs.
- One for inserting records into a MySql database via node-mysql, available at: https://github.com/bitsofinfo/io-event-reactor-plugin-mysql
- The other for executing shell commands based on stateful-process-command-proxy, available at: https://github.com/bitsofinfo/io-event-reactor-plugin-shell-exec
For an real-world example of the kind of application you could build on top of this, check out io-overwatch (albiet a simple utility) at: https://github.com/bitsofinfo/io-overwatch
Everyone once in a while during the life cycle of any given piece of software comes that time where you have the opportunity to improve it in a major way….if that is, its lucky enough to still be in production.
One particular system I’ve been involved with is responsible for processing a lot of data and keeping that data in sync across many systems. For purposes of this little case study I’ve dumbed down the overall use-case, concept, architecture and implementation details to this simple idea. We need to synchronize data.
Something in the environment (i.e. a user or other process) makes a request for some operation to be done that generates a change operation against a “DataEntry”. This DataEntry is manipulated in the primary database and then the change needs to be synchronized numerous other systems to count. The changes could be described as “create DataEntry item number XYZ”, “Mutate DataEntry XYZ in fashion Z” or simply “Delete DataEntry item XYZ”.
Each target system where a DataEntry is to be synchronized is called a DataStore and involves its own complicated process of mutating our representation of a DataEntry into the target DataStore’s representation and the means to do it can vary wildly; i.e. web-service calls, RDBMS dml, nosql operations etc etc. Not to mention, as with any integration, each of these DataStore sync calls has the possibility being fast, very slow, not working at all, or experiencing random transient failures.
For most of its life the system functioned as follows, each DataEntry mutated in the system was placed in a queue, and then processed by a consumer node’s DataSyncProvider who’s responsibility is to determine all the DataStores to process the DataEntry in via interrogating a DataStoreLocator and then make sure it happens. It worked similar to the diagrams below (highly simplified!), and note the bottleneck.
Version 1 issues
Version 1 functioned fine for most of its life, however the biggest issues with is were simply its lack of efficiency and speed in synchronizing any given DataEntry across all of the DataStores it was applicable for. More often than not any given DataEntry mutation would result in dozens of target DataStores that it needed to be synchronized against. Due to the sequential processing of each DataStore, accommodating for retries, and waiting for the overall result….before moving on to the next one, this would result in a sizable delay until the mutation materialized in all target DataStores. (not to mention lack of good core utilization across the cluster). What did this mean? Well an opportunity for improvement.
Obviously, the choice here was to move to asynchronous parallel DataStore execution and decoupling from the main DataEntry mutation consumer thread(s)….. and there are many ways you could go about doing that. Fortunately the overall modeling of the synchronization engine enabled considerably flexibility in swapping out the implementation with a little refactoring. The key points being introducing the concept of a DataEntry logic execution engine; aptly named LogicExecutionEngine and adding a new implementation of our DataStoreLocator that could decouple any given DataStore’s location from any dependency on its actual residency within the local JVM.
Great. Now that the modeling is adjusted, what about implementation? For one, there was no interest it writing a multi-threaded execution engine, even though one could with the modeling in place; any implementation could have been be developed and plugged in. That said, after looking around for a good framework that provided location transparency, parallel execution management, clustering and good resiliency, it was decided that Akka, and moving to an Actor model for the new engine would be a good fit.
As shown above, the DataStores actually are now implemented via an ActorRef version which is then passed to the LogicExectionEngine who’s new Actor based implementation injects them into yet another Actor for the DataEntry logic processing awaiting a Future<Result>. This model increased overall execution time to completion by roughly 80% as everything now executed in parallel.
Another benefit was additional resiliency and distribution of load due to the location transparency of the actual DataStore itself. Utilizing Akka’s various Routers, such as in this case the ClusterRouterGroup Actor, we were able to further redistribute the processing of any given DataStore workload across the cluster and appropriately react as nodes came on and offline. See exploded view below.
Lastly, the diagram below shows how execution of these DataEntry tasks is now more evenly distributed across the entire set of available nodes in the cluster. All nodes can now be potentially involved in processing any DataEntry workload. Also by feeding dynamic configuration into the construction of each ClusterRouterGroup Actor the system could also fine tune the distribution and amount of Actors in the cluster that are available to process entries targeted at any given DataStore. This permits for custom down-scaling based on the limitations or load ceilings that any given downstream target DataStore may present. In other words it permits throttling of loads.
Overall my experience with Akka was positive. After working some of the bugs out, so far in production this solution has been quite stable and Akka’s clustering protocol quite stable. If you are considering moving to a more reactive design approach for the back end of a system, I highly recommend giving Akka a consideration.
Lastly, as always I highly recommend going with a pure interface oriented design in any system you build. In this use-case, this system’s entire platform itself, having been designed from the ground up using interfaces extensively and then plugging in different “providers” (i.e. things like Camel or Akka) for each aspect of implementation has proved out to be very important as it has evolved over time. This gives the system tremendous flexibility as it matures over time and additional longevity.