mirror of
https://github.com/keycloak/keycloak.git
synced 2026-04-26 17:07:23 -05:00
Improve workflow logging messages
- every execution gets its own id that can be used to track all activities related to that particular workflow execution Closes #42952 Signed-off-by: Stefan Guilhen <sguilhen@redhat.com>
This commit is contained in:
committed by
Pedro Igor
parent
7bcf08fa31
commit
7f29c9bb88
+2
-2
@@ -1,7 +1,7 @@
|
||||
package org.keycloak.representations.workflows;
|
||||
|
||||
import static org.keycloak.common.util.reflections.Reflections.isArrayType;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_IF;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_WITH;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@@ -20,7 +20,7 @@ public abstract class AbstractWorkflowComponentRepresentation {
|
||||
private String id;
|
||||
private String uses;
|
||||
|
||||
@JsonProperty(CONFIG_IF)
|
||||
@JsonProperty(CONFIG_WITH)
|
||||
private MultivaluedHashMap<String, String> config;
|
||||
|
||||
public AbstractWorkflowComponentRepresentation(String id, String uses, MultivaluedHashMap<String, String> config) {
|
||||
|
||||
+38
-22
@@ -45,22 +45,23 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider {
|
||||
|
||||
@Override
|
||||
public ScheduledStep getScheduledStep(String workflowId, String resourceId) {
|
||||
WorkflowStateEntity.PrimaryKey pk = new WorkflowStateEntity.PrimaryKey(resourceId, workflowId);
|
||||
WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, pk);
|
||||
if (entity != null) {
|
||||
return new ScheduledStep(entity.getWorkflowId(), entity.getScheduledStepId(), entity.getResourceId());
|
||||
}
|
||||
return null;
|
||||
CriteriaBuilder cb = em.getCriteriaBuilder();
|
||||
CriteriaQuery<WorkflowStateEntity> query = cb.createQuery(WorkflowStateEntity.class);
|
||||
Root<WorkflowStateEntity> stateRoot = query.from(WorkflowStateEntity.class);
|
||||
|
||||
query.where(cb.and(cb.equal(stateRoot.get("workflowId"), workflowId), cb.equal(stateRoot.get("resourceId"), resourceId)));
|
||||
WorkflowStateEntity entity = em.createQuery(query).getSingleResultOrNull();
|
||||
return entity != null ? toScheduledStep(entity) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId) {
|
||||
WorkflowStateEntity.PrimaryKey pk = new WorkflowStateEntity.PrimaryKey(resourceId, workflow.getId());
|
||||
WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, pk);
|
||||
public void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId, String executionId) {
|
||||
WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, executionId);
|
||||
if (entity == null) {
|
||||
entity = new WorkflowStateEntity();
|
||||
entity.setResourceId(resourceId);
|
||||
entity.setWorkflowId(workflow.getId());
|
||||
entity.setExecutionId(executionId);
|
||||
entity.setWorkflowProviderId(workflow.getProviderId());
|
||||
entity.setScheduledStepId(step.getId());
|
||||
entity.setScheduledStepTimestamp(Time.currentTimeMillis() + step.getAfter());
|
||||
@@ -83,7 +84,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider {
|
||||
query.where(cb.and(byWorkflow, isExpired));
|
||||
|
||||
return em.createQuery(query).getResultStream()
|
||||
.map(s -> new ScheduledStep(s.getWorkflowId(), s.getScheduledStepId(), s.getResourceId()))
|
||||
.map(this::toScheduledStep)
|
||||
.toList();
|
||||
}
|
||||
|
||||
@@ -101,7 +102,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider {
|
||||
query.where(byWorkflow);
|
||||
|
||||
return em.createQuery(query).getResultStream()
|
||||
.map(s -> new ScheduledStep(s.getWorkflowId(), s.getScheduledStepId(), s.getResourceId()))
|
||||
.map(this::toScheduledStep)
|
||||
.toList();
|
||||
}
|
||||
|
||||
@@ -115,7 +116,20 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider {
|
||||
query.where(byResource);
|
||||
|
||||
return em.createQuery(query).getResultStream()
|
||||
.map(s -> new ScheduledStep(s.getWorkflowId(), s.getScheduledStepId(), s.getResourceId()))
|
||||
.map(this::toScheduledStep)
|
||||
.toList();
|
||||
}
|
||||
|
||||
public List<ScheduledStep> getScheduledStepsByStep(String stepId) {
|
||||
CriteriaBuilder cb = em.getCriteriaBuilder();
|
||||
CriteriaQuery<WorkflowStateEntity> query = cb.createQuery(WorkflowStateEntity.class);
|
||||
Root<WorkflowStateEntity> stateRoot = query.from(WorkflowStateEntity.class);
|
||||
|
||||
Predicate byStep = cb.equal(stateRoot.get("scheduledStepId"), stepId);
|
||||
query.where(byStep);
|
||||
|
||||
return em.createQuery(query).getResultStream()
|
||||
.map(this::toScheduledStep)
|
||||
.toList();
|
||||
}
|
||||
|
||||
@@ -135,16 +149,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String workflowId, String resourceId) {
|
||||
WorkflowStateEntity.PrimaryKey pk = new WorkflowStateEntity.PrimaryKey(resourceId, workflowId);
|
||||
WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, pk);
|
||||
if (entity != null) {
|
||||
em.remove(entity);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String workflowId) {
|
||||
public void removeByWorkflow(String workflowId) {
|
||||
CriteriaBuilder cb = em.getCriteriaBuilder();
|
||||
CriteriaDelete<WorkflowStateEntity> delete = cb.createCriteriaDelete(WorkflowStateEntity.class);
|
||||
Root<WorkflowStateEntity> root = delete.from(WorkflowStateEntity.class);
|
||||
@@ -159,6 +164,14 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(String executionId) {
|
||||
WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, executionId);
|
||||
if (entity != null) {
|
||||
em.remove(entity);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAll() {
|
||||
CriteriaBuilder cb = em.getCriteriaBuilder();
|
||||
@@ -177,4 +190,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider {
|
||||
public void close() {
|
||||
}
|
||||
|
||||
private ScheduledStep toScheduledStep(WorkflowStateEntity entity) {
|
||||
return new ScheduledStep(entity.getWorkflowId(), entity.getScheduledStepId(), entity.getResourceId(), entity.getExecutionId());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,14 +30,15 @@ import java.util.Objects;
|
||||
*/
|
||||
@Entity
|
||||
@Table(name = "WORKFLOW_STATE")
|
||||
@IdClass(WorkflowStateEntity.PrimaryKey.class)
|
||||
public class WorkflowStateEntity {
|
||||
|
||||
@Id
|
||||
@Column(name = "EXECUTION_ID")
|
||||
private String executionId;
|
||||
|
||||
@Column(name = "RESOURCE_ID")
|
||||
private String resourceId;
|
||||
|
||||
@Id
|
||||
@Column(name = "WORKFLOW_ID")
|
||||
private String workflowId;
|
||||
|
||||
@@ -53,6 +54,14 @@ public class WorkflowStateEntity {
|
||||
@Column(name = "SCHEDULED_STEP_TIMESTAMP")
|
||||
private long scheduledStepTimestamp;
|
||||
|
||||
public String getExecutionId() {
|
||||
return executionId;
|
||||
}
|
||||
|
||||
public void setExecutionId(String executionId) {
|
||||
this.executionId = executionId;
|
||||
}
|
||||
|
||||
public String getResourceId() {
|
||||
return resourceId;
|
||||
}
|
||||
@@ -101,49 +110,6 @@ public class WorkflowStateEntity {
|
||||
this.scheduledStepTimestamp = scheduledStepTimestamp;
|
||||
}
|
||||
|
||||
public static class PrimaryKey implements Serializable {
|
||||
|
||||
private String resourceId;
|
||||
private String workflowId;
|
||||
|
||||
public PrimaryKey() {
|
||||
}
|
||||
|
||||
public PrimaryKey(String resourceId, String workflowId) {
|
||||
this.resourceId = resourceId;
|
||||
this.workflowId = workflowId;
|
||||
}
|
||||
|
||||
public String getResourceId() {
|
||||
return resourceId;
|
||||
}
|
||||
|
||||
public void setResourceId(String resourceId) {
|
||||
this.resourceId = resourceId;
|
||||
}
|
||||
|
||||
public String getWorkflowId() {
|
||||
return workflowId;
|
||||
}
|
||||
|
||||
public void setWorkflowId(String workflowId) {
|
||||
this.workflowId = workflowId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
PrimaryKey that = (PrimaryKey) o;
|
||||
return Objects.equals(resourceId, that.resourceId) && Objects.equals(workflowId, that.workflowId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(resourceId, workflowId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
||||
@@ -31,6 +31,9 @@
|
||||
|
||||
<changeSet id="40343-workflow-state-table" author="keycloak">
|
||||
<createTable tableName="WORKFLOW_STATE">
|
||||
<column name="EXECUTION_ID" type="VARCHAR(255)">
|
||||
<constraints nullable="false" />
|
||||
</column>
|
||||
<column name="RESOURCE_ID" type="VARCHAR(255)">
|
||||
<constraints nullable="false" />
|
||||
</column>
|
||||
@@ -44,9 +47,13 @@
|
||||
</createTable>
|
||||
|
||||
<addPrimaryKey
|
||||
constraintName="PK_WORKFLOW_STEP_STATE"
|
||||
constraintName="PK_WORKFLOW_STATE"
|
||||
tableName="WORKFLOW_STATE"
|
||||
columnNames="RESOURCE_ID, WORKFLOW_ID" />
|
||||
columnNames="EXECUTION_ID" />
|
||||
|
||||
<addUniqueConstraint constraintName="UQ_WORKFLOW_RESOURCE"
|
||||
tableName="WORKFLOW_STATE"
|
||||
columnNames="WORKFLOW_ID, RESOURCE_ID"/>
|
||||
|
||||
<createIndex indexName="IDX_WORKFLOW_STATE_STEP"
|
||||
tableName="WORKFLOW_STATE">
|
||||
|
||||
+10
-10
@@ -33,25 +33,23 @@ public interface WorkflowStateProvider extends Provider {
|
||||
*/
|
||||
void removeByResource(String resourceId);
|
||||
|
||||
/**
|
||||
* Removes the record identified by the specified {@code workflowId} and {@code resourceId}.
|
||||
* @param workflowId the id of the workflow.
|
||||
* @param resourceId the id of the resource.
|
||||
*/
|
||||
void remove(String workflowId, String resourceId);
|
||||
|
||||
/**
|
||||
* Removes any record identified by the specified {@code workflowId}.
|
||||
* @param workflowId the id of the workflow.
|
||||
*/
|
||||
void remove(String workflowId);
|
||||
void removeByWorkflow(String workflowId);
|
||||
|
||||
/**
|
||||
* Removes the record identified by the specified {@code executionId}.
|
||||
*/
|
||||
void remove(String executionId);
|
||||
|
||||
/**
|
||||
* Deletes all state records associated with the current realm bound to the session.
|
||||
*/
|
||||
void removeAll();
|
||||
|
||||
void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId);
|
||||
void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId, String executionId);
|
||||
|
||||
ScheduledStep getScheduledStep(String workflowId, String resourceId);
|
||||
|
||||
@@ -59,6 +57,8 @@ public interface WorkflowStateProvider extends Provider {
|
||||
|
||||
List<ScheduledStep> getScheduledStepsByWorkflow(String workflowId);
|
||||
|
||||
List<ScheduledStep> getScheduledStepsByStep(String stepId);
|
||||
|
||||
default List<ScheduledStep> getScheduledStepsByWorkflow(Workflow workflow) {
|
||||
if (workflow == null) {
|
||||
return List.of();
|
||||
@@ -69,5 +69,5 @@ public interface WorkflowStateProvider extends Provider {
|
||||
|
||||
List<ScheduledStep> getDueScheduledSteps(Workflow workflow);
|
||||
|
||||
record ScheduledStep(String workflowId, String stepId, String resourceId) {}
|
||||
record ScheduledStep(String workflowId, String stepId, String resourceId, String executionId) {}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package org.keycloak.models.workflow;
|
||||
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_ENABLED;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_ERROR;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_NAME;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_RECURRING;
|
||||
|
||||
import java.util.List;
|
||||
@@ -59,6 +60,10 @@ public class Workflow {
|
||||
return config;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return config != null ? config.getFirst(CONFIG_NAME) : null;
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return config != null && Boolean.parseBoolean(config.getFirstOrDefault(CONFIG_ENABLED, "true"));
|
||||
}
|
||||
|
||||
+91
@@ -0,0 +1,91 @@
|
||||
package org.keycloak.models.workflow;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static java.util.Optional.ofNullable;
|
||||
|
||||
public class WorkflowExecutionContext {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(WorkflowExecutionContext.class);
|
||||
|
||||
private String executionId;
|
||||
private String resourceId;
|
||||
private Workflow workflow;
|
||||
private List<WorkflowStep> steps;
|
||||
|
||||
// variable that keep track of execution steps
|
||||
private int lastExecutedStepIndex = -1;
|
||||
|
||||
public WorkflowExecutionContext(Workflow workflow, List<WorkflowStep> steps, String resourceId) {
|
||||
this.workflow = workflow;
|
||||
this.steps = ofNullable(steps).orElse(List.of());
|
||||
this.resourceId = resourceId;
|
||||
}
|
||||
|
||||
public WorkflowExecutionContext(Workflow workflow, List<WorkflowStep> steps, String resourceId, String stepId, String executionId) {
|
||||
this(workflow, steps, resourceId);
|
||||
this.executionId = executionId;
|
||||
if (stepId != null) {
|
||||
for (int i = 0; i < steps.size(); i++) {
|
||||
if (steps.get(i).getId().equals(stepId)) {
|
||||
this.lastExecutedStepIndex = i - 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void init() {
|
||||
if (this.executionId == null) {
|
||||
this.executionId = UUID.randomUUID().toString();
|
||||
logger.debugf("Started workflow '%s' for resource %s (execution id: %s)", this.workflow.getName(), this.resourceId, this.executionId);
|
||||
}
|
||||
}
|
||||
|
||||
public void success(WorkflowStep step) {
|
||||
logger.debugf("Step %s completed successfully (execution id: %s)", step.getProviderId(), executionId);
|
||||
}
|
||||
|
||||
public void fail(WorkflowStep step, String errorMessage) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Step %s failed (execution id: %s)");
|
||||
if (errorMessage != null) {
|
||||
sb.append(" - error message: %s");
|
||||
logger.debugf(sb.toString(), step.getProviderId(), executionId, errorMessage);
|
||||
}
|
||||
else {
|
||||
logger.debugf(sb.toString(), step.getProviderId(), executionId);
|
||||
}
|
||||
}
|
||||
|
||||
public void complete() {
|
||||
logger.debugf("Workflow '%s' completed for resource %s (execution id: %s)", workflow.getName(), resourceId, executionId);
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
logger.debugf("Workflow '%s' cancelled for resource %s (execution id: %s)", workflow.getName(), resourceId, executionId);
|
||||
}
|
||||
|
||||
public boolean hasNextStep() {
|
||||
return lastExecutedStepIndex + 1 < steps.size();
|
||||
}
|
||||
|
||||
public WorkflowStep getNextStep() {
|
||||
if (lastExecutedStepIndex + 1 < steps.size()) {
|
||||
return steps.get(++lastExecutedStepIndex);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void restart() {
|
||||
logger.debugf("Restarted workflow '%s' for resource %s (execution id: %s)",workflow.getName(), resourceId, executionId);
|
||||
this.lastExecutedStepIndex = -1;
|
||||
}
|
||||
|
||||
public String getExecutionId() {
|
||||
return this.executionId;
|
||||
}
|
||||
}
|
||||
+10
@@ -0,0 +1,10 @@
|
||||
package org.keycloak.models.workflow;
|
||||
|
||||
import org.keycloak.models.ModelException;
|
||||
|
||||
public class WorkflowExecutionException extends ModelException {
|
||||
|
||||
public WorkflowExecutionException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -47,6 +47,7 @@ import static java.util.Optional.ofNullable;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_CONDITIONS;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_ENABLED;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_NAME;
|
||||
import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_RECURRING;
|
||||
|
||||
public class WorkflowsManager {
|
||||
|
||||
@@ -148,10 +149,6 @@ public class WorkflowsManager {
|
||||
return toStep(component);
|
||||
}
|
||||
|
||||
private WorkflowStep getFirstStep(Workflow workflow) {
|
||||
return getSteps(workflow.getId()).get(0);
|
||||
}
|
||||
|
||||
private WorkflowProvider getWorkflowProvider(Workflow workflow) {
|
||||
ComponentFactory<?, ?> factory = (ComponentFactory<?, ?>) session.getKeycloakSessionFactory()
|
||||
.getProviderFactory(WorkflowProvider.class, workflow.getProviderId());
|
||||
@@ -198,8 +195,8 @@ public class WorkflowsManager {
|
||||
}
|
||||
|
||||
public void processEvent(List<Workflow> workflows, WorkflowEvent event) {
|
||||
List<String> currentlyAssignedWorkflows = workflowStateProvider.getScheduledStepsByResource(event.getResourceId())
|
||||
.stream().map(ScheduledStep::workflowId).toList();
|
||||
Map<String, ScheduledStep> scheduledSteps = workflowStateProvider.getScheduledStepsByResource(event.getResourceId())
|
||||
.stream().collect(HashMap::new, (m, v) -> m.put(v.workflowId(), v), HashMap::putAll);
|
||||
|
||||
// iterate through the workflows, and for those not yet assigned to the user check if they can be assigned
|
||||
workflows.stream()
|
||||
@@ -207,46 +204,38 @@ public class WorkflowsManager {
|
||||
.forEach(workflow -> {
|
||||
WorkflowProvider provider = getWorkflowProvider(workflow);
|
||||
try {
|
||||
if (!currentlyAssignedWorkflows.contains(workflow.getId())) {
|
||||
// if workflow is not active for the resource, check if the provider allows activating based on the event
|
||||
// if workflow is not active for the resource, check if the provider allows activating based on the event
|
||||
if (!scheduledSteps.containsKey(workflow.getId())) {
|
||||
if (provider.activateOnEvent(event)) {
|
||||
WorkflowStep firstStep = getFirstStep(workflow);
|
||||
for (WorkflowStep step : getSteps(workflow.getId())) {
|
||||
// If the workflow has a notBefore set, schedule the first step with it
|
||||
if (step.getId().equals(firstStep.getId()) && workflow.getNotBefore() != null && workflow.getNotBefore() > 0) {
|
||||
log.debugf("Scheduling first step %s of workflow %s for resource %s based on on event %s with notBefore %d",
|
||||
step.getId(), workflow.getId(), event.getResourceId(), event.getOperation(), workflow.getNotBefore());
|
||||
Long originalAfter = step.getAfter();
|
||||
try {
|
||||
step.setAfter(workflow.getNotBefore());
|
||||
workflowStateProvider.scheduleStep(workflow, step, event.getResourceId());
|
||||
continue;
|
||||
} finally {
|
||||
// restore the original after value
|
||||
step.setAfter(originalAfter);
|
||||
}
|
||||
}
|
||||
if (step.getAfter() > 0) {
|
||||
// If a step has a time defined, schedule it and stop processing the other steps of workflow
|
||||
log.debugf("Scheduling step %s of workflow %s for resource %s based on event %s",
|
||||
step.getId(), workflow.getId(), event.getResourceId(), event.getOperation());
|
||||
workflowStateProvider.scheduleStep(workflow, step, event.getResourceId());
|
||||
break;
|
||||
} else {
|
||||
// Otherwise run the step right away
|
||||
log.debugf("Running step %s of workflow %s for resource %s based on event %s",
|
||||
step.getId(), workflow.getId(), event.getResourceId(), event.getOperation());
|
||||
KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), session.getContext(), s ->
|
||||
getStepProvider(step).run(List.of(event.getResourceId()))
|
||||
);
|
||||
WorkflowExecutionContext context = buildAndInitContext(workflow, event.getResourceId());
|
||||
// If the workflow has a notBefore set, schedule the first step with it
|
||||
if (context.hasNextStep() && workflow.getNotBefore() != null && workflow.getNotBefore() > 0) {
|
||||
WorkflowStep step = context.getNextStep();
|
||||
log.debugf("Scheduling first step '%s' of workflow '%s' for resource %s based on on event %s with notBefore %d",
|
||||
step.getProviderId(), workflow.getName(), event.getResourceId(), event.getOperation(), workflow.getNotBefore());
|
||||
Long originalAfter = step.getAfter();
|
||||
try {
|
||||
step.setAfter(workflow.getNotBefore());
|
||||
workflowStateProvider.scheduleStep(workflow, step, event.getResourceId(), context.getExecutionId());
|
||||
} finally {
|
||||
// restore the original after value
|
||||
step.setAfter(originalAfter);
|
||||
}
|
||||
}
|
||||
else {
|
||||
// process the workflow steps, scheduling or running them as needed
|
||||
processWorkflow(workflow, context, event.getResourceId());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// workflow is active for the resource, check if the provider wants to reset or deactivate it based on the event
|
||||
WorkflowExecutionContext context = buildFromScheduledStep(scheduledSteps.get(workflow.getId()));
|
||||
if (provider.resetOnEvent(event)) {
|
||||
workflowStateProvider.scheduleStep(workflow, getFirstStep(workflow), event.getResourceId());
|
||||
context.restart();
|
||||
processWorkflow(workflow, context, event.getResourceId());
|
||||
} else if (provider.deactivateOnEvent(event)) {
|
||||
workflowStateProvider.remove(workflow.getId(), event.getResourceId());
|
||||
context.cancel();
|
||||
workflowStateProvider.remove(context.getExecutionId());
|
||||
}
|
||||
}
|
||||
} catch (WorkflowInvalidStateException e) {
|
||||
@@ -259,44 +248,23 @@ public class WorkflowsManager {
|
||||
}
|
||||
|
||||
public void runScheduledSteps() {
|
||||
this.getWorkflows().stream().filter(Workflow::isEnabled).forEach(workflow -> {
|
||||
this.getWorkflows().stream().filter(Workflow::isEnabled).forEach(workflow -> {
|
||||
|
||||
for (ScheduledStep scheduled : workflowStateProvider.getDueScheduledSteps(workflow)) {
|
||||
List<WorkflowStep> steps = getSteps(workflow.getId());
|
||||
|
||||
for (int i = 0; i < steps.size(); i++) {
|
||||
WorkflowStep currentStep = steps.get(i);
|
||||
|
||||
if (currentStep.getId().equals(scheduled.stepId())) {
|
||||
getStepProvider(currentStep).run(List.of(scheduled.resourceId()));
|
||||
|
||||
int nextIndex = i + 1;
|
||||
// Process subsequent steps: run immediately if no time condition, schedule if time condition
|
||||
while (nextIndex < steps.size()) {
|
||||
WorkflowStep nextStep = steps.get(nextIndex);
|
||||
if (nextStep.getAfter() > 0) {
|
||||
workflowStateProvider.scheduleStep(workflow, nextStep, scheduled.resourceId());
|
||||
break;
|
||||
} else {
|
||||
getStepProvider(nextStep).run(List.of(scheduled.resourceId()));
|
||||
nextIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
if (nextIndex == steps.size()) {
|
||||
// this was the last step, check if the workflow is recurring - i.e. if we need to schedule the first step again
|
||||
if (workflow.isRecurring()) {
|
||||
WorkflowStep firstStep = getFirstStep(workflow);
|
||||
workflowStateProvider.scheduleStep(workflow, firstStep, scheduled.resourceId());
|
||||
} else {
|
||||
// not recurring, remove the state record
|
||||
workflowStateProvider.remove(workflow.getId(), scheduled.resourceId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (ScheduledStep scheduled : workflowStateProvider.getDueScheduledSteps(workflow)) {
|
||||
WorkflowExecutionContext context = buildFromScheduledStep(scheduled);
|
||||
if (!context.hasNextStep()) {
|
||||
log.warnf("Could not find step %s in workflow %s for resource %s. Removing the workflow state.",
|
||||
scheduled.stepId(), scheduled.workflowId(), scheduled.resourceId());
|
||||
workflowStateProvider.remove(scheduled.executionId());
|
||||
continue;
|
||||
}
|
||||
});
|
||||
// run the scheduled step that is due
|
||||
this.runWorkflowStep(context, context.getNextStep(), scheduled.resourceId());
|
||||
|
||||
// now process the subsequent steps, scheduling or running them as needed
|
||||
processWorkflow(workflow, context, scheduled.resourceId());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void removeWorkflow(String id) {
|
||||
@@ -307,7 +275,7 @@ public class WorkflowsManager {
|
||||
realm.getComponentsStream(workflow.getId(), WorkflowStepProvider.class.getName()).forEach(realm::removeComponent);
|
||||
realm.removeComponent(workflow);
|
||||
});
|
||||
workflowStateProvider.remove(id);
|
||||
workflowStateProvider.removeByWorkflow(id);
|
||||
}
|
||||
|
||||
public Workflow getWorkflow(String id) {
|
||||
@@ -431,6 +399,14 @@ public class WorkflowsManager {
|
||||
private void validateWorkflow(WorkflowRepresentation rep) {
|
||||
validateEvents(rep.getOnValues());
|
||||
validateEvents(rep.getOnEventsReset());
|
||||
// a recurring workflow must have at least one scheduled step to prevent an infinite loop of immediate executions
|
||||
if (rep.getConfig() != null && Boolean.parseBoolean(rep.getConfig().getFirstOrDefault(CONFIG_RECURRING, "false"))) {
|
||||
boolean hasScheduledStep = ofNullable(rep.getSteps()).orElse(List.of()).stream()
|
||||
.anyMatch(step -> Integer.parseInt(ofNullable(step.getAfter()).orElse("0")) > 0);
|
||||
if (!hasScheduledStep) {
|
||||
throw new WorkflowInvalidStateException("A recurring workflow must have at least one step with a time delay.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void validateEvents(List<String> events) {
|
||||
@@ -478,8 +454,6 @@ public class WorkflowsManager {
|
||||
step.setPriority(targetPosition + 1);
|
||||
WorkflowStep addedStep = addStep(workflow, step);
|
||||
|
||||
updateScheduledStepsAfterStepChange(workflow.getId());
|
||||
|
||||
log.debugf("Added step %s to workflow %s at position %d", addedStep.getId(), workflow.getId(), targetPosition);
|
||||
return addedStep;
|
||||
}
|
||||
@@ -499,7 +473,7 @@ public class WorkflowsManager {
|
||||
|
||||
// Reorder remaining steps and update state
|
||||
reorderAllSteps(workflow.getId());
|
||||
updateScheduledStepsAfterStepChange(workflow.getId());
|
||||
updateScheduledStepsAfterStepChange(workflow, stepId);
|
||||
|
||||
log.debugf("Removed step %s from workflow %s", stepId, workflow.getId());
|
||||
}
|
||||
@@ -532,22 +506,12 @@ public class WorkflowsManager {
|
||||
realm.updateComponent(component);
|
||||
}
|
||||
|
||||
private void updateScheduledStepsAfterStepChange(String workflowId) {
|
||||
List<WorkflowStep> steps = getSteps(workflowId);
|
||||
private void updateScheduledStepsAfterStepChange(Workflow workflow, String stepId) {
|
||||
|
||||
if (steps.isEmpty()) {
|
||||
workflowStateProvider.remove(workflowId);
|
||||
return;
|
||||
}
|
||||
|
||||
for (ScheduledStep scheduled : workflowStateProvider.getScheduledStepsByWorkflow(workflowId)) {
|
||||
boolean stepStillExists = steps.stream()
|
||||
.anyMatch(step -> step.getId().equals(scheduled.stepId()));
|
||||
|
||||
if (!stepStillExists) {
|
||||
Workflow workflow = getWorkflow(workflowId);
|
||||
workflowStateProvider.scheduleStep(workflow, steps.get(0), scheduled.resourceId());
|
||||
}
|
||||
for (ScheduledStep scheduled : workflowStateProvider.getScheduledStepsByStep(stepId)) {
|
||||
WorkflowExecutionContext context = buildFromScheduledStep(scheduled);
|
||||
context.restart();
|
||||
workflowStateProvider.scheduleStep(workflow, context.getNextStep(), scheduled.resourceId(), context.getExecutionId());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -586,4 +550,61 @@ public class WorkflowsManager {
|
||||
|
||||
return providerFactory;
|
||||
}
|
||||
|
||||
private WorkflowExecutionContext buildAndInitContext(Workflow workflow, String resourceId) {
|
||||
WorkflowExecutionContext context = new WorkflowExecutionContext(workflow, getSteps(workflow.getId()), resourceId);
|
||||
context.init();
|
||||
return context;
|
||||
}
|
||||
|
||||
private WorkflowExecutionContext buildFromScheduledStep(ScheduledStep scheduledStep) {
|
||||
return new WorkflowExecutionContext(
|
||||
getWorkflow(scheduledStep.workflowId()),
|
||||
getSteps(scheduledStep.workflowId()),
|
||||
scheduledStep.resourceId(),
|
||||
scheduledStep.stepId(),
|
||||
scheduledStep.executionId()
|
||||
);
|
||||
}
|
||||
|
||||
private void processWorkflow(Workflow workflow, WorkflowExecutionContext context, String resourceId) {
|
||||
while (context.hasNextStep()) {
|
||||
WorkflowStep step = context.getNextStep();
|
||||
if (step.getAfter() > 0) {
|
||||
// If a step has a time defined, schedule it and stop processing the other steps of workflow
|
||||
log.debugf("Scheduling step %s to run in %d ms for resource %s (execution id: %s)",
|
||||
step.getProviderId(), step.getAfter(), resourceId, context.getExecutionId());
|
||||
workflowStateProvider.scheduleStep(workflow, step, resourceId, context.getExecutionId());
|
||||
return;
|
||||
} else {
|
||||
// Otherwise run the step right away
|
||||
|
||||
runWorkflowStep(context, step, resourceId);
|
||||
}
|
||||
}
|
||||
|
||||
// if we've reached the end of the workflow, check if it is recurring or if we can mark it as completed
|
||||
if (workflow.isRecurring()) {
|
||||
// if the workflow is recurring, restart it
|
||||
context.restart();
|
||||
processWorkflow(workflow, context, resourceId);
|
||||
} else {
|
||||
// not recurring, remove the state record
|
||||
context.complete();
|
||||
workflowStateProvider.remove(context.getExecutionId());
|
||||
}
|
||||
}
|
||||
|
||||
private void runWorkflowStep(WorkflowExecutionContext context, WorkflowStep step, String resourceId) {
|
||||
log.debugf("Running step %s on resource %s (execution id: %s)", step.getProviderId(), resourceId, context.getExecutionId());
|
||||
KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), session.getContext(), s -> {
|
||||
try {
|
||||
getStepProvider(step).run(List.of(resourceId));
|
||||
context.success(step);
|
||||
} catch(WorkflowExecutionException e) {
|
||||
context.fail(step, e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user