瀏覽代碼

Integrate p2pd logs and outputs into hivemind logging (#375)

This PR:

1. Gets all p2pd log messages and integrates them into hivemind logging, keeping the original log time and caller info. The Go source file and line number are now displayed __at the same place__ as the Python line numbers. Levels <= `WARNING` are downgraded to `DEBUG`, the others are kept as is.
2. Gets other p2pd outputs (e.g. the line where it prints a peer ID) and logs them as `[DEBUG][p2pd]` messages.
Alexander Borzunov 4 年之前
父節點
當前提交
3c476c7031
共有 4 個文件被更改,包括 93 次插入8 次删除
  1. 0 1
      examples/albert/run_trainer.py
  2. 46 2
      hivemind/p2p/p2p_daemon.py
  3. 43 3
      hivemind/utils/logging.py
  4. 4 2
      hivemind/utils/mpfuture.py

+ 0 - 1
examples/albert/run_trainer.py

@@ -204,7 +204,6 @@ class NoOpScheduler(LRSchedulerBase):
             return self.optimizer.scheduler.print_lr(*args, **kwargs)
             return self.optimizer.scheduler.print_lr(*args, **kwargs)
 
 
     def step(self):
     def step(self):
-        logger.debug("Called NoOpScheduler.step")
         self._last_lr = self.get_lr()
         self._last_lr = self.get_lr()
 
 
     def state_dict(self):
     def state_dict(self):

+ 46 - 2
hivemind/p2p/p2p_daemon.py

@@ -1,9 +1,12 @@
 import asyncio
 import asyncio
+import json
+import logging
 import os
 import os
 import secrets
 import secrets
 from collections.abc import AsyncIterable as AsyncIterableABC
 from collections.abc import AsyncIterable as AsyncIterableABC
 from contextlib import closing, suppress
 from contextlib import closing, suppress
 from dataclasses import dataclass
 from dataclasses import dataclass
+from datetime import datetime
 from importlib.resources import path
 from importlib.resources import path
 from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union
 from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union
 
 
@@ -16,7 +19,7 @@ from hivemind.p2p.p2p_daemon_bindings.control import P2PDaemonError, P2PHandlerE
 from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
 from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
 from hivemind.proto.p2pd_pb2 import RPCError
 from hivemind.proto.p2pd_pb2 import RPCError
 from hivemind.utils.asyncio import as_aiter, asingle
 from hivemind.utils.asyncio import as_aiter, asingle
-from hivemind.utils.logging import get_logger
+from hivemind.utils.logging import get_logger, golog_level_to_python, loglevel, python_level_to_golog
 
 
 logger = get_logger(__name__)
 logger = get_logger(__name__)
 
 
@@ -168,8 +171,13 @@ class P2P:
             **process_kwargs,
             **process_kwargs,
         )
         )
 
 
+        env = os.environ.copy()
+        env.setdefault("GOLOG_LOG_LEVEL", python_level_to_golog(loglevel))
+        env["GOLOG_LOG_FMT"] = "json"
+
+        logger.debug(f"Launching {proc_args}")
         self._child = await asyncio.subprocess.create_subprocess_exec(
         self._child = await asyncio.subprocess.create_subprocess_exec(
-            *proc_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
+            *proc_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env
         )
         )
         self._alive = True
         self._alive = True
 
 
@@ -560,8 +568,44 @@ class P2P:
                 break
                 break
             last_line = line.rstrip().decode(errors="ignore")
             last_line = line.rstrip().decode(errors="ignore")
 
 
+            self._log_p2pd_message(last_line)
             if last_line.startswith("Peer ID:"):
             if last_line.startswith("Peer ID:"):
                 ready.set_result(None)
                 ready.set_result(None)
 
 
         if not ready.done():
         if not ready.done():
             ready.set_exception(P2PDaemonError(f"Daemon failed to start: {last_line}"))
             ready.set_exception(P2PDaemonError(f"Daemon failed to start: {last_line}"))
+
+    @staticmethod
+    def _log_p2pd_message(line: str) -> None:
+        if '"logger"' not in line:  # User-friendly info from p2pd stdout
+            logger.debug(line, extra={"caller": "p2pd"})
+            return
+
+        try:
+            record = json.loads(line)
+            caller = record["caller"]
+
+            level = golog_level_to_python(record["level"])
+            if level <= logging.WARNING:
+                # Many Go loggers are excessively verbose (e.g. show warnings for unreachable peers),
+                # so we downgrade INFO and WARNING messages to DEBUG.
+                # The Go verbosity can still be controlled via the GOLOG_LOG_LEVEL env variable.
+                # Details: https://github.com/ipfs/go-log#golog_log_level
+                level = logging.DEBUG
+
+            message = record["msg"]
+            if "error" in record:
+                message += f": {record['error']}"
+
+            logger.log(
+                level,
+                message,
+                extra={
+                    "origin_created": datetime.strptime(record["ts"], "%Y-%m-%dT%H:%M:%S.%f%z").timestamp(),
+                    "caller": caller,
+                },
+            )
+        except Exception:
+            # Parsing errors are unlikely, but we don't want to lose these messages anyway
+            logger.warning(line, extra={"caller": "p2pd"})
+            logger.exception("Failed to parse go-log message:")

+ 43 - 3
hivemind/utils/logging.py

@@ -1,15 +1,33 @@
 import logging
 import logging
 import os
 import os
 
 
+loglevel = os.getenv("LOGLEVEL", "INFO")
+
+
+class CustomFormatter(logging.Formatter):
+    """
+    A formatter that allows a log time and caller info to be overridden via
+    ``logger.log(level, message, extra={"origin_created": ..., "caller": ...})``.
+    """
+
+    def format(self, record: logging.LogRecord) -> str:
+        if hasattr(record, "origin_created"):
+            record.created = record.origin_created
+            record.msecs = (record.created - int(record.created)) * 1000
+
+        if not hasattr(record, "caller"):
+            record.caller = f"{record.name}.{record.funcName}:{record.lineno}"
+
+        return super().format(record)
+
 
 
 def get_logger(module_name: str) -> logging.Logger:
 def get_logger(module_name: str) -> logging.Logger:
     # trim package name
     # trim package name
     name_without_prefix = ".".join(module_name.split(".")[1:])
     name_without_prefix = ".".join(module_name.split(".")[1:])
-    loglevel = os.getenv("LOGLEVEL", "INFO")
 
 
     logging.addLevelName(logging.WARNING, "WARN")
     logging.addLevelName(logging.WARNING, "WARN")
-    formatter = logging.Formatter(
-        fmt="[{asctime}.{msecs:03.0f}][{levelname}][{name}.{funcName}:{lineno}] {message}",
+    formatter = CustomFormatter(
+        fmt="[{asctime}.{msecs:03.0f}][{levelname}][{caller}] {message}",
         style="{",
         style="{",
         datefmt="%Y/%m/%d %H:%M:%S",
         datefmt="%Y/%m/%d %H:%M:%S",
     )
     )
@@ -20,3 +38,25 @@ def get_logger(module_name: str) -> logging.Logger:
     logger.addHandler(handler)
     logger.addHandler(handler)
     logger.propagate = False
     logger.propagate = False
     return logger
     return logger
+
+
+def golog_level_to_python(level: str) -> int:
+    level = level.upper()
+    if level in ["DPANIC", "PANIC", "FATAL"]:
+        return logging.CRITICAL
+
+    level = logging.getLevelName(level)
+    if not isinstance(level, int):
+        raise ValueError(f"Unknown go-log level: {level}")
+    return level
+
+
+def python_level_to_golog(level: str) -> str:
+    if not isinstance(level, str):
+        raise ValueError("`level` is expected to be a Python log level in the string form")
+
+    if level == "CRITICAL":
+        return "FATAL"
+    if level == "WARNING":
+        return "WARN"
+    return level

+ 4 - 2
hivemind/utils/mpfuture.py

@@ -180,8 +180,10 @@ class MPFuture(base.Future, Generic[ResultType]):
                     future = future_ref()
                     future = future_ref()
 
 
                 if future is None:
                 if future is None:
-                    logger.debug(f"Ignoring update to future with uid={uid}: the future is already done or destroyed")
-                elif update_type == UpdateType.RESULT:
+                    # The MPFuture instance is already destroyed in this process
+                    # (the caller is not interested in the result)
+                    continue
+                if update_type == UpdateType.RESULT:
                     future.set_result(payload)
                     future.set_result(payload)
                 elif update_type == UpdateType.EXCEPTION:
                 elif update_type == UpdateType.EXCEPTION:
                     future.set_exception(payload)
                     future.set_exception(payload)