|
import subprocess |
|
import sys |
|
import json |
|
|
|
print("🔥 Installing required packages...") |
|
|
|
process = subprocess.Popen( |
|
[sys.executable, "-m", "pip", "install", "-q", "transformers", "retrying", "bitsandbytes", "accelerate", "peft", "torch"], |
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True |
|
) |
|
for line in process.stdout: |
|
print(line, end='') |
|
process.wait() |
|
print("🕵️ Packages seems to be fine, now loading model...") |
|
|
|
import ast |
|
import re |
|
from peft import PeftModel |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig |
|
from retrying import retry |
|
|
|
bnb_config = BitsAndBytesConfig(load_in_8bit=True) |
|
RunningInCOLAB = 'google.colab' in str(get_ipython()) if hasattr(__builtins__,'__IPYTHON__') else False |
|
|
|
|
|
_tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-Coder-7B-Instruct") |
|
|
|
|
|
base_model = AutoModelForCausalLM.from_pretrained( |
|
"Qwen/Qwen2.5-Coder-7B-Instruct", |
|
quantization_config=bnb_config if RunningInCOLAB else None, |
|
device_map="auto", |
|
) |
|
|
|
|
|
_model = PeftModel.from_pretrained( |
|
base_model, |
|
"SushantGautam/vulnerability_ativ0.1", |
|
device_map="auto", |
|
) |
|
|
|
def extract_dict(text): |
|
match = re.search(r"```python\n(.*?)\n```", text, re.DOTALL) |
|
return ast.literal_eval(match.group(1)) if match else {} |
|
|
|
@retry(stop_max_attempt_number=5) |
|
def generate(prompt, max_new_tokens=4000): |
|
model, tokenizer = _model, _tokenizer |
|
messages = [ |
|
{"role": "system", "content": "You are a cybersecurity expert specializing in CWE vulnerabilities in codes. Your responses must be accompanied by a python JSON."}, |
|
{"role": "user", "content": prompt}, |
|
] |
|
|
|
text = tokenizer.apply_chat_template( |
|
messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
|
|
model_inputs = tokenizer([text], return_tensors="pt").to(model.device) |
|
generated_ids = model.generate( |
|
**model_inputs, |
|
max_new_tokens=max_new_tokens, |
|
) |
|
generated_ids = [ |
|
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
|
] |
|
|
|
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
|
|
try: |
|
response_formatted = extract_dict(response) |
|
assert "code" in response_formatted or "vulnerability" in response_formatted |
|
if "vulnerability" in response_formatted: |
|
assert isinstance(response_formatted["vulnerability"], list) |
|
except: |
|
print('⚠️ Failed to extract dict properly from response. Retrying...') |
|
raise Exception(f"❌ Failed to extract dict properly from response after five tries: {response}") |
|
return {"raw": response, "extracted": response_formatted} |
|
|
|
print("🎉 Model and tokenizer loaded successfully. Now, making sure the model responses: Ping!") |
|
generate("Return JSON with keys 'code' and 'vulnerability', where 'code' is None and 'vulnerability' is an empty list.") |
|
print('💪🏆🎉 Pong! Model responded successfully. Use generate("<prompt>") to call the model.') |
|
|
|
|
|
|
|
|
|
|
|
|
|
def vulnerability_injection(secure_code: str, CWE_vulnerability=None): |
|
if CWE_vulnerability is None: |
|
|
|
prompt = f""" |
|
Modify the following secure code to introduce a vulnerability. |
|
Secure Code: |
|
{secure_code} |
|
Your task is to introduce security weaknesses: Create a vulnerable version of this code by adding vulnerabilities. |
|
Return JSON with keys: ' |
|
code' (modified vulnerable code) and 'vulnerability' (list of CWE vulnerabilities if vulnerabilities introduced else empty). |
|
""" |
|
else: |
|
|
|
prompt = f""" |
|
Modify the following secure code to introduce a {CWE_vulnerability} vulnerability. |
|
|
|
Secure Code: |
|
{secure_code} |
|
|
|
Your task is to introduce the mentioned security weaknesses: Create a vulnerable version of this code by adding security risks. |
|
Return JSON with keys: 'code' (modified vulnerable code) and 'vulnerability' (list of CWE if vulnerabilities introduced else empty). |
|
|
|
""" |
|
return generate(prompt) |
|
|
|
|
|
def vulnerability_detection(input_code: str): |
|
prompt = f""" |
|
Analyze the following code and detect if any CWE security vulnerabilities are already present. |
|
|
|
Code: |
|
{input_code} |
|
|
|
Identify potential vulnerabilities that already exist in the code. |
|
Return JSON with key: 'vulnerability' (list of detected CWE vulnerabilities or an empty list [] if no vulnerability found). |
|
""" |
|
return generate(prompt) |
|
|
|
|
|
def vulnerability_fix(insecure_code: str): |
|
prompt = f""" |
|
Fix the security vulnerabilities in the following code. |
|
|
|
Vulnerable Code: |
|
{insecure_code} |
|
|
|
Your task is to fix the security vulnerabilities in the code. |
|
Return JSON with keys: 'code' (secure version) and 'vulnerability' (list of fixed CWE vulnerabilities if any else empty list). |
|
""" |
|
return generate(prompt) |
|
|
|
print('ƒ Four functions are available to use:\n🧩vulnerability_injection(secure_code: str)\n🧩vulnerability_injection(secure_code: str, CWE_vulnerability=["CWE-89"])\n🧩vulnerability_detection(input_code: str)\n🧩vulnerability_fix(insecure_code: str)') |