mirror of
https://github.com/keycloak/keycloak.git
synced 2026-04-27 10:31:19 -05:00
Detect and handle KC split brain clusters
Closes #41561 Signed-off-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com> Signed-off-by: Pedro Ruivo <pruivo@users.noreply.github.com> Signed-off-by: Alexander Schwartz <aschwart@redhat.com> Co-authored-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com> Co-authored-by: Alexander Schwartz <alexander.schwartz@gmx.net> Co-authored-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
@@ -40,7 +40,11 @@ These endpoints respond with HTTP status `200 OK` on success or `503 Service Una
|
||||
"status": "UP",
|
||||
"checks": [
|
||||
{
|
||||
"name": "Keycloak database connections health check",
|
||||
"name": "Keycloak cluster health check",
|
||||
"status": "UP"
|
||||
},
|
||||
{
|
||||
"name": "Keycloak database connections async health check",
|
||||
"status": "UP"
|
||||
}
|
||||
]
|
||||
@@ -93,6 +97,10 @@ The table below shows the available checks.
|
||||
|Returns the status of the database connection pool.
|
||||
|Yes
|
||||
|
||||
|Cluster
|
||||
|Returns the status of the cluster (network partitions).
|
||||
|No
|
||||
|
||||
|===
|
||||
|
||||
For some checks, you'll need to also enable metrics as indicated by the `Requires Metrics` column. To enable metrics
|
||||
@@ -100,4 +108,6 @@ use the `metrics-enabled` option as follows:
|
||||
|
||||
<@kc.build parameters="--health-enabled=true --metrics-enabled=true"/>
|
||||
|
||||
The cluster health check is only available for clustered setups when the cache transport stacks `jdbc-ping` or `jdbc-ping-udp` are used.
|
||||
|
||||
</@tmpl.guide>
|
||||
|
||||
+19
-8
@@ -33,7 +33,7 @@ import org.infinispan.commons.configuration.io.ConfigurationWriter;
|
||||
import org.infinispan.commons.io.StringBuilderWriter;
|
||||
import org.infinispan.commons.util.Version;
|
||||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.configuration.global.GlobalConfiguration;
|
||||
import org.infinispan.factories.GlobalComponentRegistry;
|
||||
import org.infinispan.configuration.parsing.ParserRegistry;
|
||||
import org.infinispan.health.CacheHealth;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
@@ -43,6 +43,7 @@ import org.keycloak.Config;
|
||||
import org.keycloak.cluster.ClusterEvent;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.connections.infinispan.remote.RemoteInfinispanConnectionProvider;
|
||||
import org.keycloak.infinispan.health.ClusterHealth;
|
||||
import org.keycloak.infinispan.util.InfinispanUtils;
|
||||
import org.keycloak.marshalling.KeycloakIndexSchemaUtil;
|
||||
import org.keycloak.marshalling.KeycloakModelSchema;
|
||||
@@ -68,9 +69,8 @@ import org.keycloak.spi.infinispan.impl.embedded.CacheConfigurator;
|
||||
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLUSTERED_CACHE_NAMES;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CRL_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.KEYS_CACHE_DEFAULT_MAX;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.KEYS_CACHE_MAX_IDLE_SECONDS;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.KEYS_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
||||
@@ -91,10 +91,9 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
|
||||
private Config.Scope config;
|
||||
|
||||
private volatile EmbeddedCacheManager cacheManager;
|
||||
|
||||
private volatile RemoteCacheManager remoteCacheManager;
|
||||
|
||||
private volatile InfinispanConnectionProvider connectionProvider;
|
||||
private volatile ClusterHealth clusterHealth;
|
||||
|
||||
@Override
|
||||
public InfinispanConnectionProvider create(KeycloakSession session) {
|
||||
@@ -173,9 +172,10 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
|
||||
|
||||
this.remoteCacheManager = createRemoteCacheManager(keycloakSession);
|
||||
this.connectionProvider = InfinispanUtils.isRemoteInfinispan() ?
|
||||
new RemoteInfinispanConnectionProvider(cacheManager, remoteCacheManager, topologyInfo) :
|
||||
new DefaultInfinispanConnectionProvider(cacheManager, topologyInfo);
|
||||
new RemoteInfinispanConnectionProvider(cacheManager, remoteCacheManager, topologyInfo) :
|
||||
new DefaultInfinispanConnectionProvider(cacheManager, topologyInfo);
|
||||
|
||||
clusterHealth = GlobalComponentRegistry.componentOf(cacheManager, ClusterHealth.class);
|
||||
return connectionProvider;
|
||||
}
|
||||
}
|
||||
@@ -304,10 +304,21 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
|
||||
return info;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClusterHealthy() {
|
||||
clusterHealth.triggerClusterHealthCheck();
|
||||
return clusterHealth.isHealthy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClusterHealthSupported() {
|
||||
return clusterHealth.isSupported();
|
||||
}
|
||||
|
||||
private void addEmbeddedOperationalInfo(Map<String, String> info) {
|
||||
var cacheManagerInfo = cacheManager.getCacheManagerInfo();
|
||||
info.put("clusterSize", Integer.toString(cacheManagerInfo.getClusterSize()));
|
||||
var cacheNames = Arrays.stream(InfinispanConnectionProvider.CLUSTERED_CACHE_NAMES)
|
||||
var cacheNames = Arrays.stream(CLUSTERED_CACHE_NAMES)
|
||||
.sorted()
|
||||
.collect(Collectors.toCollection(LinkedHashSet::new));
|
||||
for (CacheHealth health : cacheManager.getHealth().getCacheHealth(cacheNames)) {
|
||||
|
||||
+26
@@ -23,4 +23,30 @@ import org.keycloak.provider.ProviderFactory;
|
||||
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
|
||||
*/
|
||||
public interface InfinispanConnectionProviderFactory extends ProviderFactory<InfinispanConnectionProvider> {
|
||||
|
||||
/**
|
||||
* Detects network split in the cluster.
|
||||
* <p>
|
||||
* If a possible network split is detected and this node does not belong to the winning partition, this method will
|
||||
* return {@code false}, and should not continue handling requests, to keep data safety.
|
||||
*
|
||||
* @return {@code true} if the cluster is healthy and this node can continue processing requests. When
|
||||
* {@code false}, this node must reject any work.
|
||||
*/
|
||||
default boolean isClusterHealthy() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the cluster health check is supported.
|
||||
* <p>
|
||||
* Not all JGroups configurations support discovering network splits and this method signals if the current in use
|
||||
* configuration can detect those.
|
||||
*
|
||||
* @return {@code true} if the cluster health check is supported.
|
||||
*/
|
||||
default boolean isClusterHealthSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright 2025 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.infinispan.health;
|
||||
|
||||
/**
|
||||
* Infinispan cluster health checks, to detect network partitions.
|
||||
*/
|
||||
public interface ClusterHealth {
|
||||
|
||||
/**
|
||||
* It checks the cluster health returning {@code true} if this node can continue processing requests.
|
||||
* <p>
|
||||
* If the network and cluster are stable, this method must return {@code true}.
|
||||
* <p>
|
||||
* If a network partition is detected, the return value depends on whether this node belongs to the winning
|
||||
* partition. It must return {@code true} if it belongs to the winning partition or {@code false} if it does not.
|
||||
* Deciding the winning partition is at the implementation discretion.
|
||||
*
|
||||
* @return {@code true} if the cluster is healthy and this node can continue processing requests, {@code false}
|
||||
* otherwise.
|
||||
*/
|
||||
boolean isHealthy();
|
||||
|
||||
/**
|
||||
* Triggers a cluster health check.
|
||||
* <p>
|
||||
* This method should only trigger the health check logic without blocking or waiting for its outcome.
|
||||
*/
|
||||
void triggerClusterHealthCheck();
|
||||
|
||||
/**
|
||||
* Determine if the cluster health check is supported.
|
||||
* @return false if the current transport setup doesn't provide enough information.
|
||||
*/
|
||||
boolean isSupported();
|
||||
|
||||
}
|
||||
+141
@@ -0,0 +1,141 @@
|
||||
/*
|
||||
* Copyright 2025 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.infinispan.health.impl;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.infinispan.factories.annotations.Inject;
|
||||
import org.infinispan.factories.scopes.Scope;
|
||||
import org.infinispan.factories.scopes.Scopes;
|
||||
import org.infinispan.remoting.transport.Transport;
|
||||
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
|
||||
import org.infinispan.util.concurrent.BlockingManager;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.infinispan.health.ClusterHealth;
|
||||
import org.keycloak.jgroups.protocol.KEYCLOAK_JDBC_PING2;
|
||||
import org.keycloak.jgroups.protocol.KEYCLOAK_JDBC_PING2.HealthStatus;
|
||||
|
||||
/**
|
||||
* A {@link ClusterHealth} implementation that makes use of {@link KEYCLOAK_JDBC_PING2}.
|
||||
* <p>
|
||||
* Since each node is registered in the database, it is possible to detect if a partition is happening.
|
||||
* <p>
|
||||
* The method {@link KEYCLOAK_JDBC_PING2#healthStatus()} contains the algorithm description. If it returns
|
||||
* {@link HealthStatus#ERROR}, the healthy state does not change and relies on the Quarkus/Agroal readiness health
|
||||
* check. But, if {@link HealthStatus#NO_COORDINATOR} is returned, the state will be changed to unhealthy. The should be
|
||||
* a temporary situation as at least one coordinator must be present in the database table.
|
||||
*
|
||||
* @see KEYCLOAK_JDBC_PING2#healthStatus()
|
||||
*/
|
||||
@Scope(Scopes.GLOBAL)
|
||||
public class JdbcPingClusterHealthImpl implements ClusterHealth {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private volatile boolean healthy = true;
|
||||
private volatile HealthRunner runner;
|
||||
|
||||
@Inject
|
||||
public void inject(Transport transport, BlockingManager blockingManager) {
|
||||
// hacking to avoid creating fields :)
|
||||
if (transport == null) {
|
||||
logger.debug("Cluster health check disabled. Local mode");
|
||||
return;
|
||||
}
|
||||
if (!(transport instanceof JGroupsTransport jgrp)) {
|
||||
logger.debug("JGroups Transport not found. Unable to check cluster health.");
|
||||
return;
|
||||
}
|
||||
KEYCLOAK_JDBC_PING2 ping = jgrp.getChannel().getProtocolStack().findProtocol(KEYCLOAK_JDBC_PING2.class);
|
||||
if (ping == null) {
|
||||
logger.warn("Stack 'jdbc-ping' not used. Unable to check cluster health.");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug("Cluster Health check available");
|
||||
init(ping, blockingManager.asExecutor("cluster-health"));
|
||||
}
|
||||
|
||||
public void init(KEYCLOAK_JDBC_PING2 discovery, Executor executor) {
|
||||
runner = new HealthRunner(discovery, executor, this::checkHealth);
|
||||
}
|
||||
|
||||
private void checkHealth(KEYCLOAK_JDBC_PING2 ping) {
|
||||
assert ping != null;
|
||||
if (!lock.tryLock()) {
|
||||
// check in progress
|
||||
return;
|
||||
}
|
||||
try {
|
||||
var status = ping.healthStatus();
|
||||
switch (status) {
|
||||
case HEALTHY:
|
||||
logger.debug("Set cluster health status to healthy");
|
||||
healthy = true;
|
||||
break;
|
||||
case NO_COORDINATOR:
|
||||
logger.warn("Unable to check the cluster health because no coordinator has been found.");
|
||||
// fallthrough
|
||||
case UNHEALTHY:
|
||||
logger.debug("Set cluster health status to unhealthy");
|
||||
healthy = false;
|
||||
break;
|
||||
case ERROR:
|
||||
logger.debug("Error querying the database. Skip updating the cluster health status.");
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHealthy() {
|
||||
return healthy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerClusterHealthCheck() {
|
||||
if (runner != null) {
|
||||
runner.trigger();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupported() {
|
||||
return runner != null;
|
||||
}
|
||||
|
||||
private record HealthRunner(KEYCLOAK_JDBC_PING2 discovery, Executor executor, Consumer<KEYCLOAK_JDBC_PING2> check) {
|
||||
|
||||
HealthRunner {
|
||||
Objects.requireNonNull(discovery);
|
||||
Objects.requireNonNull(executor);
|
||||
Objects.requireNonNull(check);
|
||||
}
|
||||
|
||||
void trigger() {
|
||||
executor.execute(() -> check.accept(discovery));
|
||||
}
|
||||
}
|
||||
}
|
||||
+33
@@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 2025 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.infinispan.module.factory;
|
||||
|
||||
import org.infinispan.factories.AbstractComponentFactory;
|
||||
import org.infinispan.factories.AutoInstantiableFactory;
|
||||
import org.infinispan.factories.annotations.DefaultFactoryFor;
|
||||
import org.keycloak.infinispan.health.ClusterHealth;
|
||||
import org.keycloak.infinispan.health.impl.JdbcPingClusterHealthImpl;
|
||||
|
||||
@DefaultFactoryFor(classes = ClusterHealth.class)
|
||||
public class ClusterHealthFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
|
||||
|
||||
@Override
|
||||
public Object construct(String componentName) {
|
||||
return new JdbcPingClusterHealthImpl();
|
||||
}
|
||||
}
|
||||
@@ -242,4 +242,59 @@ public class KEYCLOAK_JDBC_PING2 extends JDBC_PING2 {
|
||||
public void setJpaConnectionProviderFactory(JpaConnectionProviderFactory factory) {
|
||||
this.factory = Objects.requireNonNull(factory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Detects a network partition and decides if the node belongs to the winning partition.
|
||||
* <p>
|
||||
* The algorithm performs the following steps
|
||||
*
|
||||
* <ul>
|
||||
* <li>Reads the data from the database</li>
|
||||
* <li>If an error occurs fetching the data, it returns {@link HealthStatus#ERROR}</li>
|
||||
* <li>Filters out non coordinator members</li>
|
||||
* <li>If no coordinator is found, it return {@link HealthStatus#NO_COORDINATOR}</li>
|
||||
* <li>If multiple coordinators are found, it compares them and uses the coordinator with the lowest {@link UUID}</li>
|
||||
* <li>Finally, it compares if the coordinator is the same as the current view coordinator. If so, it returns {@link HealthStatus#HEALTHY}, otherwise {@link HealthStatus#UNHEALTHY}</li>
|
||||
* </ul>
|
||||
*
|
||||
* @return The {@link HealthStatus}.
|
||||
* @see HealthStatus
|
||||
*/
|
||||
public HealthStatus healthStatus() {
|
||||
try {
|
||||
// maybe create an index, and a query to return coordinators only?
|
||||
return readFromDB(cluster_name)
|
||||
.stream()
|
||||
.filter(PingData::isCoord)
|
||||
.map(PingData::getAddress)
|
||||
.sorted()
|
||||
.findFirst()
|
||||
.map(view.getCoord()::equals)
|
||||
.map(isCoordinatorInView -> isCoordinatorInView ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY)
|
||||
.orElse(HealthStatus.NO_COORDINATOR);
|
||||
} catch (Exception e) {
|
||||
// database failed?
|
||||
log.warn("Failed to fetch the cluster members from the database.", e);
|
||||
return HealthStatus.ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
public enum HealthStatus {
|
||||
/**
|
||||
* No partition detected or this instance is in the right partition.
|
||||
*/
|
||||
HEALTHY,
|
||||
/**
|
||||
* Partition detected and this instance is not in the right partition. It should stop handling requests.
|
||||
*/
|
||||
UNHEALTHY,
|
||||
/**
|
||||
* No coordinator present in the database table.
|
||||
*/
|
||||
NO_COORDINATOR,
|
||||
/**
|
||||
* If an error occurs when reading from the database.
|
||||
*/
|
||||
ERROR
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright 2025 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.jgroups.protocol;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.jgroups.Address;
|
||||
import org.jgroups.View;
|
||||
import org.jgroups.protocols.PingData;
|
||||
|
||||
/**
|
||||
* A controllable KEYCLOAK_JDBC_PING2 where we can overwrite the view, the data returned from the database, and simulate exceptions.
|
||||
*/
|
||||
public class ControlledJdbcPing extends KEYCLOAK_JDBC_PING2 {
|
||||
|
||||
private volatile List<PingData> pingData = List.of();
|
||||
private volatile Exception exception;
|
||||
|
||||
@Override
|
||||
protected List<PingData> readFromDB(String cluster) throws Exception {
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
return pingData;
|
||||
}
|
||||
|
||||
public void setPingData(List<Address> coordinators) {
|
||||
this.pingData = coordinators.stream()
|
||||
.map(address -> new PingData(address, true).coord(true))
|
||||
.toList();
|
||||
}
|
||||
|
||||
public void setException(Exception exception) {
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
public void setView(Address coordinator) {
|
||||
// view id is irrelevant
|
||||
this.view = View.create(coordinator, 1, coordinator);
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,16 @@
|
||||
package org.keycloak.jgroups.protocol;
|
||||
|
||||
import org.infinispan.util.concurrent.WithinThreadExecutor;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.jgroups.Address;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.conf.ClassConfigurator;
|
||||
import org.jgroups.util.ThreadFactory;
|
||||
import org.jgroups.util.UUID;
|
||||
import org.jgroups.util.Util;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.keycloak.infinispan.health.impl.JdbcPingClusterHealthImpl;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
@@ -14,15 +18,20 @@ import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Misc tests for {@link KEYCLOAK_JDBC_PING2}, running against H2
|
||||
* @author Bela Ban
|
||||
* @author Alexander Schwartz
|
||||
*/
|
||||
public class JdbcPing2Test {
|
||||
protected static Logger log = Logger.getLogger(JdbcPing2Test.class);
|
||||
protected static final Logger log = Logger.getLogger(JdbcPing2Test.class);
|
||||
|
||||
protected static final String CLUSTER="jdbc-test";
|
||||
protected static final int NUM_NODES=8;
|
||||
@@ -47,12 +56,69 @@ public class JdbcPing2Test {
|
||||
log.info("Average time to form the cluster: " + Duration.ofNanos(sum / count));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterHealth() {
|
||||
var ping = new ControlledJdbcPing();
|
||||
var clusterHealth = new JdbcPingClusterHealthImpl();
|
||||
clusterHealth.init(ping, new WithinThreadExecutor());
|
||||
var addresses = IntStream.range(0, 2)
|
||||
.mapToObj(operand -> new UUID(0, operand))
|
||||
.sorted()
|
||||
.toArray(Address[]::new);
|
||||
|
||||
// test exception
|
||||
ping.setException(new RuntimeException("Induced"));
|
||||
assertEquals(KEYCLOAK_JDBC_PING2.HealthStatus.ERROR, ping.healthStatus());
|
||||
// A database exception do not change the cluster health.
|
||||
// It relies on Quarkus database health check to mark the Keycloak instance as not ready.
|
||||
clusterHealth.triggerClusterHealthCheck();
|
||||
assertTrue(clusterHealth.isHealthy());
|
||||
// Remove exception
|
||||
ping.setException(null);
|
||||
|
||||
// test empty table / no coordinator
|
||||
ping.setView(addresses[0]);
|
||||
assertEquals(KEYCLOAK_JDBC_PING2.HealthStatus.NO_COORDINATOR, ping.healthStatus());
|
||||
clusterHealth.triggerClusterHealthCheck();
|
||||
assertFalse(clusterHealth.isHealthy());
|
||||
|
||||
// test member in the view / single coordinator
|
||||
ping.setPingData(List.of(addresses[0]));
|
||||
assertEquals(KEYCLOAK_JDBC_PING2.HealthStatus.HEALTHY, ping.healthStatus());
|
||||
clusterHealth.triggerClusterHealthCheck();
|
||||
assertTrue(clusterHealth.isHealthy());
|
||||
|
||||
// test higher ID loses
|
||||
// coordinator a[0] in the table, and we belong to view with the coordinator a[1]
|
||||
ping.setView(addresses[1]);
|
||||
assertEquals(KEYCLOAK_JDBC_PING2.HealthStatus.UNHEALTHY, ping.healthStatus());
|
||||
clusterHealth.triggerClusterHealthCheck();
|
||||
assertFalse(clusterHealth.isHealthy());
|
||||
|
||||
// test lower ID wins
|
||||
// coordinator a[0] and a[1] in the table, and we belong to view with the coordinator a[0]
|
||||
ping.setPingData(List.of(addresses[1], addresses[0]));
|
||||
ping.setView(addresses[0]);
|
||||
assertEquals(KEYCLOAK_JDBC_PING2.HealthStatus.HEALTHY, ping.healthStatus());
|
||||
clusterHealth.triggerClusterHealthCheck();
|
||||
assertTrue(clusterHealth.isHealthy());
|
||||
|
||||
// test lower ID wins
|
||||
// coordinator a[0] and a[1] in the table, and we belong to view with the coordinator a[1]
|
||||
ping.setPingData(List.of(addresses[1], addresses[0]));
|
||||
ping.setView(addresses[1]);
|
||||
assertEquals(KEYCLOAK_JDBC_PING2.HealthStatus.UNHEALTHY, ping.healthStatus());
|
||||
clusterHealth.triggerClusterHealthCheck();
|
||||
assertFalse(clusterHealth.isHealthy());
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
private static long runSingleTest() throws Exception {
|
||||
JChannel[] channels = new JChannel[NUM_NODES];
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
try {
|
||||
for (int i = 0; i < channels.length; i++) {
|
||||
channels[i] = createChannel(PROTOCOL_STACK, String.valueOf(i + 1));
|
||||
channels[i] = createChannel(String.valueOf(i + 1));
|
||||
}
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
int index = 1;
|
||||
@@ -81,8 +147,9 @@ public class JdbcPing2Test {
|
||||
}
|
||||
}
|
||||
|
||||
protected static JChannel createChannel(String cfg, String name) throws Exception {
|
||||
return new JChannel(cfg).name(name);
|
||||
@SuppressWarnings("resource")
|
||||
protected static JChannel createChannel(String name) throws Exception {
|
||||
return new JChannel(JdbcPing2Test.PROTOCOL_STACK).name(name);
|
||||
}
|
||||
|
||||
protected record Connector(CountDownLatch latch, JChannel ch) implements Runnable {
|
||||
|
||||
+28
-9
@@ -95,6 +95,7 @@ import org.keycloak.connections.jpa.JpaConnectionProvider;
|
||||
import org.keycloak.connections.jpa.JpaConnectionSpi;
|
||||
import org.keycloak.connections.jpa.updater.liquibase.LiquibaseJpaUpdaterProviderFactory;
|
||||
import org.keycloak.connections.jpa.updater.liquibase.conn.DefaultLiquibaseConnectionProvider;
|
||||
import org.keycloak.infinispan.util.InfinispanUtils;
|
||||
import org.keycloak.policy.BlacklistPasswordPolicyProviderFactory;
|
||||
import org.keycloak.protocol.ProtocolMapperSpi;
|
||||
import org.keycloak.protocol.oidc.mappers.DeployedScriptOIDCProtocolMapper;
|
||||
@@ -118,6 +119,7 @@ import org.keycloak.quarkus.runtime.configuration.mappers.WildcardPropertyMapper
|
||||
import org.keycloak.quarkus.runtime.integration.resteasy.KeycloakHandlerChainCustomizer;
|
||||
import org.keycloak.quarkus.runtime.integration.resteasy.KeycloakTracingCustomizer;
|
||||
import org.keycloak.quarkus.runtime.logging.ClearMappedDiagnosticContextFilter;
|
||||
import org.keycloak.quarkus.runtime.services.health.KeycloakClusterReadyHealthCheck;
|
||||
import org.keycloak.quarkus.runtime.services.health.KeycloakReadyHealthCheck;
|
||||
import org.keycloak.quarkus.runtime.storage.database.jpa.NamedJpaConnectionProviderFactory;
|
||||
import org.keycloak.quarkus.runtime.themes.FlatClasspathThemeResourceProviderFactory;
|
||||
@@ -787,12 +789,29 @@ class KeycloakProcessor {
|
||||
|
||||
@BuildStep
|
||||
void disableHealthCheckBean(BuildProducer<BuildTimeConditionBuildItem> removeBeans, CombinedIndexBuildItem index) {
|
||||
if (!isHealthEnabled() || !isMetricsEnabled()) {
|
||||
// disables the single check we provide which depends on metrics enabled
|
||||
ClassInfo disabledBean = index.getIndex()
|
||||
.getClassByName(DotName.createSimple(KeycloakReadyHealthCheck.class.getName()));
|
||||
removeBeans.produce(new BuildTimeConditionBuildItem(disabledBean.asClass(), false));
|
||||
if (isHealthDisabled()) {
|
||||
disableReadyHealthCheck(removeBeans, index);
|
||||
disableClusterHealthCheck(removeBeans, index);
|
||||
return;
|
||||
}
|
||||
if (isMetricsDisabled()) {
|
||||
// disables the single check we provide which depends on metrics enabled.
|
||||
disableReadyHealthCheck(removeBeans, index);
|
||||
}
|
||||
if (InfinispanUtils.isRemoteInfinispan()) {
|
||||
// no cluster when the remote infinispan is used.
|
||||
disableClusterHealthCheck(removeBeans, index);
|
||||
}
|
||||
}
|
||||
|
||||
private static void disableClusterHealthCheck(BuildProducer<BuildTimeConditionBuildItem> removeBeans, CombinedIndexBuildItem index) {
|
||||
ClassInfo clusterHealth = index.getIndex().getClassByName(DotName.createSimple(KeycloakClusterReadyHealthCheck.class));
|
||||
removeBeans.produce(new BuildTimeConditionBuildItem(clusterHealth.asClass(), false));
|
||||
}
|
||||
|
||||
private static void disableReadyHealthCheck(BuildProducer<BuildTimeConditionBuildItem> removeBeans, CombinedIndexBuildItem index) {
|
||||
ClassInfo disabledBean = index.getIndex().getClassByName(DotName.createSimple(KeycloakReadyHealthCheck.class.getName()));
|
||||
removeBeans.produce(new BuildTimeConditionBuildItem(disabledBean.asClass(), false));
|
||||
}
|
||||
|
||||
@BuildStep
|
||||
@@ -1105,12 +1124,12 @@ class KeycloakProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isMetricsEnabled() {
|
||||
return Configuration.isTrue(MetricsOptions.METRICS_ENABLED);
|
||||
private static boolean isMetricsDisabled() {
|
||||
return !Configuration.isTrue(MetricsOptions.METRICS_ENABLED);
|
||||
}
|
||||
|
||||
private boolean isHealthEnabled() {
|
||||
return Configuration.isTrue(HealthOptions.HEALTH_ENABLED);
|
||||
private static boolean isHealthDisabled() {
|
||||
return !Configuration.isTrue(HealthOptions.HEALTH_ENABLED);
|
||||
}
|
||||
|
||||
static JdbcDataSourceBuildItem getDefaultDataSource(List<JdbcDataSourceBuildItem> jdbcDataSources) {
|
||||
|
||||
+54
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2020 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.keycloak.quarkus.runtime.services.health;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import io.smallrye.health.api.AsyncHealthCheck;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import org.eclipse.microprofile.health.HealthCheckResponse;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProviderFactory;
|
||||
import org.keycloak.infinispan.util.InfinispanUtils;
|
||||
import org.keycloak.quarkus.runtime.integration.QuarkusKeycloakSessionFactory;
|
||||
|
||||
import static org.keycloak.quarkus.runtime.services.health.KeycloakReadyHealthCheck.DATE_FORMATTER;
|
||||
import static org.keycloak.quarkus.runtime.services.health.KeycloakReadyHealthCheck.FAILING_SINCE;
|
||||
|
||||
public class KeycloakClusterReadyHealthCheck implements AsyncHealthCheck {
|
||||
|
||||
private final AtomicReference<Instant> failingSince = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
public Uni<HealthCheckResponse> call() {
|
||||
var builder = HealthCheckResponse.named("Keycloak cluster health check").up();
|
||||
if (InfinispanUtils.isRemoteInfinispan()) {
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
var sessionFactory = QuarkusKeycloakSessionFactory.getInstance();
|
||||
InfinispanConnectionProviderFactory factory = (InfinispanConnectionProviderFactory) sessionFactory.getProviderFactory(InfinispanConnectionProvider.class);
|
||||
if (factory.isClusterHealthy()) {
|
||||
failingSince.set(null);
|
||||
} else {
|
||||
builder.down();
|
||||
Instant failingTime = failingSince.updateAndGet(KeycloakReadyHealthCheck::createInstanceIfNeeded);
|
||||
builder.withData(FAILING_SINCE, DATE_FORMATTER.format(failingTime));
|
||||
}
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
}
|
||||
+43
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 2025 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.keycloak.quarkus.runtime.services.health;
|
||||
|
||||
import io.smallrye.health.api.AsyncHealthCheck;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.context.Dependent;
|
||||
import jakarta.enterprise.inject.Produces;
|
||||
import org.eclipse.microprofile.health.Readiness;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProviderFactory;
|
||||
import org.keycloak.quarkus.runtime.integration.QuarkusKeycloakSessionFactory;
|
||||
|
||||
@ApplicationScoped
|
||||
public class KeycloakClusterReadyHealthCheckProducer {
|
||||
|
||||
@Produces
|
||||
@Readiness
|
||||
@Dependent
|
||||
public AsyncHealthCheck createHealthCheck() {
|
||||
var sessionFactory = QuarkusKeycloakSessionFactory.getInstance();
|
||||
InfinispanConnectionProviderFactory factory = (InfinispanConnectionProviderFactory) sessionFactory.getProviderFactory(InfinispanConnectionProvider.class);
|
||||
if (factory.isClusterHealthSupported()) {
|
||||
return new KeycloakClusterReadyHealthCheck();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
+6
-8
@@ -31,6 +31,7 @@ import org.eclipse.microprofile.health.Readiness;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
@@ -64,7 +65,7 @@ public class KeycloakReadyHealthCheck implements AsyncHealthCheck {
|
||||
@Inject
|
||||
DataSourceHealthCheck dataSourceHealthCheck;
|
||||
|
||||
AtomicReference<Instant> failingSince = new AtomicReference<>();
|
||||
private final AtomicReference<Instant> failingSince = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
public Uni<HealthCheckResponse> call() {
|
||||
@@ -77,7 +78,7 @@ public class KeycloakReadyHealthCheck implements AsyncHealthCheck {
|
||||
HealthCheckResponse activeCheckResult = dataSourceHealthCheck.call();
|
||||
if (activeCheckResult.getStatus() == HealthCheckResponse.Status.DOWN) {
|
||||
builder.down();
|
||||
Instant failingTime = failingSince.updateAndGet(this::createInstanceIfNeeded);
|
||||
Instant failingTime = failingSince.updateAndGet(KeycloakReadyHealthCheck::createInstanceIfNeeded);
|
||||
builder.withData(FAILING_SINCE, DATE_FORMATTER.format(failingTime));
|
||||
} else {
|
||||
failingSince.set(null);
|
||||
@@ -90,10 +91,7 @@ public class KeycloakReadyHealthCheck implements AsyncHealthCheck {
|
||||
}
|
||||
}
|
||||
|
||||
Instant createInstanceIfNeeded(Instant instant) {
|
||||
if (instant == null) {
|
||||
return Instant.now();
|
||||
}
|
||||
return instant;
|
||||
static Instant createInstanceIfNeeded(Instant instant) {
|
||||
return Objects.requireNonNullElseGet(instant, Instant::now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+10
-14
@@ -65,14 +65,13 @@ public class HealthDistTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Launch({ "start-dev", "--health-enabled=true", "--metrics-enabled=true" })
|
||||
@Launch({ "start-dev", "--health-enabled=true", "--metrics-enabled=true", "--cache=ispn" })
|
||||
void testNonBlockingProbes() {
|
||||
when().get("/health/live").then()
|
||||
.statusCode(200);
|
||||
when().get("/health/ready").then()
|
||||
.statusCode(200)
|
||||
.body("checks[0].name", equalTo("Keycloak database connections async health check"))
|
||||
.body("checks.size()", equalTo(1));
|
||||
.body("checks.size()", equalTo(2));
|
||||
when().get("/lb-check").then()
|
||||
.statusCode(404);
|
||||
}
|
||||
@@ -93,21 +92,18 @@ public class HealthDistTest {
|
||||
void testMultipleRequests(KeycloakDistribution distribution) throws Exception {
|
||||
for (String relativePath : List.of("/", "/auth/", "auth")) {
|
||||
distribution.run("start-dev", "--health-enabled=true", "--http-management-relative-path=" + relativePath);
|
||||
CompletableFuture future = CompletableFuture.completedFuture(null);
|
||||
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
future = CompletableFuture.allOf(CompletableFuture.runAsync(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < 200; i++) {
|
||||
String healthPath = "health";
|
||||
future = CompletableFuture.allOf(CompletableFuture.runAsync(() -> {
|
||||
for (int i1 = 0; i1 < 200; i1++) {
|
||||
String healthPath = "health";
|
||||
|
||||
if (!relativePath.endsWith("/")) {
|
||||
healthPath = "/" + healthPath;
|
||||
}
|
||||
|
||||
when().get(relativePath + healthPath).then().statusCode(200);
|
||||
if (!relativePath.endsWith("/")) {
|
||||
healthPath = "/" + healthPath;
|
||||
}
|
||||
|
||||
when().get(relativePath + healthPath).then().statusCode(200);
|
||||
}
|
||||
}), future);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user