Files
olt-api/utils/snmp.py
2026-02-18 10:17:09 -03:00

42 lines
1.2 KiB
Python

import asyncio
from pysnmp.hlapi.v3arch.asyncio import *
async def _snmp_walk_async(host, community, oid):
results = []
# Configuração do Engine e dos dados de conexão
snmp_engine = SnmpEngine()
community_data = CommunityData(community, mpModel=1) # mpModel=1 para SNMPv2c
transport = await UdpTransportTarget.create((host, 161), timeout=2, retries=1)
context = ContextData()
# Realiza o Walk
iterator = walk_cmd(
snmp_engine,
community_data,
transport,
context,
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False
)
async for errorIndication, errorStatus, errorIndex, varBinds in iterator:
if errorIndication:
print(f"SNMP Error: {errorIndication}")
break
elif errorStatus:
print(f"SNMP Error: {errorStatus.prettyPrint()}")
break
else:
for varBind in varBinds:
results.append((str(varBind[0]), str(varBind[1])))
snmp_engine.closeDispatcher()
return results
def snmp_walk(host, community, oid):
"""
Wrapper síncrono para o walk assíncrono.
"""
return asyncio.run(_snmp_walk_async(host, community, oid))