Tagged: clustering

Reactive re-engineering with Akka

Everyone once in a while during the life cycle of any given piece of software comes that time where you have the opportunity to improve it in a major way….if that is, its lucky enough to still be in production.

One particular system I’ve been involved with is responsible for processing a lot of data and keeping that data in sync across many systems. For purposes of this little case study I’ve dumbed down the overall use-case, concept, architecture and implementation details to this simple idea. We need to synchronize data.

Use-Case

Something in the environment (i.e. a user or other process) makes a request for some operation to be done that generates a change operation against a “DataEntry”. This DataEntry is manipulated in the primary database and then the change needs to be synchronized numerous other systems to count. The changes could be described as “create DataEntry item number XYZ”, “Mutate DataEntry XYZ in fashion Z” or simply “Delete DataEntry item XYZ”.

Each target system where a DataEntry is to be synchronized is called a DataStore and involves its own complicated process of mutating our representation of a DataEntry into the target DataStore’s representation and the means to do it can vary wildly; i.e. web-service calls, RDBMS dml, nosql operations etc etc. Not to mention, as with any integration, each of these DataStore sync calls has the possibility being fast, very slow, not working at all, or experiencing random transient failures.

Version 1

For most of its life the system functioned as follows, each DataEntry mutated in the system was placed in a queue, and then processed by a consumer node’s DataSyncProvider who’s responsibility is to determine all the DataStores to process the DataEntry in via interrogating a DataStoreLocator and then make sure it happens.  It worked similar to the diagrams below (highly simplified!), and note the bottleneck.

Screen Shot 2016-03-19 at 5.37.13 PM

Version 1, synchronization flow, within one node

Screen Shot 2016-03-19 at 5.38.55 PM

Version 1. Overall cluster view

Version 1 issues

Version 1 functioned fine for most of its life, however the biggest issues with is were simply its lack of efficiency and speed in synchronizing any given DataEntry across all of the DataStores it was applicable for. More often than not any given DataEntry mutation would result in dozens of target DataStores that it needed to be synchronized against. Due to the sequential processing of each DataStore, accommodating for retries, and waiting for the overall result….before moving on to the next one, this would result in a sizable delay until the mutation materialized in all target DataStores. (not to mention lack of good core utilization across the cluster). What did this mean? Well an opportunity for improvement.

Version 2

Obviously, the choice here was to move to asynchronous parallel DataStore execution and decoupling from the main DataEntry mutation consumer thread(s)….. and there are many ways you could go about doing that. Fortunately the overall modeling of the synchronization engine enabled considerably flexibility in swapping out the implementation with a little refactoring. The key points being introducing the concept of a DataEntry logic execution engine; aptly named LogicExecutionEngine and adding a new implementation of our DataStoreLocator that could decouple any given DataStore’s location from any dependency on its actual residency within the local JVM.

Great. Now that the modeling is adjusted, what about implementation? For one, there was no interest it writing a multi-threaded execution engine, even though one could with the modeling in place; any implementation could have been be developed and plugged in. That said, after looking around for a good framework that provided location transparency, parallel execution management, clustering and good resiliency, it was decided that Akka, and moving to an Actor model for the new engine would be a good fit.

Screen Shot 2016-03-20 at 11.44.57 AM.png

Version 2. Actor based DataStore’s and LogicExecutionEngine

As shown above, the DataStores actually are now implemented via an ActorRef version which is then passed to the LogicExectionEngine who’s new Actor based implementation injects them into yet another Actor for the DataEntry logic processing awaiting a Future<Result>. This model increased overall execution time to completion by roughly 80% as everything now executed in parallel.

Another benefit was additional resiliency and distribution of load due to the location transparency of the actual DataStore itself. Utilizing Akka’s various Routers, such as in this case the ClusterRouterGroup Actor, we were able to further redistribute the processing of any given DataStore workload across the cluster and appropriately react as nodes came on and offline. See exploded view below.

Screen Shot 2016-03-20 at 11.22.38 AM

Version 2. Exploded view of DataStore location transparency

Lastly, the diagram below shows how execution of these DataEntry tasks is now more evenly distributed across the entire set of available nodes in the cluster. All nodes can now be potentially involved in processing any DataEntry workload. Also by feeding dynamic configuration into the construction of each ClusterRouterGroup Actor the system could also fine tune the distribution and amount of Actors in the cluster that are available to process entries targeted at any given DataStore. This permits for custom down-scaling based on the limitations or load ceilings that any given downstream target DataStore may present. In other words it permits throttling of loads.

 

Screen Shot 2016-03-19 at 6.02.24 PM.png

Version 2. Better utilization of core resources across cluster

Overall my experience with Akka was positive. After working some of the bugs out, so far in production this solution has been quite stable and Akka’s clustering protocol quite stable. If you are considering moving to a more reactive design approach for the back end of a system, I highly recommend giving Akka a consideration.

Lastly, as always I highly recommend going with a pure interface oriented design in any system you build. In this use-case, this system’s entire platform itself, having been designed from the ground up using interfaces extensively and then plugging in different “providers” (i.e. things like Camel or Akka) for each aspect of implementation has proved out to be very important as it has evolved over time. This gives the system tremendous flexibility as it matures over time and additional longevity.

Hazelcast discovery with Etcd

I’ve used Hazelcast for years and have generally relied upon the availability of multicast for Hazelcast cluster discovery and formation (within a single data-center). Recently was faced with two things, expand the footprint into a non-multicast enabled data-center and secondly pre-prep the service for containerization where nodes will come and go as scaling policies dictate it…. hardwired Hazelcast clustering via an XML configuration and/or reliance on multicast is a no-go.

With Hazelcast 3.6, they now support a pluggable implementation for a cluster discovery mechanism called the Discovery SPI. (Discovery Strategy) Perfect timing, given we are already playing with Etcd as part of our Docker container strategy, this was an opportunity to let our application’s native clustering mechanism (coded on top of Hazelcast) to leverage Etcd as well as discover/remove peers both within, and potentially across data-centers.

So I coded up hazelcast-etcd-discovery-spi available on GitHub.

diag.png

This works with Hazelcast 3.6-EA+ and Etcd to provide (optional) automatic registration of your hazelcast nodes as Etcd services and automatic peer discovery of the Hazelcast cluster.

Note that the automatic registration of each hazelcast instance as a Etcd service is OPTIONAL if you want to manually maintain these key-paths in etcd. I added that in simply because I think it will be convenient for folks, especially when containerizing a Hazelcast enabled app (such as via Docker) where the less “dependencies” and manual things to do (i.e. register your hazelcast nodes manually).. the better. You can totally embedded this functionality with this discovery strategy SPI.

I hope others find this helpful, and please leave your feedback, pull-requests or issues on the project!

NOTE, if you are running your app in Docker you have a separate issue where you need to determine your own externally accessible IP/PORT that the docker host has mapped for you on 5701… well how can you determine that so that you can publish the correct IP/PORT info to Etcd? Check out: https://github.com/bitsofinfo/docker-discovery-registrator-consul

NOTE! Interested in consul? There is a separate project which is built around Consul for your discovery strategy located here: https://github.com/bitsofinfo/hazelcast-consul-discovery-spi

 

Hazelcast discovery with Consul

I’ve used Hazelcast for years and have generally relied upon the availability of multicast for Hazelcast cluster discovery and formation (within a single data-center). Recently was faced with two things, expand the footprint into a non-multicast enabled data-center and secondly pre-prep the service for containerization where nodes will come and go as scaling policies dictate it…. hardwired Hazelcast clustering via an XML configuration and/or reliance on multicast is a no-go.

With Hazelcast 3.6, they now support a pluggable implementation for a cluster discovery mechanism called the Discovery SPI. (Discovery Strategy) Perfect timing, given we are already playing with Consul as part of our Docker container strategy, this was an opportunity to let our application’s native clustering mechanism (coded on top of Hazelcast) to leverage Consul as well as discover/remove peers both within, and potentially across data-centers.

So I coded up hazelcast-consul-discovery-spi available on GitHub.

diag.png

This works with Hazelcast 3.6-EA+ and Consul to provide automatic registration of your hazelcast nodes as Consul services (without having to run a local Consul agent) and automatic peer discovery of the Hazelcast cluster.

Note that the automatic registration of each hazelcast instance as a Consul service is OPTIONAL if you already have Consul agents running that define your Hazelcast service nodes. I added that in simply because I think it will be convenient for folks, especially when containerizing a Hazelcast enabled app (such as via Docker) where the less “dependencies” like a Consul agent available on the host, or in the container (or another container).. the better. You can totally embedded this functionality with this discovery strategy SPI.

I hope others find this helpful, and please leave your feedback, pull-requests or issues on the project!

NOTE, if you are running your app in Docker you have a separate issue where you need to determine your own externally accessible IP/PORT that the docker host has mapped for you on 5701… well how can you determine that so that you can publish the correct IP/PORT info to Consul? Check out: https://github.com/bitsofinfo/docker-discovery-registrator-consul

NOTE! Interested in etcd? There is a separate project which is built around etcd for your discovery strategy located here: https://github.com/bitsofinfo/hazelcast-etcd-discovery-spi

 

Liferay clustering internals

Ever wondered how Liferay’s internal clustering works? I had to dig into it in context of my other article on globally load balancing Liferay across separate data-centers. This blog post merely serves as a place for my research notes and might be helpful for someone else who is trying to follow what is going on under the covers in Liferay in lieu of any real documentation, design document or code comments (which Liferay seems to have none of)

Current set of mostly unanswered questions at the Liferay community forums: https://www.liferay.com/community/forums/-/message_boards/recent-posts?_19_groupThreadsUserId=17865971

Liferay Cluster events, message types, join/remove

  • Anytime ClusterExecutorImpl.memberJoined/memberRemoved() is invoked all implementations of ClusterEventListener have processClusterEvent() invoked
    • MemorySchedulerClusterEventListener: calls ClusterSchedulerEngine.getMasterAddressString() which can trigger slaveToMaster or masterToSlave @see EXECUTE notes below
    • LuceneHelperImpl.LoadIndexClusterEventListener: @see NOTIFY/JOIN section below
    • LiveUsersClusterEventListenerImpl: @see DEPART/JOIN section below
    • DebuggingClusterEventListenerImpl: logs the event to the log
    • ClusterMasterTokenClusterEventListener: on any event, invokes getMasterAddressString() to determine who the master is, or become so itself if nobody is (controlled via entry in Lock_ table) After this runs it invokes notifyMasterTokenTransitionListeners() which calls ClusterMasterTokenTransitionListener.masterTokenAcquired() or masterTokenReleased()
      • Implementors of ClusterMasterTokenTransitionListener:
        • BackgroundTaskClusterMasterTokenTransitionListener: calls BackgroundTaskLocalServiceUtil.cleanUpBackgroundTasks() which in turn ends up invoking BackgroundTaskLocalServiceImpl -> cleanUpBackgroundTasks() which is annotated with @Clusterable(onMaster=true) gets all tasks with STATUS_IN_PROGRESS and changes their status to FAILED. Then gets the BackgroundTaskExecutor LOCK and UNLOCKs it if this server is the current owner/master for that lock…
    • JGroups
      • Member “removed”: triggers ClusterRequestReceiver.viewAccepted() that in turn invokes ClusterExecutorImpl.memberRemoved() which fires ClusterEventType.DEPART (see below)
    • ClusterMessageType.NOTIFY/UPDATE: These message types are requested from other nodes when a new node comes up. When received they are in-turn translated and re-broadcast locally within a individual node as a ClusterEvent of type JOIN via ClusterExecutorImpl.memberJoined() method. Note this latter class is different in the EE jar, the implementation of this method additionally invokes the EE LicenseManager.checkClusterLicense() method
      • “Live Users” message destination:
        • triggers actions done by LiveUsersMessageListener
        • invokes cluster RPC invocation of LiveUsers.getLocalClusterUsers()
        • response is processed by LiveUsers.addClusterNode
        • proceeds to “sign in” each user that is signed in on the other joined node, appears to do session tracking etc
      • LoadIndexClusterEventListener:
        • Loads a full stream of the lucene index per company id from any node that responds who has a local lucene index last generation long value that is > this node’s index’s value. This is only triggered on a JOIN event when the “control” JChannel’s total number of peer nodes minus one is <= 1, so that this only runs on a node boot and not at other times.
      • EE LicenseManager.checkClusterLicense()

        • Please see the notes below on licensing and verification in regards to clustering on bootup
    • ClusterEventType.DEPART: This event is broadcast locally on a node when it is shutdown
      • LiveUsersClusterEventListenerImpl: listens to local DEPART event and sends message over MessageBus to other cluster members letting them know this node is gone
    • ClusterMessageType.EXECUTE: These cluster messages contain serialized ClusterRequest objects that contain RPC like information on a class, parameters and a method to invoke on other node(s). Generated by ClusterRequest.createMulticastRequest and createUnicastRequest messages. The result of these are typically passed off to ClusterExecutorUtil.execute() methods (which end up calling ClusterExecutorImpl.execute())
    • ClusterRequest.createUnicastRequest() invokers:
      • LuceneHelperImpl: _getBootupClusterNodeObjectValuePair() called by getLoadIndexesInputStreamFromCluster when it needs to connect to another node only in order to load indexes on bootup from another node who’s index is up to date
      • ClusterSchedulerEngine: callMaster() called by getScheduledJob(s)(), initialize() and masterToSlave() only if MEMORY_CLUSTERED is enabled for the job engine. Initialize() calls initMemoryClusteredJobs() if the local node is a slave to get a list of of all MEMORY_CLUSTERED jobs. MEMORY_CLUSTERED jobs mean they only run on whoever the master is which is designated by which node owns the lock named “com.liferay.portal.scheduler.SchedulerEngine” in the Lock_ table in the database. masterToSlave() demotes the current node to a slave. slaveToMaster() gives current node opportunity to acquire lock above to become the scheduler master node
      • ClusterMasterExecutorImpl: used for executing EXECUTE requests against the master only. Leverages method getMasterAddressString() to determine who the master is (via which node owns the Lock_ table entry named “ClusterMasterExecutorImpl”.
    • ClusterRequest.createMulticastRequest() invokers:
      • JarUtil: Instructs other peer nodes to JarUtil.installJar(). Invoked by DataSourceFactoryUtil.initDataSource. Also by DataSourceSwapper.swap*DataSource(). SetupWizardUtil.updateSetup()
      • ClusterLoadingSyncJob: Invoked by EditServerAction.reindex(). This sends a multicast ClusterRequest of EXECUTE for LuceneClusterUtil.loadIndexesFromCluster(nodeThatWasJustReindexedAddress) to all nodes. So all nodes, even those on the other side of the RELAY2 bridge will get this, and then will attempt to connect over the bridge back to stream the “latest” index from the node that was just re-indexed. Note however that nodes across the bridge will output this stack trace in their logs when attempting to connect back to get a token on the re-indexed node (this routine is in _getBootupClusterNodeObjectValuePair() in LuceneHelperImpl and tries to do a unicast ClusterRequest to invoke TransientTokenUtil.createToken(). My guess on this error has something to do with the “source address” which is a jgroups local UUID, being sent to the remote bridge, and on receipt the local list in a different DC does not recognize that multicast address. This “source address” that is sent ClusterExecutorImpl.getLocalClusterNodeAddress(). Also see http://www.jgroups.org/manual/html/user-advanced.html#d0e3353. ClusterRequestReceiver *does* get invoked w/ a property scoped SiteUUID in the format of hostname:sitename, however ClusterRequestReceiver.processClusterRequest() defers to ClusterRequest.getMethodHandler().invoke() which has no knowledge of this originating SiteUUID and only the internal “local” jgroups UUID address that is embedded as an “argument” to loadIndexesFromCluster() in the ClusterRequest, hence this can’t effectively/properly call TransientTokenUtil.createToken() on the sender (and the error below). When it attempts the call via ClusterExecutorImpl (jgroups control channel .send()) using the “address” argument it received in the original inbound payload. We see the message is dropped cause the target address is invalid/unknown..
        • 21:10:30,485 WARN [TransferQueueBundler,LIFERAY-CONTROL-CHANNEL,MyHost-34776][UDP:1380] MyHost-34776: no physical address for 27270628-816b-ddba-872f-1f2d27327f2f, dropping message
          
        • 18:37:40,169 INFO [Incoming-1,shared=liferay-control][LuceneClusterUtil:47] Start loading Lucene index files from cluster node 9c4870e7-17c2-18fb-01c4-5817bb5d1290
          18:37:46,538 WARN [TransferQueueBundler,shared=liferay-control][TCP:1380] null: no physical address for 9c4870e7-17c2-18fb-01c4-5817bb5d1290, dropping message
        • 18:01:56,302 ERROR [Incoming-1,LIFERAY-CONTROL-CHANNEL,mm2][ClusterRequestReceiver:243] Unable to invoke method {arguments=[[J@431bd9ee, 5e794a8e-0b7f-c10d-be17-47ce43b66517], methodKey=com.liferay.portal.search.lucene.cluster.LuceneClusterUtil.loadIndexesFromCluster([J,com.liferay.portal.kernel.cluster.Address)}
          <pre>java.lang.reflect.InvocationTargetException
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:606)
          at com.liferay.portal.kernel.util.MethodHandler.invoke(MethodHandler.java:61)
          at com.liferay.portal.cluster.ClusterRequestReceiver.processClusterRequest(ClusterRequestReceiver.java:238)
          at com.liferay.portal.cluster.ClusterRequestReceiver.receive(ClusterRequestReceiver.java:88)
          at org.jgroups.JChannel.invokeCallback(JChannel.java:749)
          at org.jgroups.JChannel.up(JChannel.java:710)
          at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:1025)
          at org.jgroups.protocols.relay.RELAY2.deliver(RELAY2.java:495)
          at org.jgroups.protocols.relay.RELAY2.up(RELAY2.java:330)
          at org.jgroups.protocols.FORWARD_TO_COORD.up(FORWARD_TO_COORD.java:153)
          at org.jgroups.protocols.RSVP.up(RSVP.java:188)
          at org.jgroups.protocols.FRAG2.up(FRAG2.java:181)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:400)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:418)
          at org.jgroups.protocols.pbcast.GMS.up(GMS.java:896)
          at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:245)
          at org.jgroups.protocols.UNICAST2.up(UNICAST2.java:453)
          at org.jgroups.protocols.pbcast.NAKACK2.handleMessage(NAKACK2.java:763)
          at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:574)
          at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:147)
          at org.jgroups.protocols.FD_ALL.up(FD_ALL.java:187)
          at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:288)
          at org.jgroups.protocols.MERGE3.up(MERGE3.java:290)
          at org.jgroups.protocols.Discovery.up(Discovery.java:359)
          at org.jgroups.protocols.TP.passMessageUp(TP.java:1263)
          at org.jgroups.protocols.TP$4.run(TP.java:1181)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:744)
          Caused by: com.liferay.portal.kernel.exception.SystemException: java.lang.NullPointerException
          at com.liferay.portal.search.lucene.LuceneHelperImpl._getBootupClusterNodeObjectValuePair(LuceneHelperImpl.java:830)
          at com.liferay.portal.search.lucene.LuceneHelperImpl.getLoadIndexesInputStreamFromCluster(LuceneHelperImpl.java:451)
          at com.liferay.portal.search.lucene.LuceneHelperUtil.getLoadIndexesInputStreamFromCluster(LuceneHelperUtil.java:326)
          at com.liferay.portal.search.lucene.cluster.LuceneClusterUtil.loadIndexesFromCluster(LuceneClusterUtil.java:57)
          ... 32 more
          Caused by: java.lang.NullPointerException
          at com.liferay.portal.search.lucene.LuceneHelperImpl._getBootupClusterNodeObjectValuePair(LuceneHelperImpl.java:809)
          ... 35 more
      • PortalImpl: resetCDNHosts() does a ClusterRequest.EXECUTE for the same PortalUtil.resetCDNHosts() method on all peer nodes. To avoid recursive loop uses ThreadLocal to enter the EXECUTE block (relevant for remote nodes)
      • LuceneHelperImpl: _loadIndexFromCluster() (invoked via JOIN event above). Invoked via EXECUTE remote method calls against LuceneHelperUtil.getLastGeneration() against all peers to see who’s lucene indexes generation time is newer up earler than ME, picks that one and then initiate a direct http stream connection to it to fetch the entire index from it …. Who is a “peer” candidate is determined by invoking ClusterExecutorUtil.getClusterNodeAddresses() which is ultimately realized by calling the “control” channel’s getView() method
      • MessageSenderJob: notifyClusterMember() invoked by MessageSenderJob.doExecute() when a job execution context’s next execution time is null. The notifyClusterMember() method invokes SchedulerEngineHelperUtil.delete() via a ClusterRequest.EXECUTE
      • ClusterableAdvice: AOP advice class that intercepts any Liferay method annotatted with @Clusterable, AFTER returning, will then create a ClusterRequest.EXECUTE for the same method passed to all nodes in the cluster. Used by ClusterSchedulerEngine (delete, pause, resume, schedule, suppressError, unschedule, update) BackgroundTaskLocalServiceImpl, and PortletLocalServiceImpl. Has an ‘onMaster’ property that awkwardly controls whether or not the advice will actually be applied or not. If onMaster=false, then the advice fast returns like a no-op. But if onMaster=true then the advice runs (locally on the node if THIS node is the master, or via a ClusterRequest.EXECUTE). WHO is the master is dictated by which nodes owns the LOCK in the database named “com.liferay.portal.scheduler.SchedulerEngine”.
      • EhcacheStreamBootstrapCacheLoader: start() and doLoad() invokes EhcacheStreamBootstrapHelpUtil -> loadCachesFromCluster() broadcasts a ClusterRequest for EXECUTE of the method createServerSocketFromCluster() on all peer nodes, and then attempts to connect to a peer node to load all named caches from the peer node…
  • ClusterExecutorImpl notes:
    • This class has several methods relating to figuring out “who” is in the “cluster” and due to the implementation of these methods you will get inconsistent “views” of who are the local cluster members (if using RELAY2 in JGroups)
      • getClusterNodeAddresses() = consults the “control” JChannel’s receiver’s “view” of the cluster. This will only return local nodes subscribed to this JChannel. These are fetched real-time every time its invoked.
      • getClusterNodes() = consults a local map called “_liveInstances”. Live instances is populated when the “memberJoined()” method is invoked, in response to ClusterRequestReceiver.processClusterRequest() being invoked when a new node comes up. Note that because this is in response to ClusterRequests…. which WILL be transmitted over RELAY2 in JGroups, this will reflect MORE nodes than what getClusterNodeAddresses() returns…..
      • There is also another local member called “_clusterNodeAddresses” which also is populated w/ addresses of other nodes when “memberJoined()” is invoked…. I really don’t understand why this exists, AND getClusterNodeAddresses() exists, AND getClusterNodes() exists when they could potentially return inconsistent data.
  • Quartz/Jobs scheduling
    •  ClusterSchedulerEngine proxies all requests for scheudling jobs through SchedulerEngingProxyBean (BaseProxyBean) which just relays schedule() invocations through Messaging which ends up invoking QuartzSchedulerEngine.schedule()
    • ClusterSchedulerEngine on initialization calls getMasterAddressString() which gets the Lock from the _Lock table named “com.liferay.portal.scheduler.SchedulerEngine” via LockLocalServiceImpl.lock() which is a row based locking in the database this is based on “owner” which is based on the local IP?
    • _doMasterToSlave() – queries current master for list of all MEMORY_CLUSTERED jobs (does this over RMI or Jgroups method invoke of getScheduledJobs), gets this list of job names and unschedules them locally
    • slaveToMaster() – takes everything in _memoryClusteredJobs and schedules it locally. _memoryClusteredJobs seems to just be a global list of the MEMORY_CLUSTERED jobs that are kept track of on each node…
  • Licensing: If running Liferay EE the LicenseManager.checkClusterLicense() -> checkBinaryLicense() is invoked via ClusterExecutorImpl.memberJoined() (note that this is only invoked in the EE version as the class is different in the portal-impl.jar contained in the EE distribution). This particular invocation chain appears to respond to the JOIN event for the peer node by checking if the “total nodes” the node knows about is greater than the total number of nodes the license permits, if it is, the local node gets a System.exit() invoked on it (killing itself). For learning purposes if you are so interested to learn more about this obfuscated process, you can do so by investigating further on your own, its all in the EE portal-impl.jar. Here are some other notes on the licensing and how it relates to clustering and issues one might encounter.
    • LicenseManager. getClusterLicense() invokes ClusterExecutorUtil.getClusterNodes() and does validation to check if you have more nodes in the cluster than your installed license supports (amongst other things).
    • Another thing that happens when ClusterExecutorImpl.memberJoined() is invoked is that ClusterExecutorImpl.getClusterNodes() appears to be invoked (which @see above, in addition to local peer nodes in the same DC, will return nodes that exist across a JGroups RELAY2 bridge). It picks one of these peer nodes and remotely invokes LicenseManager.getLicenseProperties() against it, this is done via a ClusterExecutorUtil.execute(). The return is a Future with a timeout, upon reception the Future has a Map in it containing the license properties, these are then compared agains the local node’s license, if they match we are good to go, otherwise….. read below (a mixing licenses error will occurr) Why is this relvent? Well despite having legit licenses, when using RELAY2, you can come across two erronious errors that will prevent your cluster from booting, as described below.
      • The first error you might see is a timeout error, when it reaches out to a peer node to verify a license in response to a new member joining the cluster (likely a member over a RELAY2 bridge). It will look like the below. Typically just rebooting Liferay takes care of the issue, as explained in the followup bullets:
        • java.util.concurrent.TimeoutException
          at com.liferay.portal.kernel.cluster.FutureClusterResponses.get(FutureClusterResponses.java:88)
          at com.liferay.portal.license.a.a.f(Unknown Source)
          at com.liferay.portal.license.a.a.b(Unknown Source)
          at com.liferay.portal.license.a.f.d(Unknown Source)
          at com.liferay.portal.license.a.f.d(Unknown Source)
          at com.liferay.portal.license.LicenseManager.c(Unknown Source)
          at com.liferay.portal.license.LicenseManager.checkBinaryLicense(Unknown Source)
          at com.liferay.portal.license.LicenseManager.checkClusterLicense(Unknown Source)
          at com.liferay.portal.cluster.ClusterExecutorImpl.memberJoined(ClusterExecutorImpl.java:442)
          at com.liferay.portal.cluster.ClusterRequestReceiver.processClusterRequest(ClusterRequestReceiver.java:219)
          at com.liferay.portal.cluster.ClusterRequestReceiver.receive(ClusterRequestReceiver.java:88)
          at org.jgroups.JChannel.invokeCallback(JChannel.java:749)
          at org.jgroups.JChannel.up(JChannel.java:710)
          at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:1025)
          at org.jgroups.protocols.relay.RELAY2.up(RELAY2.java:338)
          at org.jgroups.protocols.FORWARD_TO_COORD.up(FORWARD_TO_COORD.java:153)
          at org.jgroups.protocols.pbcast.STATE_TRANSFER.up(STATE_TRANSFER.java:178)
          at org.jgroups.protocols.FRAG2.up(FRAG2.java:181)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:400)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:418)
          at org.jgroups.protocols.pbcast.GMS.up(GMS.java:896)
          at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:245)
          at org.jgroups.protocols.UNICAST.up(UNICAST.java:414)
          at org.jgroups.protocols.pbcast.NAKACK2.handleMessage(NAKACK2.java:763)
          at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:574)
          at org.jgroups.protocols.BARRIER.up(BARRIER.java:126)
          at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:147)
          at org.jgroups.protocols.FD.up(FD.java:253)
          at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:288)
          at org.jgroups.protocols.MERGE2.up(MERGE2.java:205)
          at org.jgroups.protocols.Discovery.up(Discovery.java:359)
          at org.jgroups.protocols.TP$ProtocolAdapter.up(TP.java:2610)
          at org.jgroups.protocols.TP.passMessageUp(TP.java:1263)
          at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1825)
          at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1793)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:745)
    • When booting up several Liferay EE nodes (configured for clustering) simultaneously it appears you can run into a situation where all the nodes fail to boot and say they have “invalid licenses” (despite the fact you have valid cluster licenses installed under data/license)…and then your container processes are automatically shut down… nice
    • This appears to be due to the verification process that LicenseManager does on bootup where it receives license information from peer nodes, however if all nodes are started up at the same time it appears there is a condition of timing, when none of the nodes are yet in a “ready” state, so when peers connect to them for their license properties, they respond w/ nothing…. which is then compared against the local node’s license and so since they don’t “equals()” it decides the local license (despite being legit) is invalid..
    • Here is example output from NODE1 (booted up at same time as NODE2, note the blank remote license):
        • 13:55:20,381 ERROR [Incoming-5,shared=liferay-control][a:?] Remote license does not match local license. Local license: {productEntryName=Liferay Portal, startDate=11, expirationDate=22, description=Liferay Portal, owner=Liferay Portal, maxServers=4, licenseEntryName=Portal6 Non-Production (Developer Cluster), productVersion=6.1, type=developer-cluster, accountEntryName=Liferay Trial, version=2}
          Remote node: {clusterNodeId=11111111111, inetAddress=/192.168.0.23, port=-1}
          Remote license: {}
          Mixing licenses is not allowed. Local server is shutting down.
      • Here is example output from NODE2 (booted up at same time as NODE1):
        • 13:55:20,388 ERROR [Incoming-1,shared=liferay-control][a:?] Remote license does not match local license. Local license: {productEntryName=Liferay Portal, startDate=11, expirationDate=22, description=Liferay Portal, owner=Liferay Portal, maxServers=4, licenseEntryName=Portal6 Non-Production (Developer Cluster), productVersion=6.1, type=developer-cluster, accountEntryName=Liferay Trial, version=2}
          Remote node: {clusterNodeId=222222222222, inetAddress=/192.168.0.22, port=-1}
          Remote license: {}
          Mixing licenses is not allowed. Local server is shutting down.
      • How to solve this? The way I did it was to always boot one node first, with NO peers… wait for it to complete booting before bringing up all other nodes, then these errors seem to go away.

 

 JGroups RELAY2 notes

Liferay does NOT use RpcDispatcher but has its own “rpc” abstraction via “ClusterRequest” which is Serializable. (contains target addresses, MethodHandler, has a UUID, and type of EXECUTE) MethodHandler is just a serializable bag of arguments and a MethodKey which stores the target class, methodName and paramTypes. MethodHandler uses these deserialized parts on the receiving side to invoke the method locally via reflection

On the RECEIVING side is ClusterRequestReceiver which is the JGroups receiver that processes all inbound messages from peers. It takes the ClusterRequest and gets the MethodHandler (if type is EXECUTE) and invokes it, then returns the response as a ClusterNodeResponse