Liferay clustering internals

Ever wondered how Liferay’s internal clustering works? I had to dig into it in context of my other article on globally load balancing Liferay across separate data-centers. This blog post merely serves as a place for my research notes and might be helpful for someone else who is trying to follow what is going on under the covers in Liferay in lieu of any real documentation, design document or code comments (which Liferay seems to have none of)

Current set of mostly unanswered questions at the Liferay community forums: https://www.liferay.com/community/forums/-/message_boards/recent-posts?_19_groupThreadsUserId=17865971

Liferay Cluster events, message types, join/remove

  • Anytime ClusterExecutorImpl.memberJoined/memberRemoved() is invoked all implementations of ClusterEventListener have processClusterEvent() invoked
    • MemorySchedulerClusterEventListener: calls ClusterSchedulerEngine.getMasterAddressString() which can trigger slaveToMaster or masterToSlave @see EXECUTE notes below
    • LuceneHelperImpl.LoadIndexClusterEventListener: @see NOTIFY/JOIN section below
    • LiveUsersClusterEventListenerImpl: @see DEPART/JOIN section below
    • DebuggingClusterEventListenerImpl: logs the event to the log
    • ClusterMasterTokenClusterEventListener: on any event, invokes getMasterAddressString() to determine who the master is, or become so itself if nobody is (controlled via entry in Lock_ table) After this runs it invokes notifyMasterTokenTransitionListeners() which calls ClusterMasterTokenTransitionListener.masterTokenAcquired() or masterTokenReleased()
      • Implementors of ClusterMasterTokenTransitionListener:
        • BackgroundTaskClusterMasterTokenTransitionListener: calls BackgroundTaskLocalServiceUtil.cleanUpBackgroundTasks() which in turn ends up invoking BackgroundTaskLocalServiceImpl -> cleanUpBackgroundTasks() which is annotated with @Clusterable(onMaster=true) gets all tasks with STATUS_IN_PROGRESS and changes their status to FAILED. Then gets the BackgroundTaskExecutor LOCK and UNLOCKs it if this server is the current owner/master for that lock…
    • JGroups
      • Member “removed”: triggers ClusterRequestReceiver.viewAccepted() that in turn invokes ClusterExecutorImpl.memberRemoved() which fires ClusterEventType.DEPART (see below)
    • ClusterMessageType.NOTIFY/UPDATE: These message types are requested from other nodes when a new node comes up. When received they are in-turn translated and re-broadcast locally within a individual node as a ClusterEvent of type JOIN via ClusterExecutorImpl.memberJoined() method. Note this latter class is different in the EE jar, the implementation of this method additionally invokes the EE LicenseManager.checkClusterLicense() method
      • “Live Users” message destination:
        • triggers actions done by LiveUsersMessageListener
        • invokes cluster RPC invocation of LiveUsers.getLocalClusterUsers()
        • response is processed by LiveUsers.addClusterNode
        • proceeds to “sign in” each user that is signed in on the other joined node, appears to do session tracking etc
      • LoadIndexClusterEventListener:
        • Loads a full stream of the lucene index per company id from any node that responds who has a local lucene index last generation long value that is > this node’s index’s value. This is only triggered on a JOIN event when the “control” JChannel’s total number of peer nodes minus one is <= 1, so that this only runs on a node boot and not at other times.
      • EE LicenseManager.checkClusterLicense()

        • Please see the notes below on licensing and verification in regards to clustering on bootup
    • ClusterEventType.DEPART: This event is broadcast locally on a node when it is shutdown
      • LiveUsersClusterEventListenerImpl: listens to local DEPART event and sends message over MessageBus to other cluster members letting them know this node is gone
    • ClusterMessageType.EXECUTE: These cluster messages contain serialized ClusterRequest objects that contain RPC like information on a class, parameters and a method to invoke on other node(s). Generated by ClusterRequest.createMulticastRequest and createUnicastRequest messages. The result of these are typically passed off to ClusterExecutorUtil.execute() methods (which end up calling ClusterExecutorImpl.execute())
    • ClusterRequest.createUnicastRequest() invokers:
      • LuceneHelperImpl: _getBootupClusterNodeObjectValuePair() called by getLoadIndexesInputStreamFromCluster when it needs to connect to another node only in order to load indexes on bootup from another node who’s index is up to date
      • ClusterSchedulerEngine: callMaster() called by getScheduledJob(s)(), initialize() and masterToSlave() only if MEMORY_CLUSTERED is enabled for the job engine. Initialize() calls initMemoryClusteredJobs() if the local node is a slave to get a list of of all MEMORY_CLUSTERED jobs. MEMORY_CLUSTERED jobs mean they only run on whoever the master is which is designated by which node owns the lock named “com.liferay.portal.scheduler.SchedulerEngine” in the Lock_ table in the database. masterToSlave() demotes the current node to a slave. slaveToMaster() gives current node opportunity to acquire lock above to become the scheduler master node
      • ClusterMasterExecutorImpl: used for executing EXECUTE requests against the master only. Leverages method getMasterAddressString() to determine who the master is (via which node owns the Lock_ table entry named “ClusterMasterExecutorImpl”.
    • ClusterRequest.createMulticastRequest() invokers:
      • JarUtil: Instructs other peer nodes to JarUtil.installJar(). Invoked by DataSourceFactoryUtil.initDataSource. Also by DataSourceSwapper.swap*DataSource(). SetupWizardUtil.updateSetup()
      • ClusterLoadingSyncJob: Invoked by EditServerAction.reindex(). This sends a multicast ClusterRequest of EXECUTE for LuceneClusterUtil.loadIndexesFromCluster(nodeThatWasJustReindexedAddress) to all nodes. So all nodes, even those on the other side of the RELAY2 bridge will get this, and then will attempt to connect over the bridge back to stream the “latest” index from the node that was just re-indexed. Note however that nodes across the bridge will output this stack trace in their logs when attempting to connect back to get a token on the re-indexed node (this routine is in _getBootupClusterNodeObjectValuePair() in LuceneHelperImpl and tries to do a unicast ClusterRequest to invoke TransientTokenUtil.createToken(). My guess on this error has something to do with the “source address” which is a jgroups local UUID, being sent to the remote bridge, and on receipt the local list in a different DC does not recognize that multicast address. This “source address” that is sent ClusterExecutorImpl.getLocalClusterNodeAddress(). Also see http://www.jgroups.org/manual/html/user-advanced.html#d0e3353. ClusterRequestReceiver *does* get invoked w/ a property scoped SiteUUID in the format of hostname:sitename, however ClusterRequestReceiver.processClusterRequest() defers to ClusterRequest.getMethodHandler().invoke() which has no knowledge of this originating SiteUUID and only the internal “local” jgroups UUID address that is embedded as an “argument” to loadIndexesFromCluster() in the ClusterRequest, hence this can’t effectively/properly call TransientTokenUtil.createToken() on the sender (and the error below). When it attempts the call via ClusterExecutorImpl (jgroups control channel .send()) using the “address” argument it received in the original inbound payload. We see the message is dropped cause the target address is invalid/unknown..
        • 21:10:30,485 WARN [TransferQueueBundler,LIFERAY-CONTROL-CHANNEL,MyHost-34776][UDP:1380] MyHost-34776: no physical address for 27270628-816b-ddba-872f-1f2d27327f2f, dropping message
          
        • 18:37:40,169 INFO [Incoming-1,shared=liferay-control][LuceneClusterUtil:47] Start loading Lucene index files from cluster node 9c4870e7-17c2-18fb-01c4-5817bb5d1290
          18:37:46,538 WARN [TransferQueueBundler,shared=liferay-control][TCP:1380] null: no physical address for 9c4870e7-17c2-18fb-01c4-5817bb5d1290, dropping message
        • 18:01:56,302 ERROR [Incoming-1,LIFERAY-CONTROL-CHANNEL,mm2][ClusterRequestReceiver:243] Unable to invoke method {arguments=[[J@431bd9ee, 5e794a8e-0b7f-c10d-be17-47ce43b66517], methodKey=com.liferay.portal.search.lucene.cluster.LuceneClusterUtil.loadIndexesFromCluster([J,com.liferay.portal.kernel.cluster.Address)}
          <pre>java.lang.reflect.InvocationTargetException
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:606)
          at com.liferay.portal.kernel.util.MethodHandler.invoke(MethodHandler.java:61)
          at com.liferay.portal.cluster.ClusterRequestReceiver.processClusterRequest(ClusterRequestReceiver.java:238)
          at com.liferay.portal.cluster.ClusterRequestReceiver.receive(ClusterRequestReceiver.java:88)
          at org.jgroups.JChannel.invokeCallback(JChannel.java:749)
          at org.jgroups.JChannel.up(JChannel.java:710)
          at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:1025)
          at org.jgroups.protocols.relay.RELAY2.deliver(RELAY2.java:495)
          at org.jgroups.protocols.relay.RELAY2.up(RELAY2.java:330)
          at org.jgroups.protocols.FORWARD_TO_COORD.up(FORWARD_TO_COORD.java:153)
          at org.jgroups.protocols.RSVP.up(RSVP.java:188)
          at org.jgroups.protocols.FRAG2.up(FRAG2.java:181)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:400)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:418)
          at org.jgroups.protocols.pbcast.GMS.up(GMS.java:896)
          at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:245)
          at org.jgroups.protocols.UNICAST2.up(UNICAST2.java:453)
          at org.jgroups.protocols.pbcast.NAKACK2.handleMessage(NAKACK2.java:763)
          at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:574)
          at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:147)
          at org.jgroups.protocols.FD_ALL.up(FD_ALL.java:187)
          at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:288)
          at org.jgroups.protocols.MERGE3.up(MERGE3.java:290)
          at org.jgroups.protocols.Discovery.up(Discovery.java:359)
          at org.jgroups.protocols.TP.passMessageUp(TP.java:1263)
          at org.jgroups.protocols.TP$4.run(TP.java:1181)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:744)
          Caused by: com.liferay.portal.kernel.exception.SystemException: java.lang.NullPointerException
          at com.liferay.portal.search.lucene.LuceneHelperImpl._getBootupClusterNodeObjectValuePair(LuceneHelperImpl.java:830)
          at com.liferay.portal.search.lucene.LuceneHelperImpl.getLoadIndexesInputStreamFromCluster(LuceneHelperImpl.java:451)
          at com.liferay.portal.search.lucene.LuceneHelperUtil.getLoadIndexesInputStreamFromCluster(LuceneHelperUtil.java:326)
          at com.liferay.portal.search.lucene.cluster.LuceneClusterUtil.loadIndexesFromCluster(LuceneClusterUtil.java:57)
          ... 32 more
          Caused by: java.lang.NullPointerException
          at com.liferay.portal.search.lucene.LuceneHelperImpl._getBootupClusterNodeObjectValuePair(LuceneHelperImpl.java:809)
          ... 35 more
      • PortalImpl: resetCDNHosts() does a ClusterRequest.EXECUTE for the same PortalUtil.resetCDNHosts() method on all peer nodes. To avoid recursive loop uses ThreadLocal to enter the EXECUTE block (relevant for remote nodes)
      • LuceneHelperImpl: _loadIndexFromCluster() (invoked via JOIN event above). Invoked via EXECUTE remote method calls against LuceneHelperUtil.getLastGeneration() against all peers to see who’s lucene indexes generation time is newer up earler than ME, picks that one and then initiate a direct http stream connection to it to fetch the entire index from it …. Who is a “peer” candidate is determined by invoking ClusterExecutorUtil.getClusterNodeAddresses() which is ultimately realized by calling the “control” channel’s getView() method
      • MessageSenderJob: notifyClusterMember() invoked by MessageSenderJob.doExecute() when a job execution context’s next execution time is null. The notifyClusterMember() method invokes SchedulerEngineHelperUtil.delete() via a ClusterRequest.EXECUTE
      • ClusterableAdvice: AOP advice class that intercepts any Liferay method annotatted with @Clusterable, AFTER returning, will then create a ClusterRequest.EXECUTE for the same method passed to all nodes in the cluster. Used by ClusterSchedulerEngine (delete, pause, resume, schedule, suppressError, unschedule, update) BackgroundTaskLocalServiceImpl, and PortletLocalServiceImpl. Has an ‘onMaster’ property that awkwardly controls whether or not the advice will actually be applied or not. If onMaster=false, then the advice fast returns like a no-op. But if onMaster=true then the advice runs (locally on the node if THIS node is the master, or via a ClusterRequest.EXECUTE). WHO is the master is dictated by which nodes owns the LOCK in the database named “com.liferay.portal.scheduler.SchedulerEngine”.
      • EhcacheStreamBootstrapCacheLoader: start() and doLoad() invokes EhcacheStreamBootstrapHelpUtil -> loadCachesFromCluster() broadcasts a ClusterRequest for EXECUTE of the method createServerSocketFromCluster() on all peer nodes, and then attempts to connect to a peer node to load all named caches from the peer node…
  • ClusterExecutorImpl notes:
    • This class has several methods relating to figuring out “who” is in the “cluster” and due to the implementation of these methods you will get inconsistent “views” of who are the local cluster members (if using RELAY2 in JGroups)
      • getClusterNodeAddresses() = consults the “control” JChannel’s receiver’s “view” of the cluster. This will only return local nodes subscribed to this JChannel. These are fetched real-time every time its invoked.
      • getClusterNodes() = consults a local map called “_liveInstances”. Live instances is populated when the “memberJoined()” method is invoked, in response to ClusterRequestReceiver.processClusterRequest() being invoked when a new node comes up. Note that because this is in response to ClusterRequests…. which WILL be transmitted over RELAY2 in JGroups, this will reflect MORE nodes than what getClusterNodeAddresses() returns…..
      • There is also another local member called “_clusterNodeAddresses” which also is populated w/ addresses of other nodes when “memberJoined()” is invoked…. I really don’t understand why this exists, AND getClusterNodeAddresses() exists, AND getClusterNodes() exists when they could potentially return inconsistent data.
  • Quartz/Jobs scheduling
    •  ClusterSchedulerEngine proxies all requests for scheudling jobs through SchedulerEngingProxyBean (BaseProxyBean) which just relays schedule() invocations through Messaging which ends up invoking QuartzSchedulerEngine.schedule()
    • ClusterSchedulerEngine on initialization calls getMasterAddressString() which gets the Lock from the _Lock table named “com.liferay.portal.scheduler.SchedulerEngine” via LockLocalServiceImpl.lock() which is a row based locking in the database this is based on “owner” which is based on the local IP?
    • _doMasterToSlave() – queries current master for list of all MEMORY_CLUSTERED jobs (does this over RMI or Jgroups method invoke of getScheduledJobs), gets this list of job names and unschedules them locally
    • slaveToMaster() – takes everything in _memoryClusteredJobs and schedules it locally. _memoryClusteredJobs seems to just be a global list of the MEMORY_CLUSTERED jobs that are kept track of on each node…
  • Licensing: If running Liferay EE the LicenseManager.checkClusterLicense() -> checkBinaryLicense() is invoked via ClusterExecutorImpl.memberJoined() (note that this is only invoked in the EE version as the class is different in the portal-impl.jar contained in the EE distribution). This particular invocation chain appears to respond to the JOIN event for the peer node by checking if the “total nodes” the node knows about is greater than the total number of nodes the license permits, if it is, the local node gets a System.exit() invoked on it (killing itself). For learning purposes if you are so interested to learn more about this obfuscated process, you can do so by investigating further on your own, its all in the EE portal-impl.jar. Here are some other notes on the licensing and how it relates to clustering and issues one might encounter.
    • LicenseManager. getClusterLicense() invokes ClusterExecutorUtil.getClusterNodes() and does validation to check if you have more nodes in the cluster than your installed license supports (amongst other things).
    • Another thing that happens when ClusterExecutorImpl.memberJoined() is invoked is that ClusterExecutorImpl.getClusterNodes() appears to be invoked (which @see above, in addition to local peer nodes in the same DC, will return nodes that exist across a JGroups RELAY2 bridge). It picks one of these peer nodes and remotely invokes LicenseManager.getLicenseProperties() against it, this is done via a ClusterExecutorUtil.execute(). The return is a Future with a timeout, upon reception the Future has a Map in it containing the license properties, these are then compared agains the local node’s license, if they match we are good to go, otherwise….. read below (a mixing licenses error will occurr) Why is this relvent? Well despite having legit licenses, when using RELAY2, you can come across two erronious errors that will prevent your cluster from booting, as described below.
      • The first error you might see is a timeout error, when it reaches out to a peer node to verify a license in response to a new member joining the cluster (likely a member over a RELAY2 bridge). It will look like the below. Typically just rebooting Liferay takes care of the issue, as explained in the followup bullets:
        • java.util.concurrent.TimeoutException
          at com.liferay.portal.kernel.cluster.FutureClusterResponses.get(FutureClusterResponses.java:88)
          at com.liferay.portal.license.a.a.f(Unknown Source)
          at com.liferay.portal.license.a.a.b(Unknown Source)
          at com.liferay.portal.license.a.f.d(Unknown Source)
          at com.liferay.portal.license.a.f.d(Unknown Source)
          at com.liferay.portal.license.LicenseManager.c(Unknown Source)
          at com.liferay.portal.license.LicenseManager.checkBinaryLicense(Unknown Source)
          at com.liferay.portal.license.LicenseManager.checkClusterLicense(Unknown Source)
          at com.liferay.portal.cluster.ClusterExecutorImpl.memberJoined(ClusterExecutorImpl.java:442)
          at com.liferay.portal.cluster.ClusterRequestReceiver.processClusterRequest(ClusterRequestReceiver.java:219)
          at com.liferay.portal.cluster.ClusterRequestReceiver.receive(ClusterRequestReceiver.java:88)
          at org.jgroups.JChannel.invokeCallback(JChannel.java:749)
          at org.jgroups.JChannel.up(JChannel.java:710)
          at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:1025)
          at org.jgroups.protocols.relay.RELAY2.up(RELAY2.java:338)
          at org.jgroups.protocols.FORWARD_TO_COORD.up(FORWARD_TO_COORD.java:153)
          at org.jgroups.protocols.pbcast.STATE_TRANSFER.up(STATE_TRANSFER.java:178)
          at org.jgroups.protocols.FRAG2.up(FRAG2.java:181)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:400)
          at org.jgroups.protocols.FlowControl.up(FlowControl.java:418)
          at org.jgroups.protocols.pbcast.GMS.up(GMS.java:896)
          at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:245)
          at org.jgroups.protocols.UNICAST.up(UNICAST.java:414)
          at org.jgroups.protocols.pbcast.NAKACK2.handleMessage(NAKACK2.java:763)
          at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:574)
          at org.jgroups.protocols.BARRIER.up(BARRIER.java:126)
          at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:147)
          at org.jgroups.protocols.FD.up(FD.java:253)
          at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:288)
          at org.jgroups.protocols.MERGE2.up(MERGE2.java:205)
          at org.jgroups.protocols.Discovery.up(Discovery.java:359)
          at org.jgroups.protocols.TP$ProtocolAdapter.up(TP.java:2610)
          at org.jgroups.protocols.TP.passMessageUp(TP.java:1263)
          at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1825)
          at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1793)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:745)
    • When booting up several Liferay EE nodes (configured for clustering) simultaneously it appears you can run into a situation where all the nodes fail to boot and say they have “invalid licenses” (despite the fact you have valid cluster licenses installed under data/license)…and then your container processes are automatically shut down… nice
    • This appears to be due to the verification process that LicenseManager does on bootup where it receives license information from peer nodes, however if all nodes are started up at the same time it appears there is a condition of timing, when none of the nodes are yet in a “ready” state, so when peers connect to them for their license properties, they respond w/ nothing…. which is then compared against the local node’s license and so since they don’t “equals()” it decides the local license (despite being legit) is invalid..
    • Here is example output from NODE1 (booted up at same time as NODE2, note the blank remote license):
        • 13:55:20,381 ERROR [Incoming-5,shared=liferay-control][a:?] Remote license does not match local license. Local license: {productEntryName=Liferay Portal, startDate=11, expirationDate=22, description=Liferay Portal, owner=Liferay Portal, maxServers=4, licenseEntryName=Portal6 Non-Production (Developer Cluster), productVersion=6.1, type=developer-cluster, accountEntryName=Liferay Trial, version=2}
          Remote node: {clusterNodeId=11111111111, inetAddress=/192.168.0.23, port=-1}
          Remote license: {}
          Mixing licenses is not allowed. Local server is shutting down.
      • Here is example output from NODE2 (booted up at same time as NODE1):
        • 13:55:20,388 ERROR [Incoming-1,shared=liferay-control][a:?] Remote license does not match local license. Local license: {productEntryName=Liferay Portal, startDate=11, expirationDate=22, description=Liferay Portal, owner=Liferay Portal, maxServers=4, licenseEntryName=Portal6 Non-Production (Developer Cluster), productVersion=6.1, type=developer-cluster, accountEntryName=Liferay Trial, version=2}
          Remote node: {clusterNodeId=222222222222, inetAddress=/192.168.0.22, port=-1}
          Remote license: {}
          Mixing licenses is not allowed. Local server is shutting down.
      • How to solve this? The way I did it was to always boot one node first, with NO peers… wait for it to complete booting before bringing up all other nodes, then these errors seem to go away.

 

 JGroups RELAY2 notes

Liferay does NOT use RpcDispatcher but has its own “rpc” abstraction via “ClusterRequest” which is Serializable. (contains target addresses, MethodHandler, has a UUID, and type of EXECUTE) MethodHandler is just a serializable bag of arguments and a MethodKey which stores the target class, methodName and paramTypes. MethodHandler uses these deserialized parts on the receiving side to invoke the method locally via reflection

On the RECEIVING side is ClusterRequestReceiver which is the JGroups receiver that processes all inbound messages from peers. It takes the ClusterRequest and gets the MethodHandler (if type is EXECUTE) and invokes it, then returns the response as a ClusterNodeResponse

 

 

Advertisements

2 comments

  1. Pingback: Clustering Liferay globally across data centers (GSLB) with JGroups and RELAY2 | bits.of.info

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s