1666 lines
58 KiB
Python
1666 lines
58 KiB
Python
"Unit tests for Input/Ouput of PostgREST seen as a black box."
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from operator import attrgetter
|
|
import os
|
|
import re
|
|
import signal
|
|
import socket
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from config import *
|
|
from util import *
|
|
from postgrest import *
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"secretpath",
|
|
[path for path in (BASEDIR / "secrets").iterdir() if path.suffix != ".jwt"],
|
|
ids=attrgetter("name"),
|
|
)
|
|
def test_read_secret_from_file(secretpath, defaultenv):
|
|
"Authorization should succeed when the secret is read from a file."
|
|
|
|
env = {**defaultenv, "PGRST_JWT_SECRET": f"@{secretpath}"}
|
|
|
|
if secretpath.suffix == ".b64":
|
|
env["PGRST_JWT_SECRET_IS_BASE64"] = "true"
|
|
|
|
secret = secretpath.read_bytes()
|
|
headers = authheader(secretpath.with_suffix(".jwt").read_text())
|
|
|
|
with run(stdin=secret, env=env) as postgrest:
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
print(response.text)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_read_secret_from_stdin(defaultenv):
|
|
"Authorization should succeed when the secret is read from stdin."
|
|
|
|
env = {**defaultenv, "PGRST_DB_CONFIG": "false", "PGRST_JWT_SECRET": "@/dev/stdin"}
|
|
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
print(response.text)
|
|
assert response.status_code == 200
|
|
|
|
|
|
# TODO: This test would fail right now, because of
|
|
# https://github.com/PostgREST/postgrest/issues/2126
|
|
@pytest.mark.skip
|
|
def test_read_secret_from_stdin_dbconfig(defaultenv):
|
|
"Authorization should succeed when the secret is read from stdin with db-config=true."
|
|
|
|
env = {**defaultenv, "PGRST_DB_CONFIG": "true", "PGRST_JWT_SECRET": "@/dev/stdin"}
|
|
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
print(response.text)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_fail_with_invalid_password(defaultenv):
|
|
"Connecting with an invalid password should fail without retries."
|
|
uri = f'postgresql://?dbname={defaultenv["PGDATABASE"]}&host={defaultenv["PGHOST"]}&user=some_protected_user&password=invalid_pass'
|
|
env = {**defaultenv, "PGRST_DB_URI": uri}
|
|
with run(env=env, wait_for_readiness=False) as postgrest:
|
|
exitCode = wait_until_exit(postgrest)
|
|
assert exitCode == 1
|
|
|
|
|
|
def test_connect_with_dburi(dburi, defaultenv):
|
|
"Connecting with db-uri instead of LIPQ* environment variables should work."
|
|
defaultenv_without_libpq = {
|
|
key: value
|
|
for key, value in defaultenv.items()
|
|
if key not in ["PGDATABASE", "PGHOST", "PGUSER"]
|
|
}
|
|
env = {**defaultenv_without_libpq, "PGRST_DB_URI": dburi.decode()}
|
|
with run(env=env):
|
|
pass
|
|
|
|
|
|
def test_read_dburi_from_stdin_without_eol(dburi, defaultenv):
|
|
"Reading the dburi from stdin with a single line should work."
|
|
defaultenv_without_libpq = {
|
|
key: value
|
|
for key, value in defaultenv.items()
|
|
if key not in ["PGDATABASE", "PGHOST", "PGUSER"]
|
|
}
|
|
env = {**defaultenv_without_libpq, "PGRST_DB_URI": "@/dev/stdin"}
|
|
|
|
with run(env=env, stdin=dburi):
|
|
pass
|
|
|
|
|
|
def test_read_dburi_from_stdin_with_eol(dburi, defaultenv):
|
|
"Reading the dburi from stdin containing a newline should work."
|
|
defaultenv_without_libpq = {
|
|
key: value
|
|
for key, value in defaultenv.items()
|
|
if key not in ["PGDATABASE", "PGHOST", "PGUSER"]
|
|
}
|
|
env = {**defaultenv_without_libpq, "PGRST_DB_URI": "@/dev/stdin"}
|
|
|
|
with run(env=env, stdin=dburi + b"\n"):
|
|
pass
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"roleclaim", FIXTURES["roleclaims"], ids=lambda claim: claim["key"]
|
|
)
|
|
def test_role_claim_key(roleclaim, defaultenv):
|
|
"Authorization should depend on a correct role-claim-key and JWT claim."
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_JWT_ROLE_CLAIM_KEY": roleclaim["key"],
|
|
"PGRST_JWT_SECRET": SECRET,
|
|
}
|
|
headers = jwtauthheader(roleclaim["data"], SECRET)
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == roleclaim["expected_status"]
|
|
|
|
|
|
def test_iat_claim(defaultenv):
|
|
"""
|
|
A claim with an 'iat' (issued at) attribute should be successful.
|
|
|
|
The PostgREST time cache leads to issues here, see:
|
|
https://github.com/PostgREST/postgrest/issues/1139
|
|
|
|
"""
|
|
|
|
env = {**defaultenv, "PGRST_JWT_SECRET": SECRET}
|
|
|
|
claim = {"role": "postgrest_test_author", "iat": datetime.utcnow()}
|
|
headers = jwtauthheader(claim, SECRET)
|
|
|
|
with run(env=env) as postgrest:
|
|
for _ in range(10):
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == 200
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
def test_app_settings_flush_pool(defaultenv):
|
|
"""
|
|
App settings should not reset when the db pool is flushed.
|
|
|
|
See: https://github.com/PostgREST/postgrest/issues/1141
|
|
|
|
"""
|
|
|
|
env = {**defaultenv, "PGRST_APP_SETTINGS_EXTERNAL_API_SECRET": "0123456789abcdef"}
|
|
|
|
with run(env=env) as postgrest:
|
|
uri = "/rpc/get_guc_value?name=app.settings.external_api_secret"
|
|
response = postgrest.session.get(uri)
|
|
assert response.text == '"0123456789abcdef"'
|
|
|
|
# SIGUSR1 causes the postgres connection pool to be flushed
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
sleep_until_postgrest_scache_reload()
|
|
|
|
uri = "/rpc/get_guc_value?name=app.settings.external_api_secret"
|
|
response = postgrest.session.get(uri)
|
|
assert response.text == '"0123456789abcdef"'
|
|
|
|
|
|
def test_flush_pool_no_interrupt(defaultenv):
|
|
"Flushing the pool via SIGUSR1 doesn't interrupt ongoing requests"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
|
|
def sleep():
|
|
response = postgrest.session.get("/rpc/sleep?seconds=0.5")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
t = Thread(target=sleep)
|
|
t.start()
|
|
|
|
# make sure the request has started
|
|
time.sleep(0.1)
|
|
|
|
# SIGUSR1 causes the postgres connection pool to be flushed
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
|
|
t.join()
|
|
|
|
|
|
def test_random_port_bound(defaultenv):
|
|
"PostgREST should bind to a random port when PGRST_SERVER_PORT is 0."
|
|
|
|
with run(env=defaultenv, port="0") as postgrest:
|
|
assert True # liveness check is done by run(), so we just need to check that it doesn't fail
|
|
|
|
|
|
def test_app_settings_reload(tmp_path, defaultenv):
|
|
"App settings should be reloaded from file when PostgREST is sent SIGUSR2."
|
|
config = (CONFIGSDIR / "sigusr2-settings.config").read_text()
|
|
configfile = tmp_path / "test.config"
|
|
configfile.write_text(config)
|
|
uri = "/rpc/get_guc_value?name=app.settings.name_var"
|
|
|
|
with run(configfile, env=defaultenv) as postgrest:
|
|
response = postgrest.session.get(uri)
|
|
assert response.text == '"John"'
|
|
|
|
# change setting
|
|
configfile.write_text(config.replace("John", "Jane"))
|
|
# reload
|
|
postgrest.process.send_signal(signal.SIGUSR2)
|
|
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
response = postgrest.session.get(uri)
|
|
assert response.text == '"Jane"'
|
|
|
|
|
|
def test_jwt_secret_reload(tmp_path, defaultenv):
|
|
"JWT secret should be reloaded from file when PostgREST is sent SIGUSR2."
|
|
config = (CONFIGSDIR / "sigusr2-settings.config").read_text()
|
|
configfile = tmp_path / "test.config"
|
|
configfile.write_text(config)
|
|
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
|
|
|
|
with run(configfile, env=defaultenv) as postgrest:
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == 401
|
|
|
|
# change setting
|
|
configfile.write_text(config.replace("invalid" * 5, SECRET))
|
|
|
|
# reload config
|
|
postgrest.process.send_signal(signal.SIGUSR2)
|
|
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_jwt_secret_external_file_reload(tmp_path, defaultenv):
|
|
"JWT secret external file should be reloaded when PostgREST is sent a SIGUSR2 or a NOTIFY."
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
|
|
|
|
external_secret_file = tmp_path / "jwt-secret-config"
|
|
external_secret_file.write_text("invalid" * 5)
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_JWT_SECRET": f"@{external_secret_file}",
|
|
"PGRST_DB_CHANNEL_ENABLED": "true",
|
|
"PGRST_DB_CONFIG": "false",
|
|
"PGRST_DB_ANON_ROLE": "postgrest_test_anonymous", # required for NOTIFY
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == 401
|
|
|
|
# change external file
|
|
external_secret_file.write_text(SECRET)
|
|
|
|
# SIGUSR1 doesn't reload external files, at least when db-config=false
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
sleep_until_postgrest_scache_reload()
|
|
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == 401
|
|
|
|
# reload config and external file with SIGUSR2
|
|
postgrest.process.send_signal(signal.SIGUSR2)
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == 200
|
|
|
|
# change external file to wrong value again
|
|
external_secret_file.write_text("invalid" * 5)
|
|
|
|
# reload config and external file with NOTIFY
|
|
response = postgrest.session.post("/rpc/reload_pgrst_config")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
response = postgrest.session.get("/authors_only", headers=headers)
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_db_schema_reload(tmp_path, defaultenv):
|
|
"DB schema should be reloaded from file when PostgREST is sent SIGUSR2."
|
|
config = (CONFIGSDIR / "sigusr2-settings.config").read_text()
|
|
configfile = tmp_path / "test.config"
|
|
configfile.write_text(config)
|
|
|
|
with run(configfile, env=defaultenv) as postgrest:
|
|
response = postgrest.session.get("/rpc/get_guc_value?name=search_path")
|
|
assert response.text == '"\\"public\\", \\"public\\""'
|
|
|
|
# change setting
|
|
configfile.write_text(
|
|
config.replace('db-schemas = "public"', 'db-schemas = "v1"')
|
|
)
|
|
|
|
# reload config
|
|
postgrest.process.send_signal(signal.SIGUSR2)
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
# reload schema cache to verify that the config reload actually happened
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
sleep_until_postgrest_scache_reload()
|
|
|
|
response = postgrest.session.get("/rpc/get_guc_value?name=search_path")
|
|
assert response.text == '"\\"v1\\", \\"public\\""'
|
|
|
|
|
|
def test_db_schema_notify_reload(defaultenv):
|
|
"DB schema and config should be reloaded when PostgREST is sent a NOTIFY"
|
|
|
|
env = {**defaultenv, "PGRST_DB_CONFIG": "true", "PGRST_DB_CHANNEL_ENABLED": "true"}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/rpc/get_guc_value?name=search_path")
|
|
assert response.text == '"\\"public\\", \\"public\\""'
|
|
|
|
# change db-schemas config on the db and reload config and cache with notify
|
|
postgrest.session.post(
|
|
"/rpc/change_db_schema_and_full_reload", data={"schemas": "v1"}
|
|
)
|
|
|
|
sleep_until_postgrest_full_reload()
|
|
|
|
response = postgrest.session.get("/rpc/get_guc_value?name=search_path")
|
|
assert response.text == '"\\"v1\\", \\"public\\""'
|
|
|
|
# reset db-schemas config on the db
|
|
response = postgrest.session.post("/rpc/reset_db_schema_config")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_max_rows_reload(defaultenv):
|
|
"max-rows should be reloaded from role settings when PostgREST receives a SIGUSR2."
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_CONFIG": "true",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.head("/projects")
|
|
assert response.status_code == 200
|
|
assert response.headers["Content-Range"] == "0-4/*"
|
|
|
|
# change max-rows config on the db
|
|
postgrest.session.post("/rpc/change_max_rows_config", data={"val": 1})
|
|
|
|
# reload config
|
|
postgrest.process.send_signal(signal.SIGUSR2)
|
|
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
response = postgrest.session.head("/projects")
|
|
assert response.status_code == 200
|
|
assert response.headers["Content-Range"] == "0-0/*"
|
|
|
|
# reset max-rows config on the db
|
|
response = postgrest.session.post("/rpc/reset_max_rows_config")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_max_rows_notify_reload(defaultenv):
|
|
"max-rows should be reloaded from role settings when PostgREST receives a NOTIFY"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_CONFIG": "true",
|
|
"PGRST_DB_CHANNEL_ENABLED": "true",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.head("/projects")
|
|
assert response.status_code == 200
|
|
assert response.headers["Content-Range"] == "0-4/*"
|
|
|
|
# change max-rows config on the db and reload with notify
|
|
postgrest.session.post(
|
|
"/rpc/change_max_rows_config", data={"val": 1, "notify": True}
|
|
)
|
|
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
response = postgrest.session.head("/projects")
|
|
assert response.status_code == 200
|
|
assert response.headers["Content-Range"] == "0-0/*"
|
|
|
|
# reset max-rows config on the db
|
|
response = postgrest.session.post("/rpc/reset_max_rows_config")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_invalid_role_claim_key_notify_reload(defaultenv):
|
|
"NOTIFY reload config should show an error if role-claim-key is invalid"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_CONFIG": "true",
|
|
"PGRST_DB_CHANNEL_ENABLED": "true",
|
|
"PGRST_LOG_LEVEL": "crit",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
postgrest.session.post("/rpc/invalid_role_claim_key_reload")
|
|
|
|
output = postgrest.read_stdout()
|
|
assert 'Received a config reload message on the "pgrst" channel' in output[0]
|
|
output = postgrest.read_stdout()
|
|
assert "failed to parse role-claim-key value" in output[0]
|
|
|
|
response = postgrest.session.post("/rpc/reset_invalid_role_claim_key")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_notify_do_nothing(defaultenv):
|
|
"NOTIFY with unknown message should do nothing"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_CONFIG": "true",
|
|
"PGRST_DB_CHANNEL_ENABLED": "true",
|
|
"PGRST_LOG_LEVEL": "crit",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.post("/rpc/notify_do_nothing")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
output = postgrest.read_stdout()
|
|
assert output == []
|
|
|
|
|
|
def test_db_prepared_statements_enable(defaultenv):
|
|
"Should use prepared statements when the setting is enabled."
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
response = postgrest.session.post("/rpc/uses_prepared_statements")
|
|
assert response.text == "true"
|
|
|
|
|
|
def test_db_prepared_statements_disable(defaultenv):
|
|
"Should not use any prepared statements when the setting is disabled."
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_PREPARED_STATEMENTS": "false",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.post("/rpc/uses_prepared_statements")
|
|
assert response.text == "false"
|
|
|
|
|
|
def set_statement_timeout(postgrest, role, milliseconds):
|
|
"""Set the statement timeout for the given role.
|
|
For this to work reliably with low previous timeout settings,
|
|
use a postgrest instance that doesn't use the affected role."""
|
|
|
|
response = postgrest.session.post(
|
|
"/rpc/set_statement_timeout", data={"role": role, "milliseconds": milliseconds}
|
|
)
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
|
|
def reset_statement_timeout(postgrest, role):
|
|
"Reset the statement timeout for the given role to the default 0 (no timeout)"
|
|
set_statement_timeout(postgrest, role, 0)
|
|
|
|
|
|
def test_statement_timeout(defaultenv, metapostgrest):
|
|
"Statement timeout times out slow statements"
|
|
|
|
role = "timeout_authenticator"
|
|
set_statement_timeout(metapostgrest, role, 1000) # 1 second
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGUSER": role,
|
|
"PGRST_DB_ANON_ROLE": role,
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/rpc/sleep?seconds=0.5")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
response = postgrest.session.get("/rpc/sleep?seconds=2")
|
|
assert response.status_code == 500
|
|
data = response.json()
|
|
assert data["message"] == "canceling statement due to statement timeout"
|
|
|
|
reset_statement_timeout(metapostgrest, role)
|
|
|
|
|
|
def test_change_statement_timeout(defaultenv, metapostgrest):
|
|
"Statement timeout changes take effect immediately"
|
|
|
|
role = "timeout_authenticator"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGUSER": role,
|
|
"PGRST_DB_ANON_ROLE": role,
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
# no limit initially
|
|
response = postgrest.session.get("/rpc/sleep?seconds=1")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
set_statement_timeout(metapostgrest, role, 500) # 0.5s
|
|
|
|
# trigger schema refresh
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
sleep_until_postgrest_scache_reload()
|
|
|
|
response = postgrest.session.get("/rpc/sleep?seconds=1")
|
|
assert response.status_code == 500
|
|
data = response.json()
|
|
assert data["message"] == "canceling statement due to statement timeout"
|
|
|
|
set_statement_timeout(metapostgrest, role, 2000) # 2s
|
|
|
|
# trigger role setting refresh
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
sleep_until_postgrest_scache_reload()
|
|
|
|
response = postgrest.session.get("/rpc/sleep?seconds=1")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
reset_statement_timeout(metapostgrest, role)
|
|
|
|
|
|
def test_pool_size(defaultenv, metapostgrest):
|
|
"Verify that PGRST_DB_POOL setting allows the correct number of parallel requests"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_POOL": "2",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
start = time.time()
|
|
threads = []
|
|
for i in range(4):
|
|
|
|
def sleep(i=i):
|
|
response = postgrest.session.get("/rpc/sleep?seconds=0.5")
|
|
assert response.text == ""
|
|
assert response.status_code == 204, "thread {}".format(i)
|
|
|
|
t = Thread(target=sleep)
|
|
t.start()
|
|
threads.append(t)
|
|
for t in threads:
|
|
t.join()
|
|
end = time.time()
|
|
delta = end - start
|
|
|
|
# sleep 4 times for 0.5s each, with 2 requests in parallel
|
|
# => total time roughly 1s
|
|
assert delta > 1 and delta < 1.5
|
|
|
|
|
|
@pytest.mark.parametrize("level", ["crit", "error", "warn", "info", "debug"])
|
|
def test_pool_acquisition_timeout(level, defaultenv, metapostgrest):
|
|
"Verify that PGRST_DB_POOL_ACQUISITION_TIMEOUT times out when the pool is empty"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_POOL": "1",
|
|
"PGRST_DB_POOL_ACQUISITION_TIMEOUT": "1", # 1 second
|
|
"PGRST_LOG_LEVEL": level,
|
|
}
|
|
|
|
with run(env=env, no_pool_connection_available=True) as postgrest:
|
|
response = postgrest.session.get("/projects")
|
|
assert response.status_code == 504
|
|
data = response.json()
|
|
assert data["message"] == "Timed out acquiring connection from connection pool."
|
|
|
|
# ensure the message appears on the logs as well
|
|
output = sorted(postgrest.read_stdout(nlines=3))
|
|
|
|
if level == "crit":
|
|
assert len(output) == 0
|
|
else:
|
|
assert " 504 " in output[0]
|
|
assert "Timed out acquiring connection from connection pool." in output[2]
|
|
|
|
|
|
def test_change_statement_timeout_held_connection(defaultenv, metapostgrest):
|
|
"Statement timeout changes take effect immediately, even with a request outliving the reconfiguration"
|
|
|
|
role = "timeout_authenticator"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGUSER": role,
|
|
"PGRST_DB_ANON_ROLE": role,
|
|
"PGRST_DB_POOL": "2",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
# start a slow request that holds a pool connection
|
|
def hold_connection():
|
|
response = postgrest.session.get("/rpc/sleep?seconds=1")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
hold = Thread(target=hold_connection)
|
|
hold.start()
|
|
# give the request time to start before SIGUSR1 flushes the pool
|
|
time.sleep(0.1)
|
|
|
|
set_statement_timeout(metapostgrest, role, 500) # 0.5s
|
|
# trigger schema refresh; flushes pool and establishes a new connection
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
|
|
# wait for the slow request's connection to be returned to the pool
|
|
hold.join()
|
|
|
|
# subsequent requests should fail due to the lowered timeout; run several in parallel
|
|
# to ensure we use the full pool
|
|
threads = []
|
|
for i in range(2):
|
|
|
|
def sleep(i=i):
|
|
response = postgrest.session.get("/rpc/sleep?seconds=1")
|
|
assert response.status_code == 500, "thread {}".format(i)
|
|
data = response.json()
|
|
assert data["message"] == "canceling statement due to statement timeout"
|
|
|
|
thread = Thread(target=sleep)
|
|
thread.start()
|
|
threads.append(thread)
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
reset_statement_timeout(metapostgrest, role)
|
|
|
|
|
|
def test_admin_config(defaultenv):
|
|
"Should get a success response from the admin server containing current configuration"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
response = postgrest.admin.get("/config")
|
|
print(response.text)
|
|
assert response.status_code == 200
|
|
assert "admin-server-port" in response.text
|
|
|
|
|
|
def test_admin_schema_cache(defaultenv):
|
|
"Should get a success response from the admin server containing current schema cache"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
response = postgrest.admin.get("/schema_cache")
|
|
assert response.status_code == 200
|
|
assert '"dbTables":[[{"qiName":"authors_only"' in response.text
|
|
|
|
|
|
def test_admin_ready_w_channel(defaultenv):
|
|
"Should get a success response from the admin server ready endpoint when the LISTEN channel is enabled"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_CHANNEL_ENABLED": "true",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.admin.get("/ready")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_admin_ready_wo_channel(defaultenv):
|
|
"Should get a success response from the admin server ready endpoint when the LISTEN channel is disabled"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_CHANNEL_ENABLED": "false",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.admin.get("/ready")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_admin_ready_includes_schema_cache_state(defaultenv, metapostgrest):
|
|
"Should get a failed response from the admin server ready endpoint when the schema cache is not loaded"
|
|
|
|
role = "timeout_authenticator"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGUSER": role,
|
|
"PGRST_DB_ANON_ROLE": role,
|
|
"PGRST_INTERNAL_SCHEMA_CACHE_SLEEP": "500",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
# The schema cache query takes at least 500ms, due to PGRST_INTERNAL_SCHEMA_CACHE_SLEEP above.
|
|
# Make it impossible to load the schema cache, by setting statement timeout to 400ms.
|
|
set_statement_timeout(metapostgrest, role, 400)
|
|
|
|
# force a reconnection so the new role setting is picked up
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
|
|
postgrest.wait_until_scache_starts_loading()
|
|
|
|
response = postgrest.admin.get("/ready", timeout=1)
|
|
assert response.status_code == 503
|
|
|
|
response = postgrest.session.get("/projects", timeout=1)
|
|
assert response.status_code == 503
|
|
|
|
reset_statement_timeout(metapostgrest, role)
|
|
|
|
|
|
def test_metrics_include_schema_cache_fails(defaultenv, metapostgrest):
|
|
"Should get shema cache fails from the metrics endpoint"
|
|
|
|
role = "timeout_authenticator"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGUSER": role,
|
|
"PGRST_INTERNAL_SCHEMA_CACHE_SLEEP": "50",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
# The schema cache query takes at least 20ms, due to PGRST_INTERNAL_SCHEMA_CACHE_SLEEP above.
|
|
# Make it impossible to load the schema cache, by setting statement timeout to 100ms.
|
|
set_statement_timeout(metapostgrest, role, 20)
|
|
|
|
# force a reconnection so the new role setting is picked up
|
|
postgrest.process.send_signal(signal.SIGUSR1)
|
|
|
|
# wait for some schema cache retries
|
|
time.sleep(1)
|
|
|
|
response = postgrest.admin.get("/ready", timeout=1)
|
|
assert response.status_code == 503
|
|
|
|
response = postgrest.admin.get("/metrics", timeout=1)
|
|
assert response.status_code == 200
|
|
|
|
metrics = float(
|
|
re.search(
|
|
r'pgrst_schema_cache_loads_total{status="FAIL"} (\d+)', response.text
|
|
).group(1)
|
|
)
|
|
assert metrics == 1.0
|
|
|
|
reset_statement_timeout(metapostgrest, role)
|
|
|
|
|
|
def test_admin_not_found(defaultenv):
|
|
"Should get a not found from a undefined endpoint on the admin server"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
response = postgrest.admin.get("/notfound")
|
|
assert response.status_code == 404
|
|
|
|
|
|
def test_admin_ready_dependent_on_main_app(defaultenv):
|
|
"Should get a failure from the admin ready endpoint if the main app also fails"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
# delete the unix socket to make the main app fail
|
|
os.remove(defaultenv["PGRST_SERVER_UNIX_SOCKET"])
|
|
response = postgrest.admin.get("/ready")
|
|
assert response.status_code == 500
|
|
|
|
|
|
def test_admin_live_good(defaultenv):
|
|
"Should get a success from the admin live endpoint if the main app is running"
|
|
|
|
with run(env=defaultenv, port=freeport()) as postgrest:
|
|
response = postgrest.admin.get("/live")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_admin_live_dependent_on_main_app(defaultenv):
|
|
"Should get a failure from the admin live endpoint if the main app also fails"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
# delete the unix socket to make the main app fail
|
|
os.remove(defaultenv["PGRST_SERVER_UNIX_SOCKET"])
|
|
response = postgrest.admin.get("/live")
|
|
assert response.status_code == 500
|
|
|
|
|
|
@pytest.mark.parametrize("specialhostvalue", FIXTURES["specialhostvalues"])
|
|
def test_admin_works_with_host_special_values(specialhostvalue, defaultenv):
|
|
"Should get a success from the admin live and ready endpoints when using special host values for the main app"
|
|
|
|
with run(env=defaultenv, port=freeport(), host=specialhostvalue) as postgrest:
|
|
response = postgrest.admin.get("/live")
|
|
assert response.status_code == 200
|
|
|
|
response = postgrest.admin.get("/ready")
|
|
assert response.status_code == 200
|
|
|
|
|
|
@pytest.mark.parametrize("level", ["crit", "error", "warn", "info", "debug"])
|
|
def test_log_level(level, defaultenv):
|
|
"log_level should filter request logging"
|
|
|
|
env = {**defaultenv, "PGRST_LOG_LEVEL": level}
|
|
|
|
# expired token to test 500 response for "JWT expired"
|
|
claim = {"role": "postgrest_test_author", "exp": 0}
|
|
headers = jwtauthheader(claim, SECRET)
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/", headers=headers)
|
|
assert response.status_code == 500
|
|
|
|
response = postgrest.session.get("/unknown")
|
|
assert response.status_code == 404
|
|
|
|
response = postgrest.session.get("/")
|
|
assert response.status_code == 200
|
|
|
|
output = sorted(postgrest.read_stdout(nlines=7))
|
|
|
|
if level == "crit":
|
|
assert len(output) == 0
|
|
elif level == "error":
|
|
assert re.match(
|
|
r'- - - \[.+\] "GET / HTTP/1.1" 500 - "" "python-requests/.+"',
|
|
output[0],
|
|
)
|
|
assert len(output) == 1
|
|
elif level == "warn":
|
|
assert re.match(
|
|
r'- - - \[.+\] "GET / HTTP/1.1" 500 - "" "python-requests/.+"',
|
|
output[0],
|
|
)
|
|
assert re.match(
|
|
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 - "" "python-requests/.+"',
|
|
output[1],
|
|
)
|
|
assert len(output) == 2
|
|
elif level == "info":
|
|
assert re.match(
|
|
r'- - - \[.+\] "GET / HTTP/1.1" 500 - "" "python-requests/.+"',
|
|
output[0],
|
|
)
|
|
assert re.match(
|
|
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 - "" "python-requests/.+"',
|
|
output[1],
|
|
)
|
|
assert re.match(
|
|
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 - "" "python-requests/.+"',
|
|
output[2],
|
|
)
|
|
assert len(output) == 3
|
|
elif level == "debug":
|
|
assert re.match(
|
|
r'- - - \[.+\] "GET / HTTP/1.1" 500 - "" "python-requests/.+"',
|
|
output[0],
|
|
)
|
|
assert re.match(
|
|
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 - "" "python-requests/.+"',
|
|
output[1],
|
|
)
|
|
assert re.match(
|
|
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 - "" "python-requests/.+"',
|
|
output[2],
|
|
)
|
|
assert "Connection" and "is available" in output[3]
|
|
assert "Connection" and "is available" in output[4]
|
|
assert "Connection" and "is used" in output[5]
|
|
assert "Connection" and "is used" in output[6]
|
|
assert len(output) == 7
|
|
|
|
|
|
def test_no_pool_connection_required_on_bad_http_logic(defaultenv):
|
|
"no pool connection should be consumed for failing on invalid http logic"
|
|
|
|
with run(env=defaultenv, no_pool_connection_available=True) as postgrest:
|
|
# not found nested route shouldn't require opening a connection
|
|
response = postgrest.session.head("/path/notfound")
|
|
assert response.status_code == 404
|
|
|
|
# an invalid http method on a resource shouldn't require opening a connection
|
|
response = postgrest.session.request("TRACE", "/projects")
|
|
assert response.status_code == 405
|
|
response = postgrest.session.patch("/rpc/hello")
|
|
assert response.status_code == 405
|
|
|
|
|
|
def test_no_pool_connection_required_on_options(defaultenv):
|
|
"no pool connection should be consumed for OPTIONS requests"
|
|
|
|
with run(env=defaultenv, no_pool_connection_available=True) as postgrest:
|
|
# OPTIONS on a table shouldn't require opening a connection
|
|
response = postgrest.session.options("/projects")
|
|
assert response.status_code == 200
|
|
|
|
# OPTIONS on RPC shouldn't require opening a connection
|
|
response = postgrest.session.options("/rpc/hello")
|
|
assert response.status_code == 200
|
|
|
|
# OPTIONS on root shouldn't require opening a connection
|
|
response = postgrest.session.options("/")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_no_pool_connection_required_on_bad_jwt_claim(defaultenv):
|
|
"no pool connection should be consumed for failing on invalid jwt"
|
|
|
|
env = {**defaultenv, "PGRST_JWT_SECRET": SECRET}
|
|
|
|
with run(env=env, no_pool_connection_available=True) as postgrest:
|
|
# A JWT with an invalid signature shouldn't open a connection
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, "Wrong Secret")
|
|
response = postgrest.session.get("/projects", headers=headers)
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_no_pool_connection_required_on_bad_embedding(defaultenv):
|
|
"no pool connection should be consumed for failing to embed"
|
|
|
|
with run(env=defaultenv, no_pool_connection_available=True) as postgrest:
|
|
# OPTIONS on a table shouldn't require opening a connection
|
|
response = postgrest.session.get("/projects?select=*,unexistent(*)")
|
|
assert response.status_code == 400
|
|
|
|
|
|
# https://github.com/PostgREST/postgrest/issues/2620
|
|
def test_notify_reloading_catalog_cache(defaultenv):
|
|
"notify should reload the connection catalog cache"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
# first the id col is an uuid
|
|
response = postgrest.session.get(
|
|
"/cats?id=eq.dea27321-f988-4a57-93e4-8eeb38f3cf1e"
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# change it to a bigint
|
|
response = postgrest.session.post("/rpc/drop_change_cats")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
sleep_until_postgrest_scache_reload()
|
|
|
|
# next request should succeed with a bigint value
|
|
response = postgrest.session.get("/cats?id=eq.1")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_role_settings(defaultenv):
|
|
"statement_timeout should be set per role"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_JWT_SECRET": SECRET,
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
# statement_timeout for postgrest_test_anonymous
|
|
response = postgrest.session.get("/rpc/get_guc_value?name=statement_timeout")
|
|
assert response.text == '"2s"'
|
|
|
|
# reload statement_timeout with NOTIFY
|
|
response = postgrest.session.post(
|
|
"/rpc/change_role_statement_timeout", data={"timeout": "5s"}
|
|
)
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
response = postgrest.session.get("/rpc/reload_pgrst_config")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
sleep_until_postgrest_config_reload()
|
|
|
|
response = postgrest.session.get("/rpc/get_guc_value?name=statement_timeout")
|
|
assert response.text == '"5s"'
|
|
|
|
# statement_timeout for postgrest_test_author
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
|
|
response = postgrest.session.get(
|
|
"/rpc/get_guc_value?name=statement_timeout", headers=headers
|
|
)
|
|
assert response.text == '"10s"'
|
|
|
|
# reset statement timeout to original value
|
|
response = postgrest.session.post(
|
|
"/rpc/change_role_statement_timeout", data={"timeout": "2s"}
|
|
)
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_isolation_level(defaultenv):
|
|
"isolation_level should be set per role and per function"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_JWT_SECRET": SECRET,
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
# default isolation level for postgrest_test_anonymous
|
|
response = postgrest.session.get(
|
|
"/items_w_isolation_level?select=isolation_level&limit=1"
|
|
)
|
|
assert response.text == '[{"isolation_level":"read committed"}]'
|
|
|
|
# isolation level for postgrest_test_repeatable_read on GET
|
|
headers = jwtauthheader({"role": "postgrest_test_repeatable_read"}, SECRET)
|
|
response = postgrest.session.get(
|
|
"/items_w_isolation_level?select=isolation_level&limit=1", headers=headers
|
|
)
|
|
assert response.text == '[{"isolation_level":"repeatable read"}]'
|
|
|
|
# isolation level for postgrest_test_serializable on POST
|
|
headers = jwtauthheader({"role": "postgrest_test_serializable"}, SECRET)
|
|
headers["Prefer"] = "return=representation"
|
|
response = postgrest.session.post(
|
|
"/items_w_isolation_level?select=isolation_level",
|
|
json={"id": "666"},
|
|
headers=headers,
|
|
)
|
|
assert response.text == '[{"isolation_level":"serializable"}]'
|
|
|
|
# isolation level for postgrest_test_serializable on PATCH
|
|
headers = jwtauthheader({"role": "postgrest_test_serializable"}, SECRET)
|
|
headers["Prefer"] = "return=representation"
|
|
response = postgrest.session.patch(
|
|
"/items_w_isolation_level?select=isolation_level&id=eq.666",
|
|
json={"id": "666"},
|
|
headers=headers,
|
|
)
|
|
assert response.text == '[{"isolation_level":"serializable"}]'
|
|
|
|
# isolation level for postgrest_test_serializable on DELETE
|
|
headers = jwtauthheader({"role": "postgrest_test_serializable"}, SECRET)
|
|
headers["Prefer"] = "return=representation"
|
|
response = postgrest.session.delete(
|
|
"/items_w_isolation_level?select=isolation_level&id=eq.666", headers=headers
|
|
)
|
|
assert response.text == '[{"isolation_level":"serializable"}]'
|
|
|
|
# default isolation level for function
|
|
response = postgrest.session.get("/rpc/default_isolation_level")
|
|
assert response.text == '"read committed"'
|
|
|
|
# changes with role isolation level
|
|
headers = jwtauthheader({"role": "postgrest_test_repeatable_read"}, SECRET)
|
|
response = postgrest.session.get(
|
|
"/rpc/default_isolation_level", headers=headers
|
|
)
|
|
assert response.text == '"repeatable read"'
|
|
|
|
# isolation level can be set per function
|
|
response = postgrest.session.get("/rpc/serializable_isolation_level")
|
|
assert response.text == '"serializable"'
|
|
response = postgrest.session.get("/rpc/repeatable_read_isolation_level")
|
|
assert response.text == '"repeatable read"'
|
|
|
|
# isolation level for a function overrides the role isolation level
|
|
headers = jwtauthheader({"role": "postgrest_test_repeatable_read"}, SECRET)
|
|
response = postgrest.session.get("/rpc/serializable_isolation_level")
|
|
assert response.text == '"serializable"'
|
|
|
|
|
|
def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
|
|
"schema cache should be up-to-date whenever a notification is sent while another reload is in progress, see https://github.com/PostgREST/postgrest/issues/2791"
|
|
|
|
internal_sleep = (
|
|
int(slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_SLEEP"]) / 1000
|
|
)
|
|
|
|
with run(env=slow_schema_cache_env, wait_for_readiness=False) as postgrest:
|
|
time.sleep(2 * internal_sleep + 0.1) # wait for readiness manually
|
|
|
|
# first request, create a function and set a schema cache reload in progress
|
|
response = postgrest.session.post("/rpc/create_function")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
time.sleep(
|
|
internal_sleep / 2
|
|
) # wait to be inside the schema cache reload process
|
|
|
|
# second request, change the same function and do another schema cache reload
|
|
response = postgrest.session.post("/rpc/migrate_function")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
time.sleep(
|
|
2 * internal_sleep
|
|
) # wait enough time to get the final schema cache state
|
|
|
|
# confirm the schema cache is up-to-date and the 2nd reload wasn't lost
|
|
response = postgrest.session.get("/rpc/mult_them?c=3&d=4")
|
|
assert response.text == "12"
|
|
assert response.status_code == 200
|
|
|
|
|
|
@pytest.mark.parametrize("dburi_type", ["no_params", "no_params_qmark", "with_params"])
|
|
def test_get_pgrst_version_with_uri_connection_string(dburi_type, dburi, defaultenv):
|
|
"The fallback_application_name should be added to the db-uri if it has a URI format"
|
|
defaultenv_without_libpq = {
|
|
key: value
|
|
for key, value in defaultenv.items()
|
|
if key not in ["PGDATABASE", "PGHOST", "PGUSER"]
|
|
}
|
|
|
|
env = {
|
|
"no_params": {**defaultenv, "PGRST_DB_URI": "postgresql://"},
|
|
"no_params_qmark": {**defaultenv, "PGRST_DB_URI": "postgresql://?"},
|
|
"with_params": {**defaultenv_without_libpq, "PGRST_DB_URI": dburi.decode()},
|
|
}
|
|
|
|
with run(env=env[dburi_type]) as postgrest:
|
|
response = postgrest.session.post("/rpc/get_pgrst_version")
|
|
version = '"%s"' % response.headers["Server"].replace(
|
|
"postgrest/", "PostgREST "
|
|
)
|
|
assert response.text == version
|
|
|
|
|
|
def test_get_pgrst_version_with_keyval_connection_string(defaultenv):
|
|
"The fallback_application_name should be added to the db-uri if it has a keyword/value format"
|
|
uri = f'dbname={defaultenv["PGDATABASE"]} host={defaultenv["PGHOST"]} user={defaultenv["PGUSER"]}'
|
|
defaultenv_without_libpq = {
|
|
key: value
|
|
for key, value in defaultenv.items()
|
|
if key not in ["PGDATABASE", "PGHOST", "PGUSER"]
|
|
}
|
|
env = {**defaultenv_without_libpq, "PGRST_DB_URI": uri}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.post("/rpc/get_pgrst_version")
|
|
version = '"%s"' % response.headers["Server"].replace(
|
|
"postgrest/", "PostgREST "
|
|
)
|
|
assert response.text == version
|
|
|
|
|
|
def test_log_postgrest_version(defaultenv):
|
|
"Should show the PostgREST version in the logs"
|
|
|
|
env = {**defaultenv, "PGRST_LOG_LEVEL": "crit"}
|
|
|
|
with run(env=defaultenv, no_startup_stdout=False) as postgrest:
|
|
version = postgrest.session.head("/").headers["Server"].split("/")[1]
|
|
|
|
output = postgrest.read_stdout(nlines=1)
|
|
|
|
assert "Starting PostgREST %s..." % version in output[0]
|
|
|
|
|
|
def test_succeed_w_role_having_superuser_settings(defaultenv):
|
|
"Should succeed when having superuser settings on the impersonated role"
|
|
|
|
env = {**defaultenv, "PGRST_DB_CONFIG": "true", "PGRST_JWT_SECRET": SECRET}
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
headers = jwtauthheader({"role": "postgrest_test_w_superuser_settings"}, SECRET)
|
|
response = postgrest.session.get("/projects", headers=headers)
|
|
print(response.text)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_get_granted_superuser_setting(defaultenv):
|
|
"Should succeed when the impersonated role has granted superuser settings"
|
|
|
|
env = {**defaultenv, "PGRST_DB_CONFIG": "true", "PGRST_JWT_SECRET": SECRET}
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
response_ver = postgrest.session.get("/rpc/get_postgres_version")
|
|
pg_ver = eval(response_ver.text)
|
|
if pg_ver >= 150000:
|
|
headers = jwtauthheader(
|
|
{"role": "postgrest_test_w_superuser_settings"}, SECRET
|
|
)
|
|
response = postgrest.session.get(
|
|
"/rpc/get_guc_value?name=log_min_duration_sample", headers=headers
|
|
)
|
|
assert response.text == '"12345ms"'
|
|
|
|
|
|
def test_fail_with_invalid_dbname_and_automatic_recovery_disabled(defaultenv):
|
|
"Should fail without retries when automatic recovery is disabled and dbname is invalid"
|
|
dbname = "INVALID"
|
|
uri = f'postgresql://?dbname={dbname}&host={defaultenv["PGHOST"]}&user={defaultenv["PGUSER"]}'
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_URI": uri,
|
|
"PGRST_DB_POOL_AUTOMATIC_RECOVERY": "false",
|
|
}
|
|
|
|
with run(env=env, wait_for_readiness=False) as postgrest:
|
|
exitCode = wait_until_exit(postgrest)
|
|
assert exitCode == 1
|
|
|
|
|
|
def test_fail_with_automatic_recovery_disabled_and_terminated_using_query(defaultenv):
|
|
"Should fail without retries when automatic recovery is disabled and pg_terminate_backend(pid) is called"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_POOL_AUTOMATIC_RECOVERY": "false",
|
|
"PGAPPNAME": "target",
|
|
}
|
|
|
|
app_name = "'{}'".format(env["PGAPPNAME"])
|
|
|
|
with run(env=env) as postgrest:
|
|
os.system(
|
|
f'psql -d {env["PGDATABASE"]} -U {env["PGUSER"]} -h {env["PGHOST"]} --set ON_ERROR_STOP=1 -a -c "SELECT terminate_pgrst({app_name})"'
|
|
)
|
|
|
|
exitCode = wait_until_exit(postgrest)
|
|
assert exitCode == 1
|
|
|
|
|
|
def test_server_timing_jwt_should_decrease_on_subsequent_requests(defaultenv):
|
|
"assert that server-timing duration for JWT should decrease on subsequent requests"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_TIMING_ENABLED": "true",
|
|
"PGRST_JWT_CACHE_MAX_LIFETIME": "86400",
|
|
"PGRST_JWT_SECRET": "@/dev/stdin",
|
|
"PGRST_DB_CONFIG": "false",
|
|
}
|
|
|
|
headers = jwtauthheader(
|
|
{
|
|
"role": "postgrest_test_author",
|
|
"exp": int(
|
|
(datetime.now(timezone.utc) + timedelta(minutes=30)).timestamp()
|
|
),
|
|
},
|
|
SECRET,
|
|
)
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
first_timings = postgrest.session.get("/authors_only", headers=headers).headers[
|
|
"Server-Timing"
|
|
]
|
|
second_timings = postgrest.session.get(
|
|
"/authors_only", headers=headers
|
|
).headers["Server-Timing"]
|
|
|
|
first_dur = parse_server_timings_header(first_timings)["jwt"]
|
|
second_dur = parse_server_timings_header(second_timings)["jwt"]
|
|
|
|
# their difference should be atleast 0.3ms, implying
|
|
# that JWT Caching is working as expected
|
|
assert (first_dur - second_dur) > 0.3
|
|
|
|
|
|
# just added to complete code coverage
|
|
def test_jwt_caching_works_with_db_plan_disabled(defaultenv):
|
|
"assert that JWT caching words even when Server-Timing header is not returned"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_TIMING_ENABLED": "true",
|
|
"PGRST_JWT_CACHE_MAX_LIFETIME": "86400",
|
|
"PGRST_JWT_SECRET": "@/dev/stdin",
|
|
"PGRST_DB_CONFIG": "false",
|
|
}
|
|
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
first_request = postgrest.session.get("/authors_only", headers=headers)
|
|
second_request = postgrest.session.get("/authors_only", headers=headers)
|
|
|
|
# in this case we don't get server-timing in response headers
|
|
# so we can't compare durations, we just check if request succeeds
|
|
assert first_request.status_code == 200 and second_request.status_code == 200
|
|
|
|
|
|
def test_server_timing_jwt_should_not_decrease_when_caching_disabled(defaultenv):
|
|
"assert than jwt duration should not decrease when disabled"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_TIMING_ENABLED": "true",
|
|
"PGRST_JWT_CACHE_MAX_LIFETIME": "0", # cache disabled
|
|
"PGRST_JWT_SECRET": "@/dev/stdin",
|
|
"PGRST_DB_CONFIG": "false",
|
|
}
|
|
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
warmup_req = postgrest.session.get("/authors_only", headers=headers)
|
|
first_timings = postgrest.session.get("/authors_only", headers=headers).headers[
|
|
"Server-Timing"
|
|
]
|
|
second_timings = postgrest.session.get(
|
|
"/authors_only", headers=headers
|
|
).headers["Server-Timing"]
|
|
|
|
first_dur = parse_server_timings_header(first_timings)["jwt"]
|
|
second_dur = parse_server_timings_header(second_timings)["jwt"]
|
|
|
|
# their difference should be less than 150
|
|
# implying that token is not cached
|
|
assert (first_dur - second_dur) < 150.0
|
|
|
|
|
|
def test_jwt_cache_with_no_exp_claim(defaultenv):
|
|
"assert than jwt duration should decrease"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_TIMING_ENABLED": "true",
|
|
"PGRST_JWT_CACHE_MAX_LIFETIME": "86400",
|
|
"PGRST_JWT_SECRET": "@/dev/stdin",
|
|
"PGRST_DB_CONFIG": "false",
|
|
}
|
|
|
|
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET) # no exp
|
|
|
|
with run(stdin=SECRET.encode(), env=env) as postgrest:
|
|
first_timings = postgrest.session.get("/authors_only", headers=headers).headers[
|
|
"Server-Timing"
|
|
]
|
|
second_timings = postgrest.session.get(
|
|
"/authors_only", headers=headers
|
|
).headers["Server-Timing"]
|
|
|
|
first_dur = parse_server_timings_header(first_timings)["jwt"]
|
|
second_dur = parse_server_timings_header(second_timings)["jwt"]
|
|
|
|
# their difference should be atleast 0.3ms, implying
|
|
# that JWT Caching is working as expected
|
|
assert (first_dur - second_dur) > 0.3
|
|
|
|
|
|
def test_preflight_request_with_cors_allowed_origin_config(defaultenv):
|
|
"OPTIONS preflight request should return Access-Control-Allow-Origin equal to origin"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_CORS_ALLOWED_ORIGINS": "http://example.com, http://example2.com",
|
|
}
|
|
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Origin": "http://example.com",
|
|
"Access-Control-Request-Method": "POST",
|
|
"Access-Control-Request-Headers": "Content-Type",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.options("/items", headers=headers)
|
|
assert (
|
|
response.headers["Access-Control-Allow-Origin"] == "http://example.com"
|
|
and response.headers["Access-Control-Allow-Credentials"] == "true"
|
|
)
|
|
|
|
|
|
def test_preflight_request_with_empty_cors_allowed_origin_config(defaultenv):
|
|
"OPTIONS preflight request should allow all origins when config is present but empty"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_CORS_ALLOWED_ORIGINS": "",
|
|
}
|
|
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Origin": "http://anyorigin.com",
|
|
"Access-Control-Request-Method": "POST",
|
|
"Access-Control-Request-Headers": "Content-Type",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.options("/items", headers=headers)
|
|
assert response.headers["Access-Control-Allow-Origin"] == "*"
|
|
assert "POST" in response.headers["Access-Control-Allow-Methods"]
|
|
|
|
|
|
def test_no_preflight_request_with_CORS_config_should_return_header(defaultenv):
|
|
"GET no preflight request should return Access-Control-Allow-Origin equal to origin"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_CORS_ALLOWED_ORIGINS": "http://example.com, http://example2.com",
|
|
}
|
|
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Origin": "http://example.com",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/items", headers=headers)
|
|
assert response.headers["Access-Control-Allow-Origin"] == "http://example.com"
|
|
|
|
|
|
def test_no_preflight_request_with_CORS_config_should_not_return_header(defaultenv):
|
|
"GET no preflight request should not return Access-Control-Allow-Origin"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_SERVER_CORS_ALLOWED_ORIGINS": "http://example.com, http://example2.com",
|
|
}
|
|
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Origin": "http://invalid.com",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/items", headers=headers)
|
|
assert "Access-Control-Allow-Origin" not in response.headers
|
|
|
|
|
|
@pytest.mark.parametrize("level", ["crit", "error", "warn", "info", "debug"])
|
|
def test_db_error_logging_to_stderr(level, defaultenv, metapostgrest):
|
|
"verify that DB errors are logged to stderr"
|
|
|
|
role = "timeout_authenticator"
|
|
set_statement_timeout(metapostgrest, role, 500)
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGUSER": role,
|
|
"PGRST_DB_ANON_ROLE": role,
|
|
"PGRST_LOG_LEVEL": level,
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/rpc/sleep?seconds=1")
|
|
assert response.status_code == 500
|
|
|
|
# ensure the message appears on the logs
|
|
output = sorted(postgrest.read_stdout(nlines=4))
|
|
|
|
if level == "crit":
|
|
assert len(output) == 0
|
|
elif level == "debug":
|
|
assert " 500 " in output[0]
|
|
assert "canceling statement due to statement timeout" in output[3]
|
|
else:
|
|
assert " 500 " in output[0]
|
|
assert "canceling statement due to statement timeout" in output[1]
|
|
|
|
reset_statement_timeout(metapostgrest, role)
|
|
|
|
|
|
def test_function_setting_statement_timeout_fails(defaultenv):
|
|
"statement that takes three seconds to execute should fail with one second timeout"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
response = postgrest.session.post("/rpc/one_sec_timeout")
|
|
|
|
assert response.status_code == 500
|
|
assert (
|
|
response.text
|
|
== '{"code":"57014","details":null,"hint":null,"message":"canceling statement due to statement timeout"}'
|
|
)
|
|
|
|
|
|
def test_function_setting_statement_timeout_passes(defaultenv):
|
|
"statement that takes three seconds to execute should succeed with four second timeout"
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
response = postgrest.session.post("/rpc/four_sec_timeout")
|
|
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_function_setting_work_mem(defaultenv):
|
|
"check function setting work_mem is applied"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_HOISTED_TX_SETTINGS": "work_mem",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get("/rpc/rpc_work_mem?select=get_work_mem")
|
|
|
|
assert response.text == '{"get_work_mem":"6000kB"}'
|
|
|
|
|
|
def test_multiple_func_settings(defaultenv):
|
|
"check multiple function settings are applied"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_HOISTED_TX_SETTINGS": "work_mem,statement_timeout",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get(
|
|
"/rpc/rpc_with_two_hoisted?select=get_work_mem,get_statement_timeout"
|
|
)
|
|
|
|
assert (
|
|
response.text == '{"get_work_mem":"5000kB","get_statement_timeout":"10s"}'
|
|
)
|
|
|
|
|
|
def test_first_hoisted_setting_is_applied(defaultenv):
|
|
"test that work_mem is applied and statement_timeout is not applied"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_HOISTED_TX_SETTINGS": "work_mem", # only work_mem is hoisted
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get(
|
|
"/rpc/rpc_with_one_hoisted?select=get_work_mem,get_statement_timeout"
|
|
)
|
|
|
|
assert response.text == '{"get_work_mem":"3000kB","get_statement_timeout":"2s"}'
|
|
|
|
|
|
def test_second_hoisted_setting_is_applied(defaultenv):
|
|
"test that statement_timeout is applied and work_mem is not applied"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_DB_HOISTED_TX_SETTINGS": "statement_timeout",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
response = postgrest.session.get(
|
|
"/rpc/rpc_with_one_hoisted?select=get_work_mem,get_statement_timeout"
|
|
)
|
|
|
|
assert response.text == '{"get_work_mem":"4MB","get_statement_timeout":"7s"}'
|
|
|
|
|
|
def test_admin_metrics(defaultenv):
|
|
"Should get metrics from the admin endpoint"
|
|
|
|
with run(env=defaultenv, port=freeport()) as postgrest:
|
|
response = postgrest.admin.get("/metrics")
|
|
assert response.status_code == 200
|
|
assert "pgrst_schema_cache_query_time_seconds" in response.text
|
|
assert 'pgrst_schema_cache_loads_total{status="SUCCESS"}' in response.text
|
|
assert "pgrst_db_pool_max" in response.text
|
|
assert "pgrst_db_pool_waiting" in response.text
|
|
assert "pgrst_db_pool_available" in response.text
|
|
assert "pgrst_db_pool_timeouts_total" in response.text
|
|
|
|
|
|
def test_schema_cache_startup_load_with_in_db_config(defaultenv, metapostgrest):
|
|
"verify that the Schema Cache loads correctly at startup, using the in-db `pgrst.db_schemas` config"
|
|
|
|
response = metapostgrest.session.post("/rpc/change_db_schemas_config")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
with run(env=defaultenv) as postgrest:
|
|
response = postgrest.session.get("/rpc/get_current_schema")
|
|
assert response.text == '"test"'
|
|
assert response.status_code == 200
|
|
|
|
response = metapostgrest.session.post("/rpc/reset_db_schemas_config")
|
|
assert response.text == ""
|
|
assert response.status_code == 204
|
|
|
|
|
|
def test_jwt_cache_purges_expired_entries(defaultenv):
|
|
"test expired cache entries are purged on cache miss"
|
|
|
|
# The verification of actual cache size reduction is done manually, see https://github.com/PostgREST/postgrest/pull/3801#issuecomment-2620776041
|
|
# This test is written for code coverage of purgeExpired function
|
|
|
|
relativeSeconds = lambda sec: int(
|
|
(datetime.now(timezone.utc) + timedelta(seconds=sec)).timestamp()
|
|
)
|
|
|
|
headers = lambda sec: jwtauthheader(
|
|
{"role": "postgrest_test_author", "exp": relativeSeconds(sec)},
|
|
SECRET,
|
|
)
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGRST_JWT_CACHE_MAX_LIFETIME": "86400",
|
|
"PGRST_JWT_SECRET": SECRET,
|
|
"PGRST_DB_CONFIG": "false",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
|
|
# Generate two unique JWT tokens
|
|
# The 1 second sleep is needed for it generate a unique token
|
|
hdrs1 = headers(5)
|
|
postgrest.session.get("/authors_only", headers=hdrs1)
|
|
|
|
time.sleep(1)
|
|
|
|
hdrs2 = headers(5)
|
|
postgrest.session.get("/authors_only", headers=hdrs2)
|
|
|
|
# Wait 5 seconds for the tokens to expire
|
|
time.sleep(5)
|
|
|
|
hdrs3 = headers(5)
|
|
|
|
# Make another request which should cause a cache miss and so
|
|
# the purgeExpired function will be triggered.
|
|
#
|
|
# This should remove the 2 expired tokens but adds another to cache
|
|
response = postgrest.session.get("/authors_only", headers=hdrs3)
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_pgrst_log_503_client_error_to_stderr(defaultenv):
|
|
"PostgREST should log 503 errors to stderr"
|
|
|
|
env = {
|
|
**defaultenv,
|
|
"PGAPPNAME": "test-io",
|
|
}
|
|
|
|
with run(env=env) as postgrest:
|
|
|
|
postgrest.session.get("/rpc/terminate_pgrst?appname=test-io")
|
|
|
|
output = postgrest.read_stdout(nlines=6)
|
|
|
|
log_message = '{"code":"PGRST001","details":"no connection to the server\\n","hint":null,"message":"Database client error. Retrying the connection."}\n'
|
|
|
|
assert any(log_message in line for line in output)
|