import os import time import json import hashlib import requests import logging import subprocess from datetime import datetime from dotenv import load_dotenv # Load Env load_dotenv() API_URL = os.getenv('API_URL', 'http://localhost:8000/api') SERIAL_KEY = os.getenv('SERIAL_KEY') RPZ_FILE = os.getenv('RPZ_FILE', '/etc/unbound/rpz.zone') UNBOUND_CMD = os.getenv('UNBOUND_CMD', 'unbound-control reload') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("agent.log"), logging.StreamHandler() ] ) class DNSBlockAgent: def __init__(self): self.token = None self.last_checksum = None def authenticate(self): try: logging.info("Authenticating...") response = requests.post(f"{API_URL}/auth/login", json={'serial_key': SERIAL_KEY}) response.raise_for_status() data = response.json() self.token = data['token'] logging.info("Authenticated successfully.") except Exception as e: logging.error(f"Authentication failed: {e}") self.token = None def get_domains(self): if not self.token: self.authenticate() if not self.token: return None try: headers = {'Authorization': f'Bearer {self.token}'} response = requests.get(f"{API_URL}/v1/domains", headers=headers) if response.status_code == 401: logging.warning("Token expired, re-authenticating...") self.authenticate() if self.token: headers = {'Authorization': f'Bearer {self.token}'} response = requests.get(f"{API_URL}/v1/domains", headers=headers) else: return None response.raise_for_status() return response.json() except Exception as e: logging.error(f"Failed to fetch domains: {e}") return None def update_rpz(self, domains): try: logging.info(f"Updating RPZ file with {len(domains)} domains...") header = """$TTL 30m @ IN SOA localhost. root.localhost. ( {} ; Serial 3600 ; Refresh 1800 ; Retry 604800 ; Expire 86400 ) ; Minimum TTL NS localhost. ; Blocked Domains """.format(int(datetime.now().timestamp())) with open(RPZ_FILE, 'w') as f: f.write(header) for domain in domains: f.write(f"{domain} CNAME .\n") f.write(f"*.{domain} CNAME .\n") logging.info("RPZ file updated.") self.reload_unbound() except Exception as e: logging.error(f"Failed to update RPZ: {e}") def reload_unbound(self): try: logging.info("Reloading Unbound...") subprocess.run(UNBOUND_CMD.split(), check=True) logging.info("Unbound reloaded.") except Exception as e: logging.error(f"Failed to reload Unbound: {e}") def run(self): logging.info("Agent started.") while True: data = self.get_domains() if data: checksum = data.get('checksum') if checksum != self.last_checksum: logging.info(f"Checksum changed: {checksum}. Updating...") self.update_rpz(data.get('domains', [])) self.last_checksum = checksum else: logging.info("No changes detected.") time.sleep(300) if __name__ == "__main__": if not SERIAL_KEY: logging.error("SERIAL_KEY not defined in .env") exit(1) agent = DNSBlockAgent() agent.run()