mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-04 11:19:39 -06:00
581 lines
17 KiB
Go
581 lines
17 KiB
Go
package testcontainers
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/containerd/errdefs"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/api/types/filters"
|
|
"github.com/docker/go-connections/nat"
|
|
|
|
"github.com/testcontainers/testcontainers-go/internal/config"
|
|
"github.com/testcontainers/testcontainers-go/internal/core"
|
|
"github.com/testcontainers/testcontainers-go/log"
|
|
"github.com/testcontainers/testcontainers-go/wait"
|
|
)
|
|
|
|
const (
|
|
// Deprecated: it has been replaced by the internal core.LabelLang
|
|
TestcontainerLabel = "org.testcontainers.golang"
|
|
// Deprecated: it has been replaced by the internal core.LabelSessionID
|
|
TestcontainerLabelSessionID = TestcontainerLabel + ".sessionId"
|
|
// Deprecated: it has been replaced by the internal core.LabelReaper
|
|
TestcontainerLabelIsReaper = TestcontainerLabel + ".reaper"
|
|
)
|
|
|
|
var (
|
|
// Deprecated: it has been replaced by an internal value
|
|
ReaperDefaultImage = config.ReaperDefaultImage
|
|
|
|
// defaultReaperPort is the default port that the reaper listens on if not
|
|
// overridden by the RYUK_PORT environment variable.
|
|
defaultReaperPort = nat.Port("8080/tcp")
|
|
|
|
// errReaperNotFound is returned when no reaper container is found.
|
|
errReaperNotFound = errors.New("reaper not found")
|
|
|
|
// errReaperDisabled is returned if a reaper is requested but the
|
|
// config has it disabled.
|
|
errReaperDisabled = errors.New("reaper disabled")
|
|
|
|
// spawner is the singleton instance of reaperSpawner.
|
|
spawner = &reaperSpawner{}
|
|
|
|
// reaperAck is the expected response from the reaper container.
|
|
reaperAck = []byte("ACK\n")
|
|
)
|
|
|
|
// ReaperProvider represents a provider for the reaper to run itself with
|
|
// The ContainerProvider interface should usually satisfy this as well, so it is pluggable
|
|
type ReaperProvider interface {
|
|
RunContainer(ctx context.Context, req ContainerRequest) (Container, error)
|
|
Config() TestcontainersConfig
|
|
}
|
|
|
|
// NewReaper creates a Reaper with a sessionID to identify containers and a provider to use
|
|
// Deprecated: it's not possible to create a reaper any more. Compose module uses this method
|
|
// to create a reaper for the compose stack.
|
|
//
|
|
// The caller must call Connect at least once on the returned Reaper and use the returned
|
|
// result otherwise the reaper will be kept open until the process exits.
|
|
func NewReaper(ctx context.Context, sessionID string, provider ReaperProvider, _ string) (*Reaper, error) {
|
|
reaper, err := spawner.reaper(ctx, sessionID, provider)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reaper: %w", err)
|
|
}
|
|
|
|
return reaper, nil
|
|
}
|
|
|
|
// reaperContainerNameFromSessionID returns the container name that uniquely
|
|
// identifies the container based on the session id.
|
|
func reaperContainerNameFromSessionID(sessionID string) string {
|
|
// The session id is 64 characters, so we will not hit the limit of 128
|
|
// characters for container names.
|
|
return "reaper_" + sessionID
|
|
}
|
|
|
|
// reaperSpawner is a singleton that manages the reaper container.
|
|
type reaperSpawner struct {
|
|
instance *Reaper
|
|
mtx sync.Mutex
|
|
}
|
|
|
|
// port returns the port that a new reaper should listen on.
|
|
func (r *reaperSpawner) port() nat.Port {
|
|
if port := os.Getenv("RYUK_PORT"); port != "" {
|
|
natPort, err := nat.NewPort("tcp", port)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("invalid RYUK_PORT value %q: %s", port, err))
|
|
}
|
|
return natPort
|
|
}
|
|
|
|
return defaultReaperPort
|
|
}
|
|
|
|
// backoff returns a backoff policy for the reaper spawner.
|
|
// It will take at most 20 seconds, doing each attempt every 100ms - 250ms.
|
|
func (r *reaperSpawner) backoff() *backoff.ExponentialBackOff {
|
|
// We want random intervals between 100ms and 250ms for concurrent executions
|
|
// to not be synchronized: it could be the case that multiple executions of this
|
|
// function happen at the same time (specifically when called from a different test
|
|
// process execution), and we want to avoid that they all try to find the reaper
|
|
// container at the same time.
|
|
b := &backoff.ExponentialBackOff{
|
|
InitialInterval: time.Millisecond * 100,
|
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
|
Multiplier: backoff.DefaultMultiplier,
|
|
// Adjust MaxInterval to compensate for randomization factor which can be added to
|
|
// returned interval so we have a maximum of 250ms.
|
|
MaxInterval: time.Duration(float64(time.Millisecond*250) * backoff.DefaultRandomizationFactor),
|
|
MaxElapsedTime: time.Second * 20,
|
|
Stop: backoff.Stop,
|
|
Clock: backoff.SystemClock,
|
|
}
|
|
b.Reset()
|
|
|
|
return b
|
|
}
|
|
|
|
// cleanup terminates the reaper container if set.
|
|
func (r *reaperSpawner) cleanup() error {
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
|
|
return r.cleanupLocked()
|
|
}
|
|
|
|
// cleanupLocked terminates the reaper container if set.
|
|
// It must be called with the lock held.
|
|
func (r *reaperSpawner) cleanupLocked() error {
|
|
if r.instance == nil {
|
|
return nil
|
|
}
|
|
|
|
err := TerminateContainer(r.instance.container)
|
|
r.instance = nil
|
|
|
|
return err
|
|
}
|
|
|
|
// lookupContainer returns a DockerContainer type with the reaper container in the case
|
|
// it's found in the running state, and including the labels for sessionID, reaper, and ryuk.
|
|
// It will perform a retry with exponential backoff to allow for the container to be started and
|
|
// avoid potential false negatives.
|
|
func (r *reaperSpawner) lookupContainer(ctx context.Context, sessionID string) (*DockerContainer, error) {
|
|
dockerClient, err := NewDockerClientWithOpts(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new client: %w", err)
|
|
}
|
|
defer dockerClient.Close()
|
|
|
|
provider, err := NewDockerProvider()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new provider: %w", err)
|
|
}
|
|
|
|
provider.SetClient(dockerClient)
|
|
|
|
opts := container.ListOptions{
|
|
All: true,
|
|
Filters: filters.NewArgs(
|
|
filters.Arg("label", fmt.Sprintf("%s=%s", core.LabelSessionID, sessionID)),
|
|
filters.Arg("label", fmt.Sprintf("%s=%t", core.LabelReaper, true)),
|
|
filters.Arg("label", fmt.Sprintf("%s=%t", core.LabelRyuk, true)),
|
|
filters.Arg("name", reaperContainerNameFromSessionID(sessionID)),
|
|
),
|
|
}
|
|
|
|
return backoff.RetryWithData(
|
|
func() (*DockerContainer, error) {
|
|
resp, err := dockerClient.ContainerList(ctx, opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("container list: %w", err)
|
|
}
|
|
|
|
if len(resp) == 0 {
|
|
// No reaper container not found.
|
|
return nil, backoff.Permanent(errReaperNotFound)
|
|
}
|
|
|
|
if len(resp) > 1 {
|
|
return nil, fmt.Errorf("found %d reaper containers for session ID %q", len(resp), sessionID)
|
|
}
|
|
|
|
r, err := provider.ContainerFromType(ctx, resp[0])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("from docker: %w", err)
|
|
}
|
|
|
|
switch {
|
|
case r.healthStatus == types.Healthy,
|
|
r.healthStatus == types.NoHealthcheck:
|
|
return r, nil
|
|
case r.healthStatus != "":
|
|
return nil, fmt.Errorf("container not healthy: %s", r.healthStatus)
|
|
}
|
|
|
|
return r, nil
|
|
},
|
|
backoff.WithContext(r.backoff(), ctx),
|
|
)
|
|
}
|
|
|
|
// isRunning returns an error if the container is not running.
|
|
func (r *reaperSpawner) isRunning(ctx context.Context, ctr Container) error {
|
|
state, err := ctr.State(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("container state: %w", err)
|
|
}
|
|
|
|
if !state.Running {
|
|
// Use NotFound error to indicate the container is not running
|
|
// and should be recreated.
|
|
return errdefs.ErrNotFound.WithMessage("container state: " + state.Status)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// retryError returns a permanent error if the error is not considered retryable.
|
|
func (r *reaperSpawner) retryError(err error) error {
|
|
var timeout interface {
|
|
Timeout() bool
|
|
}
|
|
switch {
|
|
case isCleanupSafe(err),
|
|
createContainerFailDueToNameConflictRegex.MatchString(err.Error()),
|
|
errors.Is(err, syscall.ECONNREFUSED),
|
|
errors.Is(err, syscall.ECONNRESET),
|
|
errors.Is(err, syscall.ECONNABORTED),
|
|
errors.Is(err, syscall.ETIMEDOUT),
|
|
errors.Is(err, os.ErrDeadlineExceeded),
|
|
errors.As(err, &timeout) && timeout.Timeout(),
|
|
errors.Is(err, context.DeadlineExceeded),
|
|
errors.Is(err, context.Canceled):
|
|
// Retryable error.
|
|
return err
|
|
default:
|
|
return backoff.Permanent(err)
|
|
}
|
|
}
|
|
|
|
// reaper returns an existing Reaper instance if it exists and is running, otherwise
|
|
// a new Reaper instance will be created with a sessionID to identify containers in
|
|
// the same test session/program. If connect is true, the reaper will be connected
|
|
// to the reaper container.
|
|
// Returns an error if config.RyukDisabled is true.
|
|
//
|
|
// Safe for concurrent calls.
|
|
func (r *reaperSpawner) reaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) {
|
|
if config.Read().RyukDisabled {
|
|
return nil, errReaperDisabled
|
|
}
|
|
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
|
|
return backoff.RetryWithData(
|
|
r.retryLocked(ctx, sessionID, provider),
|
|
backoff.WithContext(r.backoff(), ctx),
|
|
)
|
|
}
|
|
|
|
// retryLocked returns a function that can be used to create or reuse a reaper container.
|
|
// If connect is true, the reaper will be connected to the reaper container.
|
|
// It must be called with the lock held.
|
|
func (r *reaperSpawner) retryLocked(ctx context.Context, sessionID string, provider ReaperProvider) func() (*Reaper, error) {
|
|
return func() (reaper *Reaper, err error) {
|
|
reaper, err = r.reuseOrCreate(ctx, sessionID, provider)
|
|
// Ensure that the reaper is terminated if an error occurred.
|
|
defer func() {
|
|
if err != nil {
|
|
if reaper != nil {
|
|
err = errors.Join(err, TerminateContainer(reaper.container))
|
|
}
|
|
err = r.retryError(errors.Join(err, r.cleanupLocked()))
|
|
}
|
|
}()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = r.isRunning(ctx, reaper.container); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check we can still connect.
|
|
termSignal, err := reaper.connect(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connect: %w", err)
|
|
}
|
|
|
|
reaper.setOrSignal(termSignal)
|
|
|
|
r.instance = reaper
|
|
|
|
return reaper, nil
|
|
}
|
|
}
|
|
|
|
// reuseOrCreate returns an existing Reaper instance if it exists, otherwise a new Reaper instance.
|
|
func (r *reaperSpawner) reuseOrCreate(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) {
|
|
if r.instance != nil {
|
|
// We already have an associated reaper.
|
|
return r.instance, nil
|
|
}
|
|
|
|
// Look for an existing reaper created in the same test session but in a
|
|
// different test process execution e.g. when running tests in parallel.
|
|
container, err := r.lookupContainer(context.Background(), sessionID)
|
|
if err != nil {
|
|
if !errors.Is(err, errReaperNotFound) {
|
|
return nil, fmt.Errorf("look up container: %w", err)
|
|
}
|
|
|
|
// The reaper container was not found, continue to create a new one.
|
|
reaper, err := r.newReaper(ctx, sessionID, provider)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("new reaper: %w", err)
|
|
}
|
|
|
|
return reaper, nil
|
|
}
|
|
|
|
// A reaper container exists re-use it.
|
|
reaper, err := r.fromContainer(ctx, sessionID, provider, container)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("from container %q: %w", container.ID[:8], err)
|
|
}
|
|
|
|
return reaper, nil
|
|
}
|
|
|
|
// fromContainer constructs a Reaper from an already running reaper DockerContainer.
|
|
func (r *reaperSpawner) fromContainer(ctx context.Context, sessionID string, provider ReaperProvider, dockerContainer *DockerContainer) (*Reaper, error) {
|
|
log.Printf("⏳ Waiting for Reaper %q to be ready", dockerContainer.ID[:8])
|
|
|
|
// Reusing an existing container so we determine the port from the container's exposed ports.
|
|
if err := wait.ForExposedPort().
|
|
WithPollInterval(100*time.Millisecond).
|
|
SkipInternalCheck().
|
|
WaitUntilReady(ctx, dockerContainer); err != nil {
|
|
return nil, fmt.Errorf("wait for reaper %s: %w", dockerContainer.ID[:8], err)
|
|
}
|
|
|
|
endpoint, err := dockerContainer.Endpoint(ctx, "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("port endpoint: %w", err)
|
|
}
|
|
|
|
log.Printf("🔥 Reaper obtained from Docker for this test session %s", dockerContainer.ID[:8])
|
|
|
|
return &Reaper{
|
|
Provider: provider,
|
|
SessionID: sessionID,
|
|
Endpoint: endpoint,
|
|
container: dockerContainer,
|
|
}, nil
|
|
}
|
|
|
|
// newReaper creates a connected Reaper with a sessionID to identify containers
|
|
// and a provider to use.
|
|
func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (reaper *Reaper, err error) {
|
|
dockerHostMount := core.MustExtractDockerSocket(ctx)
|
|
|
|
port := r.port()
|
|
tcConfig := provider.Config().Config
|
|
req := ContainerRequest{
|
|
Image: config.ReaperDefaultImage,
|
|
ExposedPorts: []string{string(port)},
|
|
Labels: core.DefaultLabels(sessionID),
|
|
WaitingFor: wait.ForListeningPort(port),
|
|
Name: reaperContainerNameFromSessionID(sessionID),
|
|
HostConfigModifier: func(hc *container.HostConfig) {
|
|
hc.AutoRemove = true
|
|
hc.Binds = []string{dockerHostMount + ":/var/run/docker.sock"}
|
|
hc.NetworkMode = Bridge
|
|
hc.Privileged = tcConfig.RyukPrivileged
|
|
},
|
|
Env: map[string]string{},
|
|
}
|
|
if to := tcConfig.RyukConnectionTimeout; to > time.Duration(0) {
|
|
req.Env["RYUK_CONNECTION_TIMEOUT"] = to.String()
|
|
}
|
|
if to := tcConfig.RyukReconnectionTimeout; to > time.Duration(0) {
|
|
req.Env["RYUK_RECONNECTION_TIMEOUT"] = to.String()
|
|
}
|
|
if tcConfig.RyukVerbose {
|
|
req.Env["RYUK_VERBOSE"] = "true"
|
|
}
|
|
|
|
// Setup reaper-specific labels for the reaper container.
|
|
req.Labels[core.LabelReaper] = "true"
|
|
req.Labels[core.LabelRyuk] = "true"
|
|
delete(req.Labels, core.LabelReap)
|
|
|
|
// Attach reaper container to a requested network if it is specified
|
|
if p, ok := provider.(*DockerProvider); ok {
|
|
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("ensure default network: %w", err)
|
|
}
|
|
|
|
req.Networks = append(req.Networks, defaultNetwork)
|
|
}
|
|
|
|
c, err := provider.RunContainer(ctx, req)
|
|
defer func() {
|
|
if err != nil {
|
|
err = errors.Join(err, TerminateContainer(c))
|
|
}
|
|
}()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("run container: %w", err)
|
|
}
|
|
|
|
endpoint, err := c.PortEndpoint(ctx, port, "")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("port endpoint: %w", err)
|
|
}
|
|
|
|
return &Reaper{
|
|
Provider: provider,
|
|
SessionID: sessionID,
|
|
Endpoint: endpoint,
|
|
container: c,
|
|
}, nil
|
|
}
|
|
|
|
// Reaper is used to start a sidecar container that cleans up resources
|
|
type Reaper struct {
|
|
Provider ReaperProvider
|
|
SessionID string
|
|
Endpoint string
|
|
container Container
|
|
mtx sync.Mutex // Protects termSignal.
|
|
termSignal chan bool
|
|
}
|
|
|
|
// Connect connects to the reaper container and sends the labels to it
|
|
// so that it can clean up the containers with the same labels.
|
|
//
|
|
// It returns a channel that can be closed to terminate the connection.
|
|
// Returns an error if config.RyukDisabled is true.
|
|
func (r *Reaper) Connect() (chan bool, error) {
|
|
if config.Read().RyukDisabled {
|
|
return nil, errReaperDisabled
|
|
}
|
|
|
|
if termSignal := r.useTermSignal(); termSignal != nil {
|
|
return termSignal, nil
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
|
|
return r.connect(ctx)
|
|
}
|
|
|
|
// close signals the connection to close if needed.
|
|
// Safe for concurrent calls.
|
|
func (r *Reaper) close() {
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
|
|
if r.termSignal != nil {
|
|
r.termSignal <- true
|
|
r.termSignal = nil
|
|
}
|
|
}
|
|
|
|
// setOrSignal sets the reapers termSignal field if nil
|
|
// otherwise consumes by sending true to it.
|
|
// Safe for concurrent calls.
|
|
func (r *Reaper) setOrSignal(termSignal chan bool) {
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
|
|
if r.termSignal != nil {
|
|
// Already have an existing connection, close the new one.
|
|
termSignal <- true
|
|
return
|
|
}
|
|
|
|
// First or new unused termSignal, assign for caller to reuse.
|
|
r.termSignal = termSignal
|
|
}
|
|
|
|
// useTermSignal if termSignal is not nil returns it
|
|
// and sets it to nil, otherwise returns nil.
|
|
//
|
|
// Safe for concurrent calls.
|
|
func (r *Reaper) useTermSignal() chan bool {
|
|
r.mtx.Lock()
|
|
defer r.mtx.Unlock()
|
|
|
|
if r.termSignal == nil {
|
|
return nil
|
|
}
|
|
|
|
// Use existing connection.
|
|
term := r.termSignal
|
|
r.termSignal = nil
|
|
|
|
return term
|
|
}
|
|
|
|
// connect connects to the reaper container and sends the labels to it
|
|
// so that it can clean up the containers with the same labels.
|
|
//
|
|
// It returns a channel that can be sent true to terminate the connection.
|
|
// Returns an error if config.RyukDisabled is true.
|
|
func (r *Reaper) connect(ctx context.Context) (chan bool, error) {
|
|
var d net.Dialer
|
|
conn, err := d.DialContext(ctx, "tcp", r.Endpoint)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dial reaper %s: %w", r.Endpoint, err)
|
|
}
|
|
|
|
terminationSignal := make(chan bool)
|
|
go func() {
|
|
defer conn.Close()
|
|
if err := r.handshake(conn); err != nil {
|
|
log.Printf("Reaper handshake failed: %s", err)
|
|
}
|
|
<-terminationSignal
|
|
}()
|
|
return terminationSignal, nil
|
|
}
|
|
|
|
// handshake sends the labels to the reaper container and reads the ACK.
|
|
func (r *Reaper) handshake(conn net.Conn) error {
|
|
labels := core.DefaultLabels(r.SessionID)
|
|
labelFilters := make([]string, 0, len(labels))
|
|
for l, v := range labels {
|
|
labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v))
|
|
}
|
|
|
|
filters := []byte(strings.Join(labelFilters, "&") + "\n")
|
|
buf := make([]byte, 4)
|
|
if _, err := conn.Write(filters); err != nil {
|
|
return fmt.Errorf("writing filters: %w", err)
|
|
}
|
|
|
|
n, err := io.ReadFull(conn, buf)
|
|
if err != nil {
|
|
return fmt.Errorf("read ack: %w", err)
|
|
}
|
|
|
|
if !bytes.Equal(reaperAck, buf[:n]) {
|
|
// We have received the ACK so all done.
|
|
return fmt.Errorf("unexpected reaper response: %s", buf[:n])
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Labels returns the container labels to use so that this Reaper cleans them up
|
|
// Deprecated: internally replaced by core.DefaultLabels(sessionID)
|
|
func (r *Reaper) Labels() map[string]string {
|
|
return GenericLabels()
|
|
}
|
|
|
|
// isReaperImage returns true if the image name is the reaper image.
|
|
func isReaperImage(name string) bool {
|
|
return strings.HasSuffix(name, config.ReaperDefaultImage)
|
|
}
|