Module paracrine.runners.core
Expand source code
import json
import os
import socket
from pathlib import Path
from typing import Dict, List
from ..helpers.config import (
add_return_data,
config,
config_path,
host,
in_docker,
network_config_file,
other_config,
other_config_file,
)
from ..helpers.debian import apt_install
from ..helpers.fs import make_directory, run_command
from ..helpers.users import in_vagrant, users
def is_wireguard():
return os.path.exists("/etc/wireguard")
def hash_fn(key: str, count: int) -> int:
return sum(bytearray(key.encode("utf-8"))) % count
def _index_fn(name: str) -> Dict:
hosts = config()["servers"]
try:
existing_selectors = other_config("selectors.json")
return [host for host in hosts if host["name"] == existing_selectors[name]][0]
except KeyError:
index = hash_fn(name, len(hosts))
add_return_data({"selector": {name: hosts[index]["name"]}})
return hosts[index]
# Use this host for a given service
# Intended for "run on one machine" things
def use_this_host(name: str) -> bool:
use_host = _index_fn(name)
return host()["name"] == use_host["name"]
def wireguard_ip_for_machine_for(name: str) -> str:
use_host = _index_fn(name)
return use_host["wireguard_ip"]
def run():
apt_install(["iproute2"])
data = {
"hostname": socket.gethostname(),
"network_devices": run_command("ip -j address"),
"users": users(force_load=True),
"groups": run_command("getent group"),
"server_name": host()["name"],
}
ip_file = Path("/opt/ip_address")
if ip_file.exists():
data["external_ip"] = json.load(ip_file.open())
else:
if in_vagrant() or in_docker():
networks = json.loads(data["network_devices"])
ext_if = [
net
for net in networks
if net["ifname"].startswith("eth") and len(net["addr_info"]) > 0
]
if len(ext_if) > 0:
data["external_ip"] = json.dumps(
{"ip": ext_if[-1]["addr_info"][0]["local"]}
)
else:
data["external_ip"] = "<unknown>"
else:
apt_install(["curl", "ca-certificates"])
data["external_ip"] = run_command("curl https://api.ipify.org?format=json")
json.dump(data["external_ip"], ip_file.open("w"))
return data
def parse_return(infos: List[Dict]) -> None:
info = infos[0]
networks = json.loads(info["network_devices"])
name = info["server_name"]
make_directory(config_path())
json.dump(networks, open(network_config_file(name), "w"), indent=2)
other = {
"external_ip": json.loads(info["external_ip"])["ip"],
"users": info["users"],
"groups": info["groups"],
"hostname": info["hostname"],
}
json.dump(other, open(other_config_file(name), "w"), indent=2)
Functions
def hash_fn(key: str, count: int) ‑> int
-
Expand source code
def hash_fn(key: str, count: int) -> int: return sum(bytearray(key.encode("utf-8"))) % count
def is_wireguard()
-
Expand source code
def is_wireguard(): return os.path.exists("/etc/wireguard")
def parse_return(infos: List[Dict]) ‑> None
-
Expand source code
def parse_return(infos: List[Dict]) -> None: info = infos[0] networks = json.loads(info["network_devices"]) name = info["server_name"] make_directory(config_path()) json.dump(networks, open(network_config_file(name), "w"), indent=2) other = { "external_ip": json.loads(info["external_ip"])["ip"], "users": info["users"], "groups": info["groups"], "hostname": info["hostname"], } json.dump(other, open(other_config_file(name), "w"), indent=2)
def run()
-
Expand source code
def run(): apt_install(["iproute2"]) data = { "hostname": socket.gethostname(), "network_devices": run_command("ip -j address"), "users": users(force_load=True), "groups": run_command("getent group"), "server_name": host()["name"], } ip_file = Path("/opt/ip_address") if ip_file.exists(): data["external_ip"] = json.load(ip_file.open()) else: if in_vagrant() or in_docker(): networks = json.loads(data["network_devices"]) ext_if = [ net for net in networks if net["ifname"].startswith("eth") and len(net["addr_info"]) > 0 ] if len(ext_if) > 0: data["external_ip"] = json.dumps( {"ip": ext_if[-1]["addr_info"][0]["local"]} ) else: data["external_ip"] = "<unknown>" else: apt_install(["curl", "ca-certificates"]) data["external_ip"] = run_command("curl https://api.ipify.org?format=json") json.dump(data["external_ip"], ip_file.open("w")) return data
def use_this_host(name: str) ‑> bool
-
Expand source code
def use_this_host(name: str) -> bool: use_host = _index_fn(name) return host()["name"] == use_host["name"]
def wireguard_ip_for_machine_for(name: str) ‑> str
-
Expand source code
def wireguard_ip_for_machine_for(name: str) -> str: use_host = _index_fn(name) return use_host["wireguard_ip"]