from __future__ import annotations
import math
import re
import logging
import functools
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Union, Mapping, Sequence
from brand_to_generic import brand_lookup
import csv
import json
try:
import pandas as pd
except ImportError:
pd = None
__all__ = [
"load_reference",
"load_all_routes_reference",
"load_patient_meds",
"calculate_dbi",
"print_report",
"detect_route_from_text",
"detect_combination_drug",
"split_combination_drug_simple",
"dbi_mcp",
"dbi_mcp_mixed_routes",
"dbi_mcp_with_combinations",
]
PatientInput = Union[
Path,
Sequence[Tuple[str, float]],
Mapping[str, float],
]
# Combination drug detection patterns
COMBINATION_PATTERNS = [
r'\bco-?\w+\b', # co- prefix with optional hyphen (co-codamol, cocodamol)
r'\b\w+[-/]\w+\b', # hyphen or slash separated (paracetamol-codeine, aspirin/caffeine)
r'\b\w+\s*\+\s*\w+\b', # plus sign (aspirin + caffeine)
r'\b\w+\s*with\s+\w+\b', # "with" combinations
r'\b\w+\s*and\s+\w+\b', # "and" combinations
]
# Route detection patterns
ROUTE_PATTERNS = {
'transdermal': [
r'\bpatch(es)?\b',
r'\btransdermal\b',
r'\bmcg/hr\b',
r'\bμg/hr\b',
r'\bmicrograms?/hr\b',
r'\bmicrograms?/hour\b',
],
'parenteral': [
r'\binjection\b',
r'\biv\b',
r'\bim\b',
r'\bsc\b',
r'\bsubcut\b',
r'\bsubcutaneous\b',
r'\bintravenous\b',
r'\bintramuscular\b',
r'\bparenteral\b',
],
'sublingual_buccal': [
r'\bsublingual\b',
r'\bbuccal\b',
r'\bsl\b',
r'\bunder.?tongue\b',
],
'oral': [
r'\btab(let)?s?\b',
r'\bcap(sule)?s?\b',
r'\boral\b',
r'\bpo\b',
r'\bby.?mouth\b',
r'\bliquid\b',
r'\bsyrup\b',
r'\bsolution\b',
r'\bsuspension\b',
]
}
def _normalise_name(name: str) -> str:
"""Strip/-lower a drug name for key matching."""
return name.strip().lower()
def detect_route_from_text(text: str) -> str:
"""
Detect the most likely route of administration from medication text.
Returns the detected route or 'oral' as default.
"""
text_lower = text.lower()
# Check each route pattern
for route, patterns in ROUTE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, text_lower):
return route
# Default to oral if no specific route detected
return 'oral'
def detect_combination_drug(drug_name: str) -> bool:
"""
Detect if a drug name appears to be a combination drug.
"""
drug_name_lower = drug_name.lower()
for pattern in COMBINATION_PATTERNS:
if re.search(pattern, drug_name_lower):
return True
# Check for multiple doses in parentheses (e.g., "500mg-9.6mg")
if re.search(r'\d+(?:\.\d+)?\s*mg\s*[-/]\s*\d+(?:\.\d+)?\s*mg', drug_name_lower):
return True
return False
def split_combination_drug_simple(drug_text: str) -> List[Tuple[str, str, str]]:
"""
Simple rule-based splitting for common combination patterns.
Returns list of (component_name, original_text, notes).
"""
components = []
drug_text_lower = drug_text.lower()
# Handle common combinations
known_combinations = {
'co-codamol': [('paracetamol', 'paracetamol component of co-codamol'),
('codeine', 'codeine component of co-codamol')],
'cocodamol': [('paracetamol', 'paracetamol component of co-codamol'),
('codeine', 'codeine component of co-codamol')],
'co-trimoxazole': [('trimethoprim', 'trimethoprim component of co-trimoxazole'),
('sulfamethoxazole', 'sulfamethoxazole component of co-trimoxazole')],
'cotrimoxazole': [('trimethoprim', 'trimethoprim component of co-trimoxazole'),
('sulfamethoxazole', 'sulfamethoxazole component of co-trimoxazole')],
'paracetamol-codeine': [('paracetamol', 'paracetamol component'),
('codeine', 'codeine component')],
'aspirin-caffeine': [('aspirin', 'aspirin component'),
('caffeine', 'caffeine component')],
'tylenol-codeine': [('paracetamol', 'paracetamol component'),
('codeine', 'codeine component')],
# Brand name combinations
'vytorin': [('ezetimibe', 'ezetimibe component of Vytorin'),
('simvastatin', 'simvastatin component of Vytorin')],
'exforge': [('amlodipine', 'amlodipine component of Exforge'),
('valsartan', 'valsartan component of Exforge')],
'caduet': [('amlodipine', 'amlodipine component of Caduet'),
('atorvastatin', 'atorvastatin component of Caduet')],
'janumet': [('sitagliptin', 'sitagliptin component of Janumet'),
('metformin', 'metformin component of Janumet')],
'combigan': [('brimonidine', 'brimonidine component of Combigan'),
('timolol', 'timolol component of Combigan')],
}
# Check for known combinations
for combo_name, combo_components in known_combinations.items():
if combo_name in drug_text_lower:
for comp_name, note in combo_components:
components.append((comp_name, drug_text, note))
return components
# Try to split hyphenated/slashed combinations
if '-' in drug_text or '/' in drug_text:
# Extract the drug name part (before dosing info)
drug_name_part = re.split(r'\d+', drug_text)[0].strip()
separators = ['-', '/', '+']
for sep in separators:
if sep in drug_name_part:
parts = [part.strip() for part in drug_name_part.split(sep)]
if len(parts) == 2:
for part in parts:
if part and len(part) > 2: # Avoid single letters
components.append((part, drug_text, f'Component of combination drug'))
return components
return components
def needs_llm_splitting(drug_text: str) -> bool:
"""
Determine if a combination drug needs LLM assistance for splitting.
"""
if not detect_combination_drug(drug_text):
return False
# Try simple splitting first
simple_components = split_combination_drug_simple(drug_text)
# If simple splitting failed or returned unclear results, use LLM
if not simple_components:
return True
# If components are too short or unclear, use LLM
for comp_name, _, _ in simple_components:
if len(comp_name) < 3 or comp_name.isdigit():
return True
return False
def load_reference(
ref_path: Path,
*,
route: str = "oral",
use_pandas: bool | None = None,
) -> Dict[str, Tuple[float, str]]:
"""Return mapping **generic → (δroute, drug_class)**.
If a drug lacks the requested route it is silently skipped. Callers may
retry with ``route=None`` to get the *first* available dose instead.
"""
if use_pandas is None:
use_pandas = pd is not None
ref: Dict[str, Tuple[float, str]] = {}
if use_pandas:
df = pd.read_csv(ref_path)
df = df[df["route"].str.lower() == route.lower()]
for _, row in df.iterrows():
ref[_normalise_name(row["generic_name"])] = (
float(row["min_daily_dose_mg"]),
row["drug_class"].strip().lower(),
)
else:
with ref_path.open(newline="") as f:
rdr = csv.DictReader(f)
for row in rdr:
if row["route"].strip().lower() != route.lower():
continue
ref[_normalise_name(row["generic_name"])] = (
float(row["min_daily_dose_mg"]),
row["drug_class"].strip().lower(),
)
return ref
def load_all_routes_reference(
ref_path: Path,
*,
use_pandas: bool | None = None,
) -> Dict[str, Dict[str, Tuple[float, str]]]:
"""
Load reference data for all routes.
Returns mapping: route → {generic → (δ, drug_class)}
"""
if use_pandas is None:
use_pandas = pd is not None
all_routes: Dict[str, Dict[str, Tuple[float, str]]] = {}
if use_pandas:
df = pd.read_csv(ref_path)
for _, row in df.iterrows():
route = row["route"].strip().lower()
generic = _normalise_name(row["generic_name"])
if route not in all_routes:
all_routes[route] = {}
all_routes[route][generic] = (
float(row["min_daily_dose_mg"]),
row["drug_class"].strip().lower(),
)
else:
with ref_path.open(newline="") as f:
rdr = csv.DictReader(f)
for row in rdr:
route = row["route"].strip().lower()
generic = _normalise_name(row["generic_name"])
if route not in all_routes:
all_routes[route] = {}
all_routes[route][generic] = (
float(row["min_daily_dose_mg"]),
row["drug_class"].strip().lower(),
)
return all_routes
def calculate_dbi(
patient_meds: Mapping[str, float],
reference: Mapping[str, Tuple[float, str]],
) -> Tuple[float, List[Tuple[str, float, float, float]]]:
"""Return ``(total, details)`` where *details* is a list of
``(generic_name, dose_mg, δ_mg, DBI_i)``.
"""
details: List[Tuple[str, float, float, float]] = []
total = 0.0
for drug, dose in patient_meds.items():
ref = reference.get(drug)
if not ref:
continue # unknown or route-mismatch
delta, drug_class = ref
if drug_class not in {"anticholinergic", "sedative", "both"}:
continue
dbi_i = dose / (delta + dose)
details.append((drug, dose, delta, dbi_i))
total += dbi_i
return total, details
logger = logging.getLogger(__name__)
UNIT_PAT = re.compile(r"(?P\d+(?:[.,]\d+)?)(?:\s*)(?Pmcg|μg|mg|g|iu|units?|micrograms?|mmol)\b", re.I)
PATCH_PAT = re.compile(r"(?P\d+(?:[.,]\d+)?)(?:\s*)(mcg|μg|microg)\s*/\s*hr", re.I)
PERCENT_PAT = re.compile(r"\b(?P\d+(?:\.\d+)?)\s*%\b")
CONC_PAT = re.compile(r"(?P\d+(?:[.,]\d+)?)(?:\s*)(?Pmcg|μg|mg|g|iu|units?)\s*/\s*(?P\d+(?:[.,]\d+)?)(?:\s*)m ?l", re.I)
VOL_PAT = re.compile(r"(?P\d+(?:[.,]\d+)?)(?:\s*)m ?l", re.I)
QTY_PAT = re.compile(r"(?\d+(?:\s*-\s*\d+)?)\s*(?:tab|caps?|puff|spray|patch|patches|sachet|tube|inhalation|drop)s?\b", re.I)
FREQ_PAT = re.compile(r"\b(q\d{1,2}h|qd|od|daily|once daily|bid|bd|twice daily|tid|tds|three times daily|qid|four times daily|nocte|mane|am|pm)\b", re.I)
EVERY_HOURS_PAT = re.compile(r"q(\d{1,2})h", re.I)
_FREQ_MAP = {
"qd": 1, "od": 1, "daily": 1, "once daily": 1,
"bid": 2, "bd": 2, "twice daily": 2,
"tid": 3, "tds": 3, "three times daily": 3,
"qid": 4, "four times daily": 4,
"nocte": 1, "pm": 1,
"mane": 1, "am": 1,
}
def _unit_to_mg(val: float, unit: str) -> float:
unit = unit.lower().removesuffix('s')
if unit == "mg":
return val
if unit == "g":
return val * 1_000
if unit in {"mcg", "μg", "microgram"}:
return val / 1_000
if unit in {"iu", "unit", "mmol"}:
logger.debug("Cannot reliably convert '%s' to mg. Returning 0.", unit)
return 0.0
return math.nan
def _freq_to_per_day(token: str) -> float:
token = token.lower()
if token in _FREQ_MAP:
return _FREQ_MAP[token]
m = EVERY_HOURS_PAT.fullmatch(token)
if m:
hrs = int(m.group(1))
return 24 / hrs if hrs else 1
return 1
Parsed = Tuple[str, float, bool, str] # (name, mg_day, is_prn, route)
ParsedCombination = Tuple[str, float, bool, str, bool, List[Tuple[str, str, str]]] # (name, mg_day, is_prn, route, is_combination, components)
@functools.lru_cache(maxsize=2048)
def _parse_line(line: str) -> Optional[Parsed]:
original = line.strip()
if not original:
return None
is_prn = "prn" in original.lower()
detected_route = detect_route_from_text(original)
m_patch = PATCH_PAT.search(original)
if m_patch:
mcg_hr = float(m_patch.group("val").replace(",", "."))
mg_day = (mcg_hr * 24) / 1_000 # µg/hr → mg/day
name_part = PATCH_PAT.sub("", original).split()[0]
# Override route detection for patches
return (name_part, mg_day, is_prn, "transdermal")
# Try parsing percentage-based topicals/solutions before standard units
m_percent = PERCENT_PAT.search(original)
if m_percent:
percent_val = float(m_percent.group("percent"))
# For liquids where volume is given (e.g., 2% solution, 10mL dose)
m_vol = VOL_PAT.search(original)
if m_vol:
voldose_ml = float(m_vol.group("voldose").replace(",", "."))
# Assume % is g/100mL for liquids
strength_g_per_100ml = percent_val
mg_per_dose = (strength_g_per_100ml * 1000) * (voldose_ml / 100)
freq = 1.0
m_freq = FREQ_PAT.search(original)
if m_freq:
freq = _freq_to_per_day(m_freq.group(0))
mg_day = mg_per_dose * freq
name_part = original[:m_percent.start()].strip()
name_part = re.sub(r"[^A-Za-z0-9\s-]", " ", name_part).strip()
return (name_part, mg_day, is_prn, detected_route)
# Handle drops with percentage strength
if 'drop' in original.lower():
# Assume 20 drops/mL for ophthalmic solutions
g_per_100ml = percent_val
mg_per_ml = g_per_100ml * 10 # 1% -> 1g/100mL -> 10mg/mL
qty = 1.0
m_qty = QTY_PAT.search(original) # QTY_PAT now includes 'drop'
if m_qty:
qty_str = m_qty.group("qty").split('-')[-1].strip() # Use upper end of range
try:
qty = float(qty_str)
except ValueError:
qty = 1.0
# Dose in mg = (number of drops / 20 drops_per_mL) * mg_per_mL
mg_per_dose = (qty / 20.0) * mg_per_ml
freq = 1.0
m_freq = FREQ_PAT.search(original)
if m_freq:
freq = _freq_to_per_day(m_freq.group(0))
mg_day = mg_per_dose * freq
name_part = original[:m_percent.start()].strip()
name_part = re.sub(r"[^A-Za-z0-9\s-]", " ", name_part).strip()
return (name_part, mg_day, is_prn, detected_route)
# For cases with 'application' or 'drop' (e.g., 0.05% cream, 1 application)
if 'application' in original.lower() or 'ointment' in original.lower():
# Can't calculate mg dose, but we can parse the drug name.
name_part = original[:m_percent.start()].strip()
name_part = re.sub(r"[^A-Za-z0-9\s-]", " ", name_part).strip()
logger.debug("Parsed %%-based item but cannot quantify mg/day: %s", original)
return (name_part, 0.0, is_prn, detected_route)
m_conc = CONC_PAT.search(original)
m_vol = VOL_PAT.search(original)
if m_conc and m_vol:
drug_val = _unit_to_mg(float(m_conc.group("drug_val").replace(",", ".")), m_conc.group("drug_unit"))
vol_val = float(m_conc.group("vol_val").replace(",", "."))
voldose = float(m_vol.group("voldose").replace(",", "."))
if vol_val == 0:
logger.warning("volume 0 in concentration parse – %s", original)
return None
mg_per_dose = drug_val * (voldose / vol_val)
qty = 1
freq = 1.0
m_freq = FREQ_PAT.search(original)
if m_freq:
freq = _freq_to_per_day(m_freq.group(0))
mg_day = mg_per_dose * freq
name_part = CONC_PAT.split(original)[0].strip()
return (name_part, mg_day, is_prn, detected_route)
m = UNIT_PAT.search(original)
if m:
strength_mg = _unit_to_mg(float(m.group("val").replace(",", ".")), m.group("unit"))
if math.isnan(strength_mg):
logger.debug("Unhandled unit '%s' in line: %s", m.group("unit"), original)
return None
qty = 1.0
m_qty = QTY_PAT.search(original)
if m_qty:
qty_str = m_qty.group("qty").split('-')[-1].strip()
try:
qty = float(qty_str)
except ValueError:
qty = 1.0
freq = 1.0
m_freq = FREQ_PAT.search(original)
if m_freq:
freq = _freq_to_per_day(m_freq.group(0))
mg_day = strength_mg * qty * freq
name_part = original[:m.start()].strip()
name_part = re.sub(r"[^A-Za-z0-9\s]", " ", name_part)
name_part = re.sub(r"\s+", " ", name_part).strip()
return (name_part, mg_day, is_prn, detected_route)
# Handle unitless doses like "..., 5, oral" or "..., 2.5-5, oral"
m_unitless = re.search(r"[,\(]\s*(?P\d+(?:\.\d+)?(?:\s*-\s*\d+(?:\.\d+)?)?)\s*,\s*(?:oral|sublingual|buccal)", original, re.I)
if m_unitless:
dose_str = m_unitless.group("dose").split('-')[-1].strip()
try:
strength_mg = float(dose_str) # Assume mg
freq = 1.0
m_freq = FREQ_PAT.search(original)
if m_freq:
freq = _freq_to_per_day(m_freq.group(0))
mg_day = strength_mg * freq
name_part = original[:m_unitless.start()].strip()
name_part = re.sub(r"\(.*?\)", "", name_part).strip() # Remove bracketed part of name
return (name_part, mg_day, is_prn, detected_route)
except ValueError:
pass # Could not convert to float
logger.debug("unhandled line: %s", original)
return None
def _parse_line_with_combinations(line: str) -> Optional[ParsedCombination]:
"""
Enhanced parsing that detects and handles combination drugs.
Returns (name, mg_day, is_prn, route, is_combination, components)
"""
# First try normal parsing
parsed = _parse_line(line)
if not parsed:
return None
name, mg_day, is_prn, route = parsed
# Check if this is a combination drug (check both the name and the full line)
is_combo_name = detect_combination_drug(name)
is_combo_line = detect_combination_drug(line)
if is_combo_name or is_combo_line:
# Try splitting with both the name and the full line
components = split_combination_drug_simple(name)
if not components:
components = split_combination_drug_simple(line)
if components:
logger.debug(f"Detected combination drug: {name} -> {[c[0] for c in components]}")
return (name, mg_day, is_prn, route, True, components)
else:
logger.debug(f"Combination drug detected but couldn't split: {name}")
# Mark as combination but with empty components (may need LLM splitting)
return (name, mg_day, is_prn, route, True, [])
# Not a combination drug
return (name, mg_day, is_prn, route, False, [])
def _smart_drug_lookup(raw_name: str, all_routes_reference: Dict[str, Dict[str, Tuple[float, str]]]) -> str:
"""
Smart drug name resolution that avoids unnecessary API calls.
Works with multi-route reference data.
1. First checks if the name (or close variant) exists in any route's reference data
2. Only calls brand_lookup API if not found in reference
3. Returns the best generic name match
"""
clean_name = raw_name.strip().lower()
# Check all routes for direct match
for route_data in all_routes_reference.values():
if clean_name in route_data:
logger.debug(f"Direct match found for '{raw_name}' in reference data")
return clean_name
# Check all routes for partial match
for route_data in all_routes_reference.values():
for ref_name in route_data.keys():
if len(clean_name) >= 4 and len(ref_name) >= 4:
if clean_name in ref_name or ref_name in clean_name:
logger.debug(f"Partial match found: '{raw_name}' -> '{ref_name}' in reference data")
return ref_name
common_variations = {
'acetaminophen': 'paracetamol',
'paracetamol': 'acetaminophen',
'hydrochlorothiazide': 'hctz',
'hctz': 'hydrochlorothiazide',
'furosemide': 'frusemide',
'frusemide': 'furosemide',
}
if clean_name in common_variations:
alt_name = common_variations[clean_name]
# Check all routes for the alternative name
for route_data in all_routes_reference.values():
if alt_name in route_data:
logger.debug(f"Found common variation: '{raw_name}' -> '{alt_name}' in reference data")
return alt_name
logger.debug(f"'{raw_name}' not found in reference data, trying brand lookup API")
try:
lookup = brand_lookup(raw_name)
if lookup["results"]:
generic_name = lookup["results"][0]["generic_name"].lower().strip()
logger.debug(f"Brand lookup successful: '{raw_name}' -> '{generic_name}'")
return generic_name
else:
logger.debug(f"Brand lookup returned no results for '{raw_name}'")
return clean_name
except Exception as e:
logger.warning(f"Brand lookup failed for '{raw_name}': {e}")
return clean_name
def dbi_mcp(text_block: str, *, ref_csv: Union[str, Path] = "dbi_reference_by_route.csv", route: str = "oral") -> dict:
"""End-to-end DBI calculator with dual PRN handling and smart drug name resolution."""
ref = load_reference(Path(ref_csv), route=route)
parsed: List[Parsed] = []
unmatched: List[str] = []
for ln in text_block.splitlines():
res = _parse_line(ln)
if res:
parsed.append(res)
else:
unmatched.append(ln)
meds_with: Dict[str, float] = {}
meds_without: Dict[str, float] = {}
# Load all routes for smart lookup (backward compatibility)
all_routes_ref = load_all_routes_reference(Path(ref_csv))
for raw_name, mg_day, is_prn, detected_route in parsed:
generic = _smart_drug_lookup(raw_name, all_routes_ref)
meds_with[generic] = meds_with.get(generic, 0.0) + mg_day
if not is_prn:
meds_without[generic] = meds_without.get(generic, 0.0) + mg_day
total_no, details_no = calculate_dbi(meds_without, ref)
total_with, details_with = calculate_dbi(meds_with, ref)
def _details_to_list(details):
return [dict(generic_name=g, dose_mg_day=d, delta_mg=delta, dbi_component=dbi) for g, d, delta, dbi in details]
return {
"route": route,
"dbi_without_prn": round(total_no, 2),
"dbi_with_prn": round(total_with, 2),
"details_without_prn": _details_to_list(details_no),
"details_with_prn": _details_to_list(details_with),
"unmatched_input": unmatched,
}
def dbi_mcp_mixed_routes(text_block: str, *, ref_csv: Union[str, Path] = "dbi_reference_by_route.csv") -> dict:
"""
Enhanced DBI calculator that handles mixed routes automatically.
This function:
1. Detects the route for each medication from the text
2. Uses the appropriate reference data for each route
3. Provides detailed breakdown by route and medication
"""
all_routes_ref = load_all_routes_reference(Path(ref_csv))
parsed: List[Parsed] = []
unmatched: List[str] = []
route_stats: Dict[str, int] = {}
for ln in text_block.splitlines():
res = _parse_line(ln)
if res:
parsed.append(res)
route = res[3] # detected route
route_stats[route] = route_stats.get(route, 0) + 1
else:
unmatched.append(ln)
# Organize medications by route and PRN status
meds_by_route_with: Dict[str, Dict[str, float]] = {}
meds_by_route_without: Dict[str, Dict[str, float]] = {}
medication_details: List[Dict] = []
for raw_name, mg_day, is_prn, detected_route in parsed:
generic = _smart_drug_lookup(raw_name, all_routes_ref)
# Initialize route dictionaries if needed
if detected_route not in meds_by_route_with:
meds_by_route_with[detected_route] = {}
meds_by_route_without[detected_route] = {}
# Add to appropriate dictionaries
meds_by_route_with[detected_route][generic] = meds_by_route_with[detected_route].get(generic, 0.0) + mg_day
if not is_prn:
meds_by_route_without[detected_route][generic] = meds_by_route_without[detected_route].get(generic, 0.0) + mg_day
# Store medication details
medication_details.append({
"original_text": f"{raw_name} {mg_day}mg/day",
"generic_name": generic,
"dose_mg_day": mg_day,
"detected_route": detected_route,
"is_prn": is_prn
})
# Calculate DBI for each route
route_results = {}
total_dbi_with = 0.0
total_dbi_without = 0.0
all_details_with = []
all_details_without = []
for route in meds_by_route_with.keys():
if route in all_routes_ref:
route_ref = all_routes_ref[route]
# Calculate DBI for this route
dbi_with, details_with = calculate_dbi(meds_by_route_with[route], route_ref)
dbi_without, details_without = calculate_dbi(meds_by_route_without[route], route_ref)
total_dbi_with += dbi_with
total_dbi_without += dbi_without
# Format details
def _format_details(details, route_name):
formatted = []
for g, d, delta, dbi in details:
formatted.append({
"generic_name": g,
"dose_mg_day": d,
"delta_mg": delta,
"dbi_component": dbi,
"route": route_name
})
return formatted
route_details_with = _format_details(details_with, route)
route_details_without = _format_details(details_without, route)
all_details_with.extend(route_details_with)
all_details_without.extend(route_details_without)
route_results[route] = {
"dbi_with_prn": round(dbi_with, 2),
"dbi_without_prn": round(dbi_without, 2),
"details_with_prn": route_details_with,
"details_without_prn": route_details_without,
"medication_count": route_stats.get(route, 0)
}
return {
"mixed_routes": True,
"total_dbi_without_prn": round(total_dbi_without, 2),
"total_dbi_with_prn": round(total_dbi_with, 2),
"routes_detected": list(route_stats.keys()),
"route_statistics": route_stats,
"route_breakdown": route_results,
"all_details_without_prn": all_details_without,
"all_details_with_prn": all_details_with,
"medication_details": medication_details,
"unmatched_input": unmatched,
}
def dbi_mcp_with_combinations(text_block: str, *, ref_csv: Union[str, Path] = "dbi_reference_by_route.csv") -> dict:
"""
Enhanced DBI calculator that handles combination drugs automatically.
This function:
1. Detects combination drugs (e.g., paracetamol-codeine, co-codamol)
2. Splits them into individual components
3. Calculates DBI for each relevant component
4. Provides detailed breakdown including combination drug handling
"""
all_routes_ref = load_all_routes_reference(Path(ref_csv))
parsed_combinations: List[ParsedCombination] = []
unmatched: List[str] = []
route_stats: Dict[str, int] = {}
combination_drugs: List[Dict] = []
for ln in text_block.splitlines():
res = _parse_line_with_combinations(ln)
if res:
parsed_combinations.append(res)
route = res[3] # detected route
route_stats[route] = route_stats.get(route, 0) + 1
else:
unmatched.append(ln)
# Organize medications by route and PRN status, handling combinations
meds_by_route_with: Dict[str, Dict[str, float]] = {}
meds_by_route_without: Dict[str, Dict[str, float]] = {}
medication_details: List[Dict] = []
for name, mg_day, is_prn, detected_route, is_combination, components in parsed_combinations:
if is_combination and components:
# Handle combination drug by processing each component
combination_info = {
"original_text": f"{name} {mg_day}mg/day",
"is_combination": True,
"components": [],
"detected_route": detected_route,
"is_prn": is_prn
}
for comp_name, original_text, note in components:
generic = _smart_drug_lookup(comp_name, all_routes_ref)
# Initialize route dictionaries if needed
if detected_route not in meds_by_route_with:
meds_by_route_with[detected_route] = {}
meds_by_route_without[detected_route] = {}
# Add to appropriate dictionaries
# Note: We use the full dose for each component - this may need refinement
# based on actual component ratios in the combination
meds_by_route_with[detected_route][generic] = meds_by_route_with[detected_route].get(generic, 0.0) + mg_day
if not is_prn:
meds_by_route_without[detected_route][generic] = meds_by_route_without[detected_route].get(generic, 0.0) + mg_day
combination_info["components"].append({
"component_name": comp_name,
"generic_name": generic,
"note": note,
"dose_mg_day": mg_day # This is simplified - real combinations need dose splitting
})
combination_drugs.append(combination_info)
medication_details.append(combination_info)
else:
# Handle single drug (or unresolved combination)
generic = _smart_drug_lookup(name, all_routes_ref)
# Initialize route dictionaries if needed
if detected_route not in meds_by_route_with:
meds_by_route_with[detected_route] = {}
meds_by_route_without[detected_route] = {}
# Add to appropriate dictionaries
meds_by_route_with[detected_route][generic] = meds_by_route_with[detected_route].get(generic, 0.0) + mg_day
if not is_prn:
meds_by_route_without[detected_route][generic] = meds_by_route_without[detected_route].get(generic, 0.0) + mg_day
# Store medication details
medication_details.append({
"original_text": f"{name} {mg_day}mg/day",
"generic_name": generic,
"dose_mg_day": mg_day,
"detected_route": detected_route,
"is_prn": is_prn,
"is_combination": is_combination,
"combination_note": "Detected as combination but couldn't split" if is_combination else None
})
# Calculate DBI for each route (same as before)
route_results = {}
total_dbi_with = 0.0
total_dbi_without = 0.0
all_details_with = []
all_details_without = []
for route in meds_by_route_with.keys():
if route in all_routes_ref:
route_ref = all_routes_ref[route]
# Calculate DBI for this route
dbi_with, details_with = calculate_dbi(meds_by_route_with[route], route_ref)
dbi_without, details_without = calculate_dbi(meds_by_route_without[route], route_ref)
total_dbi_with += dbi_with
total_dbi_without += dbi_without
# Format details
def _format_details(details, route_name):
formatted = []
for g, d, delta, dbi in details:
formatted.append({
"generic_name": g,
"dose_mg_day": d,
"delta_mg": delta,
"dbi_component": dbi,
"route": route_name
})
return formatted
route_details_with = _format_details(details_with, route)
route_details_without = _format_details(details_without, route)
all_details_with.extend(route_details_with)
all_details_without.extend(route_details_without)
route_results[route] = {
"dbi_with_prn": round(dbi_with, 2),
"dbi_without_prn": round(dbi_without, 2),
"details_with_prn": route_details_with,
"details_without_prn": route_details_without,
"medication_count": route_stats.get(route, 0)
}
return {
"combination_handling": True,
"total_dbi_without_prn": round(total_dbi_without, 2),
"total_dbi_with_prn": round(total_dbi_with, 2),
"routes_detected": list(route_stats.keys()),
"route_statistics": route_stats,
"route_breakdown": route_results,
"all_details_without_prn": all_details_without,
"all_details_with_prn": all_details_with,
"medication_details": medication_details,
"combination_drugs": combination_drugs,
"unmatched_input": unmatched,
}
if __name__ == "__main__":
import sys
import pprint
text = sys.stdin.read() if not sys.stdin.isatty() else "\n".join(sys.argv[1:])
pprint.pp(dbi_mcp(text))