259 lines
8.9 KiB
Go
259 lines
8.9 KiB
Go
package reset
|
|
|
|
import (
|
|
"context"
|
|
_ "embed"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/api/types/network"
|
|
"github.com/docker/docker/errdefs"
|
|
"github.com/go-errors/errors"
|
|
"github.com/jackc/pgconn"
|
|
"github.com/jackc/pgerrcode"
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/spf13/afero"
|
|
"github.com/supabase/cli/internal/db/start"
|
|
"github.com/supabase/cli/internal/gen/keys"
|
|
"github.com/supabase/cli/internal/migration/apply"
|
|
"github.com/supabase/cli/internal/migration/repair"
|
|
"github.com/supabase/cli/internal/seed/buckets"
|
|
"github.com/supabase/cli/internal/utils"
|
|
"github.com/supabase/cli/pkg/migration"
|
|
"github.com/supabase/cli/pkg/vault"
|
|
)
|
|
|
|
func Run(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
|
|
if len(version) > 0 {
|
|
if _, err := strconv.Atoi(version); err != nil {
|
|
return errors.New(repair.ErrInvalidVersion)
|
|
}
|
|
if _, err := repair.GetMigrationFile(version, fsys); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if !utils.IsLocalDatabase(config) {
|
|
msg := "Do you want to reset the remote database?"
|
|
if shouldReset, err := utils.NewConsole().PromptYesNo(ctx, msg, false); err != nil {
|
|
return err
|
|
} else if !shouldReset {
|
|
return errors.New(context.Canceled)
|
|
}
|
|
return resetRemote(ctx, version, config, fsys, options...)
|
|
}
|
|
// Config file is loaded before parsing --linked or --local flags
|
|
if err := utils.AssertSupabaseDbIsRunning(); err != nil {
|
|
return err
|
|
}
|
|
// Reset postgres database because extensions (pg_cron, pg_net) require postgres
|
|
if err := resetDatabase(ctx, version, fsys, options...); err != nil {
|
|
return err
|
|
}
|
|
// Seed objects from supabase/buckets directory
|
|
if resp, err := utils.Docker.ContainerInspect(ctx, utils.StorageId); err == nil {
|
|
if resp.State.Health == nil || resp.State.Health.Status != types.Healthy {
|
|
if err := start.WaitForHealthyService(ctx, 30*time.Second, utils.StorageId); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := buckets.Run(ctx, "", false, fsys); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
branch := keys.GetGitBranch(fsys)
|
|
fmt.Fprintln(os.Stderr, "Finished "+utils.Aqua("supabase db reset")+" on branch "+utils.Aqua(branch)+".")
|
|
return nil
|
|
}
|
|
|
|
func resetDatabase(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
|
|
fmt.Fprintln(os.Stderr, "Resetting local database"+toLogMessage(version))
|
|
if utils.Config.Db.MajorVersion <= 14 {
|
|
return resetDatabase14(ctx, version, fsys, options...)
|
|
}
|
|
return resetDatabase15(ctx, version, fsys, options...)
|
|
}
|
|
|
|
func toLogMessage(version string) string {
|
|
if len(version) > 0 {
|
|
return " to version: " + version
|
|
}
|
|
return "..."
|
|
}
|
|
|
|
func resetDatabase14(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
|
|
if err := recreateDatabase(ctx, options...); err != nil {
|
|
return err
|
|
}
|
|
if err := initDatabase(ctx, options...); err != nil {
|
|
return err
|
|
}
|
|
if err := RestartDatabase(ctx, os.Stderr); err != nil {
|
|
return err
|
|
}
|
|
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{}, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close(context.Background())
|
|
return apply.MigrateAndSeed(ctx, version, conn, fsys)
|
|
}
|
|
|
|
func resetDatabase15(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
|
|
if err := utils.Docker.ContainerRemove(ctx, utils.DbId, container.RemoveOptions{Force: true}); err != nil {
|
|
return errors.Errorf("failed to remove container: %w", err)
|
|
}
|
|
if err := utils.Docker.VolumeRemove(ctx, utils.DbId, true); err != nil {
|
|
return errors.Errorf("failed to remove volume: %w", err)
|
|
}
|
|
config := start.NewContainerConfig()
|
|
hostConfig := start.NewHostConfig()
|
|
networkingConfig := network.NetworkingConfig{
|
|
EndpointsConfig: map[string]*network.EndpointSettings{
|
|
utils.NetId: {
|
|
Aliases: utils.DbAliases,
|
|
},
|
|
},
|
|
}
|
|
fmt.Fprintln(os.Stderr, "Recreating database...")
|
|
if _, err := utils.DockerStart(ctx, config, hostConfig, networkingConfig, utils.DbId); err != nil {
|
|
return err
|
|
}
|
|
if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
|
|
return err
|
|
}
|
|
if err := start.SetupLocalDatabase(ctx, version, fsys, os.Stderr, options...); err != nil {
|
|
return err
|
|
}
|
|
fmt.Fprintln(os.Stderr, "Restarting containers...")
|
|
return restartServices(ctx)
|
|
}
|
|
|
|
func initDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
|
|
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: "supabase_admin"}, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close(context.Background())
|
|
return start.InitSchema14(ctx, conn)
|
|
}
|
|
|
|
// Recreate postgres database by connecting to template1
|
|
func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
|
|
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: "supabase_admin", Database: "template1"}, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close(context.Background())
|
|
if err := DisconnectClients(ctx, conn); err != nil {
|
|
return err
|
|
}
|
|
// We are not dropping roles here because they are cluster level entities. Use stop && start instead.
|
|
sql := migration.MigrationFile{
|
|
Statements: []string{
|
|
"DROP DATABASE IF EXISTS postgres WITH (FORCE)",
|
|
"CREATE DATABASE postgres WITH OWNER postgres",
|
|
"DROP DATABASE IF EXISTS _supabase WITH (FORCE)",
|
|
"CREATE DATABASE _supabase WITH OWNER postgres",
|
|
},
|
|
}
|
|
return sql.ExecBatch(ctx, conn)
|
|
}
|
|
|
|
const (
|
|
TERMINATE_BACKENDS = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('postgres', '_supabase')"
|
|
COUNT_REPLICATION_SLOTS = "SELECT COUNT(*) FROM pg_replication_slots WHERE database IN ('postgres', '_supabase')"
|
|
)
|
|
|
|
func DisconnectClients(ctx context.Context, conn *pgx.Conn) error {
|
|
// Must be executed separately because looping in transaction is unsupported
|
|
// https://dba.stackexchange.com/a/11895
|
|
disconn := migration.MigrationFile{
|
|
Statements: []string{
|
|
"ALTER DATABASE postgres ALLOW_CONNECTIONS false",
|
|
"ALTER DATABASE _supabase ALLOW_CONNECTIONS false",
|
|
TERMINATE_BACKENDS,
|
|
},
|
|
}
|
|
if err := disconn.ExecBatch(ctx, conn); err != nil {
|
|
var pgErr *pgconn.PgError
|
|
if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName {
|
|
return errors.Errorf("failed to disconnect clients: %w", err)
|
|
}
|
|
}
|
|
// Wait for WAL senders to drop their replication slots
|
|
policy := start.NewBackoffPolicy(ctx, 10*time.Second)
|
|
waitForDrop := func() error {
|
|
var count int
|
|
if err := conn.QueryRow(ctx, COUNT_REPLICATION_SLOTS).Scan(&count); err != nil {
|
|
err = errors.Errorf("failed to count replication slots: %w", err)
|
|
return &backoff.PermanentError{Err: err}
|
|
} else if count > 0 {
|
|
return errors.Errorf("replication slots still active: %d", count)
|
|
}
|
|
return nil
|
|
}
|
|
return backoff.Retry(waitForDrop, policy)
|
|
}
|
|
|
|
func RestartDatabase(ctx context.Context, w io.Writer) error {
|
|
fmt.Fprintln(w, "Restarting containers...")
|
|
// Some extensions must be manually restarted after pg_terminate_backend
|
|
// Ref: https://github.com/citusdata/pg_cron/issues/99
|
|
if err := utils.Docker.ContainerRestart(ctx, utils.DbId, container.StopOptions{}); err != nil {
|
|
return errors.Errorf("failed to restart container: %w", err)
|
|
}
|
|
if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
|
|
return err
|
|
}
|
|
return restartServices(ctx)
|
|
}
|
|
|
|
func restartServices(ctx context.Context) error {
|
|
// No need to restart PostgREST because it automatically reconnects and listens for schema changes
|
|
services := listServicesToRestart()
|
|
result := utils.WaitAll(services, func(id string) error {
|
|
if err := utils.Docker.ContainerRestart(ctx, id, container.StopOptions{}); err != nil && !errdefs.IsNotFound(err) {
|
|
return errors.Errorf("failed to restart %s: %w", id, err)
|
|
}
|
|
return nil
|
|
})
|
|
// Do not wait for service healthy as those services may be excluded from starting
|
|
return errors.Join(result...)
|
|
}
|
|
|
|
func listServicesToRestart() []string {
|
|
return []string{utils.StorageId, utils.GotrueId, utils.RealtimeId, utils.PoolerId}
|
|
}
|
|
|
|
func resetRemote(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
|
|
fmt.Fprintln(os.Stderr, "Resetting remote database"+toLogMessage(version))
|
|
conn, err := utils.ConnectByConfigStream(ctx, config, io.Discard, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close(context.Background())
|
|
if err := migration.DropUserSchemas(ctx, conn); err != nil {
|
|
return err
|
|
}
|
|
if err := vault.UpsertVaultSecrets(ctx, utils.Config.Db.Vault, conn); err != nil {
|
|
return err
|
|
}
|
|
return apply.MigrateAndSeed(ctx, version, conn, fsys)
|
|
}
|
|
|
|
func LikeEscapeSchema(schemas []string) (result []string) {
|
|
// Treat _ as literal, * as any character
|
|
replacer := strings.NewReplacer("_", `\_`, "*", "%")
|
|
for _, sch := range schemas {
|
|
result = append(result, replacer.Replace(sch))
|
|
}
|
|
return result
|
|
}
|