Make APIClient, Token, and Server #1

Merged
spore merged 3 commits from rewrite into master 2023-02-07 19:35:18 +00:00
11 changed files with 943 additions and 702 deletions

View File

@ -1,30 +0,0 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: db7346d375eda68a0174f2c057dd97f2fbffe030 # frozen: v4.2.0
hooks:
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: ae2c0758c9e61a385df9700dc9c231bf54887041 # frozen: 22.3.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: c5e8fa75dda5f764d20f66a215d71c21cfa198e1 # frozen: 5.10.1
hooks:
- id: isort
- repo: https://github.com/myint/autoflake
rev: 7a53fdafc82c33f446915b60fcac947c51279260 # frozen: v1.4
hooks:
- id: autoflake
- repo: https://github.com/asottile/pyupgrade
rev: 256bd84aa5a17edbd3dcfaaa4f30f870168d2838 # frozen: v2.32.0
hooks:
- id: pyupgrade
args: [--py37-plus]
- repo: https://github.com/jackdewinter/pymarkdown
rev: be56696256d5491e8a907b72e5a3852034546adb # frozen: v0.9.5
hooks:
- id: pymarkdown
args: [--disable-rules=MD013, --set=plugins.md024.siblings_only=$!True, scan]

View File

@ -1,10 +1,15 @@
format:
black .
ruff --fix .
test:
python -m pflake8 .
black --check .
ruff .
python -m mypy --strict .
$(MAKE) test-pytest
test-pytest:
python -m pytest --cov=sporestack --cov-fail-under=49 --cov-report=term --durations=3 --cache-clear
python -m pytest --cov=sporestack --cov-fail-under=40 --cov-report=term --durations=3 --cache-clear
build-dist:
rm dist/* || true

20
Pipfile
View File

@ -7,24 +7,24 @@ name = "pypi"
sporestack = {editable = true, path = "."}
[dev-packages]
flake8 = "~=4.0"
pyproject-flake8 = "==0.0.1a2"
flake8-noqa = "~=1.2"
pep8-naming = "~=0.12.1"
mypy = "==0.942"
pytest = "~=6.2"
pytest-cov = "~=3.0"
black = "~=23.1"
mypy = "~=1.0"
pytest = "~=7.2"
pytest-cov = "~=4.0"
pytest-mock = "~=3.6"
pytest-socket = "~=0.5.1"
ruff = "==0.0.239"
types-requests = "~=2.25"
# Building
wheel = "~=0.37.0"
wheel = "~=0.38.0"
build = "~=0.7.0"
# Publishing
twine = "~=3.4"
# Docs
pdoc = "~=9.0"
pdoc = "~=12.0"
# Python `make` implementation
almost-make = "~=0.5.1"
almost-make = "~=0.5.2"

846
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,23 +1,41 @@
[tool.pytest.ini_options]
addopts = "--strict-markers --disable-socket"
[tool.ruff]
select = [
"F", # pyflakes
"E", # pycodestyle errors
"W", # pycodestyle warnings
"I", # isort
"N", # pep8-naming
"RUF", # Unused noqa + more
"ANN", # Type annotations
"UP", # pyupgrade
]
ignore = [
"ANN101", # Type annotations for self
"ANN401", # Allow ANY
]
unfixable = [
"F401", # Don't try to automatically remove unused imports
]
target-version = "py37"
update-check = false
[tool.coverage.report]
show_missing = true
[tool.coverage.run]
omit = ["tests/*", "build/*"]
# Have to use `pflake8` instead of `flake8`
[tool.flake8]
max-line-length = 88
noqa-require-code = "true"
exclude = ".git,__pycache__,build,dist"
max-complexity = 15
[tool.isort]
profile = "black"
[tool.mypy]
files = "."
plugins = ["pydantic.mypy"]
exclude = "(build|site-packages|__pycache__)"
strict = true
[tool.pydantic-mypy]
init_forbid_extra = true

View File

@ -2,4 +2,4 @@
__all__ = ["api", "api_client", "exceptions"]
__version__ = "7.3.0"
__version__ = "8.0.0"

View File

@ -1,5 +1,6 @@
import logging
import os
from dataclasses import dataclass
from time import sleep
from typing import Any, Dict, Optional
@ -130,20 +131,23 @@ def _api_request(
raise Exception("Stuff broke strangely. Please contact SporeStack support.")
def launch(
@dataclass
class APIClient:
api_endpoint: str = API_ENDPOINT
def server_launch(
self,
machine_id: str,
days: int,
flavor: str,
operating_system: str,
ssh_key: str,
token: str,
api_endpoint: str = API_ENDPOINT,
region: Optional[str] = None,
retry: bool = False,
quote: bool = False,
hostname: str = "",
autorenew: bool = False,
) -> api.ServerLaunch.Response:
) -> api.ServerLaunch.Response:
request = api.ServerLaunch.Request(
days=days,
token=token,
@ -155,160 +159,138 @@ def launch(
hostname=hostname,
autorenew=autorenew,
)
url = api_endpoint + api.ServerLaunch.url.format(machine_id=machine_id)
response = _api_request(url=url, json_params=request.dict(), retry=retry)
url = self.api_endpoint + api.ServerLaunch.url.format(machine_id=machine_id)
response = _api_request(url=url, json_params=request.dict())
response_object = api.ServerLaunch.Response.parse_obj(response)
assert response_object.machine_id == machine_id
return response_object
def topup(
def server_topup(
self,
machine_id: str,
days: int,
token: str,
api_endpoint: str = API_ENDPOINT,
retry: bool = False,
) -> api.ServerTopup.Response:
) -> api.ServerTopup.Response:
"""
Topup a server.
"""
request = api.ServerTopup.Request(days=days, token=token)
url = api_endpoint + api.ServerTopup.url.format(machine_id=machine_id)
response = _api_request(url=url, json_params=request.dict(), retry=retry)
url = self.api_endpoint + api.ServerTopup.url.format(machine_id=machine_id)
response = _api_request(url=url, json_params=request.dict())
response_object = api.ServerTopup.Response.parse_obj(response)
assert response_object.machine_id == machine_id
return response_object
def autorenew_enable(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
def autorenew_enable(self, machine_id: str) -> None:
"""
Enable autorenew on a server.
"""
url = api_endpoint + api.ServerEnableAutorenew.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerEnableAutorenew.url.format(
machine_id=machine_id
)
_api_request(url, empty_post=True)
def autorenew_disable(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
def autorenew_disable(self, machine_id: str) -> None:
"""
Disable autorenew on a server.
"""
url = api_endpoint + api.ServerDisableAutorenew.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerDisableAutorenew.url.format(
machine_id=machine_id
)
_api_request(url, empty_post=True)
def start(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
def server_start(self, machine_id: str) -> None:
"""
Boots the server.
Power on the server.
"""
url = api_endpoint + api.ServerStart.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerStart.url.format(machine_id=machine_id)
_api_request(url, empty_post=True)
def stop(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
def server_stop(self, machine_id: str) -> None:
"""
Powers off the server.
Power off the server.
"""
url = api_endpoint + api.ServerStop.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerStop.url.format(machine_id=machine_id)
_api_request(url, empty_post=True)
def destroy(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
def server_delete(self, machine_id: str) -> None:
"""
Destroys the server.
Delete the server.
"""
url = api_endpoint + api.ServerDestroy.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerDelete.url.format(machine_id=machine_id)
_api_request(url, empty_post=True)
def delete(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
"""
Deletes the server. (Deprecated, use destroy instead)
"""
destroy(machine_id, api_endpoint)
def forget(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
def server_forget(self, machine_id: str) -> None:
"""
Forget about a destroyed/deleted server.
"""
url = api_endpoint + api.ServerForget.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerForget.url.format(machine_id=machine_id)
_api_request(url, empty_post=True)
def rebuild(machine_id: str, api_endpoint: str = API_ENDPOINT) -> None:
def server_rebuild(self, machine_id: str) -> None:
"""
Rebuilds the server with the operating system and SSH key set at launch time.
Deletes all of the data on the server!
"""
url = api_endpoint + api.ServerRebuild.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerRebuild.url.format(machine_id=machine_id)
_api_request(url, empty_post=True)
def info(machine_id: str, api_endpoint: str = API_ENDPOINT) -> api.ServerInfo.Response:
def server_info(self, machine_id: str) -> api.ServerInfo.Response:
"""
Returns info about the server.
"""
url = api_endpoint + api.ServerInfo.url.format(machine_id=machine_id)
url = self.api_endpoint + api.ServerInfo.url.format(machine_id=machine_id)
response = _api_request(url)
response_object = api.ServerInfo.Response.parse_obj(response)
assert response_object.machine_id == machine_id
return response_object
def servers_launched_from_token(
token: str, api_endpoint: str = API_ENDPOINT
) -> api.ServersLaunchedFromToken.Response:
def servers_launched_from_token(
self, token: str
) -> api.ServersLaunchedFromToken.Response:
"""
Returns info of servers launched from a given token.
"""
url = api_endpoint + api.ServersLaunchedFromToken.url.format(token=token)
url = self.api_endpoint + api.ServersLaunchedFromToken.url.format(token=token)
response = _api_request(url)
response_object = api.ServersLaunchedFromToken.Response.parse_obj(response)
return response_object
def flavors(api_endpoint: str = API_ENDPOINT) -> api.Flavors.Response:
def flavors(self) -> api.Flavors.Response:
"""
Returns available flavors.
"""
url = api_endpoint + api.Flavors.url
url = self.api_endpoint + api.Flavors.url
response = _api_request(url)
response_object = api.Flavors.Response.parse_obj(response)
return response_object
def operating_systems(
api_endpoint: str = API_ENDPOINT,
) -> api.OperatingSystems.Response:
def operating_systems(self) -> api.OperatingSystems.Response:
"""
Returns available operating systems.
"""
url = api_endpoint + api.OperatingSystems.url
url = self.api_endpoint + api.OperatingSystems.url
response = _api_request(url)
response_object = api.OperatingSystems.Response.parse_obj(response)
return response_object
def token_add(
def token_add(
self,
token: str,
dollars: int,
currency: str,
api_endpoint: str = API_ENDPOINT,
retry: bool = False,
) -> api.TokenAdd.Response:
) -> api.TokenAdd.Response:
request = api.TokenAdd.Request(dollars=dollars, currency=currency)
url = api_endpoint + api.TokenAdd.url.format(token=token)
url = self.api_endpoint + api.TokenAdd.url.format(token=token)
response = _api_request(url=url, json_params=request.dict(), retry=retry)
response_object = api.TokenAdd.Response.parse_obj(response)
assert response_object.token == token
return response_object
def token_balance(
token: str, api_endpoint: str = API_ENDPOINT
) -> api.TokenBalance.Response:
url = api_endpoint + api.TokenBalance.url.format(token=token)
def token_balance(self, token: str) -> api.TokenBalance.Response:
url = self.api_endpoint + api.TokenBalance.url.format(token=token)
response = _api_request(url=url)
response_object = api.TokenBalance.Response.parse_obj(response)
assert response_object.token == token

View File

@ -2,42 +2,15 @@
SporeStack CLI: `sporestack`
"""
import importlib.util
import json
import logging
import os
import sys
import time
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import Any, Dict, Optional
import typer
from . import __version__
def lazy_import(name: str) -> ModuleType:
"""
Lazily import a module. Helps speed up CLI performance.
"""
spec = importlib.util.find_spec(name)
assert spec is not None
assert spec.loader is not None
loader = importlib.util.LazyLoader(spec.loader)
spec.loader = loader
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
loader.exec_module(module)
return module
# For mypy
if TYPE_CHECKING:
from . import api_client
else:
api_client = lazy_import("sporestack.api_client")
HELP = """
SporeStack Python CLI
@ -81,9 +54,11 @@ WAITING_PAYMENT_TO_PROCESS = "Waiting for payment to process..."
def get_api_endpoint() -> str:
api_endpoint = os.getenv("SPORESTACK_ENDPOINT", api_client.CLEARNET_ENDPOINT)
from .api_client import CLEARNET_ENDPOINT, TOR_ENDPOINT
api_endpoint = os.getenv("SPORESTACK_ENDPOINT", CLEARNET_ENDPOINT)
if os.getenv("SPORESTACK_USE_TOR_ENDPOINT", None) is not None:
api_endpoint = api_client.TOR_ENDPOINT
api_endpoint = TOR_ENDPOINT
return api_endpoint
@ -119,12 +94,14 @@ def launch(
"""
Launch a server on SporeStack.
"""
from . import utils
typer.echo(f"Launching server with token {token}...", err=True)
_token = load_token(token)
from . import utils
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
typer.echo(f"Loading SSH key from {ssh_key_file}...")
if not ssh_key_file.exists():
msg = f"{ssh_key_file} does not exist. "
@ -137,7 +114,7 @@ def launch(
machine_id = utils.random_machine_id()
if quote:
response = api_client.launch(
response = api_client.server_launch(
machine_id=machine_id,
days=days,
flavor=flavor,
@ -145,8 +122,6 @@ def launch(
ssh_key=ssh_key,
region=region,
token=_token,
api_endpoint=get_api_endpoint(),
retry=True,
quote=True,
hostname=hostname,
autorenew=autorenew,
@ -168,7 +143,7 @@ def launch(
tries = 360
while tries > 0:
response = api_client.launch(
response = api_client.server_launch(
machine_id=machine_id,
days=days,
flavor=flavor,
@ -178,8 +153,6 @@ def launch(
token=_token,
hostname=hostname,
autorenew=autorenew,
api_endpoint=get_api_endpoint(),
retry=True,
)
if response.created is True:
break
@ -193,11 +166,7 @@ def launch(
raise typer.Exit(code=1)
typer.echo(
pretty_machine_info(
api_client.info(
machine_id=machine_id, api_endpoint=get_api_endpoint()
).dict()
)
pretty_machine_info(api_client.server_info(machine_id=machine_id).dict())
)
@ -212,19 +181,21 @@ def topup(
Extend an existing SporeStack server's lifetime.
"""
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
_token = load_token(token)
response = api_client.topup(
api_client.server_topup(
machine_id=machine_id,
days=days,
api_endpoint=get_api_endpoint(),
token=_token,
retry=True,
)
typer.echo(response.expiration)
typer.echo(f"Server topped up for {days} day(s)")
def server_info_path() -> Path:
@ -310,13 +281,14 @@ def server_list(
"""
List all locally known servers and all servers under the given token.
"""
from .api_client import APIClient
from .exceptions import SporeStackUserError
api_client = APIClient(api_endpoint=get_api_endpoint())
_token = load_token(token)
server_infos = api_client.servers_launched_from_token(
token=_token, api_endpoint=get_api_endpoint()
).servers
server_infos = api_client.servers_launched_from_token(token=_token).servers
machine_id_hostnames = {}
if local:
@ -341,20 +313,21 @@ def server_list(
typer.echo(f"Machine ID (keep this secret!): {info.machine_id}")
typer.echo(f"IPv6: {info.network_interfaces[0].ipv6}")
typer.echo(f"IPv4: {info.network_interfaces[0].ipv4}")
typer.echo(f"Running: {info.running}")
typer.echo(f"Region: {info.region}")
typer.echo(f"Flavor: {info.flavor.slug}")
typer.echo(f"Token: {info.token}")
typer.echo(f"Autorenew: {info.autorenew}")
human_expiration = time.strftime(
"%Y-%m-%d %H:%M:%S %z", time.localtime(info.expiration)
)
typer.echo(f"Expiration: {info.expiration} ({human_expiration})")
typer.echo(f"Token: {info.token}")
if info.deleted:
typer.echo("Server was deleted!")
else:
typer.echo(f"Running: {info.running}")
time_to_live = info.expiration - int(time.time())
hours = time_to_live // 3600
typer.echo(f"Server will be deleted in {hours} hours.")
if info.deleted:
typer.echo("Server was deleted!")
typer.echo(f"Autorenew: {info.autorenew}")
printed_machine_ids.append(info.machine_id)
@ -367,9 +340,8 @@ def server_list(
continue
try:
upstream_vm_info = api_client.info(
machine_id=saved_vm_info["machine_id"],
api_endpoint=get_api_endpoint(),
upstream_vm_info = api_client.server_info(
machine_id=saved_vm_info["machine_id"]
)
saved_vm_info["expiration"] = upstream_vm_info.expiration
saved_vm_info["running"] = upstream_vm_info.running
@ -411,9 +383,11 @@ def _get_machine_id(machine_id: str, hostname: str, token: str) -> str:
_token = load_token(token)
for server in api_client.servers_launched_from_token(
token=_token, api_endpoint=get_api_endpoint()
).servers:
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
for server in api_client.servers_launched_from_token(token=_token).servers:
if server.hostname == hostname:
return server.machine_id
@ -429,18 +403,38 @@ def info(hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN) -
Info on the VM
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
typer.echo(
api_client.info(machine_id=machine_id, api_endpoint=get_api_endpoint()).json()
pretty_machine_info(api_client.server_info(machine_id=machine_id).dict())
)
@server_cli.command(name="json")
def server_info_json(
hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN
) -> None:
"""
Info on the VM, in JSON format
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
typer.echo(api_client.server_info(machine_id=machine_id).json())
@server_cli.command()
def start(hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN) -> None:
"""
Boots the VM.
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
api_client.start(machine_id=machine_id, api_endpoint=get_api_endpoint())
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
api_client.server_start(machine_id=machine_id)
typer.echo(f"{hostname} started.")
@ -450,7 +444,10 @@ def stop(hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN) -
Immediately shuts down the VM.
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
api_client.stop(machine_id=machine_id, api_endpoint=get_api_endpoint())
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
api_client.server_stop(machine_id=machine_id)
typer.echo(f"{hostname} stopped.")
@ -462,7 +459,10 @@ def autorenew_enable(
Enable autorenew on a server.
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
api_client.autorenew_enable(machine_id=machine_id, api_endpoint=get_api_endpoint())
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
api_client.autorenew_enable(machine_id=machine_id)
typer.echo("Autorenew enabled.")
@ -474,19 +474,25 @@ def autorenew_disable(
Disable autorenew on a server.
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
api_client.autorenew_disable(machine_id=machine_id, api_endpoint=get_api_endpoint())
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
api_client.autorenew_disable(machine_id=machine_id)
typer.echo("Autorenew disabled.")
@server_cli.command()
def destroy(
def delete(
hostname: str = "", machine_id: str = "", token: str = DEFAULT_TOKEN
) -> None:
"""
Deletes/destroys the VM before expiration (no refunds/credits)
Delete the VM before its expiration
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
api_client.destroy(machine_id=machine_id, api_endpoint=get_api_endpoint())
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
api_client.server_delete(machine_id=machine_id)
# Also remove the .json file
server_info_path().joinpath(f"{hostname}.json").unlink(missing_ok=True)
typer.echo(f"{machine_id} was destroyed.")
@ -500,7 +506,10 @@ def forget(
Forget about a deleted server so that it doesn't show up in server list.
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
api_client.forget(machine_id=machine_id, api_endpoint=get_api_endpoint())
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
api_client.server_forget(machine_id=machine_id)
typer.echo(f"{machine_id} was forgotten.")
@ -514,7 +523,10 @@ def rebuild(
Will take a couple minutes to complete after the request is made.
"""
machine_id = _get_machine_id(machine_id=machine_id, hostname=hostname, token=token)
api_client.rebuild(machine_id=machine_id, api_endpoint=get_api_endpoint())
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
api_client.server_rebuild(machine_id=machine_id)
typer.echo(f"{hostname} rebuilding.")
@ -523,7 +535,10 @@ def flavors() -> None:
"""
Returns available flavors.
"""
flavors = api_client.flavors(api_endpoint=get_api_endpoint()).flavors
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
flavors = api_client.flavors().flavors
for flavor in flavors:
typer.echo(f"{flavor}: {flavors[flavor]}")
@ -533,9 +548,10 @@ def operating_systems() -> None:
"""
Returns available operating systems.
"""
os_list = api_client.operating_systems(
api_endpoint=get_api_endpoint()
).operating_systems
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
os_list = api_client.operating_systems().operating_systems
for operating_system in os_list:
typer.echo(operating_system)
@ -590,11 +606,14 @@ def token_create(
typer.echo("Token already created! Did you mean to `topup`?", err=True)
raise typer.Exit(1)
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
response = api_client.token_add(
token=_token,
dollars=dollars,
currency=currency,
api_endpoint=get_api_endpoint(),
retry=True,
)
@ -615,7 +634,6 @@ def token_create(
token=_token,
dollars=dollars,
currency=currency,
api_endpoint=get_api_endpoint(),
retry=True,
)
if response.payment.paid is True:
@ -649,11 +667,14 @@ def token_topup(
"""
token = load_token(token)
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
response = api_client.token_add(
token,
dollars,
currency=currency,
api_endpoint=get_api_endpoint(),
retry=True,
)
@ -672,7 +693,6 @@ def token_topup(
token,
dollars,
currency=currency,
api_endpoint=get_api_endpoint(),
retry=True,
)
# Waiting for payment to set in.
@ -690,9 +710,11 @@ def balance(token: str = typer.Argument(DEFAULT_TOKEN)) -> None:
"""
_token = load_token(token)
typer.echo(
api_client.token_balance(token=_token, api_endpoint=get_api_endpoint()).usd
)
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
typer.echo(api_client.token_balance(token=_token).usd)
@token_cli.command()
@ -702,11 +724,11 @@ def servers(token: str = typer.Argument(DEFAULT_TOKEN)) -> None:
"""
_token = load_token(token)
typer.echo(
api_client.servers_launched_from_token(
token=_token, api_endpoint=get_api_endpoint()
)
)
from .api_client import APIClient
api_client = APIClient(api_endpoint=get_api_endpoint())
typer.echo(api_client.servers_launched_from_token(token=_token))
@token_cli.command(name="list")
@ -728,6 +750,8 @@ def version() -> None:
"""
Returns the installed version.
"""
from . import __version__
typer.echo(__version__)
@ -737,6 +761,8 @@ def api_endpoint() -> None:
Prints the selected API endpoint: Env var: SPORESTACK_ENDPOINT,
or, SPORESTACK_USE_TOR=1
"""
from . import api_client
endpoint = get_api_endpoint()
if ".onion" in endpoint:
typer.echo(f"{endpoint} using {api_client._get_tor_proxy()}")

103
src/sporestack/client.py Normal file
View File

@ -0,0 +1,103 @@
from dataclasses import dataclass
from typing import List, Union
from . import api
from .api_client import APIClient
from .utils import random_machine_id, random_token
@dataclass
class Server:
machine_id: str
api_client: APIClient = APIClient()
token: Union[str, None] = None
def info(self) -> api.ServerInfo.Response:
return self.api_client.server_info(self.machine_id)
def rebuild(self) -> None:
self.api_client.server_rebuild(self.machine_id)
def forget(self) -> None:
self.api_client.server_forget(self.machine_id)
def delete(self) -> None:
self.api_client.server_delete(self.machine_id)
def start(self) -> None:
"""Powers on the server."""
self.api_client.server_start(self.machine_id)
def stop(self) -> None:
"""Powers off the server."""
self.api_client.server_stop(self.machine_id)
def autorenew_enable(self) -> None:
"""Enables autorenew on the server."""
self.api_client.autorenew_enable(self.machine_id)
def autorenew_disable(self) -> None:
"""Disables autorenew on the server."""
self.api_client.autorenew_disable(self.machine_id)
def topup(self, days: int) -> None:
"""
Renew the server for the amount of days specified, from the token specified.
"""
if self.token is None:
raise ValueError("token must be set to top up a server!")
self.api_client.server_topup(
machine_id=self.machine_id, days=days, token=self.token
)
@dataclass
class Token:
token: str = random_token()
api_client: APIClient = APIClient()
def add(self, dollars: int, currency: str) -> None:
"""Add to token"""
self.api_client.token_add(token=self.token, dollars=dollars, currency=currency)
def balance(self) -> int:
"""Returns the token's balance in cents."""
return self.api_client.token_balance(token=self.token).cents
def servers(self) -> List[Server]:
server_classes: List[Server] = []
for server in self.api_client.servers_launched_from_token(self.token).servers:
server_classes.append(
Server(
machine_id=server.machine_id,
api_client=self.api_client,
token=self.token,
)
)
return server_classes
def launch_server(
self,
ssh_key: str,
flavor: str,
days: int,
operating_system: str,
region: Union[str, None] = None,
hostname: str = "",
autorenew: bool = False,
machine_id: str = random_machine_id(),
) -> Server:
self.api_client.server_launch(
machine_id=machine_id,
days=days,
token=self.token,
region=region,
flavor=flavor,
operating_system=operating_system,
ssh_key=ssh_key,
hostname=hostname,
autorenew=autorenew,
)
return Server(
machine_id=machine_id, api_client=self.api_client, token=self.token
)

View File

@ -1,8 +1,3 @@
from unittest.mock import MagicMock, patch
import pytest
from pydantic import ValidationError
from sporestack import api_client
@ -18,72 +13,3 @@ def test__is_onion_url() -> None:
assert api_client._is_onion_url("http://onion.domain.com/.onion/") is False
assert api_client._is_onion_url("http://me.me/file.onion/") is False
assert api_client._is_onion_url("http://me.me/file.onion") is False
@patch("sporestack.api_client._api_request")
def test_launch(mock_api_request: MagicMock) -> None:
with pytest.raises(ValidationError):
api_client.launch(
"dummymachineid",
days=1,
operating_system="freebsd-12",
ssh_key="id-rsa...",
flavor="aflavor",
token="f" * 64,
)
@patch("sporestack.api_client._api_request")
def test_topup(mock_api_request: MagicMock) -> None:
with pytest.raises(ValidationError):
api_client.topup("dummymachineid", token="f" * 64, days=1)
@patch("sporestack.api_client._api_request")
def test_start(mock_api_request: MagicMock) -> None:
api_client.start("dummymachineid")
mock_api_request.assert_called_once_with(
"https://api.sporestack.com/server/dummymachineid/start", empty_post=True
)
@patch("sporestack.api_client._api_request")
def test_stop(mock_api_request: MagicMock) -> None:
api_client.stop("dummymachineid")
mock_api_request.assert_called_once_with(
"https://api.sporestack.com/server/dummymachineid/stop", empty_post=True
)
@patch("sporestack.api_client._api_request")
def test_rebuild(mock_api_request: MagicMock) -> None:
api_client.rebuild("dummymachineid")
mock_api_request.assert_called_once_with(
"https://api.sporestack.com/server/dummymachineid/rebuild", empty_post=True
)
@patch("sporestack.api_client._api_request")
def test_info(mock_api_request: MagicMock) -> None:
with pytest.raises(ValidationError):
api_client.info("dummymachineid")
mock_api_request.assert_called_once_with(
"https://api.sporestack.com/server/dummymachineid/info"
)
@patch("sporestack.api_client._api_request")
def test_delete(mock_api_request: MagicMock) -> None:
api_client.delete("dummymachineid")
mock_api_request.assert_called_once_with(
"https://api.sporestack.com/server/dummymachineid/destroy", empty_post=True
)
@patch("sporestack.api_client._api_request")
def test_token_balance(mock_api_request: MagicMock) -> None:
with pytest.raises(ValidationError):
api_client.token_balance("dummytoken")
mock_api_request.assert_called_once_with(
url="https://api.sporestack.com/token/dummytoken/balance"
)

View File

@ -1,10 +1,9 @@
import pytest
import typer
from _pytest.monkeypatch import MonkeyPatch
from typer.testing import CliRunner
from sporestack import cli
from sporestack.api_client import TOR_ENDPOINT
from typer.testing import CliRunner
runner = CliRunner()