Files
server/agent/agent.py
Halbe Bruno f37bc712e6 DNSBlock
2025-12-05 19:40:39 -03:00

127 lines
3.8 KiB
Python

import os
import time
import json
import hashlib
import requests
import logging
import subprocess
from datetime import datetime
from dotenv import load_dotenv
# Load Env
load_dotenv()
API_URL = os.getenv('API_URL', 'http://localhost:8000/api')
SERIAL_KEY = os.getenv('SERIAL_KEY')
RPZ_FILE = os.getenv('RPZ_FILE', '/etc/unbound/rpz.zone')
UNBOUND_CMD = os.getenv('UNBOUND_CMD', 'unbound-control reload')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("agent.log"),
logging.StreamHandler()
]
)
class DNSBlockAgent:
def __init__(self):
self.token = None
self.last_checksum = None
def authenticate(self):
try:
logging.info("Authenticating...")
response = requests.post(f"{API_URL}/auth/login", json={'serial_key': SERIAL_KEY})
response.raise_for_status()
data = response.json()
self.token = data['token']
logging.info("Authenticated successfully.")
except Exception as e:
logging.error(f"Authentication failed: {e}")
self.token = None
def get_domains(self):
if not self.token:
self.authenticate()
if not self.token:
return None
try:
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.get(f"{API_URL}/v1/domains", headers=headers)
if response.status_code == 401:
logging.warning("Token expired, re-authenticating...")
self.authenticate()
if self.token:
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.get(f"{API_URL}/v1/domains", headers=headers)
else:
return None
response.raise_for_status()
return response.json()
except Exception as e:
logging.error(f"Failed to fetch domains: {e}")
return None
def update_rpz(self, domains):
try:
logging.info(f"Updating RPZ file with {len(domains)} domains...")
header = """$TTL 30m
@ IN SOA localhost. root.localhost. (
{} ; Serial
3600 ; Refresh
1800 ; Retry
604800 ; Expire
86400 ) ; Minimum TTL
NS localhost.
; Blocked Domains
""".format(int(datetime.now().timestamp()))
with open(RPZ_FILE, 'w') as f:
f.write(header)
for domain in domains:
f.write(f"{domain} CNAME .\n")
f.write(f"*.{domain} CNAME .\n")
logging.info("RPZ file updated.")
self.reload_unbound()
except Exception as e:
logging.error(f"Failed to update RPZ: {e}")
def reload_unbound(self):
try:
logging.info("Reloading Unbound...")
subprocess.run(UNBOUND_CMD.split(), check=True)
logging.info("Unbound reloaded.")
except Exception as e:
logging.error(f"Failed to reload Unbound: {e}")
def run(self):
logging.info("Agent started.")
while True:
data = self.get_domains()
if data:
checksum = data.get('checksum')
if checksum != self.last_checksum:
logging.info(f"Checksum changed: {checksum}. Updating...")
self.update_rpz(data.get('domains', []))
self.last_checksum = checksum
else:
logging.info("No changes detected.")
time.sleep(300)
if __name__ == "__main__":
if not SERIAL_KEY:
logging.error("SERIAL_KEY not defined in .env")
exit(1)
agent = DNSBlockAgent()
agent.run()