package com.hcl.products.test.it.kafka;

import com.ghc.a3.a3core.A3Message;
import com.ghc.a3.a3core.CallingContext;
import com.ghc.a3.a3core.DefaultTransport;
import com.ghc.a3.a3core.MessageFormatter;
import com.ghc.a3.a3core.SecurityContext;
import com.ghc.a3.a3core.Transport;
import com.ghc.a3.a3core.TransportContext;
import com.ghc.a3.a3core.TransportListener;
import com.ghc.a3.a3utils.TransportAuthenticationManager;
import com.ghc.config.Config;
import com.ghc.config.ConfigException;
import com.ghc.eventmonitor.DirectionType;
import com.ghc.eventmonitor.EventMonitorException;
import com.ghc.eventmonitor.MonitorEventListener;
import com.ghc.eventmonitor.MonitorableEventSource;
import com.ghc.identity.AuthenticationManager;
import com.ghc.passthrough.PassThroughBehaviour;
import com.ghc.passthrough.PassThroughProperties;
import com.ghc.passthrough.Supervisor;
import com.ghc.ssl.SSLRegistry;
import com.ghc.ssl.SSLUtils;
import com.ghc.ssl.SslSettings;
import com.ghc.ssl.SslSettingsUtils;
import com.ghc.ssl.SslSettingsValidation;
import com.ghc.utils.PairValue;
import com.ghc.utils.StringUtils;
import com.ghc.utils.Wait;
import com.ghc.utils.throwable.GHException;
import com.hcl.products.test.it.kafka.nls.GHMessages;
import com.hcl.products.test.it.kafka.recording.KafkaRecorder;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.SSLContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.commons.lang.ArrayUtils;
import org.apache.kafka.common.PartitionInfo;

/* loaded from: input_file:com/hcl/products/test/it/kafka/KafkaTransport.class */
public class KafkaTransport extends DefaultTransport implements TransportAuthenticationManager, MonitorableEventSource, Supervisor {
    static final Logger log = Logger.getLogger(KafkaTransport.class.getName());
    private AuthenticationManager authManager;
    private Config transportConfiguration;
    private KafkaCamelConnectionHelper camelContext = null;
    private KafkaStubServer server = null;
    private KafkaRecorder recorderSupport = new KafkaRecorder();
    private KafkaCamelURIHelper uriHelper = null;
    private KafkaTransportContext transportContext = null;
    private static volatile /* synthetic */ int[] $SWITCH_TABLE$com$ghc$passthrough$PassThroughBehaviour;

    public KafkaTransport(Config config) throws ConfigException {
        restoreState(config);
    }

    public void restoreState(Config config) throws ConfigException {
        delete();
        this.transportConfiguration = config;
        this.uriHelper = new KafkaCamelURIHelper(config);
    }

    public Config saveState(Config config) {
        if (this.transportConfiguration == null) {
            throw new IllegalStateException("Kafka transport created without valid configuration.");
        }
        Config saveState = super.saveState(config);
        this.transportConfiguration.copyTo(saveState);
        return saveState;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v15, types: [com.ghc.ssl.SSLRegistry] */
    /* JADX WARN: Type inference failed for: r0v16 */
    /* JADX WARN: Type inference failed for: r0v3, types: [com.hcl.products.test.it.kafka.KafkaCamelConnectionHelper] */
    public boolean isAvailable() {
        ?? r0 = this;
        synchronized (r0) {
            r0 = this.camelContext;
            if (r0 == 0) {
                try {
                    this.camelContext = new KafkaCamelConnectionHelper();
                    SSLRegistry.INSTANCE.registerConnection(getHost(), getPort(), SslSettings.fromConfig(this.transportConfiguration), getID());
                    if (!StringUtils.isEmptyOrNull(getProxyHost())) {
                        r0 = SSLRegistry.INSTANCE;
                        r0.registerConnection(getProxyHost(), getProxyPort(), SslSettings.fromConfig(this.transportConfiguration), getID());
                    }
                } catch (Exception e) {
                    this.camelContext = null;
                    String str = String.valueOf(GHMessages.KafkaTransport_camelNotStarted) + e.getMessage();
                    setAvailabilityError(str);
                    log.log(Level.SEVERE, str, (Throwable) e);
                }
            }
            r0 = r0;
            return this.camelContext != null;
        }
    }

    public void delete() {
        try {
            if (this.server != null) {
                this.server.dispose();
            }
        } catch (Exception e) {
            log.log(Level.SEVERE, GHMessages.KafkaTransport_FailedToStopServer, (Throwable) e);
        } finally {
            this.server = null;
        }
        try {
            if (this.transportContext != null) {
                this.transportContext.stop();
            }
            if (this.camelContext != null) {
                this.camelContext.dispose();
            }
        } catch (Exception e2) {
            log.log(Level.SEVERE, GHMessages.KafkaTransport_FailedToStopCamelContext, (Throwable) e2);
        } finally {
            this.camelContext = null;
        }
        super.delete();
    }

    public String getDescription() {
        return KafkaTransportTemplate.getDescription(this.transportConfiguration);
    }

    public boolean testTransport(StringBuilder sb) {
        boolean z;
        boolean z2 = false;
        try {
            if (isAvailable()) {
                z2 = this.camelContext.testTransport(this.transportConfiguration, sb);
            } else {
                sb.append(getAvailabilityError());
            }
            SslSettingsValidation validate = SslSettingsValidation.validate(getAuthenticationManager(), SslSettings.fromConfig(this.transportConfiguration));
            validate.reportServerAndClientMessages(sb);
            boolean z3 = validate.clientSettingsValid() && validate.serverSettingsValid();
            String string = this.transportConfiguration.getString(RITKafkaConstants.CONFIG_PROXY_HOST, (String) null);
            String string2 = this.transportConfiguration.getString(RITKafkaConstants.CONFIG_PROXY_PORT, (String) null);
            if (!z3) {
                if (StringUtils.isEmptyOrNull(string) && StringUtils.isEmptyOrNull(string2)) {
                    SSLUtils.carryOutDiagnostics(this.transportConfiguration.getString(RITKafkaConstants.CONFIG_HOST), this.transportConfiguration.getString(RITKafkaConstants.CONFIG_PORT), SslSettings.fromConfig(this.transportConfiguration), sb);
                } else {
                    SSLUtils.carryOutDiagnostics(string, string2, SslSettings.fromConfig(this.transportConfiguration), sb);
                }
            }
            z = z2 && z3;
        } catch (Throwable th) {
            sb.append(GHMessages.KafkaTransport_kafkaTransportConnectionFailed).append(th.getMessage()).append(System.lineSeparator());
            if (th.getCause() != null) {
                sb.append(GHMessages.KafkaTransport_causedBby).append(th.getCause().getMessage()).append('\n');
            }
            log.log(Level.SEVERE, GHMessages.KafkaTransport_kafkaTransportConnectionFailed, th);
            z = false;
        } finally {
            delete();
        }
        return z;
    }

    public boolean publish(CallingContext callingContext, A3Message a3Message, MessageFormatter messageFormatter, A3Message a3Message2) throws GHException {
        return publish(callingContext, a3Message, false);
    }

    private boolean publish(CallingContext callingContext, A3Message a3Message, boolean z) throws GHException {
        Map<String, List<PartitionInfo>> map = this.uriHelper.topicDetails(this.transportConfiguration);
        if (map == null || map.isEmpty()) {
            throw new GHException(GHMessages.KafkaTransport_kafkaTransportConnectionFailed);
        }
        try {
            PairValue<String, Map<String, Object>> forProducer = this.uriHelper.forProducer(a3Message, z);
            byte[] bytesFromMessageBody = KafkaFormatter.getBytesFromMessageBody(a3Message.getBody(), "value");
            if (bytesFromMessageBody == null || ArrayUtils.isEmpty(bytesFromMessageBody)) {
                throw new GHException(GHMessages.KafkaTransport_ValueIsEmpty);
            }
            this.transportContext.getProducerTemplate(this.camelContext).sendBodyAndHeaders((String) forProducer.getFirst(), bytesFromMessageBody, (Map) forProducer.getSecond());
            log.log(Level.FINE, GHMessages.KafkaTransport_bodySent);
            return true;
        } catch (Throwable th) {
            log.log(Level.SEVERE, GHMessages.KafkaTransport_camelKafkaActionFailed, th);
            throw new GHException(String.valueOf(GHMessages.KafkaTransport_camelKafkaActionFailed) + th.getMessage(), th);
        }
    }

    public A3Message receive(CallingContext callingContext, Config config, MessageFormatter messageFormatter, Wait wait) throws GHException {
        String string = config.getString(RITKafkaConstants.CONFIG_TOPIC, (String) null);
        String string2 = config.getString(RITKafkaConstants.GROUPID);
        if (string2 == null || string2.trim().length() == 0) {
            string2 = config.getString(RITKafkaConstants.CONFIG_CLIENTID, RITKafkaConstants.DEFAULT_CLIENT_NAME);
        }
        String string3 = config.getString(RITKafkaConstants.AUTO_OFFSET_RESET, (String) null);
        ConsumerTemplate consumerTemplate = null;
        Exchange exchange = null;
        try {
            try {
                consumerTemplate = this.transportContext.getConsumerTemplate(this.camelContext);
                exchange = consumerTemplate.receive(this.uriHelper.forConsumer(string, string2, string3, false), (wait == null || wait.isForever()) ? Long.MAX_VALUE : wait.duration());
                if (exchange != null) {
                    A3Message fromExchange = KafkaA3MessageBuilder.fromExchange(exchange);
                    if (consumerTemplate != null && exchange != null) {
                        consumerTemplate.doneUoW(exchange);
                    }
                    return fromExchange;
                }
                if (consumerTemplate == null || exchange == null) {
                    return null;
                }
                consumerTemplate.doneUoW(exchange);
                return null;
            } catch (InterruptedException unused) {
                Thread.currentThread().interrupt();
                if (consumerTemplate == null || exchange == null) {
                    return null;
                }
                consumerTemplate.doneUoW(exchange);
                return null;
            } catch (Exception e) {
                log.log(Level.SEVERE, GHMessages.KafkaTransport_camelKafkaActionFailed, (Throwable) e);
                throw new GHException(String.valueOf(GHMessages.KafkaTransport_camelKafkaActionFailed) + e.getMessage(), e);
            }
        } catch (Throwable th) {
            if (consumerTemplate != null && exchange != null) {
                consumerTemplate.doneUoW(exchange);
            }
            throw th;
        }
    }

    public Supervisor getSupervisor() {
        return this;
    }

    public TransportContext createTransportContext(TransportContext.Mode mode) throws GHException {
        if (this.transportContext == null) {
            this.transportContext = new KafkaTransportContext(mode);
        }
        return this.transportContext;
    }

    public boolean hasQueueSemantics(TransportContext transportContext, Config config) {
        if (!(transportContext instanceof KafkaTransportContext) || ((KafkaTransportContext) transportContext).getMode() == TransportContext.Mode.MESSAGING) {
            return true;
        }
        return StubbingType.DIRECT.equals(this.transportConfiguration.getEnum(StubbingType.class, RITKafkaConstants.KAFKA_CONFIG_STUB_MODE, StubbingType.DIRECT));
    }

    public AuthenticationManager getAuthenticationManager() {
        return this.authManager;
    }

    public void setAuthenticationManager(AuthenticationManager authenticationManager) {
        this.authManager = authenticationManager;
    }

    public String getMonitorSourceType() {
        return RITKafkaConstants.MONITORABLE_SOURCE_TYPE;
    }

    public void addMonitor(String str, Config config, MonitorEventListener monitorEventListener, DirectionType directionType, SecurityContext securityContext) throws EventMonitorException {
        if (!isAvailable()) {
            throw new EventMonitorException(str, MessageFormat.format(GHMessages.KafkaTransport_transportNotAvailable, getAvailabilityError()), (Throwable) null);
        }
        if (StringUtils.isEmptyOrNull(config.getString(RITKafkaConstants.CONFIG_TOPIC)) && StringUtils.isEmptyOrNull(config.getString(RITKafkaConstants.SUBSCRIBE_TOPIC)) && StringUtils.isEmptyOrNull(config.getString(RITKafkaConstants.PUBLISH_TOPIC))) {
            throw new EventMonitorException(GHMessages.KafkaTransport_noTopicSpecified);
        }
        try {
            if (getStubMode().equals("DIRECT")) {
                this.recorderSupport.startRecording(str, config, this.transportConfiguration, monitorEventListener, this.camelContext);
            }
        } catch (Exception e) {
            throw new EventMonitorException(str, GHMessages.KafkaTransport_couldNotStartMonitorKafka, e);
        }
    }

    public boolean removeMonitor(String str) throws EventMonitorException {
        try {
            return this.recorderSupport.stopRecording(str, this.camelContext);
        } catch (Exception e) {
            throw new EventMonitorException(e.getMessage());
        }
    }

    public String getHost() {
        return this.transportConfiguration.getString(RITKafkaConstants.CONFIG_HOST, (String) null);
    }

    public int getPort() {
        return getValidPort(this.transportConfiguration.getString(RITKafkaConstants.CONFIG_PORT));
    }

    public String getProxyHost() {
        return this.transportConfiguration.getString(RITKafkaConstants.CONFIG_PROXY_HOST, (String) null);
    }

    public int getProxyPort() {
        return getValidPort(this.transportConfiguration.getString(RITKafkaConstants.CONFIG_PROXY_PORT));
    }

    public String getTopic() {
        return this.transportConfiguration.getString(RITKafkaConstants.CONFIG_TOPIC, (String) null);
    }

    public int getStubPort() {
        return getValidPort(this.transportConfiguration.getString(RITKafkaConstants.KAFKA_CONFIG_STUB_PORT));
    }

    public String getStubMode() {
        return this.transportConfiguration.getString(RITKafkaConstants.KAFKA_CONFIG_STUB_MODE, (String) null);
    }

    private int getValidPort(String str) {
        if (StringUtils.isEmptyOrNull(str)) {
            return 0;
        }
        return Integer.valueOf(str).intValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getStubServerPort() {
        try {
            ensureServerStarted();
            return this.server.getPort();
        } catch (GHException e) {
            log.log(Level.WARNING, "Error starting stub server: " + e.getMessage());
            return -1;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void ensureServerStarted() throws GHException {
        synchronized (this) {
            try {
                if (this.server == null) {
                    SslSettings fromConfig = SslSettings.fromConfig(this.transportConfiguration);
                    int transportPortOverride = getTransportPortOverride(this.transportConfiguration.getInt(RITKafkaConstants.KAFKA_CONFIG_STUB_PORT, 0));
                    if (fromConfig.isUseSsl()) {
                        SSLContext createServerContext = SslSettingsUtils.createServerContext(this.authManager, fromConfig);
                        this.server = new KafkaStubServer(transportPortOverride, null, getID(), fromConfig);
                        this.server.open(createServerContext.getServerSocketFactory());
                    } else {
                        this.server = new KafkaStubServer(transportPortOverride, null, getID());
                        this.server.open(null);
                    }
                }
            } catch (Exception e) {
                throw new GHException("Unable to start stub server on configured port: " + e.getMessage());
            }
        }
    }

    public void addRequestMessageListener(CallingContext callingContext, TransportListener transportListener, Config config, MessageFormatter messageFormatter) throws GHException {
        if (!supportsPassThrough().equals(Transport.PassThroughConfiguration.PASS_THROUGH_ENABLED)) {
            super.addRequestMessageListener(callingContext, transportListener, config, messageFormatter);
            return;
        }
        String string = config.getString(RITKafkaConstants.CONFIG_TOPIC);
        if (StringUtils.isBlankOrNull(string)) {
            throw new GHException(GHMessages.KafkaTransport_noTopicSpecified);
        }
        ensureServerStarted();
        this.server.addListener(transportListener, string);
    }

    public void removeRequestMessageListener(CallingContext callingContext, TransportListener transportListener) throws GHException {
        if (supportsPassThrough().equals(Transport.PassThroughConfiguration.PASS_THROUGH_ENABLED)) {
            this.server.removeListener(transportListener);
        } else {
            super.removeMessageListener(callingContext, transportListener);
        }
    }

    public Transport.PassThroughConfiguration supportsPassThrough() throws GHException {
        return StubbingType.PROXY.equals(this.transportConfiguration.getEnum(StubbingType.class, RITKafkaConstants.KAFKA_CONFIG_STUB_MODE, StubbingType.DIRECT)) ? Transport.PassThroughConfiguration.PASS_THROUGH_ENABLED : Transport.PassThroughConfiguration.PASS_THROUGH_DISABLED;
    }

    public void passThrough(CallingContext callingContext, A3Message a3Message, MessageFormatter messageFormatter, PassThroughProperties passThroughProperties) throws GHException {
        try {
            switch ($SWITCH_TABLE$com$ghc$passthrough$PassThroughBehaviour()[passThroughProperties.getBehaviour().ordinal()]) {
                case 1:
                    return;
                case 2:
                    publish(callingContext, a3Message, true);
                    return;
                default:
                    throw new GHException("Not supported");
            }
        } catch (Exception e) {
            throw new GHException(MessageFormat.format("Unable to pass message through! : {0}", e.getLocalizedMessage()));
        }
    }

    static /* synthetic */ int[] $SWITCH_TABLE$com$ghc$passthrough$PassThroughBehaviour() {
        int[] iArr = $SWITCH_TABLE$com$ghc$passthrough$PassThroughBehaviour;
        if (iArr != null) {
            return iArr;
        }
        int[] iArr2 = new int[PassThroughBehaviour.values().length];
        try {
            iArr2[PassThroughBehaviour.DELAY.ordinal()] = 2;
        } catch (NoSuchFieldError unused) {
        }
        try {
            iArr2[PassThroughBehaviour.DISCARD.ordinal()] = 1;
        } catch (NoSuchFieldError unused2) {
        }
        try {
            iArr2[PassThroughBehaviour.SIMULATE_ERROR.ordinal()] = 3;
        } catch (NoSuchFieldError unused3) {
        }
        $SWITCH_TABLE$com$ghc$passthrough$PassThroughBehaviour = iArr2;
        return iArr2;
    }
}
