This is a long overdue followup to my prior article titled “Reactive re-engineering with Akka” in which I described an approach to processing large amounts of data more efficiently by using reactive actors and how they were used to massively scale up an existing data synchronization platform that was starting to bottleneck.
In this article I wanted to expand on it by discussing the topic of retrying failed synchronizations of data. The concept of retrying failed operations can be pretty complicated, especially in event driven systems where manipulations of the source data results in near immediate synchronizations of that data to N other integrations. What if one or more of the targets fail to receive the latest copy of the source data due to fatal or transient errors? Can I retry it? When? What will retry it? When I attempt the retry is the event still relevant?
With batch driven systems that typically run on some sort of scheduled interval, you trade the speed of data synchronization (that come via events) for simpler rules where you are simply re-analyzing the entire data set on on schedule. You know you will always have a chance to “re-sync” in the future. The problem is you are having to scan your entire data set for changes (via timestamps OR full deltas) to determine what needs to be re-synced. Depending on the volume of your data set and the rate at which it is updated, this can be a ton of analysis that has to happen over and over and over… vs just letting your system be driven entirely by events.
So what is this example system I am talking about and what does it do?
At its core, we are talking about a data synchronization platform.
Something in the environment (i.e. a user or other process) makes a request for some operation to be done that generates a change operation against a “DataEntry”. This DataEntry is manipulated in a primary database and then the change needs to be synchronized across numerous other systems. The changes could be described as “create DataEntry item number XYZ”, “Mutate DataEntry XYZ in sytem Z” or simply “Delete DataEntry item XYZ”. These changes fire events.
Each target system where a DataEntry is to be synchronized is called a DataStore and involves its own complicated process of mutating our representation of a DataEntry into the target DataStore’s representation and the means to do it can vary wildly; i.e. REST apis, web-service calls, RDBMS dml, nosql operations etc etc. Not to mention, as with any integration, each of these DataStore sync calls has the possibility being fast, very slow, not working at all, or experiencing random transient failures.
The example data synchronization platform looks like this diagram which is covered in much more detail in my prior article. When actions are taken against a DataEntry an event is created for that DataEntry and it is placed in global processing queues. Next DataSyncProviders consume events off these queues and then determine the appropriate set of DataStore‘s to to sync the DataEntry against. The marriage of DataEntries and DataStores takes place via LogicExecutionEngines.
The LogicExecutionEngines implement the actual synchronization flow, which can vary wildly based on the actual event source operation that occurred (i.e. think create vs delete vs [custom biz operation]); but at the end of the day the operations to be done are executed across the cluster using Akka Actor based implementations of DataStores, which do the actual underlying work of talking to numerous other systems to sync the actual data in question.
Real time retrying
When a DataEntry is mutated and triggers an event that spawns the entire synchronization process, we need to react quickly and only have a short period of time to do some real time retrying should a failure occur.
What wasn’t discussed in the prior article were some of the details of the real time retry behavior that occurs within each DataStore Actor, which is implemented via a RetryWrapper. Each Actor wraps a DataStore that is itself, wrapped within the RetryWrapper which interprets the result of each persist() to determine first if its a failure or success, and if a failure, can the operation be retried. If so it will retry appropriately based on configuration that is sourced during its construction via the retry wrapper config database.
The main consideration here is that the LogicExecutionEngine is awaiting a result from the actor. The persist() request must return in a timely manner as we are within a near real-time window of execution relative to the original source event. Generally retryable conditions that will present here are transient in nature (network errors or rate limit issues) and we only have so much time to retry within before we absolutely have to return some status/result back to the origin of the request… otherwise the LogicExecutionEngine will flag the request to persist() as an error due to a timed out condition awaiting a response. This timeout period is roughly several minutes in size; but varies considerably system to system.
So what happens if all retries fail within the event window? Keep in mind that if a target system is having an outage leading to transient errors, such outages typically can last much longer than the available time for retries in this event driven scenario. Having a backup method for retrying becomes important.
Post event retrying
For event driven synchronizations of data, the validity of the triggering event is only valid for a short period of time so we can’t keep retrying the operation forever. Why? Well lets say the source DataEntry event is for an UPDATE of property X with value A. When this UPDATE-X-A event flows through the system it should acquire a read-write lock/semaphore on X, but we can’t hold that lock forever because a followup UPDATE-X-B event could be triggered that is waiting on it and we need that event to have a chance of persist()ing as well. Therefore, for the duration window for retries during the processing of the real time event is kept relatively short. Eventually we have to give up and yield back to the LogicExecutionEngine. In the example system, all operation results from DataStores is recorded permanently in a ledger/audit database.
The result database is key here as it contains everything needed to reconstruct and original event so that we can retry it later during “post event retrying”.
Post event retrying however is a bit more complicated; you can’t just load every failed event and blindly re-execute it. Why? Mainly because time has now passed since the original event occurred and it may no longer be relevant. For example, lets say UPDATE-X-A ultimately failed, but UPDATE-X-B succeeded; in this case UPDATE-X-A was superseded by UPDATE-X-B so there is no need to retry it.
That’s a simple example, however depending on the types of operations your system generates for various DataEntries your rules may have to be a bit more sophisticated. For example: failed UPDATES are likely also superseded by newer DELETES, or what if the particular integration that a failure occurred against is no longer active? What is the original DataEntry no longer exists, or is now in a different state that is not compatible with the original operation anymore? My point here is that this will require some careful thinking on your part to get the post event retry behaviors right so that you don’t retry failed events when they are no longer relevant; or could induce a invalid data corruption issue in a downstream system. What criteria defines a prior failed event operation as being “relevant” or “not relevant” is entirely contingent upon your individual implementation and business rules.
Tackling the problem
That said how might you address this? Since we are storing all real time event results (both successes and failures) we can build an out of band process that runs on a schedule (or invoked on demand) that will analyze all stored failed event results meeting a certain set of criteria (may vary by invocation). Logically this may be expressed such as “re-evaluate all failed events for target [DataStore-X, N] and since [date] having DataEntry [type] and [prop=value]”.
It might look like this in the diagram below:
As described previously, retrying old failed events is not just as simple as finding all failures and blindly reprocessing them. Instead as you see above we have a stack of gates which are created for each event being evaluated which do custom evaluations based on the type of the event operation being checked. For example, first there are “common rules” which could be things like: “ensure that no more recent successes negate the relevance of the prior failed event”; or basic checks like “is the source error reason even retryable?” by consulting the same retry wrapper configs that the real-time retries utilize.
Beyond “common rules”, we then get into the more complex set of rules which vary based on the context/operation of the original failed event. For example if the source failed DataEntry event is of type CREATE, only the CreateBehaviorLogic handler would process it, and inside of the CreateBehaviorLogic is where custom rules could be implemented that might do something like “only re-process if item still exists in our database”; vs if the failure was for type UPDATE, we not only skip reprocessing if there is a more recent UPDATE, but fail fast if the prior DataEntry operation was negated by [CustomOperation Y].
The key point here is that having custom behavior implementations permit you to implement the more complicated retry logic which will vary based on your organization’s rules, the various operation types possible per DataEntry type and what future event combinations can negate prior failures.
Once a failed DataEntry event passes all gates/checks and is determined to be retryable, a brand new event is created for it, seeded with the latest data for that entry, decorated with pointers back to the evaluation run ID + a link to the original source event ID and then sent along into the primary DataEntry event queue to be processed normally as if it had been generated by any other normal system operation. If it fails again, its just recorded and becomes a candidate itself for another retry if the criteria is met. The system is a continuous circle.
Again, I wanted to write this post a while ago but never got around to it. Properly implementing retry logic is not always a simple thing, especially when you are dealing with events that occurred in the past, then balancing them with current system state. What is described here is one of many possible ways the problem can be approached, but in my experience it works pretty well.
Hopefully you’ll find it useful should you find yourself in a similar situation, or at least provide a further basis for brainstorming your own solution.