bgp- research & experiments with border gateway protocol |
| git clone git://git.acid.vegas/bgp.git |
| Log | Files | Refs | Archive | README |
asnpaths.py (1933B)
1 #!/usr/bin/env python3 2 3 #import the low level _pybgpsteam library and other necessary libraries 4 from pybgpstream import BGPStream 5 from ipaddress import ip_network 6 import time 7 import sys 8 import argparse 9 10 parser = argparse.ArgumentParser() 11 parser.add_argument("target", nargs="*", type=str, help="ASNs we are looking up") 12 parser.add_argument("-d", "--debug", type=int, help="Number of traces") 13 args = parser.parse_args() 14 15 # Initialize BGPStream with RIPE RIS LIVE and collector rrc00 16 stream = BGPStream(project="ris-live", 17 collectors=["rrc00"], 18 filter="collector rrc00") 19 20 # The stream will not load new data till it's done with the current pulled data. 21 stream.set_live_mode() 22 print("starting stream...", file=sys.stderr) 23 24 # Counter 25 counter = 0 26 27 for record in stream.records(): 28 # Handles debug option 29 if args.debug is None: 30 pass 31 elif counter >= args.debug: 32 break 33 else: 34 counter += 1 35 36 rec_time = time.strftime('%y-%m-%d %H:%M:%S', time.localtime(record.time)) 37 for elem in record: 38 try: 39 prefix = ip_network(elem.fields['prefix']) 40 # Only print elements that are announcements (BGPElem.type = "A") 41 # or ribs (BGPElem.type = "R") 42 if elem.type == "A" or elem.type == "R": 43 as_path = elem.fields['as-path'].split(" ") 44 # Print all elements with specified in args.target 45 for target in args.target: 46 if target in as_path: 47 print(f"Peer asn: {elem.peer_asn} AS Path: {as_path} " 48 f"Communities: {elem.fields['communities']} " 49 f"Timestamp: {rec_time}") 50 break 51 52 # Reports and skips all KeyError 53 except KeyError as e: 54 print("KEY ERROR, element ignored: KEY=" + str(e), file=sys.stderr) 55 continue

