/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.sasl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.sasl.SaslEncryption;
import org.apache.spark.network.sasl.SaslMessage;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.sasl.SparkSaslServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslRpcHandler
extends RpcHandler {
    private static final Logger logger = LoggerFactory.getLogger(SaslRpcHandler.class);
    private final TransportConf conf;
    private final Channel channel;
    private final RpcHandler delegate;
    private final SecretKeyHolder secretKeyHolder;
    private SparkSaslServer saslServer;
    private boolean isComplete;
    private boolean isAuthenticated;

    public SaslRpcHandler(TransportConf conf, Channel channel, RpcHandler delegate, SecretKeyHolder secretKeyHolder) {
        this.conf = conf;
        this.channel = channel;
        this.delegate = delegate;
        this.secretKeyHolder = secretKeyHolder;
        this.saslServer = null;
        this.isComplete = false;
        this.isAuthenticated = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        if (this.isComplete) {
            this.delegate.receive(client, message, callback);
            return;
        }
        if (this.saslServer == null || !this.saslServer.isComplete()) {
            byte[] response;
            SaslMessage saslMessage;
            ByteBuf nettyBuf = Unpooled.wrappedBuffer((ByteBuffer)message);
            try {
                saslMessage = SaslMessage.decode(nettyBuf);
            }
            finally {
                nettyBuf.release();
            }
            if (this.saslServer == null) {
                client.setClientId(saslMessage.appId);
                this.saslServer = new SparkSaslServer(saslMessage.appId, this.secretKeyHolder, this.conf.saslServerAlwaysEncrypt());
            }
            try {
                response = this.saslServer.response(JavaUtils.bufferToArray(saslMessage.body().nioByteBuffer()));
            }
            catch (IOException ioe) {
                throw new RuntimeException(ioe);
            }
            callback.onSuccess(ByteBuffer.wrap(response));
        }
        if (this.saslServer.isComplete()) {
            if (!"auth-conf".equals(this.saslServer.getNegotiatedProperty("javax.security.sasl.qop"))) {
                logger.debug("SASL authentication successful for channel {}", (Object)client);
                this.complete(true);
                return;
            }
            logger.debug("Enabling encryption for channel {}", (Object)client);
            SaslEncryption.addToChannel(this.channel, this.saslServer, this.conf.maxSaslEncryptedBlockSize());
            this.complete(false);
            return;
        }
    }

    @Override
    public void receive(TransportClient client, ByteBuffer message) {
        this.delegate.receive(client, message);
    }

    @Override
    public StreamCallbackWithID receiveStream(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        return this.delegate.receiveStream(client, message, callback);
    }

    @Override
    public StreamManager getStreamManager() {
        return this.delegate.getStreamManager();
    }

    @Override
    public void channelActive(TransportClient client) {
        this.delegate.channelActive(client);
    }

    @Override
    public void channelInactive(TransportClient client) {
        try {
            this.delegate.channelInactive(client);
        }
        finally {
            if (this.saslServer != null) {
                this.saslServer.dispose();
            }
        }
    }

    @Override
    public void exceptionCaught(Throwable cause, TransportClient client) {
        this.delegate.exceptionCaught(cause, client);
    }

    private void complete(boolean dispose) {
        if (dispose) {
            try {
                this.saslServer.dispose();
            }
            catch (RuntimeException e) {
                logger.error("Error while disposing SASL server", (Throwable)e);
            }
        }
        this.saslServer = null;
        this.isComplete = true;
    }
}

