""" SporeStack CLI: `sporestack` """ import json import logging import os import time from pathlib import Path from typing import Any, Dict, Optional import typer HELP = """ SporeStack Python CLI Optional environment variables: SPORESTACK_ENDPOINT *or* SPORESTACK_USE_TOR_ENDPOINT TOR_PROXY (defaults to socks5h://127.0.0.1:9050 which is fine for most) """ _home = os.getenv("HOME", None) assert _home is not None, "Unable to detect $HOME environment variable?" HOME = Path(_home) SPORESTACK_DIR = HOME / ".sporestack" # Try to protect files in ~/.sporestack os.umask(0o0077) cli = typer.Typer(help=HELP) HOME = Path(_home) token_cli = typer.Typer(help="Commands to interact with SporeStack tokens") cli.add_typer(token_cli, name="token") server_cli = typer.Typer(help="Commands to interact with SporeStack servers") cli.add_typer(server_cli, name="server") logging.basicConfig(level=logging.INFO) DEFAULT_TOKEN = "primary" DEFAULT_FLAVOR = "vps-1vcpu-1gb" # Users may have a different key file, but this is the most common. DEFAULT_SSH_KEY_FILE = HOME / ".ssh" / "id_rsa.pub" # On disk format TOKEN_VERSION = 1 WAITING_PAYMENT_TO_PROCESS = "Waiting for payment to process..." def get_api_endpoint() -> str: from .api_client import CLEARNET_ENDPOINT, TOR_ENDPOINT api_endpoint = os.getenv("SPORESTACK_ENDPOINT", CLEARNET_ENDPOINT) if os.getenv("SPORESTACK_USE_TOR_ENDPOINT", None) is not None: api_endpoint = TOR_ENDPOINT return api_endpoint def make_payment(currency: str, uri: str, usd: str) -> None: import segno premessage = """Payment URI: {} Pay *exactly* the specified amount. No more, no less. Pay within one hour at the very most. Resize your terminal and try again if QR code above is not readable. Press ctrl+c to abort.""" message = premessage.format(uri) qr = segno.make(uri) # This typer.echos. qr.terminal() typer.echo(message) typer.echo(f"Approximate price in USD: {usd}") input("[Press enter once you have made payment.]") @server_cli.command() def launch( hostname: str = "", days: int = typer.Option(...), operating_system: str = typer.Option(...), ssh_key_file: Path = DEFAULT_SSH_KEY_FILE, flavor: str = DEFAULT_FLAVOR, token: str = DEFAULT_TOKEN, region: Optional[str] = None, quote: bool = typer.Option(True, help="Require manual price confirmation."), autorenew: bool = typer.Option(False, help="BETA: Automatically renew server."), ) -> None: """ Launch a server on SporeStack. """ typer.echo(f"Launching server with token {token}...", err=True) _token = load_token(token) from . import utils from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) typer.echo(f"Loading SSH key from {ssh_key_file}...") if not ssh_key_file.exists(): msg = f"{ssh_key_file} does not exist. " msg += "You can try generating a key file with `ssh-keygen`" typer.echo(msg, err=True) raise typer.Exit(code=1) ssh_key = ssh_key_file.read_text() machine_id = utils.random_machine_id() if quote: response = api_client.server_launch( machine_id=machine_id, days=days, flavor=flavor, operating_system=operating_system, ssh_key=ssh_key, region=region, token=_token, quote=True, hostname=hostname, autorenew=autorenew, ) msg = f"Is {response.payment.usd} for {days} day(s) of {flavor} okay?" typer.echo(msg, err=True) input("[Press ctrl+c to cancel, or enter to accept.]") if autorenew: typer.echo( "Server will be automatically renewed (from this token) to one week of expiration.", # noqa: E501 err=True, ) typer.echo( "If using this feature, watch your token balance and server expiration closely!", # noqa: E501 err=True, ) tries = 360 while tries > 0: response = api_client.server_launch( machine_id=machine_id, days=days, flavor=flavor, operating_system=operating_system, ssh_key=ssh_key, region=region, token=_token, hostname=hostname, autorenew=autorenew, ) if response.created is True: break typer.echo("Waiting for server to build...", err=True) tries = tries + 1 # Waiting for server to spin up. time.sleep(10) if response.created is False: typer.echo("Server creation failed, tries exceeded.", err=True) raise typer.Exit(code=1) typer.echo( pretty_machine_info(api_client.server_info(machine_id=machine_id).dict()) ) @server_cli.command() def topup( hostname: str = "", machine_id: str = "", days: int = typer.Option(...), token: str = DEFAULT_TOKEN, ) -> None: """ Extend an existing SporeStack server's lifetime. """ from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) _token = load_token(token) api_client.server_topup( machine_id=machine_id, days=days, token=_token, ) typer.echo(f"Server topped up for {days} day(s)") def server_info_path() -> Path: # Put servers in a subdirectory servers_dir = SPORESTACK_DIR / "servers" # Migrate existing server.json files into servers subdirectory if SPORESTACK_DIR.exists() and not servers_dir.exists(): typer.echo( f"Migrating server profiles found in {SPORESTACK_DIR} to {servers_dir}.", err=True, ) servers_dir.mkdir() for json_file in SPORESTACK_DIR.glob("*.json"): json_file.rename(servers_dir / json_file.name) # Make it, if it doesn't exist already. SPORESTACK_DIR.mkdir(exist_ok=True) servers_dir.mkdir(exist_ok=True) return servers_dir def token_path() -> Path: token_dir = SPORESTACK_DIR / "tokens" # Make it, if it doesn't exist already. token_dir.mkdir(exist_ok=True, parents=True) return token_dir def get_machine_info(hostname: str) -> Dict[str, Any]: """ Get info from disk. """ directory = server_info_path() json_file = directory / f"{hostname}.json" if not json_file.exists(): raise ValueError(f"{hostname} does not exist in {directory} as {json_file}") machine_info = json.loads(json_file.read_bytes()) assert isinstance(machine_info, dict) if machine_info["vm_hostname"] != hostname: raise ValueError("hostname does not match filename.") return machine_info def pretty_machine_info(info: Dict[str, Any]) -> str: msg = "Machine ID (keep this secret!): {}\n".format(info["machine_id"]) if "vm_hostname" in info: msg += "Hostname: {}\n".format(info["vm_hostname"]) elif "hostname" in info: msg += "Hostname: {}\n".format(info["hostname"]) if "network_interfaces" in info: if "ipv6" in info["network_interfaces"][0]: msg += "IPv6: {}\n".format(info["network_interfaces"][0]["ipv6"]) if "ipv4" in info["network_interfaces"][0]: msg += "IPv4: {}\n".format(info["network_interfaces"][0]["ipv4"]) else: if "ipv6" in info: msg += "IPv6: {}\n".format(info["ipv6"]) if "ipv4" in info: msg += "IPv4: {}\n".format(info["ipv4"]) expiration = info["expiration"] human_expiration = time.strftime("%Y-%m-%d %H:%M:%S %z", time.localtime(expiration)) if "running" in info: msg += "Running: {}\n".format(info["running"]) msg += f"Expiration: {expiration} ({human_expiration})\n" time_to_live = expiration - int(time.time()) hours = time_to_live // 3600 msg += f"Server will be deleted in {hours} hours." return msg @server_cli.command(name="list") def server_list( token: str = DEFAULT_TOKEN, local: bool = typer.Option( True, help="List older servers not associated to token." ), ) -> None: """ List all locally known servers and all servers under the given token. """ from .api_client import APIClient from .exceptions import SporeStackUserError api_client = APIClient(api_endpoint=get_api_endpoint()) _token = load_token(token) server_infos = api_client.servers_launched_from_token(token=_token).servers machine_id_hostnames = {} if local: directory = server_info_path() for hostname_json in os.listdir(directory): hostname = hostname_json.split(".")[0] saved_vm_info = get_machine_info(hostname) machine_id_hostnames[saved_vm_info["machine_id"]] = hostname printed_machine_ids = [] for info in server_infos: typer.echo() hostname = info.hostname if hostname == "": if info.machine_id in machine_id_hostnames: hostname = machine_id_hostnames[info.machine_id] if hostname != "": typer.echo(f"Hostname: {hostname}") typer.echo(f"Machine ID (keep this secret!): {info.machine_id}") typer.echo(f"IPv6: {info.network_interfaces[0].ipv6}") typer.echo(f"IPv4: {info.network_interfaces[0].ipv4}") typer.echo(f"Region: {info.region}") typer.echo(f"Flavor: {info.flavor.slug}") human_expiration = time.strftime( "%Y-%m-%d %H:%M:%S %z", time.localtime(info.expiration) ) typer.echo(f"Expiration: {info.expiration} ({human_expiration})") typer.echo(f"Token: {info.token}") if info.deleted: typer.echo("Server was deleted!") else: typer.echo(f"Running: {info.running}") time_to_live = info.expiration - int(time.time()) hours = time_to_live // 3600 typer.echo(f"Server will be deleted in {hours} hours.") typer.echo(f"Autorenew: {info.autorenew}") printed_machine_ids.append(info.machine_id) if local: for hostname_json in os.listdir(directory): hostname = hostname_json.split(".")[0] saved_vm_info = get_machine_info(hostname) machine_id = saved_vm_info["machine_id"] if machine_id in printed_machine_ids: continue try: upstream_vm_info = api_client.server_info( machine_id=saved_vm_info["machine_id"] ) saved_vm_info["expiration"] = upstream_vm_info.expiration saved_vm_info["running"] = upstream_vm_info.running typer.echo() typer.echo(pretty_machine_info(saved_vm_info)) except SporeStackUserError as e: expiration = saved_vm_info["expiration"] human_expiration = time.strftime( "%Y-%m-%d %H:%M:%S %z", time.localtime(expiration) ) msg = hostname msg += f" expired ({expiration} {human_expiration}): " msg += str(e) typer.echo(msg) typer.echo() def _get_machine_id(machine_id: str, hostname: str, token: str) -> str: usage = "--hostname *OR* --machine-id must be set." if machine_id != "" and hostname != "": typer.echo(usage, err=True) raise typer.Exit(code=2) if machine_id != "": return machine_id if hostname == "": typer.echo(usage, err=True) raise typer.Exit(code=2) try: machine_id = get_machine_info(hostname)["machine_id"] assert isinstance(machine_id, str) return machine_id except Exception: pass _token = load_token(token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) for server in api_client.servers_launched_from_token(token=_token).servers: if server.hostname == hostname: return server.machine_id typer.echo( f"Could not find any servers matching the hostname: {hostname}", err=True ) raise typer.Exit(code=1) @server_cli.command() def info(hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN) -> None: """ Info on the VM """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) typer.echo( pretty_machine_info(api_client.server_info(machine_id=machine_id).dict()) ) @server_cli.command(name="json") def server_info_json( hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN ) -> None: """ Info on the VM, in JSON format """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) typer.echo(api_client.server_info(machine_id=machine_id).json()) @server_cli.command() def start(hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN) -> None: """ Boots the VM. """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) api_client.server_start(machine_id=machine_id) typer.echo(f"{hostname} started.") @server_cli.command() def stop(hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN) -> None: """ Immediately shuts down the VM. """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) api_client.server_stop(machine_id=machine_id) typer.echo(f"{hostname} stopped.") @server_cli.command() def autorenew_enable( hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN ) -> None: """ Enable autorenew on a server. """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) api_client.autorenew_enable(machine_id=machine_id) typer.echo("Autorenew enabled.") @server_cli.command() def autorenew_disable( hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN ) -> None: """ Disable autorenew on a server. """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) api_client.autorenew_disable(machine_id=machine_id) typer.echo("Autorenew disabled.") @server_cli.command() def delete( hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN ) -> None: """ Delete the VM before its expiration """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) api_client.server_delete(machine_id=machine_id) # Also remove the .json file server_info_path().joinpath(f"{hostname}.json").unlink(missing_ok=True) typer.echo(f"{machine_id} was destroyed.") @server_cli.command() def forget( hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN ) -> None: """ Forget about a deleted server so that it doesn't show up in server list. """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) api_client.server_forget(machine_id=machine_id) typer.echo(f"{machine_id} was forgotten.") @server_cli.command() def rebuild( hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN ) -> None: """ Rebuilds the VM with the operating system and SSH key given at launch time. Will take a couple minutes to complete after the request is made. """ machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) api_client.server_rebuild(machine_id=machine_id) typer.echo(f"{hostname} rebuilding.") @server_cli.command() def flavors() -> None: """ Returns available flavors. """ from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) flavors = api_client.flavors().flavors for flavor in flavors: typer.echo(f"{flavor}: {flavors[flavor]}") @server_cli.command() def operating_systems() -> None: """ Returns available operating systems. """ from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) os_list = api_client.operating_systems().operating_systems for operating_system in os_list: typer.echo(operating_system) def load_token(token: str) -> str: token_file = token_path().joinpath(f"{token}.json") if not token_file.exists(): msg = f"Token '{token}' ({token_file}) does not exist. Create it with:\n" msg += f"sporestack token create {token} --dollars 20 --currency xmr\n" msg += "(Can do more than $20, or a different currency, like btc.)\n" msg += ( "With the token credited, you can launch servers, renew existing ones, etc." ) typer.echo(msg, err=True) raise typer.Exit(code=1) token_data = json.loads(token_file.read_text()) assert token_data["version"] == 1 assert isinstance(token_data["key"], str) return token_data["key"] def save_token(token: str, key: str) -> None: token_file = token_path().joinpath(f"{token}.json") if token_file.exists(): msg = f"Token '{token}' already exists in {token_file}. Aborting!" typer.echo(msg, err=True) raise typer.Exit(code=1) token_data = {"version": TOKEN_VERSION, "name": token, "key": key} token_file.write_text(json.dumps(token_data)) @token_cli.command(name="create") def token_create( token: str = typer.Argument(DEFAULT_TOKEN), dollars: int = typer.Option(...), currency: str = typer.Option(...), ) -> None: """ Enables a new settlement token. Dollars is starting balance. """ from . import utils _token = utils.random_token() typer.echo(f"Generated key {_token} for use with token {token}", err=True) if Path(SPORESTACK_DIR / "tokens" / f"{token}.json").exists(): typer.echo("Token already created! Did you mean to `topup`?", err=True) raise typer.Exit(1) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) response = api_client.token_add( token=_token, dollars=dollars, currency=currency, retry=True, ) uri = response.payment.uri assert uri is not None usd = response.payment.usd make_payment(currency=currency, uri=uri, usd=usd) tries = 360 * 2 while tries > 0: typer.echo(WAITING_PAYMENT_TO_PROCESS, err=True) tries = tries - 1 # FIXME: Wait two hours in a smarter way. # Waiting for payment to set in. time.sleep(10) response = api_client.token_add( token=_token, dollars=dollars, currency=currency, retry=True, ) if response.payment.paid is True: typer.echo(f"{token} has been enabled with ${dollars}.") typer.echo(f"{token}'s key is {_token}.") typer.echo("Save it, don't share it, and don't lose it!") save_token(token, _token) return raise ValueError(f"{token} did not get enabled in time.") @token_cli.command(name="import") def token_import( name: str = typer.Argument(DEFAULT_TOKEN), key: str = typer.Option(...), ) -> None: """ Imports a token from a key. """ save_token(name, key) @token_cli.command(name="topup") def token_topup( token: str = typer.Argument(DEFAULT_TOKEN), dollars: int = typer.Option(...), currency: str = typer.Option(...), ) -> None: """ Adds balance to an existing settlement token. """ token = load_token(token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) response = api_client.token_add( token, dollars, currency=currency, retry=True, ) uri = response.payment.uri assert uri is not None usd = response.payment.usd make_payment(currency=currency, uri=uri, usd=usd) tries = 360 * 2 while tries > 0: typer.echo(WAITING_PAYMENT_TO_PROCESS, err=True) tries = tries - 1 # FIXME: Wait two hours in a smarter way. response = api_client.token_add( token, dollars, currency=currency, retry=True, ) # Waiting for payment to set in. time.sleep(10) if response.payment.paid is True: typer.echo(f"Added {dollars} dollars to {token}") return raise ValueError(f"{token} did not get enabled in time.") @token_cli.command() def balance(token: str = typer.Argument(DEFAULT_TOKEN)) -> None: """ Gets balance for a settlement token. """ _token = load_token(token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) typer.echo(api_client.token_balance(token=_token).usd) @token_cli.command() def servers(token: str = typer.Argument(DEFAULT_TOKEN)) -> None: """ Returns server info for servers launched by a given token. """ _token = load_token(token) from .api_client import APIClient api_client = APIClient(api_endpoint=get_api_endpoint()) typer.echo(api_client.servers_launched_from_token(token=_token)) @token_cli.command(name="list") def token_list() -> None: """ Gets balance for a settlement token. """ token_dir = token_path() typer.echo(f"SporeStack tokens present in {token_dir}:", err=True) typer.echo("(Name): (Key)", err=True) for token_file in token_dir.glob("*.json"): token = token_file.stem key = load_token(token) typer.echo(f"{token}: {key}") @cli.command() def version() -> None: """ Returns the installed version. """ from . import __version__ typer.echo(__version__) @cli.command() def api_endpoint() -> None: """ Prints the selected API endpoint: Env var: SPORESTACK_ENDPOINT, or, SPORESTACK_USE_TOR=1 """ from . import api_client endpoint = get_api_endpoint() if ".onion" in endpoint: typer.echo(f"{endpoint} using {api_client._get_tor_proxy()}") return else: typer.echo(endpoint) return if __name__ == "__main__": cli()