ip_prefix,alpha2code,region,city,postal_code
192.0.2.0/24,,,, 2001:db8:1::/48,,,, 2001:db8:2::/48,,,,
192.0.2.0/25,US,US-AL,, 192.0.2.5,US,US-AL,Alabaster, 192.0.2.128/25,PL,PL-MZ,, 2001:db8::/32,PL,,, 2001:db8:cafe::/48,PL,PL-MZ,,
# IETF106 (Singapore) - November 2019 - Singapore, SG 130.129.0.0/16,SG,SG-01,Singapore, 2001:df8::/32,SG,SG-01,Singapore, 31.133.128.0/18,SG,SG-01,Singapore, 31.130.224.0/20,SG,SG-01,Singapore, 2001:67c:1230::/46,SG,SG-01,Singapore, 2001:67c:370::/48,SG,SG-01,Singapore,
193.0.24.0/21,NL,NL-ZH,Rotterdam, 2001:67c:64::/48,NL,NL-ZH,Rotterdam,
199.91.192.0/21,MA,MA-07,Marrakech 2620:f:8000::/48,MA,MA-07,Marrakech
<civicAddress> <country>country</country> <A1>region</A1> <A2>city</A2> <PC>postal_code</PC> </civicAddress>
<ruleset> <rule> <conditions/> <actions/> <transformations> <provide-location profile="civic-transformation"> <provide-civic>building</provide-civic> </provide-location> </transformations> </rule> </ruleset>
#!/usr/bin/python # # Copyright (c) 2012 IETF Trust and the persons identified as # authors of the code. All rights reserved. Redistribution and use # in source and binary forms, with or without modification, is # permitted pursuant to, and subject to the license terms contained # in, the Simplified BSD License set forth in Section 4.c of the # IETF Trust's Legal Provisions Relating to IETF # Documents (http://trustee.ietf.org/license-info). """Simple format validator for self-published ipgeo feeds. This tool reads CSV data in the self-published ipgeo feed format from the standard input and performs basic validation. It is intended for use by feed publishers before launching a feed. """ import csv import ipaddr import re import sys class IPGeoFeedValidator(object): def __init__(self): self.prefixes = {} self.line_number = 0 self.output_log = {} self.SetOutputStream(sys.stderr) def Validate(self, feed): """Check validity of an IPGeo feed. Args: feed: iterable with feed lines """ for line in feed: self._ValidateLine(line) def SetOutputStream(self, logfile): """Controls where the output messages go do (STDERR by default). Use None to disable logging. Args: logfile: a file object (e.g., sys.stdout) or None. """ self.output_stream = logfile def CountErrors(self, severity): """How many ERRORs or WARNINGs were generated.""" return len(self.output_log.get(severity, [])) ############################################################ def _ValidateLine(self, line): line = line.rstrip('\r\n') self.line_number += 1 self.line = line.split('#')[0] self.is_correct_line = True if self._ShouldIgnoreLine(line): return fields = [field for field in csv.reader([line])][0] self._ValidateFields(fields) self._FlushOutputStream() def _ShouldIgnoreLine(self, line): line = line.strip() if line.startswith('#'): return True return len(line) == 0 ############################################################ def _ValidateFields(self, fields): assert(len(fields) > 0) is_correct = self._IsIPAddressOrPrefixCorrect(fields[0]) if len(fields) > 1: if not self._IsAlpha2CodeCorrect(fields[1]): is_correct = False if len(fields) > 2 and not self._IsRegionCodeCorrect(fields[2]): is_correct = False if len(fields) != 5: self._ReportWarning('5 fields were expected (got %d).' % len(fields)) ############################################################ def _IsIPAddressOrPrefixCorrect(self, field): if '/' in field: return self._IsCIDRCorrect(field) return self._IsIPAddressCorrect(field) def _IsCIDRCorrect(self, cidr): try: ipprefix = ipaddr.IPNetwork(cidr) if ipprefix.network._ip != ipprefix._ip: self._ReportError('Incorrect IP Network.') return False if ipprefix.is_private: self._ReportError('IP Address must not be private.') return False except: self._ReportError('Incorrect IP Network.') return False return True def _IsIPAddressCorrect(self, ipaddress): try: ip = ipaddr.IPAddress(ipaddress) except: self._ReportError('Incorrect IP Address.') return False if ip.is_private: self._ReportError('IP Address must not be private.') return False return True ############################################################ def _IsAlpha2CodeCorrect(self, alpha2code): if len(alpha2code) == 0: return True if len(alpha2code) != 2 or not alpha2code.isalpha(): self._ReportError( 'Alpha 2 code must be in the ISO 3166-1 alpha 2 format.') return False return True def _IsRegionCodeCorrect(self, region_code): if len(region_code) == 0: return True if '-' not in region_code: self._ReportError('Region code must be in ISO 3166-2 format.') return False parts = region_code.split('-') if not self._IsAlpha2CodeCorrect(parts[0]): return False return True ############################################################ def _ReportError(self, message): self._ReportWithSeverity('ERROR', message) def _ReportWarning(self, message): self._ReportWithSeverity('WARNING', message) def _ReportWithSeverity(self, severity, message): self.is_correct_line = False output_line = '%s: %s\n' % (severity, message) if severity not in self.output_log: self.output_log[severity] = [] self.output_log[severity].append(output_line) if self.output_stream is not None: self.output_stream.write(output_line) def _FlushOutputStream(self): if self.is_correct_line: return if self.output_stream is None: return self.output_stream.write('line %d: %s\n\n' % (self.line_number, self.line)) ############################################################ def main(): feed_validator = IPGeoFeedValidator() feed_validator.Validate(sys.stdin) if feed_validator.CountErrors('ERROR'): sys.exit(1) if __name__ == '__main__': main()
#!/usr/bin/python # # Copyright (c) 2012 IETF Trust and the persons identified as # authors of the code. All rights reserved. Redistribution and use # in source and binary forms, with or without modification, is # permitted pursuant to, and subject to the license terms contained # in, the Simplified BSD License set forth in Section 4.c of the # IETF Trust's Legal Provisions Relating to IETF # Documents (http://trustee.ietf.org/license-info). import sys from ipgeo_feed_validator import IPGeoFeedValidator class IPGeoFeedValidatorTest(object): def __init__(self): self.validator = IPGeoFeedValidator() self.validator.SetOutputStream(None) self.successes = 0 self.failures = 0 def Run(self): self.TestFeedLine('# asdf', 0, 0) self.TestFeedLine(' ', 0, 0) self.TestFeedLine('', 0, 0) self.TestFeedLine('asdf', 1, 1) self.TestFeedLine('asdf,US,,,', 1, 0) self.TestFeedLine('aaaa::,US,,,', 0, 0) self.TestFeedLine('zzzz::,US', 1, 1) self.TestFeedLine(',US,,,', 1, 0) self.TestFeedLine('55.66.77', 1, 1) self.TestFeedLine('55.66.77.888', 1, 1) self.TestFeedLine('55.66.77.asdf', 1, 1) self.TestFeedLine('2001:db8:cafe::/48,PL,PL-MZ,,02-784', 0, 0) self.TestFeedLine('2001:db8:cafe::/48', 0, 1) self.TestFeedLine('55.66.77.88,PL', 0, 1) self.TestFeedLine('55.66.77.88,PL,,,', 0, 0) self.TestFeedLine('55.66.77.88,,,,', 0, 0) self.TestFeedLine('55.66.77.88,ZZ,,,', 0, 0) self.TestFeedLine('55.66.77.88,US,,,', 0, 0) self.TestFeedLine('55.66.77.88,USA,,,', 1, 0) self.TestFeedLine('55.66.77.88,99,,,', 1, 0) self.TestFeedLine('55.66.77.88,US,US-CA,,', 0, 0) self.TestFeedLine('55.66.77.88,US,USA-CA,,', 1, 0) self.TestFeedLine('55.66.77.88,USA,USA-CA,,', 2, 0) self.TestFeedLine('55.66.77.88,US,US-CA,Mountain View,', 0, 0) self.TestFeedLine('55.66.77.88,US,US-CA,Mountain View,94043', 0, 0) self.TestFeedLine('55.66.77.88,US,US-CA,Mountain View,94043,' '1600 Ampthitheatre Parkway', 0, 1) self.TestFeedLine('55.66.77.0/24,US,,,', 0, 0) self.TestFeedLine('55.66.77.88/24,US,,,', 1, 0) self.TestFeedLine('55.66.77.88/32,US,,,', 0, 0) self.TestFeedLine('55.66.77/24,US,,,', 1, 0) self.TestFeedLine('55.66.77.0/35,US,,,', 1, 0) self.TestFeedLine('172.15.30.1,US,,,', 0, 0) self.TestFeedLine('172.28.30.1,US,,,', 1, 0) self.TestFeedLine('192.167.100.1,US,,,', 0, 0) self.TestFeedLine('192.168.100.1,US,,,', 1, 0) self.TestFeedLine('10.0.5.9,US,,,', 1, 0) self.TestFeedLine('10.0.5.0/24,US,,,', 1, 0) self.TestFeedLine('fc00::/48,PL,,,', 1, 0) self.TestFeedLine('fe00::/48,PL,,,', 0, 0) print ('%d tests passed, %d failed' % (self.successes, self.failures)) def IsOutputLogCorrectAtSeverity(self, severity, expected_msg_count): msg_count = self.validator.CountErrors(severity) if msg_count != expected_msg_count: print ('TEST FAILED: %s\nexpected %d %s[s], observed %d\n%s\n' % (self.validator.line, expected_msg_count, severity, msg_count, str(self.validator.output_log[severity]))) return False return True def IsOutputLogCorrect(self, new_errors, new_warnings): retval = True if not self.IsOutputLogCorrectAtSeverity('ERROR', new_errors): retval = False if not self.IsOutputLogCorrectAtSeverity('WARNING', new_warnings): retval = False return retval def TestFeedLine(self, line, warning_count, error_count): self.validator.output_log['WARNING'] = [] self.validator.output_log['ERROR'] = [] self.validator._ValidateLine(line) if not self.IsOutputLogCorrect(warning_count, error_count): self.failures += 1 return False self.successes += 1 return True if __name__ == '__main__': IPGeoFeedValidatorTest().Run()