#!/home/conda/feedstock_root/build_artifacts/bld/rattler-build_gstlal-ugly_1767612080/host_env_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_pl/bin/python
#
# Copyright (C) 2018--2019  Chad Hanna, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

### This program will make create a HTCondor DAG to automate the running of
### low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe

"""
This program makes a dag for persistent kafka/zookeeper services
"""

__author__ = 'Chad Hanna <channa@caltech.edu>, Patrick Godwin <patrick.godwin@ligo.org>'

#
# import standard modules and append the lalapps prefix to the python path
#

import os
from optparse import OptionParser

#
# import the modules we need to build the pipeline
#

from gstlal import aggregator
from gstlal import dagparts

#
# configuration file templates
#

ZOOKEEPER_TEMPLATE = """
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
"""

KAFKA_TEMPLATE = """
broker.id=0
listeners = PLAINTEXT://%s:%d
background.threads=100
num.network.threads=50
num.io.threads=80
log.cleaner.threads=10
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
queued.max.requests=10000
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.ms=300000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
"""

KAFKA_ENV_TEMPLATE = """
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
KAFKA_HEAP_OPTS="-Xms8G -Xmx8G"
export KAFKA_HEAP_OPTS KAFKA_JVM_PERFORMANCE_OPTS
"""

#
# job classes
#

class ZookeeperJob(dagparts.DAGJob):
	"""
	A zookeeper job
	"""
	def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server", rootdir = dagparts.log_path(), tag = "", port = 2181, maxclients = 0, universe = "local", condor_commands = {}):
		"""
		"""
		dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)

		if tag:
			zoodir = os.path.join(rootdir, tag, "zookeeper")
		else:
			zoodir = os.path.join(rootdir, "zookeeper")
		aggregator.makedir(zoodir)
		with open("zookeeper.properties", "w") as f:
			f.write(ZOOKEEPER_TEMPLATE%(zoodir, port, maxclients))


class KafkaJob(dagparts.DAGJob):
	"""
	A kafka job
	"""
	def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server", rootdir = dagparts.log_path(), tag = "", hostname = "10.14.0.112", port = 9092, zookeeperaddr = "localhost:2181", universe = "local", condor_commands = {}):
		"""
		"""
		dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = universe, condor_commands = condor_commands)

		if tag:
			kafkadir = os.path.join(rootdir, tag, "kafka")
		else:
			kafkadir = os.path.join(rootdir, "kafka")
		aggregator.makedir(kafkadir)
		with open("kafka.properties", "w") as f:
			f.write(KAFKA_TEMPLATE%(hostname, port, kafkadir, zookeeperaddr))


#
# Parse the command line
#


def parse_command_line():
	parser = OptionParser(description = __doc__)

	parser.add_option("--analysis-tag", metavar = "name", help = "Set the name of the analysis, used to distinguish between different DAGs running simultaneously and to avoid filename clashes.")
	parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
	parser.add_option("--kafka-hostname", metavar = "hostname", help = "Set the hostname in which kafka/zookeeper will be running at.")
	parser.add_option("--kafka-port", type = "int", metavar = "number", help = "Set the kafka port. default: 9092", default = 9092)
	parser.add_option("--condor-universe", default = "local", metavar = "universe", help = "set the condor universe to run jobs in DAG, options are local/vanilla, default = local")
	parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")

	options, filenames = parser.parse_args()

	return options, filenames


#
# MAIN
#


options, filenames = parse_command_line()

aggregator.makedir("logs")

if options.analysis_tag:
	dag = dagparts.DAG("kafka_broker_%s" % options.analysis_tag)
else:
	dag = dagparts.DAG("kafka_broker")

#
# setup kafka/zookeeper jobs and nodes
#

condor_options = {
	"want_graceful_removal": "True",
	"kill_sig": "15"
}

if options.condor_universe == 'vanilla':
	condor_options.update({
		"request_memory": "10GB",
		"request_cpus": 2,
	})
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)

zookeeper_job = ZookeeperJob(
	"zookeeper-server-start.sh",
	tag_base = "zookeeper-server-start",
	condor_commands = condor_commands,
	tag = options.analysis_tag,
	universe = options.condor_universe,
	port = options.zookeeper_port
)
kafka_job = KafkaJob(
	"kafka-server-start.sh",
	tag_base = "kafka-server-start",
	condor_commands = condor_commands,
	tag = options.analysis_tag,
	hostname = options.kafka_hostname,
	port = options.kafka_port,
	universe = options.condor_universe,
	zookeeperaddr = "localhost:%d" % options.zookeeper_port
)

zookeeper_node = dagparts.DAGNode(zookeeper_job, dag, [], opts = {"":"zookeeper.properties"})
kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"})

#
# Write out the dag and other files
#

dag.write_sub_files()

# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
[node.set_retry(10000) for node in dag.get_nodes()]

dag.write_dag()
dag.write_script()

with open('kafka_env.sh', 'w') as f:
    f.write(KAFKA_ENV_TEMPLATE)

print('source kafka_env.sh before submitting dag')
