/*
 * Decompiled with CFR 0.152.
 */
package com.twosigma.beakerx.widget;

import com.twosigma.beakerx.TryResult;
import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject;
import com.twosigma.beakerx.kernel.KernelFunctionality;
import com.twosigma.beakerx.kernel.KernelManager;
import com.twosigma.beakerx.kernel.PlainCode;
import com.twosigma.beakerx.message.Message;
import com.twosigma.beakerx.widget.Button;
import com.twosigma.beakerx.widget.DOMWidget;
import com.twosigma.beakerx.widget.HBox;
import com.twosigma.beakerx.widget.HTML;
import com.twosigma.beakerx.widget.Label;
import com.twosigma.beakerx.widget.SparkStateProgress;
import com.twosigma.beakerx.widget.SparkUI;
import com.twosigma.beakerx.widget.SparkVariable;
import com.twosigma.beakerx.widget.Text;
import com.twosigma.beakerx.widget.VBox;
import com.twosigma.beakerx.widget.Widget;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

public class SparkContextManager {
    public static final String SPARK_MASTER = "spark.master";
    public static final String SPARK_EXECUTOR_MEMORY = "spark.executor.memory";
    public static final String SPARK_APP_NAME = "spark.app.name";
    public static final String SPARK_CORES_MAX = "spark.cores.max";
    public static final String SPARK_EXECUTOR_CORES = "spark.executor.cores";
    private final SparkUI sparkUI;
    private Map<Integer, SparkStateProgress> progressBars = new HashMap<Integer, SparkStateProgress>();
    private VBox jobPanel = null;
    private HBox statusPanel;
    private VBox sparkView;
    private Text masterURL;
    private Text executorMemory;
    private Text sparkSessionAlias;
    private Text executorCores;
    private SparkSession.Builder sparkSessionBuilder;

    public SparkContextManager(SparkUI sparkUI, SparkSession.Builder sparkSessionBuilder) {
        this.sparkUI = sparkUI;
        this.sparkSessionBuilder = sparkSessionBuilder;
        SparkVariable.putSparkContextManager(this.getSparkConf(), this);
        this.createSparkView();
    }

    private SparkConf createSparkConf() {
        SparkConf sparkConf = new SparkConf();
        try {
            Field options = this.sparkSessionBuilder.getClass().getDeclaredField("org$apache$spark$sql$SparkSession$Builder$$options");
            options.setAccessible(true);
            for (Tuple2 x : (scala.collection.mutable.HashMap)options.get(this.sparkSessionBuilder)) {
                sparkConf.set((String)x._1, (String)x._2);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return sparkConf;
    }

    private SparkConf getSparkConf() {
        return this.createSparkConf();
    }

    private SparkContext sparkContext() {
        return this.getSparkSession().sparkContext();
    }

    private void createSparkView() {
        this.masterURL = this.createMasterURL();
        this.executorMemory = this.createExecutorMemory();
        this.executorCores = this.createExecutorCores();
        this.sparkSessionAlias = this.sparkSessionAlias();
        Button connect = this.createConnectButton();
        ArrayList<Object> children = new ArrayList<Object>();
        children.add(this.masterURL);
        children.add(this.executorCores);
        children.add(this.executorMemory);
        children.add(this.sparkSessionAlias);
        children.add(connect);
        VBox vBox = new VBox(children);
        this.sparkUI.add((Widget)vBox);
        this.sparkView = vBox;
    }

    private Text createExecutorCores() {
        Text cores = new Text();
        cores.setDescription((Object)"Executor cores");
        if (this.getSparkConf().contains(SPARK_EXECUTOR_CORES)) {
            cores.setValue((Object)this.getSparkConf().get(SPARK_EXECUTOR_CORES));
        } else {
            cores.setValue((Object)"10");
        }
        return cores;
    }

    private Text sparkSessionAlias() {
        Text alias = new Text();
        alias.setDescription((Object)"SparkSession alias");
        alias.setValue((Object)"spark");
        return alias;
    }

    private Text createExecutorMemory() {
        Text masterURL = new Text();
        masterURL.setDescription((Object)"Executor Memory");
        if (this.getSparkConf().contains(SPARK_EXECUTOR_MEMORY)) {
            masterURL.setValue((Object)this.getSparkConf().get(SPARK_EXECUTOR_MEMORY));
        } else {
            masterURL.setValue((Object)"8g");
        }
        return masterURL;
    }

    private Text createMasterURL() {
        Text masterURL = new Text();
        masterURL.setDescription((Object)"Master URL");
        if (this.getSparkConf().contains(SPARK_MASTER)) {
            masterURL.setValue((Object)this.getSparkConf().get(SPARK_MASTER));
        }
        return masterURL;
    }

    private Button createConnectButton() {
        Button connect = new Button();
        connect.setDescription((Object)"Connect");
        connect.registerOnClick((content, message) -> this.initSparkContext(message));
        return connect;
    }

    private void initSparkContext(Message parentMessage) {
        KernelFunctionality kernel = KernelManager.get();
        try {
            SparkConf sparkConf = this.configureSparkConf(this.getSparkConf());
            this.sparkSessionBuilder.config(sparkConf);
            SparkSession sparkSession = this.getSparkSession();
            this.addListener(this.sparkContext());
            SparkVariable.putSparkContext(this.sparkContext());
            SparkVariable.putSparkSession(sparkSession);
            TryResult tryResultSparkContext = this.initSparkContextInShell(kernel);
            if (tryResultSparkContext.isError()) {
                this.sendError(parentMessage, kernel, tryResultSparkContext.error());
            }
            kernel.registerCancelHook(SparkVariable::cancelAllJobs);
        }
        catch (Exception e) {
            this.sendError(parentMessage, kernel, e.getMessage());
        }
    }

    private SparkSession getSparkSession() {
        return this.sparkSessionBuilder.getOrCreate();
    }

    private TryResult initSparkContextInShell(KernelFunctionality kernel) {
        if (this.sparkSessionAlias.getValue() == null || ((String)((Object)this.sparkSessionAlias.getValue())).isEmpty()) {
            throw new RuntimeException("SparkContext alias can not be empty");
        }
        String sessionName = (String)((Object)this.sparkSessionAlias.getValue());
        String addSc = String.format("import com.twosigma.beakerx.widget.SparkVariable\nval %s = SparkVariable.getSparkSession()\nimport org.apache.spark.SparkContext._\nimport %s.implicits._\nimport %s.sql\nimport org.apache.spark.sql.functions._\n", sessionName, sessionName, sessionName);
        SimpleEvaluationObject seo = PlainCode.createSimpleEvaluationObject((String)addSc, (KernelFunctionality)kernel, (Message)new Message(), (int)1);
        return kernel.executeCode(addSc, seo);
    }

    private void sendError(Message parentMessage, KernelFunctionality kernel, String message) {
        SimpleEvaluationObject seo = PlainCode.createSimpleEvaluationObject((String)"", (KernelFunctionality)kernel, (Message)parentMessage, (int)1);
        seo.error((Object)message);
    }

    private SparkConf configureSparkConf(SparkConf sparkConf) {
        if (!sparkConf.contains(SPARK_APP_NAME)) {
            sparkConf.setAppName("beaker_" + UUID.randomUUID().toString());
        }
        if (this.masterURL.getValue() != null && !((String)((Object)this.masterURL.getValue())).isEmpty()) {
            sparkConf.set(SPARK_MASTER, (String)((Object)this.masterURL.getValue()));
        }
        if (!SparkContextManager.isLocalSpark(sparkConf)) {
            sparkConf.set("spark.repl.class.outputDir", KernelManager.get().getOutDir());
        }
        if (this.executorMemory.getValue() != null && !((String)((Object)this.executorMemory.getValue())).isEmpty()) {
            sparkConf.set(SPARK_EXECUTOR_MEMORY, (String)((Object)this.executorMemory.getValue()));
        }
        if (this.executorCores.getValue() != null && !((String)((Object)this.executorCores.getValue())).isEmpty()) {
            sparkConf.set(SPARK_EXECUTOR_CORES, (String)((Object)this.executorCores.getValue()));
        }
        if (!sparkConf.contains(SPARK_CORES_MAX)) {
            sparkConf.set(SPARK_CORES_MAX, "100");
        }
        return sparkConf;
    }

    private static boolean isLocalSpark(SparkConf sparkConf) {
        return sparkConf.contains(SPARK_MASTER) && sparkConf.get(SPARK_MASTER) != null && sparkConf.get(SPARK_MASTER).startsWith("local");
    }

    private SparkContext addListener(SparkContext sc) {
        sc.addSparkListener((SparkListenerInterface)new SparkListener(){

            public void onJobStart(SparkListenerJobStart jobStart) {
                super.onJobStart(jobStart);
            }

            public void onJobEnd(SparkListenerJobEnd jobEnd) {
                super.onJobEnd(jobEnd);
            }

            public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
                super.onStageSubmitted(stageSubmitted);
                SparkContextManager.this.startStage(stageSubmitted.stageInfo().stageId(), stageSubmitted.stageInfo().numTasks());
            }

            public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
                super.onStageCompleted(stageCompleted);
                SparkContextManager.this.endStage(stageCompleted.stageInfo().stageId());
            }

            public void onTaskStart(SparkListenerTaskStart taskStart) {
                super.onTaskStart(taskStart);
                SparkContextManager.this.taskStart(taskStart.stageId(), taskStart.taskInfo().taskId());
            }

            public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
                super.onTaskEnd(taskEnd);
                if (taskEnd.reason().toString().equals("Success")) {
                    SparkContextManager.this.taskEnd(taskEnd.stageId(), taskEnd.taskInfo().taskId());
                }
            }
        });
        return sc;
    }

    public void applicationStart() {
        this.sparkUI.removeDOMWidget((DOMWidget)this.sparkView);
        this.sparkView = null;
        this.statusPanel = this.createStatusPanel();
    }

    public void applicationEnd() {
        if (this.statusPanel != null) {
            this.sparkUI.removeDOMWidget((DOMWidget)this.statusPanel);
            this.statusPanel = null;
            this.createSparkView();
        }
    }

    private HBox createStatusPanel() {
        Label appStatus = this.createAppStatus();
        Button disconnect = this.createDisconnectButton();
        HBox statusPanel = new HBox(Arrays.asList(this.uiLink(), disconnect, appStatus));
        this.sparkUI.add((Widget)statusPanel);
        return statusPanel;
    }

    private Label createAppStatus() {
        Label appStatus = new Label();
        appStatus.setValue((Object)("Connected to " + this.getSparkConf().get(SPARK_MASTER)));
        return appStatus;
    }

    private Button createDisconnectButton() {
        Button disconnect = new Button();
        disconnect.registerOnClick((content, message) -> this.sparkContext().stop());
        disconnect.setDescription((Object)"Disconnect");
        return disconnect;
    }

    private void startStage(int stageId, int numTasks) {
        SparkStateProgress intProgress = new SparkStateProgress(numTasks, stageId, stageId, this.jobLink(stageId), this.stageLink(stageId));
        intProgress.init();
        if (this.jobPanel != null) {
            this.jobPanel.getLayout().setDisplayNone();
            this.jobPanel.close();
        }
        this.jobPanel = new VBox(new ArrayList());
        this.jobPanel.add((Widget)intProgress);
        this.jobPanel.display();
        this.progressBars.put(stageId, intProgress);
    }

    private void endStage(int stageId) {
        SparkStateProgress sparkStateProgress = this.progressBars.get(stageId);
        sparkStateProgress.hide();
    }

    private void taskStart(int stageId, long taskId) {
        SparkStateProgress intProgress = this.progressBars.get(stageId);
        intProgress.addActive();
    }

    private void taskEnd(int stageId, long taskId) {
        SparkStateProgress intProgress = this.progressBars.get(stageId);
        intProgress.addDone();
    }

    private HTML uiLink() {
        if (this.sparkContext().uiWebUrl().isDefined()) {
            HTML html = new HTML();
            html.setValue((Object)("<a target=\"_blank\" href=\"" + (String)this.sparkContext().uiWebUrl().get() + "\">Spark UI</a>"));
            return html;
        }
        HTML html = new HTML();
        html.setValue((Object)"<a target=\"_blank\" href=\"\">Spark UI </a>");
        return html;
    }

    private String stageLink(int stageId) {
        if (this.sparkContext().uiWebUrl().isDefined()) {
            return (String)this.sparkContext().uiWebUrl().get() + "/stages/stage/?id=" + stageId + "&attempt=0";
        }
        return "";
    }

    private String jobLink(int jobId) {
        if (this.sparkContext().uiWebUrl().isDefined()) {
            return (String)this.sparkContext().uiWebUrl().get() + "/jobs/job/?id=" + jobId;
        }
        return "";
    }

    public void cancelAllJobs() {
        this.sparkContext().cancelAllJobs();
    }
}

