Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.controller;
package io.javaoperatorsdk.operator.processing.event.source;

public enum ResourceAction {
ADDED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
Expand Down Expand Up @@ -83,7 +84,7 @@ public synchronized void start() {
}

@Override
public synchronized void handleEvent(
protected synchronized void handleEvent(
ResourceAction action,
T resource,
T oldResource,
Expand Down Expand Up @@ -139,13 +140,15 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso

@Override
public void onAdd(T resource) {
var handling = temporaryResourceCache.onAddOrUpdateEvent(resource);
var handling = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, resource, null);
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
}

@Override
public void onUpdate(T oldCustomResource, T newCustomResource) {
var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
var handling =
temporaryResourceCache.onAddOrUpdateEvent(
ResourceAction.UPDATED, newCustomResource, oldCustomResource);
handleEvent(
ResourceAction.UPDATED,
newCustomResource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

/**
* Extends ResourceEvent for informer Delete events, it holds also information if the final state is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

public class ResourceEvent extends Event {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

/** Used only for resource event filtering. */
public class ExtendedResourceEvent extends ResourceEvent {

private HasMetadata previousResource;

public ExtendedResourceEvent(
ResourceAction action,
ResourceID resourceID,
HasMetadata latestResource,
HasMetadata previousResource) {
super(action, resourceID, latestResource);
this.previousResource = previousResource;
}

public Optional<HasMetadata> getPreviousResource() {
return Optional.ofNullable(previousResource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
Expand Down Expand Up @@ -107,7 +107,7 @@ public void onAdd(R newResource) {
resourceType().getSimpleName(),
newResource.getMetadata().getResourceVersion());
}
onAddOrUpdate(Operation.ADD, newResource, null);
onAddOrUpdate(ResourceAction.ADDED, newResource, null);
}

@Override
Expand All @@ -120,7 +120,7 @@ public void onUpdate(R oldObject, R newObject) {
newObject.getMetadata().getResourceVersion(),
oldObject.getMetadata().getResourceVersion());
}
onAddOrUpdate(Operation.UPDATE, newObject, oldObject);
onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject);
}

@Override
Expand All @@ -139,7 +139,7 @@ public synchronized void onDelete(R resource, boolean b) {
}

@Override
public void handleEvent(
protected void handleEvent(
ResourceAction action,
R resource,
R oldResource,
Expand All @@ -156,27 +156,27 @@ public synchronized void start() {
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
}

private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) {
private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) {
primaryToSecondaryIndex.onAddOrUpdate(newObject);
var resourceID = ResourceID.fromResource(newObject);

var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject);
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);

if (eventHandling != EventHandling.NEW) {
log.debug(
"{} event propagation for {}. Resource ID: {}",
eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping",
operation,
action,
ResourceID.fromResource(newObject));
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
log.debug(
"Propagating event for {}, resource with same version not result of a reconciliation."
+ " Resource ID: {}",
operation,
action,
resourceID);
propagateEvent(newObject);
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
}
}

Expand Down Expand Up @@ -251,11 +251,11 @@ public boolean allowsNamespaceChanges() {
return configuration().followControllerNamespaceChanges();
}

private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
private boolean eventAcceptedByFilter(ResourceAction action, R newObject, R oldObject) {
if (genericFilter != null && !genericFilter.accept(newObject)) {
return false;
}
if (operation == Operation.ADD) {
if (action == ResourceAction.ADDED) {
return onAddFilter == null || onAddFilter.accept(newObject);
} else {
return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject);
Expand All @@ -266,9 +266,4 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
return (onDeleteFilter == null || onDeleteFilter.accept(resource, b))
&& (genericFilter == null || genericFilter.accept(resource));
}

private enum Operation {
ADD,
UPDATE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.*;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -90,6 +90,7 @@ public void changeNamespaces(Set<String> namespaces) {
* Also makes sure that the even produced by this update is filtered, thus does not trigger the
* reconciliation.
*/
@SuppressWarnings("unchecked")
public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<R> updateMethod) {
ResourceID id = ResourceID.fromResource(resourceToUpdate);
if (log.isDebugEnabled()) {
Expand All @@ -107,27 +108,38 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
id,
updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
var updatedForLambda = updatedResource;
res.ifPresent(
res.ifPresentOrElse(
r -> {
R latestResource = (R) r.getResource().orElseThrow();
// for update we need to have a historic resource, this might be improved to mimic more
// realistic scenario

// as previous resource version we use the one from successful update, since
// we process new event here only if that is more recent then the event from our update.
// Note that this is equivalent with the scenario when an informer watch connection
// would
// reconnect and loose some events in between.
// If that update was not successful we still record the previous version from the
// actual
// event in the ExtendedResourceEvent.
R extendedResourcePrevVersion =
(r instanceof ExtendedResourceEvent)
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
: null;
R prevVersionOfResource =
updatedForLambda != null
? updatedForLambda
: (r.getAction() == ResourceAction.UPDATED ? latestResource : null);
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
handleEvent(
r.getAction(),
latestResource,
prevVersionOfResource,
!(r instanceof ResourceDeleteEvent)
|| ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown(),
(r instanceof ResourceDeleteEvent)
? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown()
: null,
false);
});
},
() -> log.debug("No new event present after the filtering update; id: {}", id));
}
}

public abstract void handleEvent(
protected abstract void handleEvent(
ResourceAction action,
R resource,
R oldResource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

Expand Down Expand Up @@ -94,17 +94,23 @@ public synchronized Optional<ResourceEvent> doneEventFilterModify(
}

public void onDeleteEvent(T resource, boolean unknownState) {
onEvent(resource, unknownState, true);
onEvent(ResourceAction.DELETED, resource, null, unknownState, true);
}

/**
* @return true if the resourceVersion was obsolete
*/
public EventHandling onAddOrUpdateEvent(T resource) {
return onEvent(resource, false, false);
public EventHandling onAddOrUpdateEvent(
ResourceAction action, T resource, T prevResourceVersion) {
return onEvent(action, resource, prevResourceVersion, false, false);
}

private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) {
private synchronized EventHandling onEvent(
ResourceAction action,
T resource,
T prevResourceVersion,
boolean unknownState,
boolean delete) {
if (!comparableResourceVersions) {
return EventHandling.NEW;
}
Expand Down Expand Up @@ -139,8 +145,7 @@ private synchronized EventHandling onEvent(T resource, boolean unknownState, boo
ed.setLastEvent(
delete
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
: new ResourceEvent(
ResourceAction.UPDATED, resourceId, resource)); // todo true action
: new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion));
return EventHandling.DEFER;
} else {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.junit.jupiter.api.Test;

import io.javaoperatorsdk.operator.TestUtils;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
Expand Down
Loading