class org.apache.hadoop.hive.kafka.SimpleKafkaWriter extends java.lang.Object implements org.apache.hadoop.hive.ql.exec.FileSinkOperator$RecordWriter, org.apache.hadoop.mapred.RecordWriter
{
private static final org.slf4j.Logger LOG;
private static final java.lang.String TIMEOUT_CONFIG_HINT;
private static final java.lang.String ABORT_MSG;
private static final java.lang.String ACTION_ABORT;
private final java.lang.String topic;
private final java.lang.String writerId;
private final org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic writeSemantic;
private final org.apache.kafkaesqueesque.clients.producer.KafkaProducer producer;
private final org.apache.kafkaesqueesque.clients.producer.Callback callback;
private final java.util.concurrent.atomic.AtomicReference sendExceptionRef;
private final java.util.concurrent.atomic.AtomicLong lostRecords;
private long sentRecords;
void <init>(java.lang.String, java.lang.String, java.util.Properties)
{
java.lang.Object[] v;
java.util.concurrent.atomic.AtomicReference v;
org.apache.kafkaesqueesque.clients.producer.KafkaProducer v;
org.apache.kafkaesqueesque.clients.producer.Callback v;
java.lang.String v, v, v, v;
boolean v;
org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic v, v;
java.util.Properties v;
org.slf4j.Logger v;
java.util.UUID v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
java.util.concurrent.atomic.AtomicLong v;
java.lang.Object v;
org.apache.kafkaesqueesque.common.serialization.ByteArraySerializer v, v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v := @parameter: java.lang.String;
v := @parameter: java.lang.String;
v := @parameter: java.util.Properties;
specialinvoke v.<java.lang.Object: void <init>()>();
v = <org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic: org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic AT_LEAST_ONCE>;
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic writeSemantic> = v;
v = new java.util.concurrent.atomic.AtomicReference;
specialinvoke v.<java.util.concurrent.atomic.AtomicReference: void <init>()>();
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicReference sendExceptionRef> = v;
v = new java.util.concurrent.atomic.AtomicLong;
specialinvoke v.<java.util.concurrent.atomic.AtomicLong: void <init>(long)>(0L);
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicLong lostRecords> = v;
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: long sentRecords> = 0L;
if v != null goto label;
v = staticinvoke <java.util.UUID: java.util.UUID randomUUID()>();
v = virtualinvoke v.<java.util.UUID: java.lang.String toString()>();
goto label;
label:
v = v;
label:
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId> = v;
v = staticinvoke <com.google.common.base.Preconditions: java.lang.Object checkNotNull(java.lang.Object,java.lang.Object)>(v, "Topic can not be null");
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String topic> = v;
v = virtualinvoke v.<java.util.Properties: java.lang.String getProperty(java.lang.String)>("bootstrap.servers");
if v == null goto label;
v = 1;
goto label;
label:
v = 0;
label:
staticinvoke <com.google.common.base.Preconditions: void checkState(boolean,java.lang.Object)>(v, "set [bootstrap.servers] property");
v = new org.apache.kafkaesqueesque.clients.producer.KafkaProducer;
v = new org.apache.kafkaesqueesque.common.serialization.ByteArraySerializer;
specialinvoke v.<org.apache.kafkaesqueesque.common.serialization.ByteArraySerializer: void <init>()>();
v = new org.apache.kafkaesqueesque.common.serialization.ByteArraySerializer;
specialinvoke v.<org.apache.kafkaesqueesque.common.serialization.ByteArraySerializer: void <init>()>();
specialinvoke v.<org.apache.kafkaesqueesque.clients.producer.KafkaProducer: void <init>(java.util.Properties,org.apache.kafkaesqueesque.common.serialization.Serializer,org.apache.kafkaesqueesque.common.serialization.Serializer)>(v, v, v);
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.KafkaProducer producer> = v;
v = staticinvoke <org.apache.hadoop.hive.kafka.SimpleKafkaWriter$lambda_new_0__62: org.apache.kafkaesqueesque.clients.producer.Callback bootstrap$(org.apache.hadoop.hive.kafka.SimpleKafkaWriter,java.lang.String)>(v, v);
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.Callback callback> = v;
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = newarray (java.lang.Object)[3];
v[0] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic writeSemantic>;
v[1] = v;
v[2] = v;
interfaceinvoke v.<org.slf4j.Logger: void info(java.lang.String,java.lang.Object[])>("Starting WriterId [{}], Delivery Semantic [{}], Target Kafka Topic [{}]", v);
return;
}
public void write(org.apache.hadoop.io.Writable) throws java.io.IOException
{
org.apache.kafkaesqueesque.clients.producer.ProducerRecord v;
long v, v;
org.apache.hadoop.io.Writable v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
org.apache.kafkaesqueesque.clients.producer.KafkaProducer v;
org.apache.kafkaesqueesque.common.KafkaException v;
org.apache.kafkaesqueesque.clients.producer.Callback v;
java.lang.String v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v := @parameter: org.apache.hadoop.io.Writable;
specialinvoke v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: void checkExceptions()>();
label:
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: long sentRecords>;
v = v + 1L;
v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: long sentRecords> = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.KafkaProducer producer>;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String topic>;
v = staticinvoke <org.apache.hadoop.hive.kafka.KafkaUtils: org.apache.kafkaesqueesque.clients.producer.ProducerRecord toProducerRecord(java.lang.String,org.apache.hadoop.hive.kafka.KafkaWritable)>(v, v);
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.Callback callback>;
virtualinvoke v.<org.apache.kafkaesqueesque.clients.producer.KafkaProducer: java.util.concurrent.Future send(org.apache.kafkaesqueesque.clients.producer.ProducerRecord,org.apache.kafkaesqueesque.clients.producer.Callback)>(v, v);
label:
goto label;
label:
v := @caughtexception;
specialinvoke v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: void handleKafkaException(org.apache.kafkaesqueesque.common.KafkaException)>(v);
specialinvoke v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: void checkExceptions()>();
label:
return;
catch org.apache.kafkaesqueesque.common.KafkaException from label to label with label;
}
private void handleKafkaException(org.apache.kafkaesqueesque.common.KafkaException)
{
java.lang.Object[] v, v;
java.util.concurrent.atomic.AtomicReference v, v;
java.lang.Long v;
java.lang.String v, v, v, v, v, v, v, v;
boolean v, v;
org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic v;
org.slf4j.Logger v, v, v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
org.apache.kafkaesqueesque.common.KafkaException v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v := @parameter: org.apache.kafkaesqueesque.common.KafkaException;
v = v instanceof org.apache.kafkaesqueesque.common.errors.TimeoutException;
if v == 0 goto label;
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = virtualinvoke v.<org.apache.kafkaesqueesque.common.KafkaException: java.lang.String getMessage()>();
interfaceinvoke v.<org.slf4j.Logger: void error(java.lang.String,java.lang.Object)>("Try increasing producer property [`retries`] and [`retry.backoff.ms`] to avoid this error [{}].", v);
label:
v = staticinvoke <org.apache.hadoop.hive.kafka.KafkaUtils: boolean exceptionIsFatal(java.lang.Throwable)>(v);
if v == 0 goto label;
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = newarray (java.lang.Object)[4];
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
v[0] = v;
v = virtualinvoke v.<org.apache.kafkaesqueesque.common.KafkaException: java.lang.String getMessage()>();
v[1] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String topic>;
v[2] = v;
v = staticinvoke <java.lang.Long: java.lang.Long valueOf(long)>(-1L);
v[3] = v;
v = staticinvoke <java.lang.String: java.lang.String format(java.lang.String,java.lang.Object[])>("Writer [%s] aborting Send. Caused by [%s]. Sending to topic [%s]. Record offset [%s];", v);
interfaceinvoke v.<org.slf4j.Logger: void error(java.lang.String)>(v);
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicReference sendExceptionRef>;
virtualinvoke v.<java.util.concurrent.atomic.AtomicReference: boolean compareAndSet(java.lang.Object,java.lang.Object)>(null, v);
goto label;
label:
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = newarray (java.lang.Object)[4];
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
v[0] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String topic>;
v[1] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic writeSemantic>;
v[2] = v;
v = virtualinvoke v.<org.apache.kafkaesqueesque.common.KafkaException: java.lang.String getMessage()>();
v[3] = v;
interfaceinvoke v.<org.slf4j.Logger: void error(java.lang.String,java.lang.Object[])>("WriterId [{}] lost record from Topic [{}], delivery Semantic [{}] -> ACTION=ABORT, ERROR caused by [{}]", v);
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicReference sendExceptionRef>;
virtualinvoke v.<java.util.concurrent.atomic.AtomicReference: boolean compareAndSet(java.lang.Object,java.lang.Object)>(null, v);
label:
return;
}
public void close(boolean) throws java.io.IOException
{
java.lang.Object[] v;
long v, v;
java.lang.Long v, v;
org.apache.kafkaesqueesque.clients.producer.KafkaProducer v, v, v;
java.time.Duration v;
java.lang.String v, v, v, v, v;
boolean v;
org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic v;
org.slf4j.Logger v, v, v, v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
java.util.concurrent.atomic.AtomicLong v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v := @parameter: boolean;
if v == 0 goto label;
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
interfaceinvoke v.<org.slf4j.Logger: void info(java.lang.String,java.lang.Object)>("Aborting is set to TRUE, Closing writerId [{}] without flush.", v);
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.KafkaProducer producer>;
v = staticinvoke <java.time.Duration: java.time.Duration ofMillis(long)>(0L);
virtualinvoke v.<org.apache.kafkaesqueesque.clients.producer.KafkaProducer: void close(java.time.Duration)>(v);
return;
label:
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
interfaceinvoke v.<org.slf4j.Logger: void info(java.lang.String,java.lang.Object)>("Flushing Kafka Producer with writerId [{}]", v);
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.KafkaProducer producer>;
virtualinvoke v.<org.apache.kafkaesqueesque.clients.producer.KafkaProducer: void flush()>();
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
interfaceinvoke v.<org.slf4j.Logger: void info(java.lang.String,java.lang.Object)>("Closing WriterId [{}]", v);
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.KafkaProducer producer>;
virtualinvoke v.<org.apache.kafkaesqueesque.clients.producer.KafkaProducer: void close()>();
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = newarray (java.lang.Object)[5];
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
v[0] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.hadoop.hive.kafka.KafkaOutputFormat$WriteSemantic writeSemantic>;
v[1] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String topic>;
v[2] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: long sentRecords>;
v = staticinvoke <java.lang.Long: java.lang.Long valueOf(long)>(v);
v[3] = v;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicLong lostRecords>;
v = virtualinvoke v.<java.util.concurrent.atomic.AtomicLong: long get()>();
v = staticinvoke <java.lang.Long: java.lang.Long valueOf(long)>(v);
v[4] = v;
interfaceinvoke v.<org.slf4j.Logger: void info(java.lang.String,java.lang.Object[])>("Closed WriterId [{}] Delivery semantic [{}], Topic[{}], Total sent Records [{}], Total Lost Records [{}]", v);
specialinvoke v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: void checkExceptions()>();
return;
}
java.lang.String getWriterId()
{
java.lang.String v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
return v;
}
long getLostRecords()
{
java.util.concurrent.atomic.AtomicLong v;
long v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicLong lostRecords>;
v = virtualinvoke v.<java.util.concurrent.atomic.AtomicLong: long get()>();
return v;
}
long getSentRecords()
{
long v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: long sentRecords>;
return v;
}
public void write(org.apache.hadoop.io.BytesWritable, org.apache.hadoop.hive.kafka.KafkaWritable) throws java.io.IOException
{
org.apache.hadoop.io.BytesWritable v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
org.apache.hadoop.hive.kafka.KafkaWritable v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v := @parameter: org.apache.hadoop.io.BytesWritable;
v := @parameter: org.apache.hadoop.hive.kafka.KafkaWritable;
virtualinvoke v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: void write(org.apache.hadoop.io.Writable)>(v);
return;
}
public void close(org.apache.hadoop.mapred.Reporter) throws java.io.IOException
{
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
org.apache.hadoop.mapred.Reporter v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v := @parameter: org.apache.hadoop.mapred.Reporter;
virtualinvoke v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: void close(boolean)>(0);
return;
}
private void checkExceptions() throws java.io.IOException
{
org.slf4j.Logger v;
java.io.IOException v;
java.util.concurrent.atomic.AtomicReference v, v;
org.apache.hadoop.hive.kafka.SimpleKafkaWriter v;
org.apache.kafkaesqueesque.clients.producer.KafkaProducer v;
java.time.Duration v;
java.lang.Object v, v;
java.lang.String v;
v := @this: org.apache.hadoop.hive.kafka.SimpleKafkaWriter;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicReference sendExceptionRef>;
v = virtualinvoke v.<java.util.concurrent.atomic.AtomicReference: java.lang.Object get()>();
if v == null goto label;
v = <org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG>;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.lang.String writerId>;
interfaceinvoke v.<org.slf4j.Logger: void error(java.lang.String,java.lang.Object)>("Send Exception Aborting write from writerId [{}]", v);
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.apache.kafkaesqueesque.clients.producer.KafkaProducer producer>;
v = staticinvoke <java.time.Duration: java.time.Duration ofMillis(long)>(0L);
virtualinvoke v.<org.apache.kafkaesqueesque.clients.producer.KafkaProducer: void close(java.time.Duration)>(v);
v = new java.io.IOException;
v = v.<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: java.util.concurrent.atomic.AtomicReference sendExceptionRef>;
v = virtualinvoke v.<java.util.concurrent.atomic.AtomicReference: java.lang.Object get()>();
specialinvoke v.<java.io.IOException: void <init>(java.lang.Throwable)>(v);
throw v;
label:
return;
}
static void <clinit>()
{
org.slf4j.Logger v;
v = staticinvoke <org.slf4j.LoggerFactory: org.slf4j.Logger getLogger(java.lang.Class)>(class "Lorg/apache/hadoop/hive/kafka/SimpleKafkaWriter;");
<org.apache.hadoop.hive.kafka.SimpleKafkaWriter: org.slf4j.Logger LOG> = v;
return;
}
}