ProtoStream marshaller for lambas

Closes #44811

Signed-off-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com>
Co-authored-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com>
This commit is contained in:
Pedro Ruivo
2025-12-11 11:47:27 +00:00
committed by GitHub
parent 421abedaa4
commit 2feb158554
6 changed files with 200 additions and 33 deletions

View File

@@ -91,12 +91,14 @@ import org.keycloak.models.sessions.infinispan.stream.AuthClientSessionSetMapper
import org.keycloak.models.sessions.infinispan.stream.ClientSessionFilterByUser;
import org.keycloak.models.sessions.infinispan.stream.CollectionToStreamMapper;
import org.keycloak.models.sessions.infinispan.stream.GroupAndCountCollectorSupplier;
import org.keycloak.models.sessions.infinispan.stream.LoginFailuresLifespanUpdate;
import org.keycloak.models.sessions.infinispan.stream.MapEntryToKeyMapper;
import org.keycloak.models.sessions.infinispan.stream.RemoveKeyConsumer;
import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.SessionUnwrapMapper;
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.ValueIdentityBiFunction;
import org.keycloak.sessions.CommonClientSessionModel;
import org.keycloak.storage.UserStorageProviderClusterEvent;
import org.keycloak.storage.UserStorageProviderModel;
@@ -228,6 +230,8 @@ import org.infinispan.protostream.types.java.CommonTypes;
SessionUnwrapMapper.class,
ClientSessionFilterByUser.class,
RemoveKeyConsumer.class,
ValueIdentityBiFunction.class,
LoginFailuresLifespanUpdate.class,
// infinispan.module.certificates
ReloadCertificateFunction.class,

View File

@@ -186,6 +186,8 @@ public final class Marshalling {
public static final int EMBEDDED_CLIENT_SESSION_KEY = 65616;
public static final int CLIENT_SESSION_USER_FILTER = 65617;
public static final int REMOVE_KEY_BI_CONSUMER = 65618;
public static final int VALUE_IDENTITY_BI_FUNCTION = 65619;
public static final int LOGIN_FAILURES_LIFESPAN_UPDATE = 65620;
public static void configure(GlobalConfigurationBuilder builder) {
getSchemas().forEach(builder.serialization()::addContextInitializer);

View File

@@ -18,7 +18,6 @@ package org.keycloak.models.sessions.infinispan;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
@@ -33,15 +32,13 @@ import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.events.RemoveAllUserLoginFailuresEvent;
import org.keycloak.models.sessions.infinispan.events.SessionEventsSenderTransaction;
import org.keycloak.models.sessions.infinispan.stream.LoginFailuresLifespanUpdate;
import org.keycloak.models.sessions.infinispan.stream.Mappers;
import org.keycloak.models.sessions.infinispan.stream.RemoveKeyConsumer;
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.util.FuturesHelper;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.util.function.SerializableBiFunction;
import org.jboss.logging.Logger;
import static org.keycloak.common.util.StackUtil.getShortStackTrace;
@@ -154,24 +151,20 @@ public class InfinispanUserLoginFailureProvider implements UserLoginFailureProvi
@Override
public void updateWithLatestRealmSettings(RealmModel realm) {
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> cache = loginFailuresTx.getCache();
if (!realm.isBruteForceProtected()) {
cache.entrySet().stream()
.filter(SessionWrapperPredicate.create(realm.getId()))
.forEach(RemoveKeyConsumer.getInstance());
var stream = loginFailuresTx.getCache()
.entrySet()
.stream()
.filter(SessionWrapperPredicate.create(realm.getId()));
if (realm.isBruteForceProtected()) {
var action = new LoginFailuresLifespanUpdate(
realm.getMaxDeltaTimeSeconds() * 1000L,
realm.getMaxTemporaryLockouts(),
realm.isPermanentLockout()
);
stream.forEach(action);
} else {
final long maxDeltaTimeMillis = realm.getMaxDeltaTimeSeconds() * 1000L;
final boolean isPermanentLockout = realm.isPermanentLockout();
final int maxTemporaryLockouts = realm.getMaxTemporaryLockouts();
cache.entrySet().stream()
.filter(SessionWrapperPredicate.create(realm.getId()))
.<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>>forEach((c, entry) -> {
var entity = entry.getValue().getEntity();
long lifespan = SessionTimeouts.getLoginFailuresLifespanMs(isPermanentLockout, maxTemporaryLockouts, maxDeltaTimeMillis, entity);
c.getAdvancedCache()
.withFlags(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT,Flag.FAIL_SILENTLY, Flag.IGNORE_RETURN_VALUES)
.computeIfPresent(entry.getKey(), (SerializableBiFunction<? super LoginFailureKey, ? super SessionEntityWrapper<LoginFailureEntity>, ? extends SessionEntityWrapper<LoginFailureEntity>>) (key, value) -> value, lifespan, TimeUnit.MILLISECONDS);
});
stream.map(Mappers.loginFailureId())
.forEach(RemoveKeyConsumer.getInstance());
}
}

View File

@@ -30,6 +30,7 @@ import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.query.LoginFailureQueries;
import org.keycloak.models.sessions.infinispan.query.QueryHelper;
import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.stream.ValueIdentityBiFunction;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import io.reactivex.rxjava3.schedulers.Schedulers;
@@ -90,27 +91,28 @@ public class RemoteUserLoginFailureProvider implements UserLoginFailureProvider
@Override
public void updateWithLatestRealmSettings(RealmModel realm) {
RemoteCache<LoginFailureKey, LoginFailureEntity> cache = transaction.getCache();
if (!realm.isBruteForceProtected()) {
removeAllUserLoginFailures(realm);
} else {
final long maxDeltaTimeMillis = realm.getMaxDeltaTimeSeconds() * 1000L;
final boolean isPermanentLockout = realm.isPermanentLockout();
final int maxTemporaryLockouts = realm.getMaxTemporaryLockouts();
Query<LoginFailureEntity> query = LoginFailureQueries.searchByRealmId(cache, realm.getId());
CompletionStages.performConcurrently(
QueryHelper.streamAll(query, 20, Function.identity()),
20,
Schedulers.from(new WithinThreadExecutor()),
entry -> updateLifetimeOfCacheEntry(entry, cache, isPermanentLockout, maxTemporaryLockouts, maxDeltaTimeMillis));
return;
}
RemoteCache<LoginFailureKey, LoginFailureEntity> cache = transaction.getCache();
final long maxDeltaTimeMillis = realm.getMaxDeltaTimeSeconds() * 1000L;
final boolean isPermanentLockout = realm.isPermanentLockout();
final int maxTemporaryLockouts = realm.getMaxTemporaryLockouts();
Query<LoginFailureEntity> query = LoginFailureQueries.searchByRealmId(cache, realm.getId());
CompletionStages.performConcurrently(
QueryHelper.streamAll(query, 20, Function.identity()),
20,
Schedulers.from(new WithinThreadExecutor()),
entry -> updateLifetimeOfCacheEntry(entry, cache, isPermanentLockout, maxTemporaryLockouts, maxDeltaTimeMillis));
}
private static CompletionStage<?> updateLifetimeOfCacheEntry(LoginFailureEntity entry, RemoteCache<LoginFailureKey, LoginFailureEntity> cache, boolean isPermanentLockout, int maxTemporaryLockouts, long maxDeltaTimeMillis) {
long lifespan = SessionTimeouts.getLoginFailuresLifespanMs(isPermanentLockout, maxTemporaryLockouts, maxDeltaTimeMillis, entry);
return cache.computeIfPresentAsync(new LoginFailureKey(entry.getRealmId(), entry.getUserId()),
// Keep the original value - this should only update the lifespan and idle time
(loginFailureKey, loginFailureEntitySessionEntityWrapper) -> loginFailureEntitySessionEntityWrapper,
ValueIdentityBiFunction.getInstance(),
lifespan, TimeUnit.MILLISECONDS);
}

View File

@@ -0,0 +1,91 @@
/*
* Copyright 2025 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.stream;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;
import static org.keycloak.marshalling.Marshalling.LOGIN_FAILURES_LIFESPAN_UPDATE;
/**
* A {@link BiConsumer} that updates the lifespan of login failure cache entries based on realm lockout policies.
* <p>
* This class is used to recalculate and update the time-to-live (TTL) for login failure records stored in the
* Infinispan cache. The lifespan is determined by the realm's brute force protection settings, including whether
* permanent lockout is enabled and the maximum number of temporary lockouts allowed.
* <p>
* The class is serializable via Infinispan ProtoStream to support distributed cache operations in remote caches.
*/
@ProtoTypeId(LOGIN_FAILURES_LIFESPAN_UPDATE)
public class LoginFailuresLifespanUpdate implements BiConsumer<Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>>, Map.Entry<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>>> {
@ProtoField(1)
final long maxDeltaTimeMillis;
@ProtoField(2)
final int maxTemporaryLockouts;
@ProtoField(3)
final boolean permanentLockout;
/**
* Creates a new login failures lifespan update operation with the specified lockout policy parameters.
* <p>
* This constructor is annotated with {@link ProtoFactory} to enable Infinispan ProtoStream serialization for remote
* cache operations.
*
* @param maxDeltaTimeMillis The maximum time window in milliseconds for tracking failures
* @param maxTemporaryLockouts The maximum number of temporary lockouts allowed
* @param permanentLockout Whether permanent lockout is enabled
*/
@ProtoFactory
public LoginFailuresLifespanUpdate(long maxDeltaTimeMillis, int maxTemporaryLockouts, boolean permanentLockout) {
this.maxDeltaTimeMillis = maxDeltaTimeMillis;
this.maxTemporaryLockouts = maxTemporaryLockouts;
this.permanentLockout = permanentLockout;
}
/**
* Updates the lifespan of a login failure cache entry based on the configured lockout policy.
* <p>
* The new lifespan is calculated using {@link SessionTimeouts#getLoginFailuresLifespanMs} which considers the
* current failure count, permanent lockout settings, and maximum delta time. The cache entry is updated using flags
* that optimize performance by avoiding locks and ignoring return values.
*
* @param cache The Infinispan cache containing login failure entries
* @param entry The cache entry to update with its key and wrapped login failure entity
*/
@Override
public void accept(Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> cache, Map.Entry<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> entry) {
var entity = entry.getValue().getEntity();
long lifespan = SessionTimeouts.getLoginFailuresLifespanMs(permanentLockout, maxTemporaryLockouts, maxDeltaTimeMillis, entity);
cache.getAdvancedCache()
.withFlags(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY, Flag.IGNORE_RETURN_VALUES)
.computeIfPresent(entry.getKey(), ValueIdentityBiFunction.getInstance(), lifespan, TimeUnit.MILLISECONDS);
}
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2025 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.stream;
import java.util.function.BiFunction;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoTypeId;
import static org.keycloak.marshalling.Marshalling.VALUE_IDENTITY_BI_FUNCTION;
/**
* A {@link BiFunction} implementation that returns the second argument unchanged, effectively acting as an identity
* function for the value parameter.
* <p>
* This class is used in Infinispan cache operations where a {@link BiFunction} is required but only the value needs to
* be preserved without any transformation. The key parameter is ignored.
* <p>
* The class is implemented as a stateless singleton and is serializable via Infinispan ProtoStream to support
* distributed cache operations in remote caches.
*
* @param <K> The type of the first argument (key). This parameter is ignored by the function.
* @param <V> The type of the second argument (value). This parameter is returned unchanged.
*/
@ProtoTypeId(VALUE_IDENTITY_BI_FUNCTION)
public final class ValueIdentityBiFunction<K, V> implements BiFunction<K, V, V> {
private static final ValueIdentityBiFunction<?, ?> INSTANCE = new ValueIdentityBiFunction<>();
private ValueIdentityBiFunction() {
}
/**
* Returns the singleton instance of this function.
* <p>
* This method is annotated with {@link ProtoFactory} to enable Infinispan ProtoStream serialization for remote
* cache operations.
*
* @param <T> The type of the key parameter
* @param <E> The type of the value parameter
* @return The singleton instance of {@link ValueIdentityBiFunction}
*/
@ProtoFactory
@SuppressWarnings("unchecked")
public static <T, E> ValueIdentityBiFunction<T, E> getInstance() {
return (ValueIdentityBiFunction<T, E>) INSTANCE;
}
/**
* Returns the value parameter unchanged, ignoring the key parameter.
*
* @param k The key parameter (ignored)
* @param v The value parameter to return
* @return The value parameter unchanged
*/
@Override
public V apply(K k, V v) {
return v;
}
}