/*
 * Decompiled with CFR 0.152.
 */
package com.twosigma.beakerx.widget;

import com.twosigma.beakerx.TryResult;
import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject;
import com.twosigma.beakerx.kernel.KernelFunctionality;
import com.twosigma.beakerx.kernel.KernelManager;
import com.twosigma.beakerx.kernel.PlainCode;
import com.twosigma.beakerx.kernel.msg.JupyterMessages;
import com.twosigma.beakerx.kernel.msg.StacktraceHtmlPrinter;
import com.twosigma.beakerx.message.Header;
import com.twosigma.beakerx.message.Message;
import com.twosigma.beakerx.widget.SparkConfiguration;
import com.twosigma.beakerx.widget.SparkEngine;
import com.twosigma.beakerx.widget.SparkUI;
import com.twosigma.beakerx.widget.SparkUIApi;
import com.twosigma.beakerx.widget.SparkVariable;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.collection.mutable.HashMap;

public class SparkEngineImpl
implements SparkEngine {
    private SparkSession.Builder sparkSessionBuilder;

    SparkEngineImpl(SparkSession.Builder sparkSessionBuilder) {
        this.sparkSessionBuilder = sparkSessionBuilder;
        this.configureSparkSessionBuilder(this.sparkSessionBuilder);
    }

    @Override
    public TryResult configure(KernelFunctionality kernel, SparkUIApi sparkUI, Message parentMessage) {
        SparkConf sparkConf = this.createSparkConf(sparkUI.getAdvancedOptions(), SparkEngineImpl.getSparkConfBasedOn(this.sparkSessionBuilder));
        sparkConf = this.configureSparkConf(sparkConf, sparkUI);
        this.sparkSessionBuilder = SparkSession.builder().config(sparkConf);
        TryResult sparkSessionTry = this.createSparkSession(sparkUI, parentMessage);
        if (sparkSessionTry.isError()) {
            return sparkSessionTry;
        }
        this.addListener(this.getOrCreate().sparkContext(), sparkUI);
        SparkVariable.putSparkSession(this.getOrCreate());
        TryResult tryResultSparkContext = this.initSparkContextInShell(kernel, parentMessage);
        if (!tryResultSparkContext.isError()) {
            kernel.registerCancelHook(SparkVariable::cancelAllJobs);
        }
        return tryResultSparkContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TryResult createSparkSession(SparkUIApi sparkUI, Message parentMessage) {
        sparkUI.startSpinner(parentMessage);
        try {
            SparkSession sparkSession = this.getOrCreate();
            TryResult.CellResult cellResult = TryResult.createResult((Object)sparkSession);
            return cellResult;
        }
        catch (Exception e) {
            TryResult.CellError cellError = TryResult.createError((String)this.formatError(e));
            return cellError;
        }
        finally {
            sparkUI.stopSpinner();
        }
    }

    private String formatError(Exception e) {
        CharSequence[] print = StacktraceHtmlPrinter.print((String[])((String[])Arrays.stream(e.getStackTrace()).map(StackTraceElement::toString).toArray(String[]::new)));
        return String.join((CharSequence)System.lineSeparator(), print);
    }

    @Override
    public SparkSession getOrCreate() {
        return this.sparkSessionBuilder.getOrCreate();
    }

    @Override
    public String getSparkAppId() {
        RuntimeConfig conf = this.getOrCreate().conf();
        return (String)conf.getAll().get((Object)"spark.app.id").get();
    }

    @Override
    public String getSparkUiWebUrl() {
        return (String)this.getOrCreate().sparkContext().uiWebUrl().get();
    }

    @Override
    public String getSparkMasterUrl() {
        RuntimeConfig conf = this.getOrCreate().conf();
        return (String)conf.getAll().get((Object)"spark.master").get();
    }

    @Override
    public String sparkVersion() {
        try {
            InputStream sparkProps = Thread.currentThread().getContextClassLoader().getResourceAsStream("spark-version-info.properties");
            Properties props = new Properties();
            props.load(sparkProps);
            return props.getProperty("version");
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private TryResult initSparkContextInShell(KernelFunctionality kernel, Message parent) {
        String addSc = String.format("import com.twosigma.beakerx.widget.SparkVariable\nval %s = SparkVariable.getSparkSession()\nval %s = %s.sparkContext\nimport org.apache.spark.SparkContext._\nimport %s.implicits._\nimport %s.sql\nimport org.apache.spark.sql.functions._\n", "spark", "sc", "spark", "spark", "spark");
        SimpleEvaluationObject seo = PlainCode.createSimpleEvaluationObject((String)addSc, (KernelFunctionality)kernel, (Message)new Message(new Header(JupyterMessages.COMM_MSG, parent.getHeader().getSession())), (int)1);
        return kernel.executeCode(addSc, seo);
    }

    private SparkConf createSparkConf(List<SparkConfiguration.Configuration> configurations, SparkConf old) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.extraListeners", old.get("spark.extraListeners"));
        sparkConf.set("beakerx.id", old.get("beakerx.id"));
        if (old.contains("spark.app.name")) {
            sparkConf.set("spark.app.name", old.get("spark.app.name"));
        }
        configurations.forEach(x -> {
            if (x.getName() != null) {
                sparkConf.set(x.getName(), x.getValue() != null ? x.getValue() : "");
            }
        });
        return sparkConf;
    }

    @Override
    public SparkConf getSparkConf() {
        return SparkEngineImpl.getSparkConfBasedOn(this.sparkSessionBuilder);
    }

    public static SparkConf getSparkConfBasedOn(SparkSession.Builder sparkSessionBuilder) {
        try {
            SparkConf sparkConf = new SparkConf();
            Field options = sparkSessionBuilder.getClass().getDeclaredField("org$apache$spark$sql$SparkSession$Builder$$options");
            options.setAccessible(true);
            for (Tuple2 x : (HashMap)options.get(sparkSessionBuilder)) {
                sparkConf.set((String)x._1, (String)x._2);
            }
            return sparkConf;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SparkSession.Builder configureSparkSessionBuilder(SparkSession.Builder builder) {
        builder.config("spark.extraListeners", "com.twosigma.beakerx.widget.StartStopSparkListener");
        builder.config("beakerx.id", UUID.randomUUID().toString());
        return builder;
    }

    private SparkConf configureSparkConf(SparkConf sparkConf, SparkUIApi sparkUI) {
        if (!sparkConf.contains("spark.app.name")) {
            sparkConf.setAppName("beaker_" + UUID.randomUUID().toString());
        }
        if (sparkUI.getMasterURL().getValue() != null && !((String)((Object)sparkUI.getMasterURL().getValue())).isEmpty()) {
            sparkConf.set("spark.master", (String)((Object)sparkUI.getMasterURL().getValue()));
        }
        if (!SparkEngineImpl.isLocalSpark(sparkConf)) {
            sparkConf.set("spark.repl.class.outputDir", KernelManager.get().getOutDir());
        }
        if (sparkUI.getExecutorMemory().getValue() != null && !((String)((Object)sparkUI.getExecutorMemory().getValue())).isEmpty()) {
            sparkConf.set("spark.executor.memory", (String)((Object)sparkUI.getExecutorMemory().getValue()));
        }
        if (sparkUI.getExecutorCores().getValue() != null && !((String)((Object)sparkUI.getExecutorCores().getValue())).isEmpty()) {
            sparkConf.set("spark.executor.cores", (String)((Object)sparkUI.getExecutorCores().getValue()));
        }
        return sparkConf;
    }

    private SparkContext addListener(SparkContext sc, final SparkUIApi sparkUIManager) {
        sc.addSparkListener((SparkListenerInterface)new SparkListener(){

            public void onJobStart(SparkListenerJobStart jobStart) {
                super.onJobStart(jobStart);
            }

            public void onJobEnd(SparkListenerJobEnd jobEnd) {
                super.onJobEnd(jobEnd);
            }

            public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
                super.onStageSubmitted(stageSubmitted);
                sparkUIManager.startStage(stageSubmitted.stageInfo().stageId(), stageSubmitted.stageInfo().numTasks());
            }

            public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
                super.onStageCompleted(stageCompleted);
                sparkUIManager.endStage(stageCompleted.stageInfo().stageId());
            }

            public void onTaskStart(SparkListenerTaskStart taskStart) {
                super.onTaskStart(taskStart);
                sparkUIManager.taskStart(taskStart.stageId(), taskStart.taskInfo().taskId());
            }

            public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
                super.onTaskEnd(taskEnd);
                if (taskEnd.reason().toString().equals("Success")) {
                    sparkUIManager.taskEnd(taskEnd.stageId(), taskEnd.taskInfo().taskId());
                }
            }
        });
        return sc;
    }

    @Override
    public Map<String, String> getAdvanceSettings() {
        return Arrays.stream(this.getSparkConf().getAll()).filter(x -> this.isAdvancedSettings((String)x._1)).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
    }

    private boolean isAdvancedSettings(String name) {
        return !SparkUI.STANDARD_SETTINGS.contains(name);
    }

    private static boolean isLocalSpark(SparkConf sparkConf) {
        return sparkConf.contains("spark.master") && sparkConf.get("spark.master") != null && sparkConf.get("spark.master").startsWith("local");
    }

    public static class SparkEngineFactoryImpl
    implements SparkEngine.SparkEngineFactory {
        @Override
        public SparkEngine create(SparkSession.Builder sparkSessionBuilder) {
            return new SparkEngineImpl(sparkSessionBuilder);
        }
    }
}

