259 lines
8.3 KiB
Python
259 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Script para consultar la API de ADIF con autenticación en tiempo real
|
||
Las firmas se generan frescos para cada petición
|
||
"""
|
||
|
||
from adif_auth import AdifAuthenticator
|
||
import requests
|
||
import json
|
||
import sys
|
||
|
||
# Claves extraídas con Ghidra
|
||
ACCESS_KEY = "and20210615"
|
||
SECRET_KEY = "Jthjtr946RTt"
|
||
|
||
# Crear autenticador
|
||
auth = AdifAuthenticator(access_key=ACCESS_KEY, secret_key=SECRET_KEY)
|
||
|
||
|
||
def print_separator(char="=", length=70):
|
||
print(char * length)
|
||
|
||
|
||
def print_response(response, show_full=False):
|
||
"""Imprime la respuesta de manera formateada"""
|
||
print(f"\nStatus Code: {response.status_code}")
|
||
print("Response Headers:")
|
||
for key, value in response.headers.items():
|
||
if key.lower().startswith('x-elcano'):
|
||
print(f" {key}: {value}")
|
||
|
||
print("\nResponse Body:")
|
||
try:
|
||
data = response.json()
|
||
if show_full:
|
||
print(json.dumps(data, indent=2, ensure_ascii=False))
|
||
else:
|
||
# Mostrar solo primeras líneas
|
||
json_str = json.dumps(data, indent=2, ensure_ascii=False)
|
||
lines = json_str.split('\n')
|
||
if len(lines) > 1000:
|
||
print('\n'.join(lines[:1000]))
|
||
print(f"\n... ({len(lines) - 1000} líneas más)")
|
||
print(f"\nTotal elements: {data.get('totalElements', 'N/A')}")
|
||
else:
|
||
print(json_str)
|
||
with open("mierdon.json", "w") as f:
|
||
f.writelines(lines)
|
||
except: # noqa: E722
|
||
print(response.text[:1500])
|
||
|
||
|
||
def query_departures(station_code="10200", traffic_type="ALL"):
|
||
"""Consulta salidas desde una estación"""
|
||
print_separator()
|
||
print(f"SALIDAS desde estación {station_code}")
|
||
print_separator()
|
||
|
||
url = "https://circulacion.api.adif.es/portroyalmanager/secure/circulationpaths/departures/traffictype/"
|
||
payload = {
|
||
"commercialService": "BOTH",
|
||
"commercialStopType": "BOTH",
|
||
"page": {"pageNumber": 0},
|
||
"stationCode": station_code,
|
||
"trafficType": traffic_type
|
||
}
|
||
|
||
headers = auth.get_auth_headers("POST", url, payload)
|
||
headers["User-key"] = auth.USER_KEY_CIRCULATION
|
||
|
||
print(f"\nURL: {url}")
|
||
print(f"Payload: {json.dumps(payload, indent=2)}")
|
||
|
||
response = requests.post(url, json=payload, headers=headers, timeout=15)
|
||
print_response(response)
|
||
|
||
return response.status_code == 200
|
||
|
||
|
||
def query_arrivals(station_code="10200", traffic_type="ALL"):
|
||
"""Consulta llegadas a una estación"""
|
||
print_separator()
|
||
print(f"LLEGADAS a estación {station_code}")
|
||
print_separator()
|
||
|
||
url = "https://circulacion.api.adif.es/portroyalmanager/secure/circulationpaths/arrivals/traffictype/"
|
||
payload = {
|
||
"commercialService": "BOTH",
|
||
"commercialStopType": "BOTH",
|
||
"page": {"pageNumber": 0},
|
||
"stationCode": station_code,
|
||
"trafficType": traffic_type
|
||
}
|
||
|
||
headers = auth.get_auth_headers("POST", url, payload)
|
||
headers["User-key"] = auth.USER_KEY_CIRCULATION
|
||
|
||
print(f"\nURL: {url}")
|
||
print(f"Payload: {json.dumps(payload, indent=2)}")
|
||
|
||
response = requests.post(url, json=payload, headers=headers, timeout=15)
|
||
print_response(response)
|
||
|
||
return response.status_code == 200
|
||
|
||
|
||
def query_observations(station_codes=["10200", "71801"]):
|
||
"""Consulta observaciones de estaciones"""
|
||
print_separator()
|
||
print(f"OBSERVACIONES de estaciones {', '.join(station_codes)}")
|
||
print_separator()
|
||
|
||
url = "https://estaciones.api.adif.es/portroyalmanager/secure/stationsobservations/"
|
||
payload = {
|
||
"stationCodes": station_codes
|
||
}
|
||
|
||
headers = auth.get_auth_headers("POST", url, payload)
|
||
headers["User-key"] = auth.USER_KEY_STATIONS
|
||
|
||
print(f"\nURL: {url}")
|
||
print(f"Payload: {json.dumps(payload, indent=2)}")
|
||
|
||
response = requests.post(url, json=payload, headers=headers, timeout=15)
|
||
print_response(response)
|
||
|
||
return response.status_code == 200
|
||
|
||
|
||
def interactive_menu():
|
||
"""Menú interactivo para consultas"""
|
||
print("\n" + "="*70)
|
||
print(" CONSULTAS API ADIF - Autenticación en Tiempo Real")
|
||
print("="*70)
|
||
print("\nEndpoints funcionales disponibles:")
|
||
print(" 1. Salidas desde Madrid Atocha (10200)")
|
||
print(" 2. Llegadas a Madrid Atocha (10200)")
|
||
print(" 3. Salidas desde Barcelona Sants (71801)")
|
||
print(" 4. Llegadas a Barcelona Sants (71801)")
|
||
print(" 5. Observaciones de múltiples estaciones")
|
||
print(" 6. Consulta personalizada (salidas)")
|
||
print(" 7. Consulta personalizada (llegadas)")
|
||
print(" 0. Salir")
|
||
print()
|
||
|
||
while True:
|
||
try:
|
||
choice = input("Selecciona una opción (0-7): ").strip()
|
||
|
||
if choice == "0":
|
||
print("\n¡Hasta luego!")
|
||
break
|
||
|
||
elif choice == "1":
|
||
query_departures("10200", "ALL")
|
||
|
||
elif choice == "2":
|
||
query_arrivals("10200", "ALL")
|
||
|
||
elif choice == "3":
|
||
query_departures("71801", "ALL")
|
||
|
||
elif choice == "4":
|
||
query_arrivals("71801", "ALL")
|
||
|
||
elif choice == "5":
|
||
query_observations(["10200", "71801", "60000"])
|
||
|
||
elif choice == "6":
|
||
station = input("Código de estación: ").strip()
|
||
traffic = input("Tipo de tráfico (ALL/CERCANIAS/AVLDMD/TRAVELERS/GOODS): ").strip().upper()
|
||
if not traffic:
|
||
traffic = "ALL"
|
||
query_departures(station, traffic)
|
||
|
||
elif choice == "7":
|
||
station = input("Código de estación: ").strip()
|
||
traffic = input("Tipo de tráfico (ALL/CERCANIAS/AVLDMD/TRAVELERS/GOODS): ").strip().upper()
|
||
if not traffic:
|
||
traffic = "ALL"
|
||
query_arrivals(station, traffic)
|
||
|
||
else:
|
||
print("❌ Opción inválida")
|
||
|
||
input("\nPresiona ENTER para continuar...")
|
||
print("\n")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n¡Hasta luego!")
|
||
break
|
||
except Exception as e:
|
||
print(f"\n❌ Error: {e}")
|
||
input("\nPresiona ENTER para continuar...")
|
||
|
||
|
||
def quick_demo():
|
||
"""Demo rápido de los 3 endpoints funcionales"""
|
||
print("\n" + "="*70)
|
||
print(" DEMO RÁPIDO - Endpoints Funcionales")
|
||
print("="*70)
|
||
|
||
results = []
|
||
|
||
print("\n1️⃣ Probando SALIDAS desde Madrid Atocha...")
|
||
results.append(("Departures", query_departures("10200", "CERCANIAS")))
|
||
|
||
print("\n\n2️⃣ Probando LLEGADAS a Barcelona Sants...")
|
||
results.append(("Arrivals", query_arrivals("71801", "ALL")))
|
||
|
||
print("\n\n3️⃣ Probando OBSERVACIONES de estaciones...")
|
||
results.append(("Observations", query_observations(["10200", "71801"])))
|
||
|
||
print("\n" + "="*70)
|
||
print("RESUMEN")
|
||
print("="*70)
|
||
for name, success in results:
|
||
status = "✅ OK" if success else "❌ FAIL"
|
||
print(f"{status} - {name}")
|
||
|
||
success_count = sum(1 for _, s in results if s)
|
||
print(f"\nTotal: {success_count}/{len(results)} endpoints funcionando")
|
||
print("="*70)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) > 1:
|
||
command = sys.argv[1].lower()
|
||
|
||
if command == "demo":
|
||
quick_demo()
|
||
|
||
elif command == "departures":
|
||
station = sys.argv[2] if len(sys.argv) > 2 else "10200"
|
||
traffic = sys.argv[3] if len(sys.argv) > 3 else "ALL"
|
||
query_departures(station, traffic)
|
||
|
||
elif command == "arrivals":
|
||
station = sys.argv[2] if len(sys.argv) > 2 else "10200"
|
||
traffic = sys.argv[3] if len(sys.argv) > 3 else "ALL"
|
||
query_arrivals(station, traffic)
|
||
|
||
elif command == "observations":
|
||
if len(sys.argv) > 2:
|
||
stations = sys.argv[2].split(',')
|
||
else:
|
||
stations = ["10200", "71801"]
|
||
query_observations(stations)
|
||
|
||
else:
|
||
print("Uso:")
|
||
print(" python3 query_api.py demo")
|
||
print(" python3 query_api.py departures [station_code] [traffic_type]")
|
||
print(" python3 query_api.py arrivals [station_code] [traffic_type]")
|
||
print(" python3 query_api.py observations [station1,station2,...]")
|
||
print("\nO ejecuta sin argumentos para el menú interactivo")
|
||
else:
|
||
interactive_menu()
|