public class org.apache.activemq.ra.ActiveMQEndpointWorker extends java.lang.Object
{
public static final java.lang.reflect.Method ON_MESSAGE_METHOD;
private static final org.slf4j.Logger LOG;
private static final long INITIAL_RECONNECT_DELAY;
private static final long MAX_RECONNECT_DELAY;
private static final java.lang.ThreadLocal THREAD_LOCAL;
protected final org.apache.activemq.ra.ActiveMQEndpointActivationKey endpointActivationKey;
protected final javax.resource.spi.endpoint.MessageEndpointFactory endpointFactory;
protected final javax.resource.spi.work.WorkManager workManager;
protected final boolean transacted;
private final org.apache.activemq.command.ActiveMQDestination dest;
private final javax.resource.spi.work.Work connectWork;
private final java.util.concurrent.atomic.AtomicBoolean connecting;
private final java.lang.Object shutdownMutex;
private org.apache.activemq.ActiveMQConnection connection;
private org.apache.activemq.ActiveMQConnectionConsumer consumer;
private org.apache.activemq.ra.ServerSessionPoolImpl serverSessionPool;
private boolean running;
protected void <init>(org.apache.activemq.ra.MessageResourceAdapter, org.apache.activemq.ra.ActiveMQEndpointActivationKey) throws javax.resource.ResourceException
{
javax.resource.ResourceException v, v, v;
java.lang.NoSuchMethodException v;
javax.naming.NamingException v;
org.apache.activemq.ra.ActiveMQEndpointWorker$1 v;
java.lang.reflect.Method v;
boolean v, v, v, v;
javax.naming.InitialContext v;
org.apache.activemq.command.ActiveMQTopic v;
org.apache.activemq.ra.ActiveMQEndpointActivationKey v, v, v;
java.util.concurrent.atomic.AtomicBoolean v;
org.apache.activemq.command.ActiveMQQueue v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
org.apache.activemq.ra.MessageResourceAdapter v;
javax.resource.spi.work.WorkManager v;
java.lang.String v, v, v, v, v, v, v, v, v, v, v, v;
javax.resource.spi.BootstrapContext v;
javax.resource.spi.endpoint.MessageEndpointFactory v, v;
java.lang.Object v;
org.apache.activemq.ra.MessageActivationSpec v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v := @parameter: org.apache.activemq.ra.MessageResourceAdapter;
v := @parameter: org.apache.activemq.ra.ActiveMQEndpointActivationKey;
specialinvoke v.<java.lang.Object: void <init>()>();
v = new java.util.concurrent.atomic.AtomicBoolean;
specialinvoke v.<java.util.concurrent.atomic.AtomicBoolean: void <init>(boolean)>(0);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: java.util.concurrent.atomic.AtomicBoolean connecting> = v;
v = new java.lang.String;
specialinvoke v.<java.lang.String: void <init>(java.lang.String)>("shutdownMutex");
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.Object shutdownMutex> = v;
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ra.ActiveMQEndpointActivationKey endpointActivationKey> = v;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ra.ActiveMQEndpointActivationKey endpointActivationKey>;
v = virtualinvoke v.<org.apache.activemq.ra.ActiveMQEndpointActivationKey: javax.resource.spi.endpoint.MessageEndpointFactory getMessageEndpointFactory()>();
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.endpoint.MessageEndpointFactory endpointFactory> = v;
v = interfaceinvoke v.<org.apache.activemq.ra.MessageResourceAdapter: javax.resource.spi.BootstrapContext getBootstrapContext()>();
v = interfaceinvoke v.<javax.resource.spi.BootstrapContext: javax.resource.spi.work.WorkManager getWorkManager()>();
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.WorkManager workManager> = v;
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.endpoint.MessageEndpointFactory endpointFactory>;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.reflect.Method ON_MESSAGE_METHOD>;
v = interfaceinvoke v.<javax.resource.spi.endpoint.MessageEndpointFactory: boolean isDeliveryTransacted(java.lang.reflect.Method)>(v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean transacted> = v;
label:
goto label;
label:
v := @caughtexception;
v = new javax.resource.ResourceException;
specialinvoke v.<javax.resource.ResourceException: void <init>(java.lang.String)>("Endpoint does not implement the onMessage method.");
throw v;
label:
v = new org.apache.activemq.ra.ActiveMQEndpointWorker$1;
specialinvoke v.<org.apache.activemq.ra.ActiveMQEndpointWorker$1: void <init>(org.apache.activemq.ra.ActiveMQEndpointWorker,org.apache.activemq.ra.MessageResourceAdapter)>(v, v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.Work connectWork> = v;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ra.ActiveMQEndpointActivationKey endpointActivationKey>;
v = virtualinvoke v.<org.apache.activemq.ra.ActiveMQEndpointActivationKey: org.apache.activemq.ra.MessageActivationSpec getActivationSpec()>();
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: boolean isUseJndi()>();
if v == 0 goto label;
label:
v = new javax.naming.InitialContext;
specialinvoke v.<javax.naming.InitialContext: void <init>()>();
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: java.lang.String getDestination()>();
v = virtualinvoke v.<javax.naming.InitialContext: java.lang.Object lookup(java.lang.String)>(v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.command.ActiveMQDestination dest> = v;
label:
goto label;
label:
v := @caughtexception;
v = new javax.resource.ResourceException;
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: java.lang.String getDestination()>();
v = dynamicinvoke "makeConcatWithConstants" <java.lang.String (java.lang.String)>(v) <java.lang.invoke.StringConcatFactory: java.lang.invoke.CallSite makeConcatWithConstants(java.lang.invoke.MethodHandles$Lookup,java.lang.String,java.lang.invoke.MethodType,java.lang.String,java.lang.Object[])>("JNDI lookup failed for \u0001");
specialinvoke v.<javax.resource.ResourceException: void <init>(java.lang.String)>(v);
throw v;
label:
v = "javax.jms.Queue";
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: java.lang.String getDestinationType()>();
v = virtualinvoke v.<java.lang.String: boolean equals(java.lang.Object)>(v);
if v == 0 goto label;
v = new org.apache.activemq.command.ActiveMQQueue;
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: java.lang.String getDestination()>();
specialinvoke v.<org.apache.activemq.command.ActiveMQQueue: void <init>(java.lang.String)>(v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.command.ActiveMQDestination dest> = v;
goto label;
label:
v = "javax.jms.Topic";
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: java.lang.String getDestinationType()>();
v = virtualinvoke v.<java.lang.String: boolean equals(java.lang.Object)>(v);
if v == 0 goto label;
v = new org.apache.activemq.command.ActiveMQTopic;
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: java.lang.String getDestination()>();
specialinvoke v.<org.apache.activemq.command.ActiveMQTopic: void <init>(java.lang.String)>(v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.command.ActiveMQDestination dest> = v;
goto label;
label:
v = new javax.resource.ResourceException;
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: java.lang.String getDestinationType()>();
v = dynamicinvoke "makeConcatWithConstants" <java.lang.String (java.lang.String)>(v) <java.lang.invoke.StringConcatFactory: java.lang.invoke.CallSite makeConcatWithConstants(java.lang.invoke.MethodHandles$Lookup,java.lang.String,java.lang.invoke.MethodType,java.lang.String,java.lang.Object[])>("Unknown destination type: \u0001");
specialinvoke v.<javax.resource.ResourceException: void <init>(java.lang.String)>(v);
throw v;
label:
return;
catch java.lang.NoSuchMethodException from label to label with label;
catch javax.naming.NamingException from label to label with label;
}
public static void safeClose(javax.jms.Connection)
{
java.lang.Throwable v;
javax.jms.Connection v;
org.slf4j.Logger v, v;
v := @parameter: javax.jms.Connection;
label:
if v == null goto label;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void debug(java.lang.String)>("Closing connection to broker");
interfaceinvoke v.<javax.jms.Connection: void close()>();
label:
goto label;
label:
v := @caughtexception;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void trace(java.lang.String,java.lang.Object,java.lang.Object)>("failed to close c {}", v, v);
label:
return;
catch javax.jms.JMSException from label to label with label;
}
public static void safeClose(javax.jms.ConnectionConsumer)
{
java.lang.Throwable v;
javax.jms.ConnectionConsumer v;
org.slf4j.Logger v, v;
v := @parameter: javax.jms.ConnectionConsumer;
label:
if v == null goto label;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void debug(java.lang.String)>("Closing ConnectionConsumer");
interfaceinvoke v.<javax.jms.ConnectionConsumer: void close()>();
label:
goto label;
label:
v := @caughtexception;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void trace(java.lang.String,java.lang.Object,java.lang.Object)>("failed to close cc {}", v, v);
label:
return;
catch javax.jms.JMSException from label to label with label;
}
public void start() throws javax.resource.ResourceException
{
java.lang.Throwable v;
org.slf4j.Logger v, v;
org.apache.activemq.ra.ServerSessionPoolImpl v;
java.util.concurrent.atomic.AtomicBoolean v;
javax.resource.spi.work.Work v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
int v;
org.apache.activemq.ra.MessageActivationSpec v;
org.apache.activemq.ra.ActiveMQEndpointActivationKey v;
boolean v, v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.Work connectWork>;
entermonitor v;
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean running>;
if v == 0 goto label;
exitmonitor v;
label:
return;
label:
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean running> = 1;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: java.util.concurrent.atomic.AtomicBoolean connecting>;
v = virtualinvoke v.<java.util.concurrent.atomic.AtomicBoolean: boolean compareAndSet(boolean,boolean)>(0, 1);
if v == 0 goto label;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void info(java.lang.String)>("Starting");
v = new org.apache.activemq.ra.ServerSessionPoolImpl;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ra.ActiveMQEndpointActivationKey endpointActivationKey>;
v = virtualinvoke v.<org.apache.activemq.ra.ActiveMQEndpointActivationKey: org.apache.activemq.ra.MessageActivationSpec getActivationSpec()>();
v = interfaceinvoke v.<org.apache.activemq.ra.MessageActivationSpec: int getMaxSessionsIntValue()>();
specialinvoke v.<org.apache.activemq.ra.ServerSessionPoolImpl: void <init>(org.apache.activemq.ra.ActiveMQEndpointWorker,int)>(v, v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ra.ServerSessionPoolImpl serverSessionPool> = v;
specialinvoke v.<org.apache.activemq.ra.ActiveMQEndpointWorker: void connect()>();
goto label;
label:
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void warn(java.lang.String)>("Ignoring start command, EndpointWorker is already trying to connect");
label:
exitmonitor v;
label:
goto label;
label:
v := @caughtexception;
exitmonitor v;
throw v;
label:
return;
catch java.lang.Throwable from label to label with label;
catch java.lang.Throwable from label to label with label;
}
public void stop() throws java.lang.InterruptedException
{
java.lang.Throwable v, v;
org.slf4j.Logger v, v;
org.apache.activemq.ra.ServerSessionPoolImpl v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
java.lang.Object v, v;
boolean v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.Object shutdownMutex>;
entermonitor v;
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean running>;
if v != 0 goto label;
exitmonitor v;
label:
return;
label:
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean running> = 0;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void info(java.lang.String)>("Stopping");
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.Object shutdownMutex>;
virtualinvoke v.<java.lang.Object: void notifyAll()>();
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ra.ServerSessionPoolImpl serverSessionPool>;
virtualinvoke v.<org.apache.activemq.ra.ServerSessionPoolImpl: void close()>();
label:
goto label;
label:
v := @caughtexception;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void debug(java.lang.String,java.lang.Throwable)>("Unexpected error on server session pool close", v);
label:
exitmonitor v;
label:
goto label;
label:
v := @caughtexception;
exitmonitor v;
throw v;
label:
specialinvoke v.<org.apache.activemq.ra.ActiveMQEndpointWorker: void disconnect()>();
return;
catch java.lang.Throwable from label to label with label;
catch java.lang.Throwable from label to label with label;
catch java.lang.Throwable from label to label with label;
}
private boolean isRunning()
{
org.apache.activemq.ra.ActiveMQEndpointWorker v;
boolean v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean running>;
return v;
}
private void connect()
{
java.lang.Throwable v, v;
org.slf4j.Logger v;
javax.resource.spi.work.Work v, v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
javax.resource.spi.work.WorkManager v;
boolean v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.Work connectWork>;
entermonitor v;
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean running>;
if v != 0 goto label;
exitmonitor v;
label:
return;
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.WorkManager workManager>;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.Work connectWork>;
interfaceinvoke v.<javax.resource.spi.work.WorkManager: void scheduleWork(javax.resource.spi.work.Work,long,javax.resource.spi.work.ExecutionContext,javax.resource.spi.work.WorkListener)>(v, 9223372036854775807L, null, null);
label:
goto label;
label:
v := @caughtexception;
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: boolean running> = 0;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG>;
interfaceinvoke v.<org.slf4j.Logger: void error(java.lang.String,java.lang.Throwable)>("Work Manager did not accept work: ", v);
label:
exitmonitor v;
label:
goto label;
label:
v := @caughtexception;
exitmonitor v;
throw v;
label:
return;
catch javax.resource.spi.work.WorkException from label to label with label;
catch java.lang.Throwable from label to label with label;
catch java.lang.Throwable from label to label with label;
}
private void disconnect()
{
java.lang.Throwable v;
javax.resource.spi.work.Work v;
org.apache.activemq.ActiveMQConnectionConsumer v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
org.apache.activemq.ActiveMQConnection v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.Work connectWork>;
entermonitor v;
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ActiveMQConnectionConsumer consumer>;
staticinvoke <org.apache.activemq.ra.ActiveMQEndpointWorker: void safeClose(javax.jms.ConnectionConsumer)>(v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ActiveMQConnectionConsumer consumer> = null;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ActiveMQConnection connection>;
staticinvoke <org.apache.activemq.ra.ActiveMQEndpointWorker: void safeClose(javax.jms.Connection)>(v);
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ActiveMQConnection connection> = null;
exitmonitor v;
label:
goto label;
label:
v := @caughtexception;
exitmonitor v;
throw v;
label:
return;
catch java.lang.Throwable from label to label with label;
}
protected void registerThreadSession(javax.jms.Session)
{
java.lang.ThreadLocal v;
javax.jms.Session v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v := @parameter: javax.jms.Session;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.ThreadLocal THREAD_LOCAL>;
virtualinvoke v.<java.lang.ThreadLocal: void set(java.lang.Object)>(v);
return;
}
protected void unregisterThreadSession(javax.jms.Session)
{
java.lang.ThreadLocal v;
javax.jms.Session v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v := @parameter: javax.jms.Session;
v = <org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.ThreadLocal THREAD_LOCAL>;
virtualinvoke v.<java.lang.ThreadLocal: void set(java.lang.Object)>(null);
return;
}
public void setConnection(org.apache.activemq.ActiveMQConnection)
{
org.apache.activemq.ActiveMQConnection v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v := @parameter: org.apache.activemq.ActiveMQConnection;
v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ActiveMQConnection connection> = v;
return;
}
protected org.apache.activemq.ActiveMQConnection getConnection()
{
java.lang.Throwable v;
javax.resource.spi.work.Work v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
org.apache.activemq.ActiveMQConnection v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: javax.resource.spi.work.Work connectWork>;
entermonitor v;
label:
v = v.<org.apache.activemq.ra.ActiveMQEndpointWorker: org.apache.activemq.ActiveMQConnection connection>;
exitmonitor v;
label:
return v;
label:
v := @caughtexception;
exitmonitor v;
throw v;
catch java.lang.Throwable from label to label with label;
}
private java.lang.String emptyToNull(java.lang.String)
{
int v;
java.lang.String v;
org.apache.activemq.ra.ActiveMQEndpointWorker v;
v := @this: org.apache.activemq.ra.ActiveMQEndpointWorker;
v := @parameter: java.lang.String;
if v == null goto label;
v = virtualinvoke v.<java.lang.String: int length()>();
if v != 0 goto label;
label:
return null;
label:
return v;
}
static void <clinit>()
{
java.lang.ThreadLocal v;
org.slf4j.Logger v;
java.lang.Class[] v;
java.lang.Class v;
java.lang.reflect.Method v;
java.lang.Exception v;
java.lang.ExceptionInInitializerError v;
v = staticinvoke <org.slf4j.LoggerFactory: org.slf4j.Logger getLogger(java.lang.Class)>(class "Lorg/apache/activemq/ra/ActiveMQEndpointWorker;");
<org.apache.activemq.ra.ActiveMQEndpointWorker: org.slf4j.Logger LOG> = v;
v = new java.lang.ThreadLocal;
specialinvoke v.<java.lang.ThreadLocal: void <init>()>();
<org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.ThreadLocal THREAD_LOCAL> = v;
label:
v = class "Ljavax/jms/MessageListener;";
v = newarray (java.lang.Class)[1];
v[0] = class "Ljavax/jms/Message;";
v = virtualinvoke v.<java.lang.Class: java.lang.reflect.Method getMethod(java.lang.String,java.lang.Class[])>("onMessage", v);
<org.apache.activemq.ra.ActiveMQEndpointWorker: java.lang.reflect.Method ON_MESSAGE_METHOD> = v;
label:
goto label;
label:
v := @caughtexception;
v = new java.lang.ExceptionInInitializerError;
specialinvoke v.<java.lang.ExceptionInInitializerError: void <init>(java.lang.Throwable)>(v);
throw v;
label:
return;
catch java.lang.Exception from label to label with label;
}
}