diff --git a/.github/actions/codeartifact-login/action.yml b/.github/actions/codeartifact-login/action.yml new file mode 100644 index 000000000000..384b55221a1a --- /dev/null +++ b/.github/actions/codeartifact-login/action.yml @@ -0,0 +1,29 @@ +name: CodeArtifact login +description: Assume the staging CodeArtifact role via OIDC, fetch an authorisation token, and export Poetry HTTP basic auth env vars + +outputs: + token: + description: CodeArtifact authorisation token + value: ${{ steps.codeartifact.outputs.token }} + +runs: + using: composite + steps: + - name: Configure AWS credentials for CodeArtifact + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::302456015006:role/codeartifact-github-actions-staging + aws-region: eu-west-2 + + - name: Fetch CodeArtifact authorisation token + id: codeartifact + shell: bash + run: | + token=$(aws codeartifact get-authorization-token \ + --domain flagsmith-staging \ + --domain-owner 302456015006 \ + --query authorizationToken --output text) + echo "::add-mask::$token" + echo "token=$token" >> "$GITHUB_OUTPUT" + echo "POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_USERNAME=aws" >> "$GITHUB_ENV" + echo "POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_PASSWORD=$token" >> "$GITHUB_ENV" diff --git a/.github/workflows/.reusable-docker-build.yml b/.github/workflows/.reusable-docker-build.yml index 049292e1bb76..5bae713b9be7 100644 --- a/.github/workflows/.reusable-docker-build.yml +++ b/.github/workflows/.reusable-docker-build.yml @@ -97,6 +97,10 @@ jobs: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} + - name: Authenticate with CodeArtifact + id: codeartifact + uses: ./.github/actions/codeartifact-login + - name: Extract Docker metadata id: meta uses: docker/metadata-action@v5 @@ -113,7 +117,9 @@ jobs: save: ${{ inputs.ephemeral }} push: ${{ !inputs.ephemeral }} platforms: linux/amd64,linux/arm64 - secrets: ${{ secrets.secrets }} + secrets: | + ${{ secrets.secrets }} + codeartifact_token=${{ steps.codeartifact.outputs.token }} target: ${{ inputs.target }} build-args: | CI_COMMIT_SHA=${{ github.sha }} diff --git a/.github/workflows/api-deploy-production-ecs.yml b/.github/workflows/api-deploy-production-ecs.yml index 90e53cad2b7b..fdd050727bfd 100644 --- a/.github/workflows/api-deploy-production-ecs.yml +++ b/.github/workflows/api-deploy-production-ecs.yml @@ -24,6 +24,9 @@ jobs: needs: deploy-ecs name: Push MCP Schema to Gram runs-on: depot-ubuntu-latest + permissions: + contents: read + id-token: write # For CodeArtifact OIDC defaults: run: working-directory: api @@ -39,6 +42,9 @@ jobs: python-version: "3.12" cache: poetry + - name: Authenticate with CodeArtifact + uses: ./.github/actions/codeartifact-login + - name: Install dependencies run: | echo "https://${{ secrets.GH_PRIVATE_ACCESS_TOKEN }}:@github.com" > ${HOME}/.git-credentials diff --git a/.github/workflows/api-pull-request.yml b/.github/workflows/api-pull-request.yml index b30ff69b2bb1..28b15d16e1ad 100644 --- a/.github/workflows/api-pull-request.yml +++ b/.github/workflows/api-pull-request.yml @@ -2,7 +2,7 @@ name: API Pull Request permissions: contents: read # For actions/checkout - id-token: write # For Codecov OIDC + id-token: write # For Codecov and CodeArtifact OIDC on: pull_request: @@ -50,6 +50,9 @@ jobs: python-version: ${{ matrix.python-version }} cache: poetry + - name: Authenticate with CodeArtifact + uses: ./.github/actions/codeartifact-login + - name: Install Dependencies run: make install-packages diff --git a/.github/workflows/api-run-makefile-target.yml b/.github/workflows/api-run-makefile-target.yml index 5fba3ae10552..40b5e6bff21f 100644 --- a/.github/workflows/api-run-makefile-target.yml +++ b/.github/workflows/api-run-makefile-target.yml @@ -21,6 +21,7 @@ on: permissions: contents: write pull-requests: write + id-token: write # For CodeArtifact OIDC defaults: run: @@ -41,6 +42,9 @@ jobs: python-version: 3.13 cache: poetry + - name: Authenticate with CodeArtifact + uses: ./.github/actions/codeartifact-login + - name: Install Dependencies run: make install-packages diff --git a/.github/workflows/api-tests-with-private-packages.yml b/.github/workflows/api-tests-with-private-packages.yml index 2ef9f112d7cb..ff5bddddef4f 100644 --- a/.github/workflows/api-tests-with-private-packages.yml +++ b/.github/workflows/api-tests-with-private-packages.yml @@ -2,7 +2,7 @@ name: API Pull Request with Private Packages permissions: contents: read # For actions/checkout - id-token: write # For Codecov OIDC + id-token: write # For Codecov and CodeArtifact OIDC on: pull_request: @@ -51,6 +51,9 @@ jobs: - name: Install SAML Dependencies run: sudo apt-get install -y xmlsec1 + - name: Authenticate with CodeArtifact + uses: ./.github/actions/codeartifact-login + - name: Install packages and Tests shell: bash run: | diff --git a/.github/workflows/update-flagsmith-environment.yml b/.github/workflows/update-flagsmith-environment.yml index 2822710adb9c..088fc1d4730a 100644 --- a/.github/workflows/update-flagsmith-environment.yml +++ b/.github/workflows/update-flagsmith-environment.yml @@ -9,6 +9,10 @@ defaults: run: working-directory: api +permissions: + contents: read + id-token: write # For CodeArtifact OIDC + jobs: update_server_defaults: runs-on: depot-ubuntu-latest @@ -26,6 +30,9 @@ jobs: python-version: 3.12 cache: pip + - name: Authenticate with CodeArtifact + uses: ./.github/actions/codeartifact-login + - name: Install Dependencies run: make install diff --git a/Dockerfile b/Dockerfile index 5888ab746715..a0f2ff39c8d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,16 +14,27 @@ # Build an Open Source Unified image: # (`oss-unified` stage is the default one, so there's no need to specify a target stage) -# $ docker build -t flagsmith:dev . +# $ CODEARTIFACT_TOKEN=$(aws codeartifact get-authorization-token \ +# --domain flagsmith-staging --domain-owner 302456015006 \ +# --query authorizationToken --output text) \ +# docker build -t flagsmith:dev \ +# --secret="id=codeartifact_token,env=CODEARTIFACT_TOKEN" . # Build a SaaS API image: # $ GH_TOKEN=$(gh auth token) docker build -t flagsmith-saas-api:dev --target saas-api \ # --secret="id=sse_pgp_pkey,src=./sse_pgp_pkey.key"\ -# --secret="id=github_private_cloud_token,env=GH_TOKEN" . +# --secret="id=github_private_cloud_token,env=GH_TOKEN" \ +# --secret="id=codeartifact_token,env=CODEARTIFACT_TOKEN" . # Build a Private Cloud Unified image: # $ GH_TOKEN=$(gh auth token) docker build -t flagsmith-private-cloud:dev --target private-cloud-unified \ -# --secret="id=github_private_cloud_token,env=GH_TOKEN" . +# --secret="id=github_private_cloud_token,env=GH_TOKEN" \ +# --secret="id=codeartifact_token,env=CODEARTIFACT_TOKEN" . + +# `codeartifact_token` is required for any target that runs `poetry install`, +# since the `flagsmith-sql-flag-engine` dep is hosted on Flagsmith's +# private CodeArtifact PyPI. See the `[[tool.poetry.source]]` block in +# `api/pyproject.toml`. # Table of Contents # Stages are described as stage-name [dependencies] @@ -100,7 +111,10 @@ ENV POETRY_VIRTUALENVS_IN_PROJECT=true \ POETRY_VIRTUALENVS_OPTIONS_NO_SETUPTOOLS=true \ POETRY_HOME=/opt/poetry \ PATH="/opt/poetry/bin:$PATH" -RUN make install opts='--without dev' +RUN --mount=type=secret,id=codeartifact_token \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_USERNAME=aws \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_PASSWORD="$(cat /run/secrets/codeartifact_token)" \ + make install opts='--without dev' # * build-python-private [build-python] FROM build-python AS build-python-private @@ -111,8 +125,11 @@ ARG SAML_REVISION ARG RBAC_REVISION ARG WITH="saml,auth-controller,ldap,workflows,licensing,release-pipelines" RUN --mount=type=secret,id=github_private_cloud_token \ + --mount=type=secret,id=codeartifact_token \ echo "https://$(cat /run/secrets/github_private_cloud_token):@github.com" > ${HOME}/.git-credentials && \ git config --global credential.helper store && \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_USERNAME=aws \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_PASSWORD="$(cat /run/secrets/codeartifact_token)" \ make install-packages opts='--without dev --with ${WITH}' && \ make install-private-modules @@ -161,7 +178,10 @@ FROM build-python AS api-test COPY api /build/ -RUN make install-packages opts='--with dev' +RUN --mount=type=secret,id=codeartifact_token \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_USERNAME=aws \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_PASSWORD="$(cat /run/secrets/codeartifact_token)" \ + make install-packages opts='--with dev' CMD ["make", "test"] @@ -170,7 +190,10 @@ FROM build-python-private AS api-private-test COPY api /build/ -RUN make install-packages opts='--with dev' && \ +RUN --mount=type=secret,id=codeartifact_token \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_USERNAME=aws \ + POETRY_HTTP_BASIC_FLAGSMITH_PYPI_STAGING_PASSWORD="$(cat /run/secrets/codeartifact_token)" \ + make install-packages opts='--with dev' && \ make integrate-private-tests && \ git config --global --unset credential.helper && \ rm -f ${HOME}/.git-credentials diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 634c27cf9bd5..052b6fd23d9b 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -118,6 +118,7 @@ "features.workflows.core", "features.release_pipelines.core", "segments", + "segment_membership", "app", "e2etests", "simple_history", @@ -1429,3 +1430,16 @@ PYLON_IDENTITY_VERIFICATION_SECRET = env.str("PYLON_IDENTITY_VERIFICATION_SECRET", None) OSIC_UPDATE_BATCH_SIZE = env.int("OSIC_UPDATE_BATCH_SIZE", default=500) + +# --- Snowflake (segment membership inspection) ------------------------------- +# All-None default disables the segment_membership backfill and refresh tasks. +# When set, the api/segments/membership tasks open a Snowpark session and run +# against this account. See docs/deployment/observability/segment-membership.md +# for the operational shape. +SNOWFLAKE_ACCOUNT = env.str("SNOWFLAKE_ACCOUNT", default=None) +SNOWFLAKE_USER = env.str("SNOWFLAKE_USER", default=None) +SNOWFLAKE_PRIVATE_KEY_PATH = env.str("SNOWFLAKE_PRIVATE_KEY_PATH", default=None) +SNOWFLAKE_ROLE = env.str("SNOWFLAKE_ROLE", default=None) +SNOWFLAKE_WAREHOUSE = env.str("SNOWFLAKE_WAREHOUSE", default=None) +SNOWFLAKE_DATABASE = env.str("SNOWFLAKE_DATABASE", default=None) +SNOWFLAKE_SCHEMA = env.str("SNOWFLAKE_SCHEMA", default=None) diff --git a/api/poetry.lock b/api/poetry.lock index cfe554534102..accc71f5394c 100644 --- a/api/poetry.lock +++ b/api/poetry.lock @@ -73,6 +73,18 @@ files = [ [package.extras] tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"] +[[package]] +name = "asn1crypto" +version = "1.5.1" +description = "Fast ASN.1 parser and serializer with definitions for private keys, public keys, certificates, CRL, OCSP, CMS, PKCS#3, PKCS#7, PKCS#8, PKCS#12, PKCS#5, X.509 and TSP" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "asn1crypto-1.5.1-py2.py3-none-any.whl", hash = "sha256:db4e40728b728508912cbb3d44f19ce188f218e9eba635821bb4b68564f8fd67"}, + {file = "asn1crypto-1.5.1.tar.gz", hash = "sha256:13ae38502be632115abf8a24cbe5f4da52e3b5231990aff31123c805306ccb9c"}, +] + [[package]] name = "astroid" version = "2.15.8" @@ -960,6 +972,18 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} +[[package]] +name = "cloudpickle" +version = "3.1.1" +description = "Pickler class to extend the standard pickle.Pickler functionality" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "cloudpickle-3.1.1-py3-none-any.whl", hash = "sha256:c8c5a44295039331ee9dad40ba100a9c7297b6f988e50e87ccdf3765a668350e"}, + {file = "cloudpickle-3.1.1.tar.gz", hash = "sha256:b216fa8ae4019d5482a8ac3c95d8f6346115d8835911fd4aefd1a445e4242c64"}, +] + [[package]] name = "colorama" version = "0.4.6" @@ -2009,7 +2033,7 @@ version = "3.20.3" description = "A platform independent file lock." optional = false python-versions = ">=3.10" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "filelock-3.20.3-py3-none-any.whl", hash = "sha256:4b0dda527ee31078689fc205ec4f1c1bf7d56cf88b6dc9426c4f230e46c2dce1"}, {file = "filelock-3.20.3.tar.gz", hash = "sha256:18c57ee915c7ec61cff0ecf7f0f869936c7c30191bb0cf406f1341778d0834e1"}, @@ -2058,12 +2082,10 @@ name = "flagsmith-common" version = "3.9.0" description = "Flagsmith's common library" optional = false -python-versions = "<4.0,>=3.11" +python-versions = ">=3.11,<4.0" groups = ["main", "dev", "licensing", "workflows"] -files = [ - {file = "flagsmith_common-3.9.0-py3-none-any.whl", hash = "sha256:782183d05b891ed5f19bebad2f281a2ebd6f69728c3d3f524c4bebc25a654cf6"}, - {file = "flagsmith_common-3.9.0.tar.gz", hash = "sha256:b47b141d366a6714285a0768e08e24adbc9849400294d6fc4e6030087928d8e6"}, -] +files = [] +develop = false [package.dependencies] backoff = {version = ">=2.2.1,<3.0.0", optional = true, markers = "extra == \"task-processor\""} @@ -2085,8 +2107,8 @@ opentelemetry-instrumentation-redis = {version = ">=0.46b0,<1", optional = true, opentelemetry-sdk = {version = ">=1.25,<2", optional = true, markers = "extra == \"common-core\""} prometheus-client = {version = ">=0.0.16", optional = true, markers = "extra == \"common-core\" or extra == \"task-processor\""} psycopg2-binary = {version = ">=2.9,<3", optional = true, markers = "extra == \"common-core\""} -pyfakefs = {version = ">=5,<6", optional = true, markers = "extra == \"test-tools\""} -pytest-django = {version = ">=4,<5", optional = true, markers = "extra == \"test-tools\""} +pyfakefs = {version = ">=5,<6", optional = true} +pytest-django = {version = ">=4,<5", optional = true} redis = {version = ">=5,<6", optional = true, markers = "extra == \"common-core\""} requests = {version = "*", optional = true, markers = "extra == \"common-core\""} sentry-sdk = {version = ">=2.0.0,<3.0.0", optional = true, markers = "extra == \"common-core\""} @@ -2095,11 +2117,17 @@ structlog = {version = ">=24.4,<26", optional = true, markers = "extra == \"comm typing-extensions = {version = "*", optional = true, markers = "extra == \"common-core\" or extra == \"flagsmith-schemas\""} [package.extras] -common-core = ["django (>4,<6)", "django-health-check", "djangorestframework", "djangorestframework-recursive", "drf-spectacular (>=0.28.0,<1)", "drf-writable-nested", "environs (<16)", "gunicorn (>=19.1)", "inflection", "opentelemetry-api (>=1.25,<2)", "opentelemetry-exporter-otlp-proto-http (>=1.25,<2)", "opentelemetry-instrumentation-django (>=0.46b0,<1)", "opentelemetry-instrumentation-psycopg2 (>=0.46b0,<1)", "opentelemetry-instrumentation-redis (>=0.46b0,<1)", "opentelemetry-sdk (>=1.25,<2)", "prometheus-client (>=0.0.16)", "psycopg2-binary (>=2.9,<3)", "redis (>=5,<6)", "requests", "sentry-sdk (>=2.0.0,<3.0.0)", "simplejson (>=3,<4)", "structlog (>=24.4,<26)", "typing-extensions"] -flagsmith-schemas = ["flagsmith-flag-engine (>6)", "simplejson", "typing-extensions"] +common-core = ["django (>4,<6)", "django-health-check", "djangorestframework", "djangorestframework-recursive", "drf-spectacular (>=0.28.0,<1)", "drf-writable-nested", "environs (<16)", "gunicorn (>=19.1)", "inflection", "opentelemetry-api (>=1.25,<2)", "opentelemetry-exporter-otlp-proto-http (>=1.25,<2)", "opentelemetry-instrumentation-django (>=0.46b0,<1)", "opentelemetry-instrumentation-psycopg2 (>=0.46b0,<1)", "opentelemetry-instrumentation-redis (>=0.46b0,<1)", "opentelemetry-sdk (>=1.25,<2)", "prometheus-client (>=0.0.16)", "psycopg2-binary (>=2.9,<3)", "redis (>=5,<6)", "requests", "sentry-sdk (>=2.0.0,<3.0.0)", "simplejson (>=3,<4)", "structlog (>=24.4,<26)", "typing_extensions"] +flagsmith-schemas = ["flagsmith-flag-engine (>6)", "simplejson", "typing_extensions"] task-processor = ["backoff (>=2.2.1,<3.0.0)", "django (>4,<6)", "django-health-check", "opentelemetry-api (>=1.25,<2)", "prometheus-client (>=0.0.16)"] test-tools = ["pyfakefs (>=5,<6)", "pytest-django (>=4,<5)"] +[package.source] +type = "git" +url = "https://github.com/Flagsmith/flagsmith-common.git" +reference = "fix/assert-metric-unlabeled" +resolved_reference = "b15979d8e6da3c7cbc4de7ced579983da06e9623" + [[package]] name = "flagsmith-flag-engine" version = "10.1.0" @@ -2152,6 +2180,27 @@ url = "https://github.com/Flagsmith/flagsmith-private/" reference = "v0.4.4" resolved_reference = "a5318e69b5712307e3204a57ecebb64995526200" +[[package]] +name = "flagsmith-sql-flag-engine" +version = "0.1.0a1" +description = "SQL translator for Flagsmith segment predicates." +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "flagsmith_sql_flag_engine-0.1.0a1-py3-none-any.whl", hash = "sha256:3a721a9f053846550f43633285209e1112b4507b38ed2aeddc063b53f73e5fe0"}, + {file = "flagsmith_sql_flag_engine-0.1.0a1.tar.gz", hash = "sha256:09d9b3c8bdf5699f35676213a5795b2facd1564bb1cf3c0a75fcda1e1bb0c2e4"}, +] + +[package.dependencies] +flagsmith-flag-engine = ">=10" +jsonpath-rfc9535 = ">=0.2" + +[package.source] +type = "legacy" +url = "https://flagsmith-staging-302456015006.d.codeartifact.eu-west-2.amazonaws.com/pypi/flagsmith-pypi-staging/simple" +reference = "flagsmith-pypi-staging" + [[package]] name = "freezegun" version = "1.5.5" @@ -3558,7 +3607,7 @@ version = "3.10.0" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." optional = false python-versions = ">=3.7" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "platformdirs-3.10.0-py3-none-any.whl", hash = "sha256:d7c24979f292f916dc9cbf8648319032f551ea8c49a4c9bf2fb556a02070ec1d"}, {file = "platformdirs-3.10.0.tar.gz", hash = "sha256:b45696dab2d7cc691a3226759c0d3b00c47c8b6e293d96f6436f733303f77f6d"}, @@ -4170,7 +4219,7 @@ version = "26.0.0" description = "Python wrapper module around the OpenSSL library" optional = false python-versions = ">=3.8" -groups = ["saml"] +groups = ["main", "saml"] files = [ {file = "pyopenssl-26.0.0-py3-none-any.whl", hash = "sha256:df94d28498848b98cc1c0ffb8ef1e71e40210d3b0a8064c9d29571ed2904bf81"}, {file = "pyopenssl-26.0.0.tar.gz", hash = "sha256:f293934e52936f2e3413b89c6ce36df66a0b34ae1ea3a053b8c5020ff2f513fc"}, @@ -5238,7 +5287,7 @@ version = "80.10.2" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false python-versions = ">=3.9" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "setuptools-80.10.2-py3-none-any.whl", hash = "sha256:95b30ddfb717250edb492926c92b5221f7ef3fbcc2b07579bcd4a27da21d0173"}, {file = "setuptools-80.10.2.tar.gz", hash = "sha256:8b0e9d10c784bf7d262c4e5ec5d4ec94127ce206e8738f29a437945fbc219b70"}, @@ -5388,6 +5437,98 @@ files = [ optional = ["SQLAlchemy (>=1,<2)", "aiodns (>1.0)", "aiohttp (>=3.7.3,<4)", "boto3 (<=2)", "websocket-client (>=1,<2)", "websockets (>=9.1,<10)"] testing = ["Flask (>=1,<2)", "Flask-Sockets (>=0.2,<1)", "Werkzeug (<2)", "black (==21.7b0)", "boto3 (<=2)", "codecov (>=2,<3)", "databases (>=0.3)", "flake8 (>=3,<4)", "moto (<2)", "psutil (>=5,<6)", "pytest (>=5.4,<6)", "pytest-asyncio (<1)", "pytest-cov (>=2,<3)"] +[[package]] +name = "snowflake-connector-python" +version = "4.4.0" +description = "Snowflake Connector for Python" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "snowflake_connector_python-4.4.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fb628d5ea1999e23bfbaabce4125eb44d56605ca5634b8b1d6092ab22d555598"}, + {file = "snowflake_connector_python-4.4.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:16fdca775f7ca5ce4a973c07c434f5ab72bef5284e81a5e4ae2fb4d54d28965c"}, + {file = "snowflake_connector_python-4.4.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:9b1a28f843c1c0b582db7854789525d0c8aac4ea5c56e31113684e38220d0af9"}, + {file = "snowflake_connector_python-4.4.0-cp310-cp310-win_amd64.whl", hash = "sha256:693a1bef97509f09b7e6f42ea6f743d27819413c04fb3dc543b060d029871c56"}, + {file = "snowflake_connector_python-4.4.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f5d0e90e68a899c13fda5ca842ff77b5759b1674adf2c72702d3c2b53ca9d27b"}, + {file = "snowflake_connector_python-4.4.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:19d0c1ed033abae715a71b74c53010b180a5247c6924f851e4f7d0b0d58066c4"}, + {file = "snowflake_connector_python-4.4.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:52efe2d6543a09807283748dd50a36ec01d52b4f342868132f8f9856b9c95a42"}, + {file = "snowflake_connector_python-4.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:85a01338d282423611f357cd5392dca2219bbda9a66b44761b11d6ae8ebf1e50"}, + {file = "snowflake_connector_python-4.4.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e8e7ce0e8b33aec8b1fc6741eb51dbeb54e2c3a6d282a0d459c355a85f089b08"}, + {file = "snowflake_connector_python-4.4.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:a088f108da4653ad1396ddb63a1c757ad614d0862c38f6f69cc77344bdcfeccb"}, + {file = "snowflake_connector_python-4.4.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:b9f0ac0c00075321e1720d3876e936ee0256f54832e7463c5193a8dfa54913d5"}, + {file = "snowflake_connector_python-4.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:ea6e4083ebea0a814b46f029d64a2fb0ba6e7732952cd8af4406041708ce0e21"}, + {file = "snowflake_connector_python-4.4.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:2a6f6a514a10c3bb2d4554132f0b639f43d7e9fbb73fa1fae1c8a75333102686"}, + {file = "snowflake_connector_python-4.4.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8304b4818d3e9de552dcfbdd0bca61bae1583e1c9794e242e58fe44bce701604"}, + {file = "snowflake_connector_python-4.4.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c828248214a49f77b903e05acf887d3ccb9d958b5a979f2ed3663bba1bd0f2b3"}, + {file = "snowflake_connector_python-4.4.0-cp313-cp313-win_amd64.whl", hash = "sha256:56ff04dd9e17edc82128f412aa3776687dc94088f3d6b9144971e169952623cb"}, + {file = "snowflake_connector_python-4.4.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:96fdca994c4d9f7780e82fc7b4bd3398d856b43de3bae57d44e242ff435a2431"}, + {file = "snowflake_connector_python-4.4.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:9fa43d330389df27024757c4f97dabddafbedc74b8bcc189b6a86e8b4d036014"}, + {file = "snowflake_connector_python-4.4.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:43e1a2f3ac51d24406d4eb0c23a8ceb9d6f5cb4854c941e5e1375d8c481e2844"}, + {file = "snowflake_connector_python-4.4.0-cp314-cp314-win_amd64.whl", hash = "sha256:d8ac1659c8e588b9502f8d3d03c1ded2f274de0da9c09e62fe007cba5b46d6a5"}, + {file = "snowflake_connector_python-4.4.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:7c2984663a733d06c979aa6c8c1d7691621ec0d3521ef345d57c869ff2f1c4b2"}, + {file = "snowflake_connector_python-4.4.0-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:307f41326c702f6976746d2001dacf35adaf567f3f12afb3a5778fbb063c7241"}, + {file = "snowflake_connector_python-4.4.0-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d6fd334e4d8df7fcb30e6746e5ade845e82de2942268862aa8bce974ae2b86a2"}, + {file = "snowflake_connector_python-4.4.0-cp39-cp39-win_amd64.whl", hash = "sha256:70d4051e2d9c87258b02672e17e21f5873e0cb49ff9705f6194ccfa25ac0d5fd"}, + {file = "snowflake_connector_python-4.4.0.tar.gz", hash = "sha256:648f49029d699591af0f253e81c5bf60efc4411c7b0149ef074a59a038210a3b"}, +] + +[package.dependencies] +asn1crypto = ">0.24.0,<2.0.0" +boto3 = ">=1.24" +botocore = ">=1.24" +certifi = ">=2024.7.4" +charset_normalizer = ">=2,<4" +cryptography = ">=46.0.5" +filelock = ">=3.5,<4" +idna = ">=3.7,<4" +packaging = "*" +platformdirs = ">=2.6.0,<5.0.0" +pyjwt = ">=2.10.1,<3.0.0" +pyOpenSSL = ">=24.0.0" +pytz = "*" +requests = ">=2.32.4,<3.0.0" +sortedcontainers = ">=2.4.0" +tomlkit = "*" +typing_extensions = ">=4.3,<5" + +[package.extras] +boto = ["boto3 (>=1.24)", "botocore (>=1.24)"] +development = ["Cython", "coverage", "mitmproxy (>=12.0.0) ; python_version >= \"3.12\"", "more-itertools", "numpy (<=2.4.3)", "pendulum (!=2.1.1)", "pexpect", "pytest (<7.5.0)", "pytest-asyncio", "pytest-cov", "pytest-rerunfailures (<16.0)", "pytest-timeout", "pytest-xdist", "pytzdata", "responses"] +pandas = ["pandas (>=1.0.0,<3.0.0) ; python_version < \"3.13\"", "pandas (>=2.1.2,<3.0.0) ; python_version >= \"3.13\"", "pyarrow (>=14.0.1)"] +secure-local-storage = ["keyring (>=23.1.0,<26.0.0)"] + +[[package]] +name = "snowflake-snowpark-python" +version = "1.50.1" +description = "Snowflake Snowpark for Python" +optional = false +python-versions = "<3.15,>=3.9" +groups = ["main"] +files = [ + {file = "snowflake_snowpark_python-1.50.1-py3-none-any.whl", hash = "sha256:2d16fb5b9052c5b6920426b02534ecbb38e4a8c20a1deda36bb9a354e46c55ec"}, + {file = "snowflake_snowpark_python-1.50.1.tar.gz", hash = "sha256:780a7595fa4fd8989a3519dd825d8de0a79b8dd98f6bf95c853a0b3f2df9db43"}, +] + +[package.dependencies] +cloudpickle = ">=1.6.0,<2.1.0 || >2.1.0,<2.2.0 || >2.2.0,<=3.1.1" +protobuf = ">=3.20,<6.34" +python-dateutil = "*" +pyyaml = "*" +setuptools = ">=40.6.0" +snowflake-connector-python = ">=3.17.0,<5.0.0" +typing-extensions = ">=4.1.0,<5.0.0" +tzlocal = "*" +wheel = "*" + +[package.extras] +development = ["cachetools", "coverage", "decorator", "graphviz", "lxml", "matplotlib", "openpyxl", "pre-commit", "psutil", "pyarrow", "pytest (<8.0.0)", "pytest-assume", "pytest-cov", "pytest-timeout", "pytest-xdist", "snowflake.core (>=1.0.0,<2)", "sphinx (==5.0.2)", "tox", "wrapt"] +localtest = ["pandas", "requests"] +modin = ["ipywidgets", "modin (>=0.36.0,<0.38.0)", "pandas (<=2.4)", "snowflake-connector-python[pandas] (>=3.17.0,<5.0.0)", "tqdm"] +modin-development = ["cachetools", "coverage", "decorator", "graphviz", "ipywidgets", "lxml", "matplotlib", "modin (>=0.36.0,<0.38.0)", "openpyxl", "pandas (<=2.4)", "plotly (<6.0.0)", "pre-commit", "psutil", "pyarrow", "pytest (<8.0.0)", "pytest-assume", "pytest-cov", "pytest-timeout", "pytest-xdist", "ray ; python_version < \"3.14\"", "s3fs", "scikit-learn", "scipy (<=1.16.3)", "snowflake-connector-python[pandas] (>=3.17.0,<5.0.0)", "snowflake-ml-python (>=1.8.0) ; python_version < \"3.12\"", "snowflake.core (>=1.0.0,<2)", "sphinx (==5.0.2)", "statsmodels", "tox", "tqdm", "wrapt"] +opentelemetry = ["opentelemetry-api (>=1.0.0,<2.0.0)", "opentelemetry-exporter-otlp (>=1.0.0,<2.0.0)", "opentelemetry-sdk (>=1.0.0,<2.0.0)"] +pandas = ["snowflake-connector-python[pandas] (>=3.17.0,<5.0.0)"] +secure-local-storage = ["snowflake-connector-python[secure-local-storage] (>=3.17.0,<5.0.0)"] + [[package]] name = "social-auth-app-django" version = "5.6.0" @@ -5435,6 +5576,18 @@ azuread = ["cryptography (>=2.1.1)"] openidconnect = ["python-jose (>=3.0.0)"] saml = ["python3-saml (>=1.5.0)"] +[[package]] +name = "sortedcontainers" +version = "2.4.0" +description = "Sorted Containers -- Sorted List, Sorted Dict, Sorted Set" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0"}, + {file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"}, +] + [[package]] name = "sqlparse" version = "0.5.4" @@ -5519,7 +5672,7 @@ version = "0.13.0" description = "Style preserving TOML library" optional = false python-versions = ">=3.8" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "tomlkit-0.13.0-py3-none-any.whl", hash = "sha256:7075d3042d03b80f603482d69bf0c8f345c2b30e41699fd8883227f89972b264"}, {file = "tomlkit-0.13.0.tar.gz", hash = "sha256:08ad192699734149f5b97b45f1f18dad7eb1b6d16bc72ad0c2335772650d7b72"}, @@ -5747,6 +5900,24 @@ files = [ {file = "tzdata-2024.1.tar.gz", hash = "sha256:2674120f8d891909751c38abcdfd386ac0a5a1127954fbc332af6b5ceae07efd"}, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +description = "tzinfo object for the local timezone" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"}, + {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"}, +] + +[package.dependencies] +tzdata = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"] + [[package]] name = "uritemplate" version = "3.0.1" @@ -5828,6 +5999,21 @@ markupsafe = ">=2.1.1" [package.extras] watchdog = ["watchdog (>=2.3)"] +[[package]] +name = "wheel" +version = "0.47.0" +description = "Command line tool for manipulating wheel files" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "wheel-0.47.0-py3-none-any.whl", hash = "sha256:212281cab4dff978f6cedd499cd893e1f620791ca6ff7107cf270781e587eced"}, + {file = "wheel-0.47.0.tar.gz", hash = "sha256:cc72bd1009ba0cf63922e28f94d9d83b920aa2bb28f798a31d0691b02fa3c9b3"}, +] + +[package.dependencies] +packaging = ">=24.0" + [[package]] name = "whitenoise" version = "6.0.0" @@ -6023,4 +6209,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">3.11,<3.14" -content-hash = "13c3c174bc913f77b4998499cc0e1d8feea33c521e5cd747d77743bb8fc422ed" +content-hash = "a313257963636807c81a18d1121add17d4caf75309fb98573d48a8428cae242d" diff --git a/api/pyproject.toml b/api/pyproject.toml index 702e542fab38..47e45ff12f00 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -116,6 +116,11 @@ enabled = true [tool.poetry] package-mode = false +[[tool.poetry.source]] +name = "flagsmith-pypi-staging" +url = "https://flagsmith-staging-302456015006.d.codeartifact.eu-west-2.amazonaws.com/pypi/flagsmith-pypi-staging/simple/" +priority = "explicit" + [tool.poetry.dependencies] python = ">3.11,<3.14" django = ">=5,<6" @@ -151,6 +156,8 @@ django-lifecycle = "~1.2.4" drf-writable-nested = "~0.6.2" django-filter = "~2.4.0" flagsmith-flag-engine = "^10.1.0" +flagsmith-sql-flag-engine = { version = "0.1.0a1", source = "flagsmith-pypi-staging", allow-prereleases = true } +snowflake-snowpark-python = "^1.20" boto3 = "~1.35.95" slack-sdk = "~3.9.0" asgiref = "~3.8.1" @@ -263,7 +270,7 @@ types-psycopg2 = "^2.9.21.20250121" types-python-dateutil = "^2.9.0.20241206" types-pytz = "^2025.1.0.20250204" ruff = "^0.9.7" -flagsmith-common = { version = "*", extras = ["test-tools"] } +flagsmith-common = { git = "https://github.com/Flagsmith/flagsmith-common.git", branch = "fix/assert-metric-unlabeled", extras = ["test-tools"] } pytest-responses = "^0.5.1" diff-cover = "^10.1.0" django-debug-toolbar = "*" diff --git a/api/segment_membership/__init__.py b/api/segment_membership/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/segment_membership/apps.py b/api/segment_membership/apps.py new file mode 100644 index 000000000000..42753fd75971 --- /dev/null +++ b/api/segment_membership/apps.py @@ -0,0 +1,6 @@ +from core.apps import BaseAppConfig + + +class SegmentMembershipConfig(BaseAppConfig): + name = "segment_membership" + default = True diff --git a/api/segment_membership/mappers.py b/api/segment_membership/mappers.py new file mode 100644 index 000000000000..25b88bba9a37 --- /dev/null +++ b/api/segment_membership/mappers.py @@ -0,0 +1,55 @@ +import uuid +from decimal import Decimal + +from flagsmith_schemas import dynamodb + +# (environment_id, id, identifier, identity_key, traits) +SnowflakeIdentityRow = tuple[str, int, str, str, dict[str, object] | None] + + +def map_identity_document_to_snowflake_row( + env_key: str, + identity_doc: dynamodb.Identity, +) -> SnowflakeIdentityRow: + """Project a Dynamo identity document onto the canonical IDENTITIES + row tuple. The returned tuple aligns positionally with the schema + `(environment_id, id, identifier, identity_key, traits)`.""" + identity_uuid = identity_doc["identity_uuid"] + identifier = identity_doc["identifier"] + composite_key = identity_doc["composite_key"] + raw_traits = identity_doc.get("identity_traits") + traits = _flatten_traits(raw_traits) if raw_traits else None + return ( + env_key, + _identity_id(identity_uuid), + identifier, + composite_key, + traits, + ) + + +def _identity_id(identity_uuid: str) -> int: + """Project a UUID onto a stable signed 64-bit IDENTITIES.id.""" + return int.from_bytes(uuid.UUID(identity_uuid).bytes[:8], "big", signed=True) + + +def _coerce_trait_value(value: object) -> object: + """Coerce Dynamo-decoded values for VARIANT serialisation. boto3 + returns `Decimal` for numbers; we narrow to int when whole, float + otherwise, so the VARIANT keeps a useful numeric type.""" + if isinstance(value, Decimal): + if value == value.to_integral_value(): + return int(value) + return float(value) + return value + + +def _flatten_traits( + identity_traits: list[dynamodb.Trait], +) -> dict[str, object]: + """Convert Dynamo's `[{trait_key, trait_value}, ...]` list into a + flat trait map.""" + return { + t["trait_key"]: _coerce_trait_value(t.get("trait_value")) + for t in identity_traits + } diff --git a/api/segment_membership/metrics.py b/api/segment_membership/metrics.py new file mode 100644 index 000000000000..ba5cde4e2c44 --- /dev/null +++ b/api/segment_membership/metrics.py @@ -0,0 +1,26 @@ +import prometheus_client + +# All metrics are global — refresh and backfill cardinality scales with +# project + environment counts, which would blow up Prometheus storage. +# Drill-down lives in Snowflake's query history (tagged via QUERY_TAG) +# and in structlog events that carry per-project/env IDs. + +flagsmith_segment_membership_backfill_identities_total = prometheus_client.Counter( + "flagsmith_segment_membership_backfill_identities_total", + "Total identities mirrored from Dynamo to Snowflake by the segment-membership backfill task across all environments.", +) + +flagsmith_segment_membership_backfill_duration_seconds = prometheus_client.Histogram( + "flagsmith_segment_membership_backfill_duration_seconds", + "Duration of a segment-membership backfill for one environment.", +) + +flagsmith_segment_membership_refresh_duration_seconds = prometheus_client.Histogram( + "flagsmith_segment_membership_refresh_duration_seconds", + "Duration of a single segment-membership count-refresh run for one project.", +) + +flagsmith_segment_membership_refresh_failures_total = prometheus_client.Counter( + "flagsmith_segment_membership_refresh_failures_total", + "Total segment-membership refresh runs that failed for any reason.", +) diff --git a/api/segment_membership/migrations/0001_initial.py b/api/segment_membership/migrations/0001_initial.py new file mode 100644 index 000000000000..0024b54bb990 --- /dev/null +++ b/api/segment_membership/migrations/0001_initial.py @@ -0,0 +1,57 @@ +# Generated by Django 5.2.13 on 2026-05-08 22:03 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("environments", "0037_add_uuid_field"), + ("segments", "0030_add_default_to_segment_version"), + ] + + operations = [ + migrations.CreateModel( + name="SegmentMembership", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("count", models.PositiveIntegerField()), + ("last_synced_at", models.DateTimeField()), + ( + "environment", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="+", + to="environments.environment", + ), + ), + ( + "segment", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="memberships", + to="segments.segment", + ), + ), + ], + options={ + "constraints": [ + models.UniqueConstraint( + fields=("segment", "environment"), + name="segment_membership_unique_segment_environment", + ) + ], + }, + ), + ] diff --git a/api/segment_membership/migrations/0002_setup_snowflake_identities_schema.py b/api/segment_membership/migrations/0002_setup_snowflake_identities_schema.py new file mode 100644 index 000000000000..a510036f339b --- /dev/null +++ b/api/segment_membership/migrations/0002_setup_snowflake_identities_schema.py @@ -0,0 +1,43 @@ +"""Create the canonical IDENTITIES table the SQL flag engine emits +against when a Snowflake account is configured. + +No-op when SNOWFLAKE_* settings are unset, so self-hosted installs +without Snowflake (and the test suite) migrate cleanly. +""" + +from django.db import migrations +from django.db.backends.base.schema import BaseDatabaseSchemaEditor +from django.db.migrations.state import StateApps +from flagsmith_sql_flag_engine.dialects import SnowflakeDialect + +from segment_membership.services import ( + is_snowflake_configured, + open_snowflake_session, +) + + +def setup_snowflake_identities_schema( + apps: StateApps, schema_editor: BaseDatabaseSchemaEditor +) -> None: + if not is_snowflake_configured(): + return + with open_snowflake_session() as sess: + sess.sql(SnowflakeDialect.schema_ddl).collect() + + +class Migration(migrations.Migration): + # The Snowflake DDL talks to a remote service; running it inside + # Django's default-atomic migration block would couple this Postgres + # migration to a Snowflake transaction we don't actually need. + atomic = False + + dependencies = [ + ("segment_membership", "0001_initial"), + ] + + operations = [ + migrations.RunPython( + setup_snowflake_identities_schema, + reverse_code=migrations.RunPython.noop, + ), + ] diff --git a/api/segment_membership/migrations/__init__.py b/api/segment_membership/migrations/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/segment_membership/models.py b/api/segment_membership/models.py new file mode 100644 index 000000000000..cb80a20577f4 --- /dev/null +++ b/api/segment_membership/models.py @@ -0,0 +1,32 @@ +from django.db import models + +from environments.models import Environment +from segments.models import Segment + + +class SegmentMembership(models.Model): + """ + Cached count of identities matching a canonical segment within a + single environment. One row per (segment, environment) pair. + """ + + segment = models.ForeignKey( + Segment, + on_delete=models.CASCADE, + related_name="memberships", + ) + environment = models.ForeignKey( + Environment, + on_delete=models.CASCADE, + related_name="+", + ) + count = models.PositiveIntegerField() + last_synced_at = models.DateTimeField() + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["segment", "environment"], + name="segment_membership_unique_segment_environment", + ), + ] diff --git a/api/segment_membership/services.py b/api/segment_membership/services.py new file mode 100644 index 000000000000..82770206eff6 --- /dev/null +++ b/api/segment_membership/services.py @@ -0,0 +1,161 @@ +from contextlib import contextmanager +from typing import Iterator + +import structlog +from django.conf import settings +from flag_engine.context.types import EvaluationContext +from flagsmith_sql_flag_engine import TranslateContext, translate_segment +from flagsmith_sql_flag_engine.dialects import SnowflakeDialect +from snowflake.snowpark import Session + +from integrations.flagsmith.client import get_openfeature_client +from organisations.models import Organisation +from projects.models import Project +from segment_membership.models import SegmentMembership +from segments.models import Segment +from util.engine_models.context.mappers import map_segment_to_segment_context +from util.mappers.engine import map_segment_to_engine + +logger = structlog.get_logger("segment_membership") + + +def is_membership_enabled(organisation: Organisation) -> bool: + """Resolve the per-org Flagsmith-on-Flagsmith flag for segment- + membership inspection. Defaults False when the flag is missing.""" + return get_openfeature_client().get_boolean_value( + "segment_membership_inspection", + default_value=False, + evaluation_context=organisation.openfeature_evaluation_context, + ) + + +def is_snowflake_configured() -> bool: + """All SNOWFLAKE_* settings required to open a session must be + populated. Tasks short-circuit when this returns False.""" + return all( + getattr(settings, name) + for name in ( + "SNOWFLAKE_ACCOUNT", + "SNOWFLAKE_USER", + "SNOWFLAKE_PRIVATE_KEY_PATH", + "SNOWFLAKE_DATABASE", + "SNOWFLAKE_SCHEMA", + "SNOWFLAKE_WAREHOUSE", + ) + ) + + +@contextmanager +def open_snowflake_session() -> Iterator[Session]: + """Open a Snowpark session from `SNOWFLAKE_*` settings.""" + config: dict[str, str | None] = { + "account": settings.SNOWFLAKE_ACCOUNT, + "user": settings.SNOWFLAKE_USER, + "warehouse": settings.SNOWFLAKE_WAREHOUSE, + "database": settings.SNOWFLAKE_DATABASE, + "schema": settings.SNOWFLAKE_SCHEMA, + "private_key_file": settings.SNOWFLAKE_PRIVATE_KEY_PATH, + } + if settings.SNOWFLAKE_ROLE: + config["role"] = settings.SNOWFLAKE_ROLE + sess = Session.builder.configs(config).create() + try: + yield sess + finally: + sess.close() + + +def get_projects_to_process() -> Iterator[Project]: + """Yield projects that hold at least one canonical segment and whose + organisation has the segment-membership FoF flag enabled. Used by + both the backfill and refresh tasks to scope work.""" + project_ids = Segment.live_objects.values_list("project_id", flat=True).distinct() + for project in Project.objects.filter(id__in=project_ids).select_related( + "organisation" + ): + if not is_membership_enabled(project.organisation): + continue + yield project + + +def compute_segment_counts_for_project( + project: Project, session: Session +) -> list[SegmentMembership]: + """Run one batched `SELECT ... UNION ALL` counting identity matches + for every (canonical-segment, environment) pair in `project`. + + Returns a list of unsaved `SegmentMembership` instances — `count` + and the `(segment_id, environment_id)` keys are populated; + `last_synced_at` is left for the caller to stamp consistently + across the batch. + + The SQL groups by `environment_id` per segment, so cardinality is + one SELECT per segment rather than per (segment, env) pair. Pairs + with zero matches are absent from the result; the caller treats + absent pairs as "no row" rather than count = 0. + + Segments whose predicate is currently untranslatable — e.g. a + regex pattern unsupported by the active dialect — are skipped + entirely. + + Environment keys are bound as parameters, not f-string-spliced; + the predicate from `translate_segment` is already escape-safe per + the SQL flag engine's contract. + """ + segments = list(Segment.live_objects.filter(project=project)) + env_id_by_key: dict[str, int] = dict( + project.environments.values_list("api_key", "id"), + ) + if not segments or not env_id_by_key: + return [] + + env_keys = list(env_id_by_key) + env_placeholders = ",".join("?" * len(env_keys)) + dialect = SnowflakeDialect() + + select_clauses: list[str] = [] + for seg in segments: + translate_ctx = TranslateContext( + evaluation_context=EvaluationContext( + environment={"key": "_count", "name": project.name} + ), + dialect=dialect, + ) + predicate = translate_segment( + map_segment_to_segment_context(map_segment_to_engine(seg)), + translate_ctx, + ) + if predicate is None: + logger.error( + "compute.segment.skipped", + project__id=project.id, + segment__id=seg.id, + reason="untranslatable", + ) + continue + select_clauses.append( + f"SELECT {seg.id} AS segment_id, " + f"i.environment_id AS env_key, COUNT(*) AS c " + f"FROM IDENTITIES i " + f"WHERE i.environment_id IN ({env_placeholders}) AND ({predicate}) " + f"GROUP BY i.environment_id" + ) + + if not select_clauses: + return [] + + sql = "\nUNION ALL\n".join(select_clauses) + rows = session.sql(sql, params=env_keys * len(select_clauses)).collect() + memberships: list[SegmentMembership] = [] + for row in rows: + env_id = env_id_by_key.get(str(row["ENV_KEY"])) + if env_id is None: + continue + memberships.append( + SegmentMembership( + segment_id=int(row["SEGMENT_ID"]), + environment_id=env_id, + count=int(row["C"]), + ) + ) + return memberships diff --git a/api/segment_membership/tasks.py b/api/segment_membership/tasks.py new file mode 100644 index 000000000000..49cff07f1312 --- /dev/null +++ b/api/segment_membership/tasks.py @@ -0,0 +1,199 @@ +"""Tasks: backfill IDENTITIES from Dynamo to Snowflake daily, then +refresh per-segment counts in the `SegmentMembership` cache. + +The backfill recurs daily and, once it finishes, fans out one +`refresh_project_segment_counts` per project — guarantees the refresh +always reads the freshly backfilled snapshot rather than racing a +separate schedule. Both tasks short-circuit when SNOWFLAKE_* settings +are unset, and skip per-organisation when the +`segment_membership_inspection` FoF flag is False. +""" + +from datetime import timedelta +from typing import cast + +import structlog +from django.utils import timezone +from flagsmith_schemas.dynamodb import Identity as DynamoIdentity +from snowflake.snowpark.types import ( + LongType, + StringType, + StructField, + StructType, + VariantType, +) +from task_processor.decorators import ( + register_recurring_task, + register_task_handler, +) + +from environments.dynamodb.wrappers.identity_wrapper import DynamoIdentityWrapper +from projects.models import Project +from segment_membership.mappers import map_identity_document_to_snowflake_row +from segment_membership.metrics import ( + flagsmith_segment_membership_backfill_duration_seconds, + flagsmith_segment_membership_backfill_identities_total, + flagsmith_segment_membership_refresh_duration_seconds, + flagsmith_segment_membership_refresh_failures_total, +) +from segment_membership.models import SegmentMembership +from segment_membership.services import ( + compute_segment_counts_for_project, + get_projects_to_process, + is_membership_enabled, + is_snowflake_configured, + open_snowflake_session, +) +from util.util import batched + +logger = structlog.get_logger("segment_membership") + +# Per-INSERT row count; bounds memory while loading large environments. +_INSERT_BATCH_SIZE = 1000 + +_IDENTITIES_SCHEMA = StructType( + [ + StructField("environment_id", StringType()), + StructField("id", LongType()), + StructField("identifier", StringType()), + StructField("identity_key", StringType()), + StructField("traits", VariantType()), + ] +) + + +@register_recurring_task( + run_every=timedelta(days=1), + # The default timeout doesn't fit the per-environment + # backfill at SaaS scale; 4 hours leaves + # headroom for several large environments back-to-back without + # truncating the task processor's lease. + timeout=timedelta(hours=4), +) +def backfill_identities_to_snowflake() -> None: + """Replace Snowflake's IDENTITIES rows for every relevant + environment with the current Dynamo state. Once the backfill + finishes, fans out one `refresh_project_segment_counts` task per + project so the count refresh always sees fresh data. + + Per-statement implicit commits leave a brief window where readers + see an empty partition mid-refresh — a PoC tradeoff later fixed + by CDC. + """ + if not is_snowflake_configured(): + logger.info("backfill.skipped", reason="snowflake_not_configured") + return + + wrapper = DynamoIdentityWrapper() + if not wrapper.is_enabled: + logger.info("backfill.skipped", reason="dynamo_disabled") + return + + refreshable_project_ids: list[int] = [] + with open_snowflake_session() as sess: + for project in get_projects_to_process(): + refreshable_project_ids.append(project.id) + for env in project.environments.all(): + env_key = env.api_key + row_count = 0 + sess.query_tag = ( + "flagsmith:segment_membership:backfill" + f":org_{project.organisation_id}" + f":project_{project.id}" + ) + try: + with flagsmith_segment_membership_backfill_duration_seconds.time(): + sess.sql( + "DELETE FROM IDENTITIES WHERE environment_id = ?", + params=[env_key], + ).collect() + for batch in batched( + wrapper.iter_all_items_paginated(env_key), + _INSERT_BATCH_SIZE, + ): + rows = [ + map_identity_document_to_snowflake_row( + env_key, cast(DynamoIdentity, doc) + ) + for doc in batch + ] + sess.create_dataframe( + rows, schema=_IDENTITIES_SCHEMA + ).write.mode("append").save_as_table("IDENTITIES") + row_count += len(rows) + except Exception: + logger.exception( + "backfill.environment.failed", + project__id=project.id, + environment__id=env.id, + ) + continue + flagsmith_segment_membership_backfill_identities_total.inc(row_count) + logger.info( + "backfill.environment.completed", + project__id=project.id, + environment__id=env.id, + rows__count=row_count, + ) + + for project_id in refreshable_project_ids: + refresh_project_segment_counts.delay(args=(project_id,)) + + +@register_task_handler( + # One project's predicate matrix at SaaS scale takes seconds to a + # few minutes; 30 minutes bounds runaway queries without cutting + # legitimate ones short. + timeout=timedelta(minutes=30), +) +def refresh_project_segment_counts(project_id: int) -> None: + """Compute per-segment match counts for a single project and upsert + into `SegmentMembership`. Re-checks the FoF flag at execution time + so a stale fan-out skips orgs that have since been disabled.""" + if not is_snowflake_configured(): + logger.info( + "refresh.project.skipped", + project__id=project_id, + reason="snowflake_not_configured", + ) + return + + project = Project.objects.select_related("organisation").get(pk=project_id) + if not is_membership_enabled(project.organisation): + logger.info( + "refresh.project.skipped", + project__id=project_id, + reason="ff_disabled", + ) + return + + with ( + flagsmith_segment_membership_refresh_duration_seconds.time(), + open_snowflake_session() as sess, + ): + sess.query_tag = ( + "flagsmith:segment_membership:refresh" + f":org_{project.organisation_id}" + f":project_{project.id}" + ) + try: + memberships = compute_segment_counts_for_project(project, sess) + except Exception: + flagsmith_segment_membership_refresh_failures_total.inc() + logger.exception("refresh.project.failed", project__id=project_id) + return + + now = timezone.now() + for m in memberships: + m.last_synced_at = now + SegmentMembership.objects.bulk_create( + memberships, + update_conflicts=True, + unique_fields=["segment", "environment"], + update_fields=["count", "last_synced_at"], + ) + logger.info( + "refresh.project.completed", + project__id=project_id, + memberships__count=len(memberships), + ) diff --git a/api/segments/serializers.py b/api/segments/serializers.py index 6415d26bde73..114b43fbeef0 100644 --- a/api/segments/serializers.py +++ b/api/segments/serializers.py @@ -10,6 +10,7 @@ from edge_api.utils import is_edge_enabled from metadata.serializers import MetadataSerializer, MetadataSerializerMixin from projects.models import Project +from segment_membership.models import SegmentMembership from segments.models import Condition, Segment, SegmentRule logger = structlog.get_logger(__name__) @@ -17,6 +18,17 @@ DictList = list[dict[str, Any]] +class SegmentMembershipSerializer(serializers.ModelSerializer[SegmentMembership]): + """One materialised count of identities matching a canonical + segment within an environment, refreshed daily by + `segment_membership.tasks.refresh_project_segment_counts`.""" + + class Meta: + model = SegmentMembership + fields = ["environment", "count", "last_synced_at"] + read_only_fields = ["environment", "count", "last_synced_at"] + + class ConditionSerializer(serializers.ModelSerializer[Condition]): delete = serializers.BooleanField( write_only=True, @@ -82,6 +94,7 @@ class Meta: class SegmentSerializer(MetadataSerializerMixin, WritableNestedModelSerializer): rules = SegmentRuleSerializer(many=True, required=True, allow_empty=False) metadata = MetadataSerializer(required=False, many=True) + memberships = SegmentMembershipSerializer(many=True, read_only=True) def __init__(self, *args: Any, **kwargs: Any) -> None: """ @@ -112,7 +125,9 @@ class Meta: "version_of", "rules", "metadata", + "memberships", ] + read_only_fields = ["memberships"] def validate(self, attrs: dict[str, Any]) -> dict[str, Any]: attrs = super().validate(attrs) diff --git a/api/segments/views.py b/api/segments/views.py index ca8f421eb03b..e80fb360b4ba 100644 --- a/api/segments/views.py +++ b/api/segments/views.py @@ -108,6 +108,7 @@ def get_queryset(self): # type: ignore[no-untyped-def] # TODO: at the moment, the UI only shows the name and description of the segment in the list view. # we shouldn't return all of the rules and conditions in the list view. queryset = queryset.prefetch_related( + "memberships", "rules", "rules__conditions", "rules__rules", diff --git a/api/tests/integration/segments/__init__.py b/api/tests/integration/segments/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/tests/integration/segments/test_segment_membership_field.py b/api/tests/integration/segments/test_segment_membership_field.py new file mode 100644 index 000000000000..ad2bf9b32603 --- /dev/null +++ b/api/tests/integration/segments/test_segment_membership_field.py @@ -0,0 +1,65 @@ +from datetime import datetime, timezone +from typing import Any + +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APIClient + +from segment_membership.models import SegmentMembership + + +def test_get_segment__no_memberships__returns_empty_list( + admin_client: APIClient, + project: int, + segment: int, +) -> None: + # Given a segment with no materialised SegmentMembership rows + # (the daily refresh has not yet run for this org, or the FoF flag + # is off so the refresh task skips it) + # When the segment is fetched + response = admin_client.get( + reverse( + "api-v1:projects:project-segments-detail", + args=[project, segment], + ) + ) + + # Then the memberships field is present and empty + assert response.status_code == status.HTTP_200_OK + body: dict[str, Any] = response.json() + assert body["memberships"] == [] + + +def test_get_segment__one_membership_per_environment__returns_per_env_counts( + admin_client: APIClient, + project: int, + segment: int, + environment: int, +) -> None: + # Given one SegmentMembership row in this segment's environment + synced_at = datetime(2026, 5, 1, tzinfo=timezone.utc) + SegmentMembership.objects.create( + segment_id=segment, + environment_id=environment, + count=42, + last_synced_at=synced_at, + ) + + # When the segment is fetched + response = admin_client.get( + reverse( + "api-v1:projects:project-segments-detail", + args=[project, segment], + ) + ) + + # Then the memberships field carries one entry keyed by environment id + assert response.status_code == status.HTTP_200_OK + body: dict[str, Any] = response.json() + assert body["memberships"] == [ + { + "environment": environment, + "count": 42, + "last_synced_at": synced_at.isoformat().replace("+00:00", "Z"), + } + ] diff --git a/api/tests/unit/segment_membership/__init__.py b/api/tests/unit/segment_membership/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py new file mode 100644 index 000000000000..fa6243a86709 --- /dev/null +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_mappers.py @@ -0,0 +1,150 @@ +from decimal import Decimal + +import pytest +from flagsmith_schemas.dynamodb import Identity as DynamoIdentity + +from segment_membership.mappers import map_identity_document_to_snowflake_row + +UUID_A = "f47ac10b-58cc-4372-a567-0e02b2c3d479" +UUID_B = "550e8400-e29b-41d4-a716-446655440000" + + +@pytest.mark.parametrize( + "doc,expected", + [ + pytest.param( + { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [ + {"trait_key": "plan", "trait_value": "growth"}, + ], + }, + ("env-key", "alice", "env_x_alice", {"plan": "growth"}), + id="single string trait", + ), + pytest.param( + { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + }, + ("env-key", "alice", "env_x_alice", None), + id="empty traits collapse to NULL", + ), + pytest.param( + { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [ + {"trait_key": "age", "trait_value": Decimal("18")}, + ], + }, + ("env-key", "alice", "env_x_alice", {"age": 18}), + id="whole-number Decimal narrows to int", + ), + pytest.param( + { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [ + {"trait_key": "score", "trait_value": Decimal("1.5")}, + ], + }, + ("env-key", "alice", "env_x_alice", {"score": 1.5}), + id="fractional Decimal narrows to float", + ), + pytest.param( + { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [ + {"trait_key": "plan", "trait_value": "growth"}, + {"trait_key": "team", "trait_value": "alpha"}, + ], + }, + ( + "env-key", + "alice", + "env_x_alice", + {"plan": "growth", "team": "alpha"}, + ), + id="multiple traits flatten to a single dict", + ), + ], +) +def test_map_identity_document_to_snowflake_row__cases__return_expected( + doc: DynamoIdentity, + expected: tuple[str, str, str, dict[str, object] | None], +) -> None: + # Given a Dynamo identity document + # When mapped onto an IDENTITIES row + env_id, _id, identifier, identity_key, traits = ( + map_identity_document_to_snowflake_row("env-key", doc) + ) + + # Then non-id columns line up positionally with the IDENTITIES schema + assert (env_id, identifier, identity_key, traits) == expected + # ...and the id column is a stable signed 64-bit projection of the UUID + assert -(2**63) <= _id < 2**63 + + +def test_map_identity_document_to_snowflake_row__same_uuid__same_id() -> None: + # Given two documents sharing an identity_uuid + doc: DynamoIdentity = { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + } + + # When mapped twice + a = map_identity_document_to_snowflake_row("env-a", doc) + b = map_identity_document_to_snowflake_row("env-b", doc) + + # Then the id projection is stable across calls + assert a[1] == b[1] + + +def test_map_identity_document_to_snowflake_row__different_uuid__different_id() -> None: + # Given two documents with distinct identity_uuids + doc_a: DynamoIdentity = { + "identity_uuid": UUID_A, + "identifier": "alice", + "environment_api_key": "env-key", + "composite_key": "env_x_alice", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + } + doc_b: DynamoIdentity = { + "identity_uuid": UUID_B, + "identifier": "bob", + "environment_api_key": "env-key", + "composite_key": "env_x_bob", + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + } + + # When mapped + a = map_identity_document_to_snowflake_row("env-key", doc_a) + b = map_identity_document_to_snowflake_row("env-key", doc_b) + + # Then the id projections are distinct + assert a[1] != b[1] diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_migration.py b/api/tests/unit/segment_membership/test_unit_segment_membership_migration.py new file mode 100644 index 000000000000..a969906477e6 --- /dev/null +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_migration.py @@ -0,0 +1,51 @@ +import importlib +from unittest.mock import MagicMock + +from flagsmith_sql_flag_engine.dialects import SnowflakeDialect +from pytest_mock import MockerFixture +from snowflake.snowpark import Session + +# Migration module names start with a digit, which `import` can't parse; +# `importlib.import_module` is the only way in. +migration_module = importlib.import_module( + "segment_membership.migrations.0002_setup_snowflake_identities_schema" +) + + +def test_setup_snowflake_identities_schema__unconfigured__skips( + mocker: MockerFixture, +) -> None: + # Given Snowflake settings unconfigured + mocker.patch.object( + migration_module, + "is_snowflake_configured", + return_value=False, + ) + open_sess = mocker.patch.object(migration_module, "open_snowflake_session") + + # When the migration's RunPython entry runs + migration_module.setup_snowflake_identities_schema(MagicMock(), MagicMock()) + + # Then it short-circuits without opening a session + open_sess.assert_not_called() + + +def test_setup_snowflake_identities_schema__configured__runs_dialect_ddl( + mocker: MockerFixture, +) -> None: + # Given Snowflake configured and a mocked Snowpark session + mocker.patch.object( + migration_module, + "is_snowflake_configured", + return_value=True, + ) + sess = MagicMock(spec=Session) + open_sess = mocker.patch.object(migration_module, "open_snowflake_session") + open_sess.return_value.__enter__.return_value = sess + + # When the migration's RunPython entry runs + migration_module.setup_snowflake_identities_schema(MagicMock(), MagicMock()) + + # Then the dialect's schema DDL was executed against the session + sess.sql.assert_called_once_with(SnowflakeDialect.schema_ddl) + sess.sql.return_value.collect.assert_called_once_with() diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_services.py b/api/tests/unit/segment_membership/test_unit_segment_membership_services.py new file mode 100644 index 000000000000..5612ad969827 --- /dev/null +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_services.py @@ -0,0 +1,245 @@ +from unittest.mock import MagicMock + +from pytest_django.fixtures import SettingsWrapper +from pytest_mock import MockerFixture + +from environments.models import Environment +from organisations.models import Organisation +from projects.models import Project +from segment_membership import services +from segment_membership.services import ( + compute_segment_counts_for_project, + get_projects_to_process, + is_membership_enabled, + is_snowflake_configured, + open_snowflake_session, +) +from segments.models import Segment, SegmentRule +from tests.types import EnableFeaturesFixture + + +def test_is_membership_enabled__flag_off__returns_false( + organisation: Organisation, +) -> None: + # Given the FoF flag is not enabled (default state of the test + # OpenFeature provider) + # When the helper resolves the flag for the organisation + # Then it returns False + assert is_membership_enabled(organisation) is False + + +def test_is_membership_enabled__flag_on__returns_true( + organisation: Organisation, + enable_features: EnableFeaturesFixture, +) -> None: + # Given the FoF flag is enabled + enable_features("segment_membership_inspection") + + # When the helper resolves the flag + # Then it returns True + assert is_membership_enabled(organisation) is True + + +def test_is_snowflake_configured__all_set__returns_true( + settings: SettingsWrapper, +) -> None: + # Given every required SNOWFLAKE_* setting is populated + settings.SNOWFLAKE_ACCOUNT = "acc" + settings.SNOWFLAKE_USER = "u" + settings.SNOWFLAKE_PRIVATE_KEY_PATH = "/key" + settings.SNOWFLAKE_DATABASE = "db" + settings.SNOWFLAKE_SCHEMA = "sch" + settings.SNOWFLAKE_WAREHOUSE = "wh" + + # When checked + # Then the helper reports the feature configured + assert is_snowflake_configured() is True + + +def test_is_snowflake_configured__missing_account__returns_false( + settings: SettingsWrapper, +) -> None: + # Given one required setting is unset + settings.SNOWFLAKE_ACCOUNT = None + settings.SNOWFLAKE_USER = "u" + settings.SNOWFLAKE_PRIVATE_KEY_PATH = "/key" + settings.SNOWFLAKE_DATABASE = "db" + settings.SNOWFLAKE_SCHEMA = "sch" + settings.SNOWFLAKE_WAREHOUSE = "wh" + + # When checked + # Then the helper reports the feature unconfigured + assert is_snowflake_configured() is False + + +def test_open_snowflake_session__configured__yields_session_and_closes( + mocker: MockerFixture, + settings: SettingsWrapper, +) -> None: + # Given populated SNOWFLAKE_* settings and a mocked Snowpark builder + settings.SNOWFLAKE_ACCOUNT = "acc" + settings.SNOWFLAKE_USER = "u" + settings.SNOWFLAKE_ROLE = "ACCOUNTADMIN" + settings.SNOWFLAKE_WAREHOUSE = "wh" + settings.SNOWFLAKE_DATABASE = "db" + settings.SNOWFLAKE_SCHEMA = "sch" + settings.SNOWFLAKE_PRIVATE_KEY_PATH = "/key" + + fake_session = MagicMock() + builder = MagicMock() + builder.configs.return_value.create.return_value = fake_session + mocker.patch.object(services, "Session", MagicMock(builder=builder)) + + # When the context manager is entered and exited + with open_snowflake_session() as sess: + # Then it yields the underlying Snowpark session... + assert sess is fake_session + + # ...and closes it on exit + fake_session.close.assert_called_once_with() + + +def test_get_projects_to_process__no_canonical_segments__yields_nothing( + project: Project, +) -> None: + # Given a project with no canonical segments + # When iterating projects to process + # Then nothing is yielded + assert list(get_projects_to_process()) == [] + + +def test_get_projects_to_process__ff_disabled__skips_organisation( + project: Project, + segment: Segment, +) -> None: + # Given a project with a canonical segment but FoF flag off + # When iterating projects to process + # Then the project is skipped + assert list(get_projects_to_process()) == [] + + +def test_get_projects_to_process__ff_enabled__yields_project( + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a project with a canonical segment and the FoF flag on + enable_features("segment_membership_inspection") + + # When iterating projects to process + # Then the project is yielded + assert list(get_projects_to_process()) == [project] + + +def test_compute_segment_counts_for_project__no_segments__returns_empty( + project: Project, +) -> None: + # Given a project with no canonical segments + sess = MagicMock() + + # When counts are computed + result = compute_segment_counts_for_project(project, sess) + + # Then the result is empty and Snowflake was not queried + assert result == [] + sess.sql.assert_not_called() + + +def test_compute_segment_counts_for_project__no_environments__returns_empty( + project: Project, + segment: Segment, +) -> None: + # Given a project with a segment but no environments + project.environments.all().delete() + sess = MagicMock() + + # When counts are computed + result = compute_segment_counts_for_project(project, sess) + + # Then the result is empty and Snowflake was not queried + assert result == [] + sess.sql.assert_not_called() + + +def test_compute_segment_counts_for_project__one_segment__returns_membership_instances( + project: Project, + environment: Environment, + segment: Segment, + segment_rule: SegmentRule, + mocker: MockerFixture, +) -> None: + # Given a project with one segment, one environment, and a stubbed + # SQL translator that emits a trivial predicate + mocker.patch( + "segment_membership.services.translate_segment", + return_value="TRUE", + ) + sess = MagicMock() + sess.sql.return_value.collect.return_value = [ + {"SEGMENT_ID": segment.id, "ENV_KEY": environment.api_key, "C": 7} + ] + + # When counts are computed + result = compute_segment_counts_for_project(project, sess) + + # Then Snowflake was queried once, the predicate landed in the SQL, + # and the row decodes into an unsaved SegmentMembership keyed by + # (segment, environment) — last_synced_at left for the caller + assert len(result) == 1 + [membership] = result + assert membership.segment_id == segment.id + assert membership.environment_id == environment.id + assert membership.count == 7 + assert membership.last_synced_at is None + sess.sql.assert_called_once() + sql = sess.sql.call_args.args[0] + assert f"SELECT {segment.id} AS segment_id" in sql + assert "GROUP BY i.environment_id" in sql + + +def test_compute_segment_counts_for_project__unknown_env_key_in_row__skips( + project: Project, + environment: Environment, + segment: Segment, + segment_rule: SegmentRule, + mocker: MockerFixture, +) -> None: + # Given a Snowflake row whose env_key isn't in this project — would + # only happen via stale/cross-project data, but we defend against it + mocker.patch( + "segment_membership.services.translate_segment", + return_value="TRUE", + ) + sess = MagicMock() + sess.sql.return_value.collect.return_value = [ + {"SEGMENT_ID": segment.id, "ENV_KEY": "ghost-env", "C": 99} + ] + + # When counts are computed + result = compute_segment_counts_for_project(project, sess) + + # Then the unknown-env row is skipped, no spurious membership emitted + assert result == [] + + +def test_compute_segment_counts_for_project__untranslatable_segment__skips( + project: Project, + environment: Environment, + segment: Segment, + segment_rule: SegmentRule, + mocker: MockerFixture, +) -> None: + # Given a project with a segment whose predicate the translator can't compile + mocker.patch( + "segment_membership.services.translate_segment", + return_value=None, + ) + sess = MagicMock() + + # When counts are computed + result = compute_segment_counts_for_project(project, sess) + + # Then the segment is skipped entirely (no row, not even count = 0) + # and Snowflake is not queried at all + assert result == [] + sess.sql.assert_not_called() diff --git a/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py new file mode 100644 index 000000000000..720c78a327f6 --- /dev/null +++ b/api/tests/unit/segment_membership/test_unit_segment_membership_tasks.py @@ -0,0 +1,290 @@ +from unittest.mock import MagicMock + +from pytest_mock import MockerFixture +from pytest_structlog import StructuredLogCapture + +from environments.models import Environment +from projects.models import Project +from segment_membership import tasks +from segment_membership.models import SegmentMembership +from segment_membership.tasks import ( + backfill_identities_to_snowflake, + refresh_project_segment_counts, +) +from segments.models import Segment +from tests.types import EnableFeaturesFixture + + +def test_backfill_identities_to_snowflake__no_snowflake_creds__skips( + mocker: MockerFixture, + log: StructuredLogCapture, +) -> None: + # Given Snowflake settings unconfigured + mocker.patch.object(tasks, "is_snowflake_configured", return_value=False) + spy = mocker.patch.object(tasks, "open_snowflake_session") + + # When the task runs + backfill_identities_to_snowflake() + + # Then it short-circuits without opening a session + spy.assert_not_called() + assert any(e["event"] == "backfill.skipped" for e in log.events) + + +def test_backfill_identities_to_snowflake__dynamo_disabled__skips( + mocker: MockerFixture, +) -> None: + # Given Snowflake configured but Dynamo wrapper disabled + mocker.patch.object(tasks, "is_snowflake_configured", return_value=True) + spy = mocker.patch.object(tasks, "open_snowflake_session") + mocker.patch.object( + tasks, + "DynamoIdentityWrapper", + return_value=MagicMock(is_enabled=False), + ) + + # When the task runs + backfill_identities_to_snowflake() + + # Then it skips without opening a session + spy.assert_not_called() + + +def test_backfill_identities_to_snowflake__happy_path__deletes_then_inserts( + mocker: MockerFixture, + project: Project, + environment: Environment, + segment: Segment, + enable_features: EnableFeaturesFixture, + log: StructuredLogCapture, +) -> None: + # Given a project with a canonical segment and a Dynamo wrapper + # yielding two identities for its environment + enable_features("segment_membership_inspection") + mocker.patch.object(tasks, "is_snowflake_configured", return_value=True) + sess = MagicMock() + mocker.patch.object( + tasks, "open_snowflake_session" + ).return_value.__enter__.return_value = sess + refresh_dispatch = mocker.patch.object(tasks, "refresh_project_segment_counts") + wrapper = MagicMock(is_enabled=True) + wrapper.iter_all_items_paginated.return_value = iter( + [ + { + "identity_uuid": "f47ac10b-58cc-4372-a567-0e02b2c3d479", + "identifier": "a", + "composite_key": "k1", + "environment_api_key": environment.api_key, + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + }, + { + "identity_uuid": "550e8400-e29b-41d4-a716-446655440000", + "identifier": "b", + "composite_key": "k2", + "environment_api_key": environment.api_key, + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + }, + ] + ) + mocker.patch.object(tasks, "DynamoIdentityWrapper", return_value=wrapper) + + # When the task runs + backfill_identities_to_snowflake() + + # Then DELETE binds the env api key as a parameter and the identities + # are written via the Snowpark DataFrame writer + delete_calls = [ + call + for call in sess.sql.call_args_list + if call.args and call.args[0].startswith("DELETE FROM IDENTITIES") + ] + assert len(delete_calls) == 1 + assert delete_calls[0].kwargs == {"params": [environment.api_key]} + + sess.create_dataframe.assert_called_once() + rows_arg = sess.create_dataframe.call_args.args[0] + assert {row[0] for row in rows_arg} == {environment.api_key} + assert {row[2] for row in rows_arg} == {"a", "b"} + sess.create_dataframe.return_value.write.mode.assert_called_once_with("append") + sess.create_dataframe.return_value.write.mode.return_value.save_as_table.assert_called_once_with( + "IDENTITIES" + ) + assert any( + e["event"] == "backfill.environment.completed" and e["rows__count"] == 2 + for e in log.events + ) + # And a per-project count refresh is dispatched once the backfill + # finishes. + refresh_dispatch.delay.assert_called_once_with(args=(project.id,)) + + +def test_backfill_identities_to_snowflake__insert_fails__logs_and_continues( + mocker: MockerFixture, + project: Project, + environment: Environment, + segment: Segment, + enable_features: EnableFeaturesFixture, + log: StructuredLogCapture, +) -> None: + # Given the DataFrame write blows up mid-batch + enable_features("segment_membership_inspection") + mocker.patch.object(tasks, "is_snowflake_configured", return_value=True) + sess = MagicMock() + sess.create_dataframe.side_effect = RuntimeError("boom") + mocker.patch.object( + tasks, "open_snowflake_session" + ).return_value.__enter__.return_value = sess + wrapper = MagicMock(is_enabled=True) + wrapper.iter_all_items_paginated.return_value = iter( + [ + { + "identity_uuid": "f47ac10b-58cc-4372-a567-0e02b2c3d479", + "identifier": "a", + "composite_key": "k1", + "environment_api_key": environment.api_key, + "created_date": "2026-05-08T00:00:00Z", + "identity_traits": [], + } + ] + ) + mocker.patch.object(tasks, "DynamoIdentityWrapper", return_value=wrapper) + + # When the task runs + backfill_identities_to_snowflake() + + # Then the failure is logged and the loop continues + assert any(e["event"] == "backfill.environment.failed" for e in log.events) + + +def test_backfill_identities_to_snowflake__multiple_projects__fans_out_refresh_per_project( + mocker: MockerFixture, + project: Project, + project_b: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given two FoF-enabled projects with canonical segments + enable_features("segment_membership_inspection") + Segment.objects.create(name="seg-b", project=project_b) + mocker.patch.object(tasks, "is_snowflake_configured", return_value=True) + sess = MagicMock() + mocker.patch.object( + tasks, "open_snowflake_session" + ).return_value.__enter__.return_value = sess + refresh_dispatch = mocker.patch.object(tasks, "refresh_project_segment_counts") + wrapper = MagicMock(is_enabled=True) + wrapper.iter_all_items_paginated.return_value = iter([]) + mocker.patch.object(tasks, "DynamoIdentityWrapper", return_value=wrapper) + + # When the backfill runs + backfill_identities_to_snowflake() + + # Then a per-project refresh is dispatched for each project we + # actually processed (deduped) — once per project, not once per env + dispatched_ids = { + call.kwargs["args"][0] for call in refresh_dispatch.delay.call_args_list + } + assert dispatched_ids == {project.id, project_b.id} + + +def test_refresh_project_segment_counts__no_snowflake_creds__skips( + mocker: MockerFixture, + project: Project, + log: StructuredLogCapture, +) -> None: + # Given Snowflake unconfigured + mocker.patch.object(tasks, "is_snowflake_configured", return_value=False) + spy = mocker.patch.object(tasks, "open_snowflake_session") + + # When the per-project task runs + refresh_project_segment_counts(project.id) + + # Then it short-circuits without opening a session + spy.assert_not_called() + assert any( + e["event"] == "refresh.project.skipped" + and e["reason"] == "snowflake_not_configured" + for e in log.events + ) + + +def test_refresh_project_segment_counts__ff_disabled__skips( + mocker: MockerFixture, + project: Project, + log: StructuredLogCapture, +) -> None: + # Given Snowflake configured but FoF flag off (default) + mocker.patch.object(tasks, "is_snowflake_configured", return_value=True) + spy = mocker.patch.object(tasks, "open_snowflake_session") + + # When the per-project task runs + refresh_project_segment_counts(project.id) + + # Then it skips without opening a session + spy.assert_not_called() + assert any( + e["event"] == "refresh.project.skipped" and e["reason"] == "ff_disabled" + for e in log.events + ) + + +def test_refresh_project_segment_counts__compute_fails__logs( + mocker: MockerFixture, + project: Project, + segment: Segment, + enable_features: EnableFeaturesFixture, + log: StructuredLogCapture, +) -> None: + # Given a project where count compute throws + enable_features("segment_membership_inspection") + mocker.patch.object(tasks, "is_snowflake_configured", return_value=True) + sess = MagicMock() + mocker.patch.object( + tasks, "open_snowflake_session" + ).return_value.__enter__.return_value = sess + mocker.patch.object( + tasks, "compute_segment_counts_for_project", side_effect=RuntimeError("boom") + ) + + # When the per-project task runs + refresh_project_segment_counts(project.id) + + # Then the failure is logged + assert any(e["event"] == "refresh.project.failed" for e in log.events) + + +def test_refresh_project_segment_counts__counts_returned__upserts_per_env_rows( + mocker: MockerFixture, + project: Project, + environment: Environment, + segment: Segment, + enable_features: EnableFeaturesFixture, +) -> None: + # Given a project with a canonical segment and stubbed compute + enable_features("segment_membership_inspection") + mocker.patch.object(tasks, "is_snowflake_configured", return_value=True) + sess = MagicMock() + mocker.patch.object( + tasks, "open_snowflake_session" + ).return_value.__enter__.return_value = sess + mocker.patch.object( + tasks, + "compute_segment_counts_for_project", + return_value=[ + SegmentMembership( + segment_id=segment.id, + environment_id=environment.id, + count=42, + ), + ], + ) + + # When the per-project task runs + refresh_project_segment_counts(project.id) + + # Then a SegmentMembership row exists keyed by (segment, environment) + membership = SegmentMembership.objects.get(segment=segment, environment=environment) + assert membership.count == 42 + assert membership.last_synced_at is not None diff --git a/api/tests/unit/segments/test_unit_segments_views.py b/api/tests/unit/segments/test_unit_segments_views.py index 80cb26e3679a..c015d2e348f1 100644 --- a/api/tests/unit/segments/test_unit_segments_views.py +++ b/api/tests/unit/segments/test_unit_segments_views.py @@ -594,8 +594,8 @@ def test_get_segment_by_uuid__existing_segment__returns_segment_data( # type: i @pytest.mark.parametrize( "client, num_queries", [ - (lazy_fixture("admin_master_api_key_client"), 12), - (lazy_fixture("admin_client"), 14), + (lazy_fixture("admin_master_api_key_client"), 13), + (lazy_fixture("admin_client"), 15), ], ) def test_list_segments__without_rbac__expected_num_queries( @@ -651,8 +651,8 @@ def test_list_segments__system_segment_exists__excludes_system_segment( @pytest.mark.parametrize( "client, num_queries", [ - (lazy_fixture("admin_master_api_key_client"), 12), - (lazy_fixture("admin_client"), 15), + (lazy_fixture("admin_master_api_key_client"), 13), + (lazy_fixture("admin_client"), 16), ], ) def test_list_segments__with_rbac__expected_num_queries( diff --git a/api/tests/unit/util/test_util.py b/api/tests/unit/util/test_util.py index 4f35ca8c098a..4aa75833918c 100644 --- a/api/tests/unit/util/test_util.py +++ b/api/tests/unit/util/test_util.py @@ -1,6 +1,6 @@ import pytest -from util.util import iter_chunked_concat, iter_paired_chunks +from util.util import batched, iter_chunked_concat, iter_paired_chunks def test_iter_paired_chunks__both_empty__returns_empty_list() -> None: @@ -121,3 +121,24 @@ def test_iter_chunked_concat__various_inputs__returns_expected_chunks( # Then assert list(result) == expected_result + + +def test_batched__empty_iterable__yields_nothing() -> None: + # Given an empty iterable + # When batched + # Then no batches are yielded + assert list(batched([], 3)) == [] + + +def test_batched__exact_multiple__yields_full_batches() -> None: + # Given an iterable whose length is a multiple of the batch size + # When batched + # Then every batch is full + assert list(batched(range(6), 2)) == [[0, 1], [2, 3], [4, 5]] + + +def test_batched__remainder__yields_smaller_final_batch() -> None: + # Given an iterable whose length isn't a multiple of the batch size + # When batched + # Then the final batch carries the remainder + assert list(batched([1, 2, 3, 4, 5], 2)) == [[1, 2], [3, 4], [5]] diff --git a/api/util/util.py b/api/util/util.py index 37cded499214..157fb11e3170 100644 --- a/api/util/util.py +++ b/api/util/util.py @@ -93,3 +93,13 @@ def truncate( separated by a delimiter. """ return delimiter.join([value[:ends_len], value[-ends_len:]]) + + +def batched(iterable: Iterable[T], size: int) -> Generator[list[T], None, None]: + """Yield consecutive batches of `size` items from `iterable`. The + final batch may be smaller. + + Backport from Python 3.12.""" + iterator = iter(iterable) + while batch := list(islice(iterator, size)): + yield batch diff --git a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md index b7e979e6b046..af9b0fff05cb 100644 --- a/docs/docs/deployment-self-hosting/observability/_events-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_events-catalogue.md @@ -327,10 +327,75 @@ Logged at `warning` from: Attributes: +### `segment_membership.backfill.environment.completed` + +Logged at `info` from: + - `api/segment_membership/tasks.py:132` + +Attributes: + - `environment.id` + - `project.id` + - `rows.count` + +### `segment_membership.backfill.environment.failed` + +Logged at `exception` from: + - `api/segment_membership/tasks.py:125` + +Attributes: + - `environment.id` + - `project.id` + +### `segment_membership.backfill.skipped` + +Logged at `info` from: + - `api/segment_membership/tasks.py:84` + - `api/segment_membership/tasks.py:89` + +Attributes: + - `reason` + +### `segment_membership.compute.segment.skipped` + +Logged at `error` from: + - `api/segment_membership/services.py:129` + +Attributes: + - `project.id` + - `reason` + - `segment.id` + +### `segment_membership.refresh.project.completed` + +Logged at `info` from: + - `api/segment_membership/tasks.py:195` + +Attributes: + - `memberships.count` + - `project.id` + +### `segment_membership.refresh.project.failed` + +Logged at `exception` from: + - `api/segment_membership/tasks.py:183` + +Attributes: + - `project.id` + +### `segment_membership.refresh.project.skipped` + +Logged at `info` from: + - `api/segment_membership/tasks.py:154` + - `api/segment_membership/tasks.py:163` + +Attributes: + - `project.id` + - `reason` + ### `segments.serializers.segment_revision_created` Logged at `info` from: - - `api/segments/serializers.py:142` + - `api/segments/serializers.py:157` Attributes: - `revision_id` diff --git a/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md b/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md index 6cae297b29e4..f90a72b8a5c7 100644 --- a/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md +++ b/docs/docs/deployment-self-hosting/observability/_metrics-catalogue.md @@ -70,6 +70,38 @@ Labels: - `method` - `response_status` +### `flagsmith_segment_membership_backfill_duration_seconds` + +Histogram. + +Duration of a segment-membership backfill for one environment. + +Labels: + +### `flagsmith_segment_membership_backfill_identities` + +Counter. + +Total identities mirrored from Dynamo to Snowflake by the segment-membership backfill task across all environments. + +Labels: + +### `flagsmith_segment_membership_refresh_duration_seconds` + +Histogram. + +Duration of a single segment-membership count-refresh run for one project. + +Labels: + +### `flagsmith_segment_membership_refresh_failures` + +Counter. + +Total segment-membership refresh runs that failed for any reason. + +Labels: + ### `flagsmith_task_processor_enqueued_tasks` Counter.