diff --git a/charts/tidepool/charts/clinic/templates/5-patients-source-kafka-connector.yaml b/charts/tidepool/charts/clinic/templates/5-patients-source-kafka-connector.yaml index 6857b9b7..98cf3ea3 100644 --- a/charts/tidepool/charts/clinic/templates/5-patients-source-kafka-connector.yaml +++ b/charts/tidepool/charts/clinic/templates/5-patients-source-kafka-connector.yaml @@ -14,8 +14,8 @@ spec: collection: patients connection.uri: {{ .Values.global.kafka.connect.mongoConnectionUri }} copy.existing: false - pipeline: '[ {$project: {"fullDocument.summary": 0}} ]' - startup.mode.copy.existing.pipeline: '[ {$project: {"fullDocument.summary": 0}} ]' + pipeline: '[ {$project: {"fullDocument.summary": 0, "updateDescription.updatedFields.summary": 0}} ]' + startup.mode.copy.existing.pipeline: '[ {$project: {"fullDocument.summary": 0, "updateDescription.updatedFields.summary": 0}} ]' database: clinic key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false diff --git a/charts/tidepool/charts/kafka/templates/2-users-source-kafka-connector.yaml b/charts/tidepool/charts/kafka/templates/2-users-source-kafka-connector.yaml index 85717257..016741ec 100644 --- a/charts/tidepool/charts/kafka/templates/2-users-source-kafka-connector.yaml +++ b/charts/tidepool/charts/kafka/templates/2-users-source-kafka-connector.yaml @@ -27,12 +27,31 @@ spec: value.converter: 'org.apache.kafka.connect.json.JsonConverter' value.converter.schemas.enable: false snapshot.mode: {{ .Values.keycloak.snapshotMode }} - table.include.list: 'public.user_entity,public.user_role_mapping,public.keycloak_role,public.user_attribute' - transforms: 'filter,filter_user_attr' + table.include.list: 'public.user_entity,public.user_role_mapping,public.keycloak_role,public.user_attribute,public.tidepool_user_activity_event' + # Key the user-activity outbox by user_id (rather than the row's PK) so every + # event for a user lands on one partition and is consumed in commit order. This + # is what lets the clinic-worker consumer apply updates as last-writer-wins + # safely; combined with snapshot.mode=never it removes out-of-order/replay + # regressions. Other tables keep their default primary-key based message key. + message.key.columns: 'public.tidepool_user_activity_event:user_id' + transforms: 'filter,user_activity_filter,filter_user_attr' transforms.filter.type: 'io.debezium.transforms.Filter' transforms.filter.language: 'jsr223.groovy' transforms.filter.topic.regex: '.+\.public.(user_entity|keycloak_role)' transforms.filter.condition: "value.op && (((value.op == 'r' || value.op == 'c' || value.op == 'u') && (value.after && value.after.realm_id && value.after.realm_id == '{{ $realm }}')) || (value.op == 'd'))" + # Realm filter scoped to the user-activity outbox only. Keeps inserts (and any + # snapshot reads/updates) for the configured realm and drops everything else, + # including pruning deletes (their `after` is null, so they fail the check) — + # the clinic-worker consumer ignores deletes anyway. Records on other topics do + # not match this regex and pass through untouched. + transforms.user_activity_filter.type: 'io.debezium.transforms.Filter' + transforms.user_activity_filter.language: 'jsr223.groovy' + transforms.user_activity_filter.topic.regex: '.+\.public.tidepool_user_activity_event' + # Drop pruning-delete tombstones (null value) for this topic instead of the + # default 'keep', so they don't reach the consumer. Scoped to this filter's + # topic.regex, so the keycloak tables' tombstones are unaffected. + transforms.user_activity_filter.null.handling.mode: 'drop' + transforms.user_activity_filter.condition: "value.op && (value.op == 'r' || value.op == 'c' || value.op == 'u') && value.after && value.after.realm_id && value.after.realm_id == '{{ $realm }}'" transforms.filter_user_attr.type: 'io.debezium.transforms.Filter' transforms.filter_user_attr.language: 'jsr223.groovy' transforms.filter_user_attr.topic.regex: '.+\.public.(user_attribute)'