#!/usr/bin/env -S PYTHONDONTWRITEBYTECODE=1 python import os import random import shutil import subprocess import sys from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError, RawTextHelpFormatter from tempfile import TemporaryDirectory # ------------------------------------------------------------------------------ # Parse CLI arguments # ------------------------------------------------------------------------------ description = "Salis: Simple A-Life Simulator" prog = sys.argv[0] epilog = f"Use '-h' to list arguments for each command.\nExample: '{prog} bench -h'" main_parser = ArgumentParser( description=description, epilog=epilog, formatter_class=RawTextHelpFormatter, prog=prog, ) parsers = main_parser.add_subparsers(dest="command", required=True) formatter_class = lambda prog: ArgumentDefaultsHelpFormatter(prog, max_help_position=32) bench = parsers.add_parser("bench", formatter_class=formatter_class, help="run benchmark") load = parsers.add_parser("load", formatter_class=formatter_class, help="load saved simulation") new = parsers.add_parser("new", formatter_class=formatter_class, help="create new simulation") architectures = os.listdir("./arch") uis = os.listdir("./ui") def seed(i): ival = int(i, 0) if ival < -1: raise ArgumentTypeError("invalid seed value") return ival def ipos(i): ival = int(i, 0) if ival < 0: raise ArgumentTypeError("value must be positive integer") return ival def inat(i): ival = int(i, 0) if ival < 1: raise ArgumentTypeError("value must be greater than zero") return ival option_keys = ["short", "long", "metavar", "description", "default", "required", "type", "parsers"] option_list = [ ["A", "anc", "ANC", "ancestor file name without extension, to be compiled on all cores (ANC points to 'anc/{arch}/{ANC}.asm')", None, True, str, [bench, new]], ["a", "arch", architectures, "VM architecture", "dummy", False, str, [bench, new]], ["b", "steps", "N", "number of steps to run in benchmark", 0x1000000, False, ipos, [bench]], ["C", "clones", "N", "number of ancestor clones on each core", 1, False, inat, [bench, new]], ["c", "cores", "N", "number of simulator cores", 2, False, inat, [bench, new]], ["d", "data-push-pow", "POW", "data aggregation interval exponent (interval == 2^{POW} >= {sync-pow}); a value of 0 disables data aggregation (requires 'sqlite')", 28, False, ipos, [new]], ["f", "force", None, "overwrite existing simulation of given name", False, False, bool, [new]], ["F", "muta-flip", None, "cosmic rays flip bits instead of randomizing whole bytes", False, False, bool, [bench, new]], ["g", "compiler", "CC", "C compiler to use", "gcc", False, str, [bench, load, new]], ["M", "muta-pow", "POW", "mutator range exponent (range == 2^{POW})", 32, False, ipos, [bench, new]], ["m", "mvec-pow", "POW", "memory vector size exponent (size == 2^{POW})", 20, False, ipos, [bench, new]], ["n", "name", "NAME", "name of new or loaded simulation", "def.sim", False, str, [load, new]], ["o", "optimized", None, "builds salis binary with optimizations", False, False, bool, [bench, load, new]], ["p", "pre-cmd", "CMD", "shell command to wrap call to executable (e.g. gdb, time, valgrind, etc.)", None, False, str, [bench, load, new]], ["s", "seed", "SEED", "seed value for new simulation; a value of 0 disables cosmic rays; a value of -1 creates a random seed", 0, False, seed, [bench, new]], ["T", "keep-temp-dir", None, "delete temporary directory on exit", False, False, bool, [bench, load, new]], ["t", "thread-gap", "N", "memory gap between cores in bytes (may help reduce cache misses)", 0x100, False, inat, [bench, load, new]], ["u", "ui", uis, "user interface", "curses", False, str, [load, new]], ["x", "no-compress", None, "do not compress save files (useful if 'zlib' is unavailable)", True, False, bool, [new]], ["y", "sync-pow", "POW", "core sync interval exponent (interval == 2^{POW})", 20, False, ipos, [bench, new]], ["z", "auto-save-pow", "POW", "auto-save interval exponent (interval == 2^{POW})", 36, False, ipos, [new]], ] options = list(map(lambda option: dict(zip(option_keys, option)), option_list)) parser_map = ((parser, option) for option in options for parser in option["parsers"]) for parser, option in parser_map: arg_kwargs = {} def push_same(key): arg_kwargs[key] = option[key] def push_diff(tgt_key, src_key): arg_kwargs[tgt_key] = option[src_key] def push_val(key, val): arg_kwargs[key] = val push_diff("help", "description") push_same("required") if option["metavar"] is None: push_val("action", "store_true") else: push_same("default") push_same("type") if type(option["metavar"]) is list: push_diff("choices", "metavar") if type(option["metavar"]) is str: push_same("metavar") parser.add_argument(f"-{option["short"]}", f"--{option["long"]}", **arg_kwargs) args = main_parser.parse_args() # ------------------------------------------------------------------------------ # Logging # ------------------------------------------------------------------------------ def info(msg, val=""): print(f"\033[1;34m[INFO]\033[0m {msg}", val) def warn(msg, val=""): print(f"\033[1;33m[WARN]\033[0m {msg}", val) def error(msg, val=""): print(f"\033[1;31m[ERROR]\033[0m {msg}", val) sys.exit(1) # ------------------------------------------------------------------------------ # Load configuration # ------------------------------------------------------------------------------ info(description) info(f"Called '{prog}' with the following options:") for key, val in vars(args).items(): print(f"{key} = {repr(val)}") if args.command in ["load", "new"]: sim_dir = os.path.join(os.environ["HOME"], ".salis", args.name) sim_opts = os.path.join(sim_dir, "opts.py") sim_path = os.path.join(sim_dir, args.name) if args.command in ["load"]: if not os.path.isdir(sim_dir): error("No simulation found named:", args.name) info(f"Sourcing configuration from: '{sim_opts}':") sys.path.append(sim_dir) import opts opt_vars = (opt for opt in dir(opts) if not opt.startswith("__")) for opt_var in opt_vars: opt_attr = getattr(opts, opt_var) print(f"{opt_var} = {repr(opt_attr)}") setattr(args, opt_var, opt_attr) if args.command in ["new"]: if args.data_push_pow and args.data_push_pow < args.sync_pow: error("Data push power must be equal or greater than thread sync power") if os.path.isdir(sim_dir) and args.force: warn("Force flag used! Wiping old simulation at:", sim_dir) shutil.rmtree(sim_dir) if os.path.isdir(sim_dir): error("Simulation directory found at:", sim_dir) if args.seed == -1: args.seed = random.getrandbits(64) info("Using random seed:", args.seed) info("Creating new simulation directory at:", sim_dir) info("Creating configuration file at:", sim_opts) os.mkdir(sim_dir) opts = ( option["long"].replace("-", "_") for option in options if new in option["parsers"] and load not in option["parsers"] ) with open(sim_opts, "w") as file: for opt in opts: file.write(f"{opt} = {repr(eval(f"args.{opt}"))}\n") # ------------------------------------------------------------------------------ # Load architecture and UI variables # ------------------------------------------------------------------------------ arch_path = os.path.join("arch", args.arch) info("Loading architecture variables from:", os.path.join(arch_path, "arch_vars.py")) sys.path.append(arch_path) from arch_vars import ArchVars arch_vars = ArchVars(args) if args.command in ["load", "new"]: ui_path = os.path.join("ui", args.ui) info("Loading UI variables from:", os.path.join(ui_path, "ui_vars.py")) sys.path.append(ui_path) from ui_vars import UIVars ui_vars = UIVars(args) # ------------------------------------------------------------------------------ # Compile ancestor organism # ------------------------------------------------------------------------------ if args.command in ["bench", "new"] and args.anc is not None: anc_path = os.path.join("anc", args.arch, f"{args.anc}.asm") if not os.path.isfile(anc_path): error("Could not find ancestor file:", anc_path) with open(anc_path, "r") as file: lines = file.read().splitlines() lines = filter(lambda line: not line.startswith(";"), lines) lines = filter(lambda line: not line.isspace(), lines) lines = filter(lambda line: line, lines) lines = map(lambda line: line.split(), lines) anc_bytes = [] for line in lines: found = False for byte, tup in enumerate(arch_vars.inst_set): if line == tup[0]: anc_bytes.append(byte) found = True break if not found: error("Unrecognized instruction in ancestor file:", line) anc_bytes_repr = ",".join(map(str, anc_bytes)) info(f"Compiled ancestor file '{anc_path}' into byte array:", f"{{{anc_bytes_repr}}}") # ------------------------------------------------------------------------------ # Populate compiler flags # ------------------------------------------------------------------------------ flags = set() includes = set() defines = set() links = set() flags.update({"-Wall", "-Wextra", "-Werror", f"-Iarch/{args.arch}"}) defines.add(f"-DARCH=\"{args.arch}\"") defines.add(f"-DCOMMAND_{args.command.upper()}") defines.add(f"-DCORES={args.cores}") defines.add(f"-DMUTA_RANGE={2 ** args.muta_pow}ul") defines.add(f"-DMVEC_SIZE={2 ** args.mvec_pow}ul") defines.add(f"-DSEED={args.seed}ul") defines.add(f"-DSYNC_INTERVAL={2 ** args.sync_pow}ul") defines.add(f"-DTHREAD_GAP={args.thread_gap}") defines.add(f"-DCORE_FIELDS={" ".join(f"CORE_FIELD({", ".join(field)})" for field in arch_vars.core_fields)}") defines.add(f"-DPROC_FIELDS={" ".join(f"PROC_FIELD({", ".join(field)})" for field in arch_vars.proc_fields)}") defines.add(f"-DINST_SET={" ".join(f"INST({index}, {"_".join(inst[0])}, \"{" ".join(inst[0])}\", L'{inst[1]}')" for index, inst in enumerate(arch_vars.inst_set))}") defines.add(f"-DCORE_FIELD_COUNT={len(arch_vars.core_fields)}") defines.add(f"-DPROC_FIELD_COUNT={len(arch_vars.proc_fields)}") defines.add(f"-DINST_COUNT={len(arch_vars.inst_set)}") defines.add(f"-DFOR_CORES={" ".join(f"FOR_CORE({i})" for i in range(args.cores))}") if args.muta_flip: defines.add("-DMUTA_FLIP") if arch_vars.mvec_loop: defines.add("-DMVEC_LOOP") if args.optimized: flags.add("-O3") defines.add("-DNDEBUG") else: flags.add("-ggdb") if args.command in ["bench"]: includes.add("stdio.h") defines.add(f"-DSTEPS={args.steps}ul") if args.command in ["bench", "new"]: defines.add(f"-DCLONES={args.clones}") if args.anc is not None: defines.add(f"-DANC_BYTES={{{anc_bytes_repr}}}") defines.add(f"-DANC_SIZE={len(anc_bytes)}") if args.command in ["load", "new"]: flags.add(f"-Iui/{args.ui}") includes.update(ui_vars.includes) defines.update(ui_vars.defines) defines.add(f"-DAUTOSAVE_INTERVAL={2 ** args.auto_save_pow}ul") defines.add(f"-DAUTOSAVE_NAME_LEN={len(sim_path) + 20}") defines.add(f"-DNAME=\"{args.name}\"") defines.add(f"-DSIM_PATH=\"{sim_path}\"") links.update(ui_vars.links) if args.data_push_pow: includes.add("sqlite3.h") data_push_path = os.path.join(sim_dir, f"{args.name}.sqlite3") defines.add(f"-DDATA_PUSH_INTERVAL={2 ** args.data_push_pow}ul") defines.add(f"-DDATA_PUSH_PATH=\"{data_push_path}\"") links.add("-lsqlite3") info("Data will be aggregated at:", data_push_path) if arch_vars.data_is_compressed: includes.add("zlib.h") links.add("-lz") info("Data aggregation requires compression") else: warn("Data aggregation disabled") if not args.no_compress: includes.add("zlib.h") defines.add("-D_POSIX_C_SOURCE=200809L") defines.add("-DCOMPRESS") links.add("-lz") info("Save file compression enabled") else: warn("Save file compression disabled") # ------------------------------------------------------------------------------ # Build executable # ------------------------------------------------------------------------------ tempdir = TemporaryDirectory(prefix="salis_", delete=not args.keep_temp_dir) info("Created a temporary salis directory at:", tempdir.name) salis_bin = os.path.join(tempdir.name, "salis_bin") info("Building salis binary at:", salis_bin) build_cmd = [args.compiler, "core.c", "-o", salis_bin] build_cmd.extend(flags) build_cmd.extend(sum(map(lambda include: [f"-include", include], includes), [])) build_cmd.extend(defines) build_cmd.extend(links) info("Using build command:", build_cmd) subprocess.run(build_cmd, check=True) # ------------------------------------------------------------------------------ # Run salis binary # ------------------------------------------------------------------------------ info("Running salis binary...") run_cmd = [args.pre_cmd] if args.pre_cmd else [] run_cmd.append(salis_bin) info("Using run command:", " ".join(run_cmd)) salis_sp = subprocess.Popen(run_cmd, stdout=sys.stdout, stderr=sys.stderr) # When using signals (e.g. SIGTERM), they must be sent to the entire process group # to make sure both the simulator and the interpreter get shut down. try: salis_sp.wait() except KeyboardInterrupt: salis_sp.terminate() salis_sp.wait() code = salis_sp.returncode if code != 0: error("Salis binary returned code:", code)