-
Notifications
You must be signed in to change notification settings - Fork 180
Consolidate airlock storage architecture and implement metadata-based management #4853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 49 commits
57b3d70
5bbe648
1d2172e
fa39c85
138820b
8941b1b
47dcdc8
427515d
b09f990
231f434
76a3d62
cd70948
f08b384
5211f36
76a09bd
2c6235b
0df7e5c
e375cf2
72c9478
2b66bb3
1d5b8ef
7638186
d490b5a
e20e33a
4f2fe0b
aa6c32a
85ab8af
bee6cdc
ff96ee5
e025056
b98ede1
4a9b185
8421bdb
34f2636
3d99220
ad73137
105f38b
7335c65
55f3590
b0c50e8
fcead34
bd14845
8b405ef
115e778
051ef76
98764f9
816e4e8
25c194e
90fc2d7
de87d71
a9125cb
d885d40
4992cfd
3b9dbd6
d3fa795
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -214,3 +214,4 @@ validation.txt | |
|
|
||
| /index.html | ||
| .DS_Store | ||
| *_old.tf | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,7 +9,7 @@ | |||||||||||
|
|
||||||||||||
| from exceptions import NoFilesInRequestException, TooManyFilesInRequestException | ||||||||||||
|
|
||||||||||||
| from shared_code import blob_operations, constants | ||||||||||||
| from shared_code import blob_operations, constants, airlock_storage_helper | ||||||||||||
| from pydantic import BaseModel, parse_obj_as | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
|
|
@@ -19,6 +19,8 @@ class RequestProperties(BaseModel): | |||||||||||
| previous_status: Optional[str] | ||||||||||||
| type: str | ||||||||||||
| workspace_id: str | ||||||||||||
| review_workspace_id: Optional[str] = None | ||||||||||||
| airlock_version: int = 1 | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| class ContainersCopyMetadata: | ||||||||||||
|
|
@@ -31,6 +33,8 @@ def __init__(self, source_account_name: str, dest_account_name: str): | |||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def main(msg: func.ServiceBusMessage, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent]): | ||||||||||||
| request_properties = None | ||||||||||||
| request_files = None | ||||||||||||
| try: | ||||||||||||
| request_properties = extract_properties(msg) | ||||||||||||
| request_files = get_request_files(request_properties) if request_properties.new_status == constants.STAGE_SUBMITTED else None | ||||||||||||
|
|
@@ -53,13 +57,25 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent | |||||||||||
|
|
||||||||||||
| logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, request_type) | ||||||||||||
|
|
||||||||||||
| # Check if using metadata-based stage management (v2) or legacy per-stage accounts (v1) | ||||||||||||
| use_metadata = request_properties.airlock_version >= 2 | ||||||||||||
|
|
||||||||||||
| if new_status == constants.STAGE_DRAFT: | ||||||||||||
| account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id) | ||||||||||||
| blob_operations.create_container(account_name, req_id) | ||||||||||||
| if use_metadata: | ||||||||||||
| from shared_code.blob_operations_metadata import create_container_with_metadata | ||||||||||||
| account_name = airlock_storage_helper.get_storage_account_name_for_request(request_type, new_status, ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| stage = airlock_storage_helper.get_stage_from_status(request_type, new_status) | ||||||||||||
| create_container_with_metadata(account_name, req_id, stage, workspace_id=ws_id, request_type=request_type) | ||||||||||||
| else: | ||||||||||||
| account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id) | ||||||||||||
| blob_operations.create_container(account_name, req_id) | ||||||||||||
| return | ||||||||||||
|
|
||||||||||||
| if new_status == constants.STAGE_CANCELLED: | ||||||||||||
| storage_account_name = get_storage_account(previous_status, request_type, ws_id) | ||||||||||||
| if use_metadata: | ||||||||||||
| storage_account_name = airlock_storage_helper.get_storage_account_name_for_request(request_type, previous_status, ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| else: | ||||||||||||
| storage_account_name = get_storage_account(previous_status, request_type, ws_id) | ||||||||||||
| container_to_delete_url = blob_operations.get_blob_url(account_name=storage_account_name, container_name=req_id) | ||||||||||||
| set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url=container_to_delete_url) | ||||||||||||
| return | ||||||||||||
|
|
@@ -68,11 +84,58 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent | |||||||||||
| set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files) | ||||||||||||
|
|
||||||||||||
| if (is_require_data_copy(new_status)): | ||||||||||||
| logging.info('Request with id %s. requires data copy between storage accounts', req_id) | ||||||||||||
| containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id) | ||||||||||||
| blob_operations.create_container(containers_metadata.dest_account_name, req_id) | ||||||||||||
| blob_operations.copy_data(containers_metadata.source_account_name, | ||||||||||||
| containers_metadata.dest_account_name, req_id) | ||||||||||||
| if use_metadata: | ||||||||||||
| # Metadata mode: Update container stage instead of copying | ||||||||||||
| from shared_code.blob_operations_metadata import update_container_stage, create_container_with_metadata | ||||||||||||
|
|
||||||||||||
| # For import submit, use review_workspace_id so data goes to review workspace storage | ||||||||||||
| effective_ws_id = ws_id | ||||||||||||
| if new_status == constants.STAGE_SUBMITTED and request_type.lower() == constants.IMPORT_TYPE and request_properties.review_workspace_id: | ||||||||||||
| effective_ws_id = request_properties.review_workspace_id | ||||||||||||
|
|
||||||||||||
| # Get the storage account (might change from core to workspace or vice versa) | ||||||||||||
| source_account = airlock_storage_helper.get_storage_account_name_for_request(request_type, previous_status, ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| dest_account = airlock_storage_helper.get_storage_account_name_for_request(request_type, new_status, effective_ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| new_stage = airlock_storage_helper.get_stage_from_status(request_type, new_status) | ||||||||||||
|
|
||||||||||||
| # Import approval_in_progress: source and dest differ (core → workspace), so copy is needed. | ||||||||||||
| # The general logic below handles this correctly via the source_account == dest_account check. | ||||||||||||
| if source_account == dest_account: | ||||||||||||
| # Same storage account - just update metadata | ||||||||||||
| logging.info(f'Request {req_id}: Updating container stage to {new_stage} (no copy needed)') | ||||||||||||
| update_container_stage(source_account, req_id, new_stage, changed_by='system') | ||||||||||||
| else: | ||||||||||||
| # Different storage account (e.g., core → workspace) - need to copy | ||||||||||||
| logging.info(f'Request {req_id}: Copying from {source_account} to {dest_account}') | ||||||||||||
| create_container_with_metadata(dest_account, req_id, new_stage, workspace_id=effective_ws_id, request_type=request_type) | ||||||||||||
| blob_operations.copy_data(source_account, dest_account, req_id) | ||||||||||||
|
|
||||||||||||
| # In metadata mode, there is no BlobCreatedTrigger to signal completion, | ||||||||||||
| # so we must send the step result event directly for terminal transitions. | ||||||||||||
| completion_status_map = { | ||||||||||||
| constants.STAGE_APPROVAL_INPROGRESS: constants.STAGE_APPROVED, | ||||||||||||
| constants.STAGE_REJECTION_INPROGRESS: constants.STAGE_REJECTED, | ||||||||||||
| constants.STAGE_BLOCKING_INPROGRESS: constants.STAGE_BLOCKED_BY_SCAN, | ||||||||||||
| } | ||||||||||||
| if new_status in completion_status_map: | ||||||||||||
| final_status = completion_status_map[new_status] | ||||||||||||
| logging.info(f'Request {req_id}: Metadata mode - sending step result for {new_status} -> {final_status}') | ||||||||||||
| stepResultEvent.set( | ||||||||||||
| func.EventGridOutputEvent( | ||||||||||||
| id=str(uuid.uuid4()), | ||||||||||||
| data={"completed_step": new_status, "new_status": final_status, "request_id": req_id}, | ||||||||||||
| subject=req_id, | ||||||||||||
| event_type="Airlock.StepResult", | ||||||||||||
| event_time=datetime.datetime.now(datetime.UTC), | ||||||||||||
| data_version=constants.STEP_RESULT_EVENT_DATA_VERSION)) | ||||||||||||
|
marrobi marked this conversation as resolved.
Outdated
|
||||||||||||
| else: | ||||||||||||
| # Legacy mode: Copy data between storage accounts | ||||||||||||
| logging.info('Request with id %s. requires data copy between storage accounts', req_id) | ||||||||||||
| review_ws_id = request_properties.review_workspace_id | ||||||||||||
| containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id, review_workspace_id=review_ws_id) | ||||||||||||
| blob_operations.create_container(containers_metadata.dest_account_name, req_id) | ||||||||||||
| blob_operations.copy_data(containers_metadata.source_account_name, | ||||||||||||
| containers_metadata.dest_account_name, req_id) | ||||||||||||
| return | ||||||||||||
|
|
||||||||||||
| # Other statuses which do not require data copy are dismissed as we don't need to do anything... | ||||||||||||
|
|
@@ -102,7 +165,7 @@ def is_require_data_copy(new_status: str): | |||||||||||
| return False | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata: | ||||||||||||
| def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str, review_workspace_id: str = None) -> ContainersCopyMetadata: | ||||||||||||
| # sanity | ||||||||||||
| if is_require_data_copy(new_status) is False: | ||||||||||||
| raise Exception("Given new status is not supported") | ||||||||||||
|
|
@@ -115,7 +178,7 @@ def get_source_dest_for_copy(new_status: str, previous_status: str, request_type | |||||||||||
| raise Exception(msg) | ||||||||||||
|
|
||||||||||||
| source_account_name = get_storage_account(previous_status, request_type, short_workspace_id) | ||||||||||||
| dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id) | ||||||||||||
| dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id, review_workspace_id=review_workspace_id) | ||||||||||||
| return ContainersCopyMetadata(source_account_name, dest_account_name) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
|
|
@@ -151,12 +214,14 @@ def get_storage_account(status: str, request_type: str, short_workspace_id: str) | |||||||||||
| raise Exception(error_message) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> str: | ||||||||||||
| def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str, review_workspace_id: str = None) -> str: | ||||||||||||
| tre_id = _get_tre_id() | ||||||||||||
|
|
||||||||||||
| if request_type == constants.IMPORT_TYPE: | ||||||||||||
| if new_status == constants.STAGE_SUBMITTED: | ||||||||||||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id | ||||||||||||
| # Import submit: copy to review workspace storage, or tre_id for legacy compatibility | ||||||||||||
| dest_id = review_workspace_id if review_workspace_id else tre_id | ||||||||||||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + dest_id | ||||||||||||
|
marrobi marked this conversation as resolved.
Comment on lines
+238
to
+240
|
||||||||||||
| # Import submit: copy to review workspace storage, or tre_id for legacy compatibility | |
| dest_id = review_workspace_id if review_workspace_id else tre_id | |
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + dest_id | |
| # Import submit: in legacy/v1 mode the in-progress storage account remains TRE-scoped. | |
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| __version__ = "0.8.9" | ||
| __version__ = "0.8.12" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| import os | ||
| from shared_code import constants | ||
|
|
||
|
|
||
| def get_storage_account_name_for_request(request_type: str, status: str, short_workspace_id: str, airlock_version: int = 1) -> str: | ||
| tre_id = os.environ.get("TRE_ID", "") | ||
|
|
||
| if airlock_version >= 2: | ||
| # Global workspace storage - all workspaces use same account | ||
| if request_type == constants.IMPORT_TYPE: | ||
| if status in [constants.STAGE_DRAFT, constants.STAGE_SUBMITTED, constants.STAGE_IN_REVIEW, | ||
| constants.STAGE_REJECTED, constants.STAGE_REJECTION_INPROGRESS, | ||
| constants.STAGE_BLOCKED_BY_SCAN, constants.STAGE_BLOCKING_INPROGRESS]: | ||
| # ALL core import stages in stalairlock | ||
| return constants.STORAGE_ACCOUNT_NAME_AIRLOCK_CORE + tre_id | ||
| else: # Approved, approval in progress | ||
| # Global workspace storage | ||
| return constants.STORAGE_ACCOUNT_NAME_AIRLOCK_WORKSPACE_GLOBAL + tre_id | ||
| else: # export | ||
| if status in [constants.STAGE_APPROVED, constants.STAGE_APPROVAL_INPROGRESS]: | ||
| # Export approved in core | ||
| return constants.STORAGE_ACCOUNT_NAME_AIRLOCK_CORE + tre_id | ||
| else: # Draft, submitted, in-review, rejected, blocked | ||
| # Global workspace storage | ||
| return constants.STORAGE_ACCOUNT_NAME_AIRLOCK_WORKSPACE_GLOBAL + tre_id | ||
| else: | ||
| # Legacy mode | ||
| if request_type == constants.IMPORT_TYPE: | ||
| if status == constants.STAGE_DRAFT: | ||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id | ||
| elif status in [constants.STAGE_SUBMITTED, constants.STAGE_IN_REVIEW, constants.STAGE_APPROVAL_INPROGRESS, | ||
| constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]: | ||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id | ||
| elif status == constants.STAGE_APPROVED: | ||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id | ||
| elif status == constants.STAGE_REJECTED: | ||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id | ||
| elif status == constants.STAGE_BLOCKED_BY_SCAN: | ||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id | ||
| else: # export | ||
| if status == constants.STAGE_DRAFT: | ||
| return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + short_workspace_id | ||
| elif status in [constants.STAGE_SUBMITTED, constants.STAGE_IN_REVIEW, constants.STAGE_APPROVAL_INPROGRESS, | ||
| constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]: | ||
| return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id | ||
| elif status == constants.STAGE_APPROVED: | ||
| return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id | ||
| elif status == constants.STAGE_REJECTED: | ||
| return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id | ||
| elif status == constants.STAGE_BLOCKED_BY_SCAN: | ||
| return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id | ||
|
|
||
|
|
||
| def get_stage_from_status(request_type: str, status: str) -> str: | ||
| if request_type == constants.IMPORT_TYPE: | ||
| if status == constants.STAGE_DRAFT: | ||
| return constants.STAGE_IMPORT_EXTERNAL | ||
| elif status in [constants.STAGE_SUBMITTED, constants.STAGE_IN_REVIEW]: | ||
| return constants.STAGE_IMPORT_IN_PROGRESS | ||
| elif status in [constants.STAGE_APPROVED, constants.STAGE_APPROVAL_INPROGRESS]: | ||
| return constants.STAGE_IMPORT_APPROVED | ||
| elif status in [constants.STAGE_REJECTED, constants.STAGE_REJECTION_INPROGRESS]: | ||
| return constants.STAGE_IMPORT_REJECTED | ||
| elif status in [constants.STAGE_BLOCKED_BY_SCAN, constants.STAGE_BLOCKING_INPROGRESS]: | ||
| return constants.STAGE_IMPORT_BLOCKED | ||
| else: # export | ||
| if status == constants.STAGE_DRAFT: | ||
| return constants.STAGE_EXPORT_INTERNAL | ||
| elif status in [constants.STAGE_SUBMITTED, constants.STAGE_IN_REVIEW]: | ||
| return constants.STAGE_EXPORT_IN_PROGRESS | ||
| elif status in [constants.STAGE_APPROVED, constants.STAGE_APPROVAL_INPROGRESS]: | ||
| return constants.STAGE_EXPORT_APPROVED | ||
| elif status in [constants.STAGE_REJECTED, constants.STAGE_REJECTION_INPROGRESS]: | ||
| return constants.STAGE_EXPORT_REJECTED | ||
| elif status in [constants.STAGE_BLOCKED_BY_SCAN, constants.STAGE_BLOCKING_INPROGRESS]: | ||
| return constants.STAGE_EXPORT_BLOCKED | ||
|
|
||
| return "unknown" |
Uh oh!
There was an error while loading. Please reload this page.