chatdesk-ui/postgrest_v12.2.8/test/io/postgrest.py

221 lines
6.2 KiB
Python

"Fixtures to run PostgREST as a server."
import contextlib
import dataclasses
import os
import pathlib
import socket
import subprocess
import tempfile
import time
import urllib.parse
import pytest
import requests
import requests_unixsocket
from config import *
def sleep_until_postgrest_scache_reload():
"Sleep until schema cache reload"
time.sleep(0.3)
def sleep_until_postgrest_config_reload():
"Sleep until config reload"
time.sleep(0.2)
def sleep_until_postgrest_full_reload():
"Sleep until schema cache plus config reload"
time.sleep(0.3)
class PostgrestTimedOut(Exception):
"Connecting to PostgREST endpoint timed out."
class PostgrestSession(requests_unixsocket.Session):
"HTTP client session directed at a PostgREST endpoint."
def __init__(self, baseurl, *args, **kwargs):
super(PostgrestSession, self).__init__(*args, **kwargs)
self.baseurl = baseurl
def request(self, method, url, *args, **kwargs):
# Not using urllib.parse.urljoin to compose the url, as it doesn't play
# well with our 'http+unix://' unix domain socket urls.
fullurl = self.baseurl + url
return super(PostgrestSession, self).request(method, fullurl, *args, **kwargs)
@dataclasses.dataclass
class PostgrestProcess:
"Running PostgREST process and its corresponding main and admin endpoints."
admin: object
process: object
session: object
def read_stdout(self, nlines=1):
"Wait for line(s) on standard output."
output = []
for _ in range(10):
self.process.stdout.flush()
l = self.process.stdout.readline()
if l:
output.append(l.decode())
if len(output) >= nlines:
break
time.sleep(0.1)
return output
def wait_until_scache_starts_loading(self, max_seconds=1):
"Wait for the admin /ready return a status of 503"
wait_until_status_code(
self.admin.baseurl + "/ready", max_seconds=max_seconds, status_code=503
)
@contextlib.contextmanager
def run(
configpath=None,
stdin=None,
env=None,
port=None,
host=None,
wait_for_readiness=True,
wait_max_seconds=1,
no_pool_connection_available=False,
no_startup_stdout=True,
):
"Run PostgREST and yield an endpoint that is ready for connections."
with tempfile.TemporaryDirectory() as tmpdir:
if port:
env["PGRST_SERVER_PORT"] = str(port)
env["PGRST_SERVER_HOST"] = host or "localhost"
baseurl = f"http://localhost:{port}"
else:
socketfile = pathlib.Path(tmpdir) / "postgrest.sock"
env["PGRST_SERVER_UNIX_SOCKET"] = str(socketfile)
baseurl = "http+unix://" + urllib.parse.quote_plus(str(socketfile))
adminport = freeport(port)
env["PGRST_ADMIN_SERVER_PORT"] = str(adminport)
adminurl = f"http://localhost:{adminport}"
command = [POSTGREST_BIN]
env["HPCTIXFILE"] = hpctixfile()
if configpath:
command.append(configpath)
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
)
os.set_blocking(process.stdout.fileno(), False)
try:
process.stdin.write(stdin or b"")
process.stdin.close()
if wait_for_readiness:
wait_until_status_code(adminurl + "/ready", wait_max_seconds, 200)
if no_startup_stdout:
process.stdout.read()
if no_pool_connection_available:
sleep_pool_connection(baseurl, 10)
yield PostgrestProcess(
process=process,
session=PostgrestSession(baseurl),
admin=PostgrestSession(adminurl),
)
finally:
remaining_output = process.stdout.read()
if remaining_output:
print(remaining_output.decode())
process.terminate()
try:
process.wait(timeout=1)
except:
process.kill()
process.wait()
@pytest.fixture(scope="module")
def metapostgrest():
"A shared postgrest instance to use for interacting with the database independently of the instance under test"
role = "meta_authenticator"
env = {
"PGDATABASE": os.environ["PGDATABASE"],
"PGHOST": os.environ["PGHOST"],
"PGUSER": role,
"PGRST_DB_ANON_ROLE": role,
"PGRST_DB_CONFIG": "true",
"PGRST_LOG_LEVEL": "info",
"PGRST_DB_POOL": "1",
}
with run(env=env) as postgrest:
yield postgrest
def freeport(used_port=None):
"Find a free port on localhost."
while True:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = s.getsockname()[1]
if port != used_port:
return port
def wait_until_exit(postgrest):
"Wait for PostgREST to exit, or times out"
try:
return postgrest.process.wait(timeout=1)
except subprocess.TimeoutExpired:
raise PostgrestTimedOut()
def wait_until_status_code(url, max_seconds, status_code):
"Wait for the given HTTP endpoint to return a status code"
session = requests_unixsocket.Session()
for _ in range(max_seconds * 10):
try:
response = session.get(url, timeout=1)
if response.status_code == status_code:
return
except (requests.ConnectionError, requests.ReadTimeout):
pass
time.sleep(0.1)
if response:
raise PostgrestTimedOut(f"{response.status_code}: {response.text}")
else:
raise PostgrestTimedOut()
def sleep_pool_connection(url, seconds):
"Sleep a pool connection by calling an RPC that uses pg_sleep"
session = requests_unixsocket.Session()
# The try/except is a hack for not waiting for the response,
# taken from https://stackoverflow.com/a/45601591/4692662
try:
session.get(url + f"/rpc/sleep?seconds={seconds}", timeout=0.1)
except requests.exceptions.ReadTimeout:
pass