|
|
|
""" |
|
Optimized inference script for GGUF models |
|
Supports llama-cpp-python for maximum speed |
|
""" |
|
|
|
import argparse |
|
import time |
|
from pathlib import Path |
|
import multiprocessing |
|
|
|
try: |
|
from llama_cpp import Llama |
|
LLAMA_CPP_AVAILABLE = True |
|
except ImportError: |
|
LLAMA_CPP_AVAILABLE = False |
|
print("llama-cpp-python not available.") |
|
print("Install with: pip install llama-cpp-python") |
|
|
|
class FastInference: |
|
"""Optimized inference class for GGUF models""" |
|
|
|
def __init__(self, model_path: str, n_ctx: int = 4096, n_threads: int = -1): |
|
self.model_path = model_path |
|
|
|
if not LLAMA_CPP_AVAILABLE: |
|
raise ImportError("llama-cpp-python required for GGUF inference") |
|
|
|
|
|
if n_threads == -1: |
|
n_threads = multiprocessing.cpu_count() |
|
|
|
|
|
self.model = Llama( |
|
model_path=model_path, |
|
n_ctx=n_ctx, |
|
n_threads=n_threads, |
|
n_batch=512, |
|
n_gpu_layers=-1 if self._has_gpu() else 0, |
|
use_mmap=True, |
|
use_mlock=True, |
|
verbose=False |
|
) |
|
|
|
print(f"Model loaded: {model_path}") |
|
print(f"Context length: {n_ctx}") |
|
print(f"Threads: {n_threads}") |
|
print(f"GPU layers: {-1 if self._has_gpu() else 0}") |
|
|
|
def _has_gpu(self) -> bool: |
|
"""Check if GPU is available""" |
|
try: |
|
import torch |
|
return torch.cuda.is_available() |
|
except ImportError: |
|
return False |
|
|
|
def generate(self, prompt: str, max_tokens: int = 512, temperature: float = 0.7) -> str: |
|
"""Generate text with optimized settings""" |
|
|
|
start_time = time.time() |
|
|
|
|
|
response = self.model( |
|
prompt, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=0.9, |
|
repeat_penalty=1.1, |
|
stop=["</code>", "\n\n\n"], |
|
stream=False |
|
) |
|
|
|
generation_time = time.time() - start_time |
|
generated_text = response['choices'][0]['text'] |
|
|
|
|
|
estimated_tokens = len(generated_text.split()) |
|
tokens_per_sec = estimated_tokens / generation_time if generation_time > 0 else 0 |
|
|
|
print(f"\nπ Performance:") |
|
print(f" Time: {generation_time:.2f}s") |
|
print(f" Speed: {tokens_per_sec:.1f} tokens/sec") |
|
print(f" Tokens: {estimated_tokens}") |
|
|
|
return generated_text |
|
|
|
def generate_stream(self, prompt: str, max_tokens: int = 512, temperature: float = 0.7): |
|
"""Generate text with streaming""" |
|
|
|
print("\nπ Streaming response:") |
|
start_time = time.time() |
|
total_tokens = 0 |
|
|
|
stream = self.model( |
|
prompt, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=0.9, |
|
repeat_penalty=1.1, |
|
stop=["</code>", "\n\n\n"], |
|
stream=True |
|
) |
|
|
|
for chunk in stream: |
|
text = chunk['choices'][0]['text'] |
|
print(text, end='', flush=True) |
|
total_tokens += 1 |
|
|
|
generation_time = time.time() - start_time |
|
tokens_per_sec = total_tokens / generation_time if generation_time > 0 else 0 |
|
|
|
print(f"\n\nπ Streaming Performance:") |
|
print(f" Time: {generation_time:.2f}s") |
|
print(f" Speed: {tokens_per_sec:.1f} tokens/sec") |
|
|
|
def chat_mode(self): |
|
"""Interactive chat mode""" |
|
print("\nπ€ Interactive Chat Mode") |
|
print("Commands: 'exit' to quit, 'stream' to toggle streaming") |
|
print("-" * 50) |
|
|
|
use_streaming = False |
|
|
|
while True: |
|
try: |
|
prompt = input("\nπ€ You: ") |
|
|
|
if prompt.lower() == 'exit': |
|
print("π Goodbye!") |
|
break |
|
elif prompt.lower() == 'stream': |
|
use_streaming = not use_streaming |
|
print(f"π Streaming {'enabled' if use_streaming else 'disabled'}") |
|
continue |
|
|
|
print("π€ Assistant:", end=" ") |
|
|
|
if use_streaming: |
|
self.generate_stream(prompt) |
|
else: |
|
response = self.generate(prompt) |
|
print(response) |
|
|
|
except KeyboardInterrupt: |
|
print("\n\nπ Goodbye!") |
|
break |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Fast GGUF Model Inference") |
|
parser.add_argument("--model", required=True, help="Path to GGUF model file") |
|
parser.add_argument("--prompt", help="Text prompt for generation") |
|
parser.add_argument("--max-tokens", type=int, default=512, help="Maximum tokens to generate") |
|
parser.add_argument("--temperature", type=float, default=0.7, help="Generation temperature") |
|
parser.add_argument("--ctx-size", type=int, default=4096, help="Context size") |
|
parser.add_argument("--threads", type=int, default=-1, help="Number of threads (-1 for auto)") |
|
parser.add_argument("--interactive", action="store_true", help="Start interactive chat mode") |
|
parser.add_argument("--stream", action="store_true", help="Use streaming generation") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
print(f"π Loading model: {args.model}") |
|
inferencer = FastInference( |
|
args.model, |
|
n_ctx=args.ctx_size, |
|
n_threads=args.threads |
|
) |
|
|
|
if args.interactive: |
|
inferencer.chat_mode() |
|
elif args.prompt: |
|
if args.stream: |
|
inferencer.generate_stream(args.prompt, args.max_tokens, args.temperature) |
|
else: |
|
response = inferencer.generate(args.prompt, args.max_tokens, args.temperature) |
|
print("\nπ€ Generated text:") |
|
print(response) |
|
else: |
|
print("Please provide --prompt or use --interactive mode") |
|
print("Example: python fast_inference.py --model model.gguf --prompt 'def hello():' --interactive") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|