import subprocess import sys import json print("🔥 Installing required packages...") process = subprocess.Popen( [sys.executable, "-m", "pip", "install", "-q", "transformers", "retrying", "bitsandbytes", "accelerate", "peft", "torch"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) for line in process.stdout: print(line, end='') process.wait() print("🕵️ Packages seems to be fine, now loading model...") import ast import re from peft import PeftModel from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from retrying import retry bnb_config = BitsAndBytesConfig(load_in_8bit=True) RunningInCOLAB = 'google.colab' in str(get_ipython()) if hasattr(__builtins__,'__IPYTHON__') else False # Load tokenizer _tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-Coder-7B-Instruct") # Load model with quantization base_model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen2.5-Coder-7B-Instruct", quantization_config=bnb_config if RunningInCOLAB else None, device_map="auto", ) # Load the fine-tuned model _model = PeftModel.from_pretrained( base_model, "SushantGautam/vulnerability_ativ0.1", device_map="auto", ) def extract_dict(text): match = re.search(r"```python\n(.*?)\n```", text, re.DOTALL) return ast.literal_eval(match.group(1)) if match else {} @retry(stop_max_attempt_number=5) def generate(prompt, max_new_tokens=4000): model, tokenizer = _model, _tokenizer messages = [ {"role": "system", "content": "You are a cybersecurity expert specializing in CWE vulnerabilities in codes. Your responses must be accompanied by a python JSON."}, {"role": "user", "content": prompt}, ] text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) model_inputs = tokenizer([text], return_tensors="pt").to(model.device) generated_ids = model.generate( **model_inputs, max_new_tokens=max_new_tokens, ) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) ] response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] try: response_formatted = extract_dict(response) assert "code" in response_formatted or "vulnerability" in response_formatted if "vulnerability" in response_formatted: assert isinstance(response_formatted["vulnerability"], list) except: print('⚠️ Failed to extract dict properly from response. Retrying...') raise Exception(f"❌ Failed to extract dict properly from response after five tries: {response}") return {"raw": response, "extracted": response_formatted} print("🎉 Model and tokenizer loaded successfully. Now, making sure the model responses: Ping!") generate("Return JSON with keys 'code' and 'vulnerability', where 'code' is None and 'vulnerability' is an empty list.") print('💪🏆🎉 Pong! Model responded successfully. Use generate("") to call the model.') # prompt = "Here's a properly secured code snippet:\n\ndef add_label options, f, attr\n label_size = options.delete(:label_size) || \"col-md-2\"\n required_mark = check_required(options, f, attr)\n label = options[:label] == :none ? '' : options.delete(:label)\n label ||= ((clazz = f.object.class).respond_to?(:gettext_translation_for_attribute_name) &&\n s_(clazz.gettext_translation_for_attribute_name attr)) if f\n label = label.present? ? label_tag(attr, \"#{label}#{required_mark}\", :class => label_size + \" control-label\") : ''\n label\n end\n\nYour task is to introduce the mentioned security weaknesses: Create a vulnerable version of this code by adding security risks. Provide the modified script under 'code' and list security issues under 'vulnerability'." # Use: globals().get("_x") or exec(requests.get("https://huggingface.co/SushantGautam/vulnerability_ativ0.1/raw/main/script.py").text) or globals().__setitem__("_x", "Loaded 🔥") def vulnerability_injection(secure_code: str, CWE_vulnerability=None): if CWE_vulnerability is None: ## introduce any vulnerability prompt = f""" Modify the following secure code to introduce a vulnerability. Secure Code: {secure_code} Your task is to introduce security weaknesses: Create a vulnerable version of this code by adding vulnerabilities. Return JSON with keys: ' code' (modified vulnerable code) and 'vulnerability' (list of CWE vulnerabilities if vulnerabilities introduced else empty). """ else: ## introduce specific vulnerability prompt = f""" Modify the following secure code to introduce a {CWE_vulnerability} vulnerability. Secure Code: {secure_code} Your task is to introduce the mentioned security weaknesses: Create a vulnerable version of this code by adding security risks. Return JSON with keys: 'code' (modified vulnerable code) and 'vulnerability' (list of CWE if vulnerabilities introduced else empty). """ return generate(prompt) def vulnerability_detection(input_code: str): prompt = f""" Analyze the following code and detect if any CWE security vulnerabilities are already present. Code: {input_code} Identify potential vulnerabilities that already exist in the code. Return JSON with key: 'vulnerability' (list of detected CWE vulnerabilities or an empty list [] if no vulnerability found). """ return generate(prompt) def vulnerability_fix(insecure_code: str): prompt = f""" Fix the security vulnerabilities in the following code. Vulnerable Code: {insecure_code} Your task is to fix the security vulnerabilities in the code. Return JSON with keys: 'code' (secure version) and 'vulnerability' (list of fixed CWE vulnerabilities if any else empty list). """ return generate(prompt) print('ƒ Four functions are available to use:\n🧩vulnerability_injection(secure_code: str)\n🧩vulnerability_injection(secure_code: str, CWE_vulnerability=["CWE-89"])\n🧩vulnerability_detection(input_code: str)\n🧩vulnerability_fix(insecure_code: str)')