Browse Source

1.2.0: Ran black

tags/1.2.0
Teran McKinney 5 months ago
parent
commit
4166b2b675
11 changed files with 654 additions and 652 deletions
  1. +28
    -25
      setup.py
  2. +1
    -1
      sporestackv2/__init__.py
  3. +170
    -168
      sporestackv2/api_client.py
  4. +10
    -10
      sporestackv2/api_client_test.py
  5. +330
    -322
      sporestackv2/client.py
  6. +3
    -3
      sporestackv2/client_test.py
  7. +15
    -15
      sporestackv2/utilities.py
  8. +6
    -18
      sporestackv2/utilities_test.py
  9. +46
    -47
      sporestackv2/validate.py
  10. +44
    -42
      sporestackv2/validate_test.py
  11. +1
    -1
      sporestackv2/version.py

+ 28
- 25
setup.py View File

@@ -5,42 +5,45 @@ from setuptools import setup
# Not sure how necessary this is. Would be nice to just
# import .sporestackv2.__version__
VERSION = None
with open('sporestackv2/version.py') as f:
with open("sporestackv2/version.py") as f:
for line in f:
if line.startswith('__version__'):
VERSION = line.replace("'", '').split('=')[1].strip()
if line.startswith("__version__"):
VERSION = line.replace("'", "").split("=")[1].strip()
break
if VERSION is None:
raise ValueError('__version__ not found in __init__.py')
raise ValueError("__version__ not found in __init__.py")

DOWNLOAD_URL = 'https://github.com/sporestack/sporestack-python/tarball/{}'
DOWNLOAD_URL = "https://github.com/sporestack/sporestack-python/tarball/{}"

DESCRIPTION = 'SporeStack.com library and client. Launch servers with Bitcoin.'
KEYWORDS = ['bitcoin', 'bitcoincash', 'bitcoinsv', 'servers', 'infrastructure',
'vps', 'virtual private server']
DESCRIPTION = "SporeStack.com library and client. Launch servers with Bitcoin."
KEYWORDS = [
"bitcoin",
"bitcoincash",
"bitcoinsv",
"servers",
"infrastructure",
"vps",
"virtual private server",
]

setup(
python_requires='>=3.3',
name='sporestack',
python_requires=">=3.3",
name="sporestack",
version=VERSION,
author='SporeStack',
author_email='admin@sporestack.com',
author="SporeStack",
author_email="admin@sporestack.com",
description=DESCRIPTION,
keywords=KEYWORDS,
license='Unlicense',
url='https://sporestack.com/',
license="Unlicense",
url="https://sporestack.com/",
download_url=DOWNLOAD_URL.format(VERSION),
packages=['sporestackv2'],
packages=["sporestackv2"],
install_requires=[
'pyqrcode',
'requests[socks]>=2.22.0',
'aaargh',
'walkingliberty',
'sshpubkeys'
"pyqrcode",
"requests[socks]>=2.22.0",
"aaargh",
"walkingliberty",
"sshpubkeys",
],
entry_points={
'console_scripts': [
'sporestackv2 = sporestackv2.client:main'
]
}
entry_points={"console_scripts": ["sporestackv2 = sporestackv2.client:main"]},
)

+ 1
- 1
sporestackv2/__init__.py View File

@@ -4,6 +4,6 @@ from . import utilities
from . import validate
from . import version

__all__ = ['api_client', 'client', 'utilities', 'validate']
__all__ = ["api_client", "client", "utilities", "validate"]

__version__ = version.__version__

+ 170
- 168
sporestackv2/api_client.py View File

@@ -13,17 +13,17 @@ LATEST_API_VERSION = 2

GET_TIMEOUT = 60
POST_TIMEOUT = 90
USE_TOR_PROXY = 'auto'
TOR_PROXY = 'socks5h://127.0.0.1:9050'
USE_TOR_PROXY = "auto"
TOR_PROXY = "socks5h://127.0.0.1:9050"
# For requests module
TOR_PROXY_REQUESTS = {'http': TOR_PROXY, 'https': TOR_PROXY}
TOR_PROXY_REQUESTS = {"http": TOR_PROXY, "https": TOR_PROXY}


def validate_use_tor_proxy(use_tor_proxy):
if isinstance(use_tor_proxy, bool):
return True
if isinstance(use_tor_proxy, str):
if use_tor_proxy == 'auto':
if use_tor_proxy == "auto":
return True

raise ValueError('use_tor_proxy must be True, False, or "auto"')
@@ -40,54 +40,52 @@ def is_onion_url(url):
This can be optimized a lot.
"""
try:
url_parts = url.split('/')
url_parts = url.split("/")
domain = url_parts[2]
tld = domain.split('.')[-1]
if tld == 'onion':
tld = domain.split(".")[-1]
if tld == "onion":
return True
except Exception:
pass
return False


def api_request(url,
json_params=None,
get_params=None,
retry=False,
use_tor_proxy=USE_TOR_PROXY):
def api_request(
url, json_params=None, get_params=None, retry=False, use_tor_proxy=USE_TOR_PROXY
):
validate_use_tor_proxy(use_tor_proxy)

proxies = {}
if use_tor_proxy is True:
proxies = TOR_PROXY_REQUESTS
elif use_tor_proxy == 'auto':
elif use_tor_proxy == "auto":
if is_onion_url(url) is True:
msg = 'use_tor_proxy is "auto" and we have a .onion url, '
msg += 'using local Tor SOCKS proxy.'
msg += "using local Tor SOCKS proxy."
logging.debug(msg)
proxies = TOR_PROXY_REQUESTS

try:
if json_params is None:
request = requests.get(url,
params=get_params,
timeout=GET_TIMEOUT,
proxies=proxies)
request = requests.get(
url, params=get_params, timeout=GET_TIMEOUT, proxies=proxies
)
else:
request = requests.post(url,
json=json_params,
timeout=POST_TIMEOUT,
proxies=proxies)
request = requests.post(
url, json=json_params, timeout=POST_TIMEOUT, proxies=proxies
)
except Exception as e:
if retry is True:
logging.warning('Got an error, but retrying: {}'.format(e))
logging.warning("Got an error, but retrying: {}".format(e))
sleep(5)
# Try again.
return api_request(url,
json_params=json_params,
get_params=get_params,
retry=retry,
use_tor_proxy=use_tor_proxy)
return api_request(
url,
json_params=json_params,
get_params=get_params,
retry=retry,
use_tor_proxy=use_tor_proxy,
)
else:
raise

@@ -95,46 +93,48 @@ def api_request(url,
if status_code_first_digit == 2:
try:
request_dict = request.json()
if 'latest_api_version' in request_dict:
if request_dict['latest_api_version'] > LATEST_API_VERSION:
logging.warning('New API version may be available.')
if "latest_api_version" in request_dict:
if request_dict["latest_api_version"] > LATEST_API_VERSION:
logging.warning("New API version may be available.")
return request_dict
except Exception:
return request.content
elif status_code_first_digit == 4:
if request.status_code == 415:
raise NotImplementedError(request.content.decode('utf-8'))
raise NotImplementedError(request.content.decode("utf-8"))
else:
logging.debug('Status code: {}'.format(request.status_code))
raise ValueError(request.content.decode('utf-8'))
logging.debug("Status code: {}".format(request.status_code))
raise ValueError(request.content.decode("utf-8"))
elif status_code_first_digit == 5:
if retry is True:
logging.warning(request.content.decode('utf-8'))
logging.warning('Got a 500, retrying in 5 seconds...')
logging.warning(request.content.decode("utf-8"))
logging.warning("Got a 500, retrying in 5 seconds...")
sleep(5)
# Try again if we get a 500
return api_request(url,
json_params=json_params,
get_params=get_params,
retry=retry,
use_tor_proxy=use_tor_proxy)
return api_request(
url,
json_params=json_params,
get_params=get_params,
retry=retry,
use_tor_proxy=use_tor_proxy,
)
else:
raise Exception(str(request.content))
else:
# Not sure why we'd get this.
request.raise_for_status()
raise Exception('Stuff broke strangely.')
raise Exception("Stuff broke strangely.")


def normalize_argument(argument):
"""
Helps normalize arguments from aaargh that may not be what we want.
"""
if argument == 'False':
if argument == "False":
return False
elif argument == 'True':
elif argument == "True":
return True
elif argument == 'None':
elif argument == "None":
return None
else:
return argument
@@ -145,35 +145,37 @@ def get_url(api_endpoint, host, target):
Has nothing to do with GET requests.
"""
if api_endpoint is None:
api_endpoint = 'http://{}'.format(host)
return '{}/v{}/{}'.format(api_endpoint, LATEST_API_VERSION, target)


def launch(machine_id,
days,
disk,
memory,
ipv4,
ipv6,
bandwidth,
currency,
region=None,
ipxescript=None,
operating_system=None,
ssh_key=None,
organization=None,
cores=1,
managed=False,
override_code=None,
settlement_token=None,
qemuopts=None,
hostaccess=False,
api_endpoint=None,
host=None,
want_topup=False,
retry=False,
affiliate_amount=None,
affiliate_token=None):
api_endpoint = "http://{}".format(host)
return "{}/v{}/{}".format(api_endpoint, LATEST_API_VERSION, target)


def launch(
machine_id,
days,
disk,
memory,
ipv4,
ipv6,
bandwidth,
currency,
region=None,
ipxescript=None,
operating_system=None,
ssh_key=None,
organization=None,
cores=1,
managed=False,
override_code=None,
settlement_token=None,
qemuopts=None,
hostaccess=False,
api_endpoint=None,
host=None,
want_topup=False,
retry=False,
affiliate_amount=None,
affiliate_token=None,
):
"""
Only ipxescript or operating_system + ssh_key can be None.
"""
@@ -195,57 +197,63 @@ def launch(machine_id,
validate.ssh_key(ssh_key)
validate.affiliate_amount(affiliate_amount)

json_params = {'machine_id': machine_id,
'days': days,
'disk': disk,
'memory': memory,
'cores': cores,
'managed': managed,
'currency': currency,
'region': region,
'organization': organization,
'bandwidth': bandwidth,
'ipv4': ipv4,
'ipv6': ipv6,
'override_code': override_code,
'settlement_token': settlement_token,
'qemuopts': qemuopts,
'hostaccess': hostaccess,
'ipxescript': ipxescript,
'operating_system': operating_system,
'ssh_key': ssh_key,
'want_topup': want_topup,
'host': host,
'affiliate_amount': affiliate_amount,
'affiliate_token': affiliate_token}

url = get_url(api_endpoint=api_endpoint, host=host, target='launch')
json_params = {
"machine_id": machine_id,
"days": days,
"disk": disk,
"memory": memory,
"cores": cores,
"managed": managed,
"currency": currency,
"region": region,
"organization": organization,
"bandwidth": bandwidth,
"ipv4": ipv4,
"ipv6": ipv6,
"override_code": override_code,
"settlement_token": settlement_token,
"qemuopts": qemuopts,
"hostaccess": hostaccess,
"ipxescript": ipxescript,
"operating_system": operating_system,
"ssh_key": ssh_key,
"want_topup": want_topup,
"host": host,
"affiliate_amount": affiliate_amount,
"affiliate_token": affiliate_token,
}

url = get_url(api_endpoint=api_endpoint, host=host, target="launch")
return api_request(url=url, json_params=json_params, retry=retry)


def topup(machine_id,
days,
currency,
settlement_token=None,
override_code=None,
api_endpoint=None,
host=None,
retry=False,
affiliate_amount=None,
affiliate_token=None):
def topup(
machine_id,
days,
currency,
settlement_token=None,
override_code=None,
api_endpoint=None,
host=None,
retry=False,
affiliate_amount=None,
affiliate_token=None,
):
validate.machine_id(machine_id)
validate.currency(currency)
validate.affiliate_amount(affiliate_amount)

json_params = {'machine_id': machine_id,
'days': days,
'settlement_token': settlement_token,
'currency': currency,
'host': host,
'override_code': override_code,
'affiliate_amount': affiliate_amount,
'affiliate_token': affiliate_token}
url = get_url(api_endpoint=api_endpoint, host=host, target='topup')
json_params = {
"machine_id": machine_id,
"days": days,
"settlement_token": settlement_token,
"currency": currency,
"host": host,
"override_code": override_code,
"affiliate_amount": affiliate_amount,
"affiliate_token": affiliate_token,
}
url = get_url(api_endpoint=api_endpoint, host=host, target="topup")
return api_request(url=url, json_params=json_params, retry=retry)


@@ -255,8 +263,8 @@ def start(host, machine_id, api_endpoint=None):
"""
validate.machine_id(machine_id)

url = get_url(api_endpoint=api_endpoint, host=host, target='start')
json_params = {'machine_id': machine_id, 'host': host}
url = get_url(api_endpoint=api_endpoint, host=host, target="start")
json_params = {"machine_id": machine_id, "host": host}
api_request(url, json_params=json_params)
return True

@@ -267,8 +275,8 @@ def stop(host, machine_id, api_endpoint=None):
"""
validate.machine_id(machine_id)

url = get_url(api_endpoint=api_endpoint, host=host, target='stop')
json_params = {'machine_id': machine_id, 'host': host}
url = get_url(api_endpoint=api_endpoint, host=host, target="stop")
json_params = {"machine_id": machine_id, "host": host}
api_request(url, json_params=json_params)
return True

@@ -279,8 +287,8 @@ def delete(host, machine_id, api_endpoint=None):
"""
validate.machine_id(machine_id)

url = get_url(api_endpoint=api_endpoint, host=host, target='delete')
json_params = {'machine_id': machine_id, 'host': host}
url = get_url(api_endpoint=api_endpoint, host=host, target="delete")
json_params = {"machine_id": machine_id, "host": host}
api_request(url, json_params=json_params)
return True

@@ -292,8 +300,8 @@ def sshhostname(host, machine_id, api_endpoint=None):
"""
validate.machine_id(machine_id)

url = get_url(api_endpoint=api_endpoint, host=host, target='sshhostname')
get_params = {'machine_id': machine_id, 'host': host}
url = get_url(api_endpoint=api_endpoint, host=host, target="sshhostname")
get_params = {"machine_id": machine_id, "host": host}
return api_request(url, get_params=get_params)


@@ -303,8 +311,8 @@ def info(host, machine_id, api_endpoint=None):
"""
validate.machine_id(machine_id)

url = get_url(api_endpoint=api_endpoint, host=host, target='info')
get_params = {'machine_id': machine_id, 'host': host}
url = get_url(api_endpoint=api_endpoint, host=host, target="info")
get_params = {"machine_id": machine_id, "host": host}
return api_request(url, get_params=get_params)


@@ -316,15 +324,13 @@ def ipxescript(host, machine_id, ipxescript=None, api_endpoint=None):
validate.machine_id(machine_id)

if ipxescript is None:
if __name__ == '__main__':
if __name__ == "__main__":
ipxescript = sys.stdin.read()
else:
raise ValueError('ipxescript must be set.')
raise ValueError("ipxescript must be set.")

url = get_url(api_endpoint=api_endpoint, host=host, target='ipxescript')
json_params = {'machine_id': machine_id,
'host': host,
'ipxescript': ipxescript}
url = get_url(api_endpoint=api_endpoint, host=host, target="ipxescript")
json_params = {"machine_id": machine_id, "host": host, "ipxescript": ipxescript}
return api_request(url, json_params=json_params)


@@ -335,10 +341,8 @@ def bootorder(host, machine_id, bootorder, api_endpoint=None):
validate.machine_id(machine_id)
validate.bootorder(bootorder)

url = get_url(api_endpoint=api_endpoint, host=host, target='ipxescript')
json_params = {'machine_id': machine_id,
'host': host,
'bootorder': bootorder}
url = get_url(api_endpoint=api_endpoint, host=host, target="ipxescript")
json_params = {"machine_id": machine_id, "host": host, "bootorder": bootorder}
return api_request(url, json_params=json_params)


@@ -346,7 +350,7 @@ def host_info(host, api_endpoint=None):
"""
Returns info about the host.
"""
url = get_url(api_endpoint=api_endpoint, host=host, target='host_info')
url = get_url(api_endpoint=api_endpoint, host=host, target="host_info")
return api_request(url)


@@ -356,55 +360,53 @@ def serialconsole(host, machine_id):
"""
validate.machine_id(machine_id)

command = '/usr/bin/ssh'
command = "/usr/bin/ssh"
arguments = []
arguments.append(command)
arguments.append('-t')
arguments.append('vmmanagement@{}'.format(host))
arguments.append('-p')
arguments.append('1060')
arguments.append('serialconsole {}'.format(machine_id))
arguments.append("-t")
arguments.append("vmmanagement@{}".format(host))
arguments.append("-p")
arguments.append("1060")
arguments.append("serialconsole {}".format(machine_id))
logging.info(command, arguments)
os.execv(command, arguments)


def settlement_token_enable(settlement_token,
cents,
currency,
api_endpoint=None,
retry=False):
def settlement_token_enable(
settlement_token, cents, currency, api_endpoint=None, retry=False
):
validate.settlement_token(settlement_token)
validate.cents(cents)
validate.currency(currency)

json_params = {'settlement_token': settlement_token,
'cents': cents,
'currency': currency}
url = api_endpoint + '/settlement/enable'
json_params = {
"settlement_token": settlement_token,
"cents": cents,
"currency": currency,
}
url = api_endpoint + "/settlement/enable"
return api_request(url=url, json_params=json_params, retry=retry)


def settlement_token_add(settlement_token,
cents,
currency,
api_endpoint=None,
retry=False):
def settlement_token_add(
settlement_token, cents, currency, api_endpoint=None, retry=False
):
validate.settlement_token(settlement_token)
validate.cents(cents)
validate.currency(currency)

json_params = {'settlement_token': settlement_token,
'cents': cents,
'currency': currency}
url = api_endpoint + '/settlement/add'
json_params = {
"settlement_token": settlement_token,
"cents": cents,
"currency": currency,
}
url = api_endpoint + "/settlement/add"
return api_request(url=url, json_params=json_params, retry=retry)


def settlement_token_balance(settlement_token,
api_endpoint=None,
retry=False):
def settlement_token_balance(settlement_token, api_endpoint=None, retry=False):
validate.settlement_token(settlement_token)

get_params = {'settlement_token': settlement_token}
url = api_endpoint + '/settlement/balance'
get_params = {"settlement_token": settlement_token}
url = api_endpoint + "/settlement/balance"
return api_request(url=url, get_params=get_params, retry=retry)

+ 10
- 10
sporestackv2/api_client_test.py View File

@@ -6,9 +6,9 @@ import pytest
def test_validate_validate_use_tor_proxy():
assert api_client.validate_use_tor_proxy(True) is True
assert api_client.validate_use_tor_proxy(False) is True
assert api_client.validate_use_tor_proxy('auto') is True
assert api_client.validate_use_tor_proxy("auto") is True
with pytest.raises(ValueError):
api_client.validate_use_tor_proxy('some string')
api_client.validate_use_tor_proxy("some string")
with pytest.raises(ValueError):
api_client.validate_use_tor_proxy(1)
with pytest.raises(ValueError):
@@ -16,14 +16,14 @@ def test_validate_validate_use_tor_proxy():


def test_is_onion_url():
onion_url = 'http://spore64i5sofqlfz5gq2ju4msgzojjwifls7'
onion_url += 'rok2cti624zyq3fcelad.onion/v2/'
onion_url = "http://spore64i5sofqlfz5gq2ju4msgzojjwifls7"
onion_url += "rok2cti624zyq3fcelad.onion/v2/"
assert api_client.is_onion_url(onion_url) is True
# This is a good, unusual test.
onion_url = 'https://www.facebookcorewwwi.onion/'
onion_url = "https://www.facebookcorewwwi.onion/"
assert api_client.is_onion_url(onion_url) is True
assert api_client.is_onion_url('http://domain.com') is False
assert api_client.is_onion_url('domain.com') is False
assert api_client.is_onion_url('http://onion.domain.com/.onion/') is False
assert api_client.is_onion_url('http://me.me/file.onion/') is False
assert api_client.is_onion_url('http://me.me/file.onion') is False
assert api_client.is_onion_url("http://domain.com") is False
assert api_client.is_onion_url("domain.com") is False
assert api_client.is_onion_url("http://onion.domain.com/.onion/") is False
assert api_client.is_onion_url("http://me.me/file.onion/") is False
assert api_client.is_onion_url("http://me.me/file.onion") is False

+ 330
- 322
sporestackv2/client.py
File diff suppressed because it is too large
View File


+ 3
- 3
sporestackv2/client_test.py View File

@@ -6,9 +6,9 @@ def test_random_machine_id():


def test_api_endpoint_to_host():
assert client.api_endpoint_to_host('http://foo.bar') == 'foo.bar'
assert client.api_endpoint_to_host('https://foo.bar') == 'foo.bar'
assert client.api_endpoint_to_host("http://foo.bar") == "foo.bar"
assert client.api_endpoint_to_host("https://foo.bar") == "foo.bar"


def test_version():
assert '.' in client.version()
assert "." in client.version()

+ 15
- 15
sporestackv2/utilities.py View File

@@ -16,29 +16,29 @@ def payment_to_uri(address, currency, amount):
This does *not* validate address.
"""
if not isinstance(address, str):
raise TypeError('address must be string.')
raise TypeError("address must be string.")
if not isinstance(currency, str):
raise TypeError('currency must be string.')
raise TypeError("currency must be string.")
if not validate.unsigned_int(amount):
raise TypeError('amount must be unsigned int.')
raise TypeError("amount must be unsigned int.")

if len(address) == 0:
raise ValueError('address cannot be 0 length.')
raise ValueError("address cannot be 0 length.")

btc_decimal_amount = "{0:.8f}".format(amount * 0.00000001)
if currency == 'btc':
uri = 'bitcoin:{}?amount={}'.format(address, btc_decimal_amount)
elif currency == 'bch':
if currency == "btc":
uri = "bitcoin:{}?amount={}".format(address, btc_decimal_amount)
elif currency == "bch":
# This does not support "legacy" base58 addresses for Bitcoin Cash.
uri = '{}?amount={}'.format(address, btc_decimal_amount)
elif currency == 'bsv':
uri = "{}?amount={}".format(address, btc_decimal_amount)
elif currency == "bsv":
# This does not support "legacy" cashaddr addresses for Bitcoin SV.
uri = 'bitcoin:{}?amount={}'.format(address, btc_decimal_amount)
elif currency == 'xmr':
uri = "bitcoin:{}?amount={}".format(address, btc_decimal_amount)
elif currency == "xmr":
xmr_decimal_amount = "{0:.12f}".format(amount * 0.000000000001)
uri = 'monero:{}?tx_amount={}'.format(address, xmr_decimal_amount)
uri = "monero:{}?tx_amount={}".format(address, xmr_decimal_amount)
else:
raise ValueError('Currency must be one of: btc, bch, bsv, xmr')
raise ValueError("Currency must be one of: btc, bch, bsv, xmr")

return uri

@@ -48,5 +48,5 @@ def cents_to_usd(cents):
cents_to_usd: Convert cents to USD string.
"""
if not validate.unsigned_int(cents):
raise TypeError('cents must be unsigned int.')
return '${:,.2f}'.format(cents * 0.01)
raise TypeError("cents must be unsigned int.")
return "${:,.2f}".format(cents * 0.01)

+ 6
- 18
sporestackv2/utilities_test.py View File

@@ -7,27 +7,21 @@ def test_payment_to_uri():
address = "1xm4vFerV3pSgvBFkyzLgT1Ew3HQYrS1V"
currency = "btc"
amount = 12345678
output = utilities.payment_to_uri(address=address,
currency=currency,
amount=amount)
output = utilities.payment_to_uri(address=address, currency=currency, amount=amount)
expected = "bitcoin:1xm4vFerV3pSgvBFkyzLgT1Ew3HQYrS1V?amount=0.12345678"
assert output == expected

address = "1xm4vFerV3pSgvBFkyzLgT1Ew3HQYrS1V"
currency = "bsv"
amount = 200000000
output = utilities.payment_to_uri(address=address,
currency=currency,
amount=amount)
output = utilities.payment_to_uri(address=address, currency=currency, amount=amount)
expected = "bitcoin:1xm4vFerV3pSgvBFkyzLgT1Ew3HQYrS1V?amount=2.00000000"
assert output == expected

address = "bitcoincash:qq9gh20y2vur63tpe0xa5dh90zwzsuxagyhp7pfuv3"
currency = "bch"
amount = 1000001
output = utilities.payment_to_uri(address=address,
currency=currency,
amount=amount)
output = utilities.payment_to_uri(address=address, currency=currency, amount=amount)
expected = address + "?amount=0.01000001"
assert output == expected

@@ -35,22 +29,16 @@ def test_payment_to_uri():
address += "vcP47iPXXU1yqgJofrShLzKnBYBmMCTSSw2h1iaQs8h"
currency = "xmr"
amount = 1000001
output = utilities.payment_to_uri(address=address,
currency=currency,
amount=amount)
output = utilities.payment_to_uri(address=address, currency=currency, amount=amount)
# piconeros are smaller than Satoshis.
expected = "monero:" + address + "?tx_amount=0.000001000001"
assert output == expected

# Negative tests.
with pytest.raises(ValueError):
utilities.payment_to_uri(address=address,
currency="xxx",
amount=amount)
utilities.payment_to_uri(address=address, currency="xxx", amount=amount)
with pytest.raises(TypeError):
utilities.payment_to_uri(address=address,
currency=currency,
amount="100")
utilities.payment_to_uri(address=address, currency=currency, amount="100")


def test_cents_to_usd():


+ 46
- 47
sporestackv2/validate.py View File

@@ -16,12 +16,12 @@ def machine_id(machine_id):
A sha256sum, in effect.
"""
if not isinstance(machine_id, str):
raise TypeError('machine_id must be a string.')
raise TypeError("machine_id must be a string.")
if len(machine_id) != 64:
raise ValueError('machine_id must be exactly 64 bytes/characters.')
raise ValueError("machine_id must be exactly 64 bytes/characters.")
for letter in machine_id:
if letter not in '0123456789abcdef':
raise ValueError('machine_id must be only 0-9, a-f (lowercase)')
if letter not in "0123456789abcdef":
raise ValueError("machine_id must be only 0-9, a-f (lowercase)")
return True


@@ -40,14 +40,14 @@ def operating_system(operating_system):
if operating_system is None:
return True
if not isinstance(operating_system, str):
raise TypeError('operating_system must be null or a string.')
raise TypeError("operating_system must be null or a string.")
if len(operating_system) < 1:
raise ValueError('operating_system must have at least one letter.')
raise ValueError("operating_system must have at least one letter.")
if len(operating_system) > 16:
raise ValueError('operating_system must have 16 letters or less.')
raise ValueError("operating_system must have 16 letters or less.")
for character in operating_system:
if character not in string.ascii_lowercase + string.digits + "-":
msg = 'operating_system must only contain a-z, digits, -'
msg = "operating_system must only contain a-z, digits, -"
raise ValueError(msg)

return True
@@ -60,11 +60,9 @@ def ssh_key(ssh_key):
if ssh_key is None:
return True
if not isinstance(ssh_key, str):
raise TypeError('ssh_key must be null or a string.')
raise TypeError("ssh_key must be null or a string.")

ssh_key_object = SSHKey(ssh_key,
skip_option_parsing=True,
disallow_options=True)
ssh_key_object = SSHKey(ssh_key, skip_option_parsing=True, disallow_options=True)
try:
ssh_key_object.parse()
except Exception as e:
@@ -83,7 +81,7 @@ def days(days, zero_allowed=False):
0 is invalid unless zero_allowed is True.
"""
if not isinstance(days, int):
raise TypeError('days must be an integer type')
raise TypeError("days must be an integer type")

if zero_allowed is True:
minimum_days = 0
@@ -91,7 +89,7 @@ def days(days, zero_allowed=False):
minimum_days = 1

if days < minimum_days or days > 28:
raise ValueError('days must be {}-28'.format(minimum_days))
raise ValueError("days must be {}-28".format(minimum_days))

return True

@@ -100,14 +98,14 @@ def organization(organization):
if organization is None:
return True
if not isinstance(organization, str):
raise TypeError('organization must be string.')
raise TypeError("organization must be string.")
if len(organization) < 1:
raise ValueError('organization must have at least one letter.')
raise ValueError("organization must have at least one letter.")
if len(organization) > 16:
raise ValueError('organization must have 16 letters or less.')
raise ValueError("organization must have 16 letters or less.")
for character in organization:
if character not in string.ascii_letters:
raise ValueError('organization must only contain a-z, A-Z')
raise ValueError("organization must only contain a-z, A-Z")
return True


@@ -129,7 +127,7 @@ def disk(disk):
0 is valid, means no disk.
"""
if not unsigned_int(disk):
raise TypeError('disk must be an unsigned integer.')
raise TypeError("disk must be an unsigned integer.")
return True


@@ -138,9 +136,9 @@ def memory(memory):
Makes sure memory is valid.
"""
if not unsigned_int(memory):
raise TypeError('memory must be an unsigned integer.')
raise TypeError("memory must be an unsigned integer.")
if memory == 0:
raise ValueError('0 not acceptable for memory')
raise ValueError("0 not acceptable for memory")
return True


@@ -149,7 +147,7 @@ def expiration(expiration):
Makes sure expiration is valid.
"""
if not unsigned_int(expiration):
raise TypeError('expiration must be an unsigned integer.')
raise TypeError("expiration must be an unsigned integer.")
return True


@@ -158,7 +156,7 @@ def cores(cores):
Makes sure cores is valid.
"""
if not unsigned_int(cores):
raise TypeError('cores must be an unsigned integer.')
raise TypeError("cores must be an unsigned integer.")
return True


@@ -169,7 +167,7 @@ def qemuopts(qemuopts):
if qemuopts is None:
return True
if not isinstance(qemuopts, str):
raise TypeError('qemuopts must be none or str.')
raise TypeError("qemuopts must be none or str.")
return True


@@ -178,7 +176,7 @@ def managed(managed):
Makes sure managed is valid.
"""
if not isinstance(managed, bool):
raise TypeError('managed must be a boolean.')
raise TypeError("managed must be a boolean.")
return True


@@ -187,7 +185,7 @@ def hostaccess(hostaccess):
Makes sure hostaccess is valid.
"""
if not isinstance(hostaccess, bool):
raise TypeError('hostaccess must be a boolean.')
raise TypeError("hostaccess must be a boolean.")
return True


@@ -198,7 +196,7 @@ def currency(currency):
if currency is None:
return True
if not isinstance(currency, str):
raise TypeError('currency must be None or str.')
raise TypeError("currency must be None or str.")
return True


@@ -211,7 +209,7 @@ def refund_address(refund_address):
if refund_address is None:
return True
if not isinstance(refund_address, str):
raise TypeError('refund_address must be none or str.')
raise TypeError("refund_address must be none or str.")
return True


@@ -225,9 +223,9 @@ def bandwidth(bandwidth):
if bandwidth >= -1:
return True
else:
raise ValueError('bandwidth can be no lower than -1.')
raise ValueError("bandwidth can be no lower than -1.")
else:
raise TypeError('bandwidth must be integer.')
raise TypeError("bandwidth must be integer.")


def _ip(ip, ip_type, cidr):
@@ -240,37 +238,38 @@ def _ip(ip, ip_type, cidr):
return True

if not isinstance(ip, str):
raise TypeError('ipv4 must be false or string.')
raise TypeError("ipv4 must be false or string.")
elif ip == cidr:
return True
elif ip == 'nat':
elif ip == "nat":
return True
elif ip == 'tor':
elif ip == "tor":
return True
else:
raise ValueError('{} must be one of: False|{}'.format(ip_type, cidr))
raise ValueError("{} must be one of: False|{}".format(ip_type, cidr))


def ipv4(ipv4):
return _ip(ipv4, 'ipv4', '/32')
return _ip(ipv4, "ipv4", "/32")


def ipv6(ipv6):
return _ip(ipv6, 'ipv6', '/128')
return _ip(ipv6, "ipv6", "/128")


# further calls are for validating compatibilities between
# argument cominations.


def further_ipv4_ipv6(ipv4, ipv6):
"""
More validation with the combination of ipv4 and ipv6 options.

We don't support mixed nat/tor modes, so this handles that.
"""
message = 'ipv4 and ipv6 must be the same if either is tor or nat.'
message = "ipv4 and ipv6 must be the same if either is tor or nat."

if ipv4 in ['tor', 'nat'] or ipv6 in ['tor', 'nat']:
if ipv4 in ["tor", "nat"] or ipv6 in ["tor", "nat"]:
if ipv4 != ipv6:
raise ValueError(message)
return True
@@ -288,14 +287,14 @@ def ipxescript(ipxescript):
if ipxescript is None:
return True
if not isinstance(ipxescript, str):
raise TypeError('ipxescript must be a string or null.')
raise TypeError("ipxescript must be a string or null.")
if len(ipxescript) == 0:
raise ValueError('ipxescript must be more than zero bytes long.')
raise ValueError("ipxescript must be more than zero bytes long.")
if len(ipxescript) > 4000:
raise ValueError('ipxescript must be less than 4,000 bytes long.')
raise ValueError("ipxescript must be less than 4,000 bytes long.")
for letter in ipxescript:
if letter not in string.printable:
raise ValueError('ipxescript must only contain ascii characters.')
raise ValueError("ipxescript must only contain ascii characters.")
return True


@@ -303,14 +302,14 @@ def region(region):
if region is None:
return True
if not isinstance(region, str):
raise TypeError('region must be a string or null.')
raise TypeError("region must be a string or null.")
if len(region) == 0:
raise ValueError('region must be more than zero bytes long.')
raise ValueError("region must be more than zero bytes long.")
if len(region) > 200:
raise ValueError('region must be less than 200 bytes long.')
raise ValueError("region must be less than 200 bytes long.")
for letter in region:
if letter not in string.printable:
raise ValueError('region must only contain ascii characters.')
raise ValueError("region must only contain ascii characters.")
return True


@@ -320,7 +319,7 @@ def affiliate_amount(amount):
if unsigned_int(amount) is True:
if amount != 0:
return True
raise TypeError('affiliate_amount must be null or non-zero unsigned int.')
raise TypeError("affiliate_amount must be null or non-zero unsigned int.")


def cents(cents):


+ 44
- 42
sporestackv2/validate_test.py View File

@@ -4,8 +4,8 @@ import pytest

from . import validate

valid_id = '01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b'
invalid_id = 'z1ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b'
valid_id = "01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b"
invalid_id = "z1ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b"


def test_machine_id():
@@ -13,7 +13,7 @@ def test_machine_id():
with pytest.raises(TypeError):
validate.machine_id(1337)
with pytest.raises(ValueError):
validate.machine_id(valid_id + 'b')
validate.machine_id(valid_id + "b")
with pytest.raises(ValueError):
validate.machine_id(invalid_id)

@@ -23,7 +23,7 @@ def test_settlement_token():
with pytest.raises(TypeError):
validate.settlement_token(1337)
with pytest.raises(ValueError):
validate.settlement_token(valid_id + 'b')
validate.settlement_token(valid_id + "b")
with pytest.raises(ValueError):
validate.settlement_token(invalid_id)

@@ -48,7 +48,7 @@ def test_days():
validate.days(-1, zero_allowed=True)

with pytest.raises(TypeError):
validate.days('one')
validate.days("one")


def test_memory():
@@ -67,7 +67,7 @@ def test_disk():
with pytest.raises(TypeError):
validate.disk(-10)
with pytest.raises(TypeError):
validate.disk('10')
validate.disk("10")


def test_unsigned_int():
@@ -76,7 +76,7 @@ def test_unsigned_int():
assert validate.unsigned_int(1000) is True
assert validate.unsigned_int(-1) is False
assert validate.unsigned_int(1.0) is False
assert validate.unsigned_int('a') is False
assert validate.unsigned_int("a") is False
assert validate.unsigned_int(None) is False


@@ -100,24 +100,24 @@ def test_cents():
with pytest.raises(TypeError):
validate.cents(-1)
with pytest.raises(TypeError):
validate.cents('a')
validate.cents("a")
with pytest.raises(TypeError):
validate.cents(None)


def test_further_ipv4_ipv6():
assert validate.further_ipv4_ipv6('tor', 'tor') is True
assert validate.further_ipv4_ipv6('nat', 'nat') is True
assert validate.further_ipv4_ipv6("tor", "tor") is True
assert validate.further_ipv4_ipv6("nat", "nat") is True
with pytest.raises(ValueError):
validate.further_ipv4_ipv6('tor', 'nat')
validate.further_ipv4_ipv6("tor", "nat")
with pytest.raises(ValueError):
validate.further_ipv4_ipv6(False, 'nat')
validate.further_ipv4_ipv6(False, "nat")
with pytest.raises(ValueError):
validate.further_ipv4_ipv6('tor', False)
validate.further_ipv4_ipv6("tor", False)
with pytest.raises(ValueError):
validate.further_ipv4_ipv6('tor', '/128')
validate.further_ipv4_ipv6("tor", "/128")
with pytest.raises(ValueError):
validate.further_ipv4_ipv6('/32', 'tor')
validate.further_ipv4_ipv6("/32", "tor")


def test_further_dollars_cents():
@@ -131,11 +131,11 @@ def test_further_dollars_cents():

def test_organization():
assert validate.organization(None) is True
assert validate.organization('Corporation') is True
assert validate.organization('google') is True
assert validate.organization('Yahoo') is True
assert validate.organization("Corporation") is True
assert validate.organization("google") is True
assert validate.organization("Yahoo") is True
# Shortest valid.
assert validate.organization('A') is True
assert validate.organization("A") is True
# Longest valid
valid = validate.organization(string.ascii_lowercase[:16])
assert valid is True
@@ -148,21 +148,21 @@ def test_organization():
validate.organization(string.ascii_lowercase[:17])
# Too short
with pytest.raises(ValueError):
validate.organization('')
validate.organization("")
# Bad character
with pytest.raises(ValueError):
validate.organization('Google5')
validate.organization("Google5")


def test_operating_system():
assert validate.operating_system('debian-9') is True
assert validate.operating_system('debian-10') is True
assert validate.operating_system('ubuntu-16-04') is True
assert validate.operating_system('something-else') is True
assert validate.operating_system("debian-9") is True
assert validate.operating_system("debian-10") is True
assert validate.operating_system("ubuntu-16-04") is True
assert validate.operating_system("something-else") is True
assert validate.operating_system(None) is True

# Shortest valid.
assert validate.operating_system('a') is True
assert validate.operating_system("a") is True
# Longest valid
valid = validate.operating_system(string.ascii_lowercase[:16])
assert valid is True
@@ -173,24 +173,26 @@ def test_operating_system():
validate.operating_system(0)
# Too short.
with pytest.raises(ValueError):
validate.operating_system('')
validate.operating_system("")
# One too long.
with pytest.raises(ValueError):
validate.operating_system(string.ascii_lowercase[:17])
# Bad character.
with pytest.raises(ValueError):
validate.operating_system('_')
validate.operating_system("_")


valid_ssh_key = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDLuFASoTgo5r/bBGcawcN'\
'7B/NTyjmEi3cdgl8r3ldZXVXl6N/7vfF/O6ggkU1iJCHUgxOqGHzJMyJ3ZL'\
'6osyhgMF0htWYH7LhS4lJkzayUpCizelvW6FS//00smjxPnvOicEQNuuP0i'\
'XYZVZIzbubZn8fJi0ZoJ5LkTpqNdAx1M420cplFGlcMww/jk0gGvlQZnEV4'\
'Qra0Wh88s3xzPeryAoi+CwVAkBqHfntgkAVMComCfah8D+7nCS4F+Wi70hp'\
'wrUhKulm3r5sOsyU1fGduvyL8XBYH8+yHwe5H+5TK5kJ6gAmyjNQ8fw+6Wk'\
'wcB/heHAAJSUysYyfatIqKeWsj'
valid_ssh_key = (
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDLuFASoTgo5r/bBGcawcN"
"7B/NTyjmEi3cdgl8r3ldZXVXl6N/7vfF/O6ggkU1iJCHUgxOqGHzJMyJ3ZL"
"6osyhgMF0htWYH7LhS4lJkzayUpCizelvW6FS//00smjxPnvOicEQNuuP0i"
"XYZVZIzbubZn8fJi0ZoJ5LkTpqNdAx1M420cplFGlcMww/jk0gGvlQZnEV4"
"Qra0Wh88s3xzPeryAoi+CwVAkBqHfntgkAVMComCfah8D+7nCS4F+Wi70hp"
"wrUhKulm3r5sOsyU1fGduvyL8XBYH8+yHwe5H+5TK5kJ6gAmyjNQ8fw+6Wk"
"wcB/heHAAJSUysYyfatIqKeWsj"
)

valid_ssh_key_with_comment = valid_ssh_key + ' root@localhost'
valid_ssh_key_with_comment = valid_ssh_key + " root@localhost"


def test_ssh_key():
@@ -200,31 +202,31 @@ def test_ssh_key():
with pytest.raises(TypeError):
validate.ssh_key(1)
with pytest.raises(ValueError):
validate.ssh_key('')
validate.ssh_key("")
with pytest.raises(ValueError):
validate.ssh_key('ssh-rsa')
validate.ssh_key("ssh-rsa")


def test_ipxescript():
assert validate.ipxescript(None) is True
assert validate.ipxescript('#!ipxe') is True
assert validate.ipxescript("#!ipxe") is True
with pytest.raises(TypeError):
validate.ipxescript(1)
with pytest.raises(TypeError):
validate.ipxescript(True)
with pytest.raises(ValueError):
validate.ipxescript('')
validate.ipxescript("")


def test_region():
assert validate.region(None) is True
assert validate.region('#!ipxe') is True
assert validate.region("#!ipxe") is True
with pytest.raises(TypeError):
validate.region(1)
with pytest.raises(TypeError):
validate.region(True)
with pytest.raises(ValueError):
validate.region('')
validate.region("")


def test_affiliate_amount():


+ 1
- 1
sporestackv2/version.py View File

@@ -1 +1 @@
__version__ = '1.2.0'
__version__ = "1.2.0"

Loading…
Cancel
Save