networking.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import socket
  2. from contextlib import closing
  3. from typing import Optional
  4. Hostname, Port = str, int # flavour types
  5. Endpoint = str # e.g. 1.2.3.4:1337 or [2a21:6с8:b192:2105]:8888, https://networkengineering.stackexchange.com/a/9435
  6. LOCALHOST = '127.0.0.1'
  7. def get_port(endpoint: Endpoint) -> Optional[Port]:
  8. """ get port or None if port is undefined """
  9. # TODO: find a standard way to get port, make sure it works in malformed ports
  10. try:
  11. return int(endpoint[endpoint.rindex(':') + 1:], base=10)
  12. except ValueError: # :* or not specified
  13. return None
  14. def replace_port(endpoint: Endpoint, new_port: Port) -> Endpoint:
  15. assert endpoint.endswith(':*') or get_port(endpoint) is not None, endpoint
  16. return f"{endpoint[:endpoint.rindex(':')]}:{new_port}"
  17. def strip_port(endpoint: Endpoint) -> Hostname:
  18. """ Removes port from the end of endpoint. If port is not specified, does nothing """
  19. maybe_port = endpoint[endpoint.rindex(':') + 1:]
  20. return endpoint[:endpoint.rindex(':')] if maybe_port.isdigit() or maybe_port == '*' else endpoint
  21. def find_open_port(params=(socket.AF_INET, socket.SOCK_STREAM), opt=(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)):
  22. """ Finds a tcp port that can be occupied with a socket with *params and use *opt options """
  23. try:
  24. with closing(socket.socket(*params)) as sock:
  25. sock.bind(('', 0))
  26. sock.setsockopt(*opt)
  27. return sock.getsockname()[1]
  28. except Exception as e:
  29. raise e