Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
bin
charts/**/charts
charts/koperator/requirements.lock

charts/kafka-operator/ingress
# Test binary, build with `go test -c`
*.test

Expand Down
19 changes: 16 additions & 3 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,22 @@ type KafkaClusterSpec struct {
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=false
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
// DebugEnabled is used to decide whether to create a separate loadbalancer services for the
// Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
// cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
// a kafkaCluster instance on a Kind Cluster.
// +kubebuilder:default=false
// +optional
DebugEnabled bool `json:"debugEnabled"`
// Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
// This Disables CPU and Memory request reconciliation from the desired state defined in
// the KafkaCluster to the current state in the Kubernetes Cluster
// +kubebuilder:default=false
// +optional
ScaleOpsEnabled bool `json:"scaleOpsEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
Expand Down
15 changes: 15 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down Expand Up @@ -23735,6 +23743,13 @@ spec:
required:
- failureThreshold
type: object
scaleOpsEnabled:
default: false
description: |-
Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
This Disables CPU and Memory request reconciliation from the desired state defined in
the KafkaCluster to the current state in the Kubernetes Cluster
type: boolean
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
Expand Down
15 changes: 15 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down Expand Up @@ -23735,6 +23743,13 @@ spec:
required:
- failureThreshold
type: object
scaleOpsEnabled:
default: false
description: |-
Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
This Disables CPU and Memory request reconciliation from the desired state defined in
the KafkaCluster to the current state in the Kubernetes Cluster
type: boolean
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
Expand Down
10 changes: 10 additions & 0 deletions config/samples/simpleZookeeper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
name: zookeeper-server
namespace: zookeeper
spec:
replicas: 3
persistence:
reclaimPolicy: Delete

3 changes: 2 additions & 1 deletion config/samples/simplekafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ metadata:
controller-tools.k8s.io: "1.0"
name: kafka
spec:
debugEnabled: true
kRaft: false
monitoringConfig:
jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0"
headlessServiceEnabled: true
headlessServiceEnabled: false
zkAddresses:
- "zookeeper-server-client.zookeeper:2181"
propagateLabels: false
Expand Down
22 changes: 22 additions & 0 deletions config/scaleops/CustomOwnerGrouping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

kind: CustomOwnerGrouping
apiVersion: analysis.scaleops.sh/v1alpha1
metadata:
name: kafkabroker
namespace: scaleops-system
spec:
groupBy:
positiveRegexMatch: false
groupBys:
- labels:
- 'isBrokerNode: true'
positiveRegexMatch: false
topOwnerController:
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
displayOptions:
hideGeneratedSuffix: true
fields:
- ownerName
defaultPolicy: kafka-brokers
enabled: true
9 changes: 8 additions & 1 deletion pkg/resources/cruisecontrol/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
)

func (r *Reconciler) service() runtime.Object {
return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMeta(
fmt.Sprintf(serviceNameTemplate, r.KafkaCluster.Name),
apiutil.MergeLabels(ccLabelSelector(r.KafkaCluster.Name), r.KafkaCluster.Labels),
r.KafkaCluster,
),
Spec: corev1.ServiceSpec{
Selector: ccLabelSelector(r.KafkaCluster.Name),
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: "cc",
Expand All @@ -50,4 +51,10 @@ func (r *Reconciler) service() runtime.Object {
},
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
8 changes: 7 additions & 1 deletion pkg/resources/kafka/allBrokerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Reconciler) allBrokerService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(
fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
apiutil.LabelsForKafka(r.KafkaCluster.GetName()),
Expand All @@ -52,4 +52,10 @@ func (r *Reconciler) allBrokerService() runtime.Object {
Ports: usedPorts,
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
41 changes: 41 additions & 0 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType)
}
switch {
//initial run - Create Pod
case len(podList.Items) == 0:
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(desiredPod); err != nil {
return errors.WrapIf(err, "could not apply last state to annotation")
Expand Down Expand Up @@ -941,6 +942,42 @@ func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, broke
return nil
}

// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers
// with the values from currentPod so that request-only changes do not trigger a pod restart.
func syncResourceRequests(desiredPod, currentPod *corev1.Pod) {
syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers)
syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers)
syncPodAffinities(desiredPod, currentPod)
}

func syncPodAffinities(desiredPod, currentPod *corev1.Pod) {
panic("unimplemented")
}

func syncContainerResourceRequests(desired, current []corev1.Container) {
index := make(map[string]corev1.ResourceList, len(current))
for _, c := range current {
index[c.Name] = c.Resources.Requests
}
for i := range desired {
c := &desired[i]
reqs, ok := index[c.Name]
if !ok {
continue
}
if c.Resources.Requests == nil {
c.Resources.Requests = make(corev1.ResourceList)
}
for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
if val, exists := reqs[res]; exists {
c.Resources.Requests[res] = val
} else {
delete(c.Resources.Requests, res)
}
}
}
}

//gocyclo:ignore
func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPod *corev1.Pod, desiredType reflect.Type) error {
// Since toleration does not support patchStrategy:"merge,retainKeys",
Expand All @@ -957,6 +994,10 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
}
desiredPod.Spec.Tolerations = uniqueTolerations
}
// Ignore CPU/memory request diffs — changing requests does not require a pod restart.
if r.KafkaCluster.Spec.ScaleOpsEnabled {
syncResourceRequests(desiredPod, currentPod)
}
// Check if the resource actually updated or if labels match TaintedBrokersSelector
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
switch {
Expand Down
3 changes: 3 additions & 0 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,3 +1986,6 @@
})
}
}

func TestScaleOps(t. *testing.T) {

Check failure on line 1990 in pkg/resources/kafka/kafka_test.go

View workflow job for this annotation

GitHub Actions / Build

expected 'IDENT', found '*' (typecheck)

Check failure on line 1990 in pkg/resources/kafka/kafka_test.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected *, expected name (typecheck)

Check failure on line 1991 in pkg/resources/kafka/kafka_test.go

View workflow job for this annotation

GitHub Actions / Build

expected '}', found 'EOF' (typecheck)
6 changes: 5 additions & 1 deletion pkg/resources/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Protocol: corev1.ProtocolTCP,
})

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(fmt.Sprintf("%s-%d", r.KafkaCluster.Name, id),
apiutil.MergeLabels(
apiutil.LabelsForKafka(r.KafkaCluster.Name),
Expand All @@ -61,4 +61,8 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Ports: usedPorts,
},
}
if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}
return svc
}
Loading
Loading