forked from NVIDIA-AI-Blueprints/vulnerability-analysis
-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathcve_checklist.py
More file actions
88 lines (70 loc) · 3.98 KB
/
cve_checklist.py
File metadata and controls
88 lines (70 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import pandas as pd
from aiq.builder.builder import Builder
from aiq.builder.framework_enum import LLMFrameworkEnum
from aiq.builder.function_info import FunctionInfo
from aiq.cli.register_workflow import register_function
from aiq.data_models.function import FunctionBaseConfig
from aiq.data_models.component_ref import FunctionRef
from pydantic import Field
from exploit_iq_commons.utils import data_utils
from exploit_iq_commons.logging.loggers_factory import LoggingFactory, trace_id
logger = LoggingFactory.get_agent_logger(__name__)
class CVEChecklistToolConfig(FunctionBaseConfig, name="cve_checklist"):
"""
Defines a function that generates tailored, context-sensitive task checklist for impact analysis.
"""
llm_name: str = Field(description="The LLM model to use")
agent_name: FunctionRef = Field(
default="cve_agent_executor",
description="Name of agent function to get tool configuration from"
)
prompt: str | None = Field(
default=None,
description=
"Manually set the prompt for the specific model in the configuration. The prompt can either be passed in as a "
"string of text or as a path to a text file containing the desired prompting.")
@register_function(config_type=CVEChecklistToolConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN])
async def cve_checklist(config: CVEChecklistToolConfig, builder: Builder):
from vuln_analysis.data_models.state import AgentMorpheusEngineState
from vuln_analysis.utils.checklist_prompt_generator import _parse_list
from vuln_analysis.utils.checklist_prompt_generator import generate_checklist
llm = await builder.get_llm(llm_name=config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN)
# Get agent tool configuration
agent_config = builder.get_function_config(config.agent_name)
agent_tool_names = agent_config.tool_names if hasattr(agent_config, 'tool_names') else None
async def generate_checklist_for_cve(cve_intel, ecosystem: str = ""):
checklist = await generate_checklist(prompt=config.prompt,
llm=llm,
input_dict=cve_intel,
tool_names=agent_tool_names,
enable_llm_list_parsing=False,
ecosystem=ecosystem)
checklist = await _parse_list([checklist])
return cve_intel["vuln_id"], checklist[0]
async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState:
trace_id.set(state.original_input.input.scan.id)
ecosystem = state.original_input.input.image.ecosystem.value if state.original_input and state.original_input.input.image.ecosystem else ""
intel_df = data_utils.merge_intel_and_plugin_data_convert_to_dataframe(state.cve_intel)
workflow_cve_intel = intel_df.to_dict(orient='records')
results = await asyncio.gather(*(generate_checklist_for_cve(cve_intel, ecosystem=ecosystem) for cve_intel in workflow_cve_intel))
state.checklist_plans = dict(results)
return state
yield FunctionInfo.from_fn(
_arun,
input_schema=AgentMorpheusEngineState,
description=("Generates tailored, context-sensitive task checklist for impact analysis."))