server.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. from __future__ import annotations
  2. import gc
  3. import multiprocessing as mp
  4. import random
  5. import threading
  6. import time
  7. from typing import Dict, List, Optional, Sequence, Union
  8. import numpy as np
  9. import psutil
  10. import torch
  11. from hivemind import DHT, MAX_DHT_TIME_DISCREPANCY_SECONDS, BatchTensorDescriptor, get_dht_time
  12. from hivemind.moe.server.layers import add_custom_models_from_file
  13. from hivemind.moe.server.runtime import Runtime
  14. from hivemind.proto.runtime_pb2 import CompressionType
  15. from hivemind.utils.logging import get_logger, use_hivemind_log_handler
  16. from src import BloomConfig, declare_active_modules
  17. from src.bloom.from_pretrained import DTYPE_MAP, load_pretrained_block
  18. from src.data_structures import CHAIN_DELIMITER, UID_DELIMITER, ServerState
  19. from src.dht_utils import get_remote_module_infos
  20. from src.server import block_selection
  21. from src.server.backend import TransformerBackend
  22. from src.server.cache import MemoryCache
  23. from src.server.handler import TransformerConnectionHandler
  24. from src.server.throughput import get_host_throughput
  25. from src.utils.convert_8bit import replace_8bit_linear
  26. use_hivemind_log_handler("in_root_logger")
  27. logger = get_logger(__file__)
  28. class Server(threading.Thread):
  29. """
  30. Runs ModuleContainer, periodically checks that the network is balanced,
  31. restarts the ModuleContainer with other layers if the imbalance is significant
  32. """
  33. def __init__(
  34. self,
  35. prefix: Optional[str],
  36. converted_model_name_or_path: str,
  37. throughput: Union[float, str],
  38. num_blocks: Optional[int] = None,
  39. block_indices: Optional[str] = None,
  40. num_handlers: int = 8,
  41. min_batch_size: int = 1,
  42. max_batch_size: int = 4096,
  43. inference_max_length: int = 4096,
  44. torch_dtype: str = "auto",
  45. revision: str = "main",
  46. cache_dir: Optional[str] = None,
  47. attn_cache_size: Optional[int] = None,
  48. device: Optional[Union[str, torch.device]] = None,
  49. initial_peers: Sequence[str] = (),
  50. compression=CompressionType.NONE,
  51. stats_report_interval: Optional[int] = None,
  52. custom_module_path=None,
  53. update_period: float = 30,
  54. expiration: Optional[float] = None,
  55. prefetch_batches: int = 1,
  56. sender_threads: int = 1,
  57. min_balance_quality: float = 0.0,
  58. mean_balance_check_period: float = 150,
  59. mean_block_selection_delay: float = 0.5,
  60. use_auth_token: Optional[str] = None,
  61. load_in_8bit: bool = False,
  62. *,
  63. start: bool,
  64. **kwargs,
  65. ):
  66. """Create a server with one or more bloom blocks. See run_server.py for documentation."""
  67. super().__init__()
  68. self.converted_model_name_or_path = converted_model_name_or_path
  69. self.num_handlers = num_handlers
  70. self.min_batch_size, self.max_batch_size = min_batch_size, max_batch_size
  71. self.inference_max_length = inference_max_length
  72. self.cache_dir = cache_dir
  73. self.attn_cache_size = attn_cache_size
  74. self.compression = compression
  75. self.stats_report_interval, self.update_period = stats_report_interval, update_period
  76. self.prefetch_batches, self.sender_threads = prefetch_batches, sender_threads
  77. self.use_auth_token = use_auth_token
  78. self.load_in_8bit = load_in_8bit
  79. if custom_module_path is not None:
  80. add_custom_models_from_file(custom_module_path)
  81. if prefix is None:
  82. prefix = converted_model_name_or_path
  83. assert UID_DELIMITER not in prefix and CHAIN_DELIMITER not in prefix, (
  84. f"Cannot use model name as prefix (contains '{UID_DELIMITER}' or '{CHAIN_DELIMITER}'); "
  85. f"Please specify --prefix manually when starting a server"
  86. )
  87. logger.info(f"Automatic dht prefix: {prefix}")
  88. self.prefix = prefix
  89. if expiration is None:
  90. expiration = max(2 * update_period, MAX_DHT_TIME_DISCREPANCY_SECONDS)
  91. self.expiration = expiration
  92. self.dht = DHT(initial_peers=initial_peers, start=True, **kwargs)
  93. visible_maddrs_str = [str(a) for a in self.dht.get_visible_maddrs()]
  94. logger.info(f"Running DHT node on {visible_maddrs_str}, initial peers = {initial_peers}")
  95. device = device or ("cuda" if torch.cuda.is_available() else "cpu")
  96. self.device = device
  97. self.memory_cache = MemoryCache(device, attn_cache_size)
  98. assert isinstance(throughput, float) or throughput in ["auto", "eval"]
  99. if throughput in ["auto", "eval"]:
  100. throughput = get_host_throughput(device, force_eval=(throughput == "eval"))
  101. self.throughput = throughput
  102. if isinstance(torch_dtype, str):
  103. torch_dtype = DTYPE_MAP[torch_dtype]
  104. assert torch_dtype in DTYPE_MAP.values(), f"torch_dtype must be one of {list(DTYPE_MAP.values())}"
  105. self.torch_dtype = torch_dtype
  106. self.block_config = BloomConfig.from_pretrained(
  107. converted_model_name_or_path,
  108. use_auth_token=use_auth_token,
  109. revision=revision,
  110. )
  111. self.module_uids = [f"{self.prefix}.{block_index}" for block_index in range(self.block_config.n_layer)]
  112. assert (block_indices is None) != (num_blocks is None), "please specify num_blocks or block_indices, not both"
  113. if block_indices is not None:
  114. try:
  115. first_block_index, last_block_index = block_indices.split(":")
  116. first_block_index, last_block_index = map(int, map(str.strip, (first_block_index, last_block_index)))
  117. except Exception as e:
  118. logger.error(f"Failed to parse --block_indices ({e}), must be start:end (e.g. 0:18)")
  119. raise
  120. block_indices = range(first_block_index, last_block_index)
  121. self.strict_block_indices, self.num_blocks = block_indices, num_blocks
  122. self.min_balance_quality = min_balance_quality
  123. self.mean_balance_check_period = mean_balance_check_period
  124. self.mean_block_selection_delay = mean_block_selection_delay
  125. self.stop = threading.Event()
  126. if start:
  127. self.start()
  128. def run(self):
  129. while True:
  130. block_indices = self._choose_blocks()
  131. self.module_container = ModuleContainer.create(
  132. dht=self.dht,
  133. prefix=self.prefix,
  134. converted_model_name_or_path=self.converted_model_name_or_path,
  135. block_config=self.block_config,
  136. memory_cache=self.memory_cache,
  137. throughput=self.throughput,
  138. block_indices=block_indices,
  139. num_handlers=self.num_handlers,
  140. min_batch_size=self.min_batch_size,
  141. max_batch_size=self.max_batch_size,
  142. inference_max_length=self.inference_max_length,
  143. torch_dtype=self.torch_dtype,
  144. cache_dir=self.cache_dir,
  145. device=self.device,
  146. compression=self.compression,
  147. stats_report_interval=self.stats_report_interval,
  148. update_period=self.update_period,
  149. expiration=self.expiration,
  150. prefetch_batches=self.prefetch_batches,
  151. sender_threads=self.sender_threads,
  152. use_auth_token=self.use_auth_token,
  153. load_in_8bit=self.load_in_8bit,
  154. start=True,
  155. )
  156. try:
  157. self.module_container.ready.wait()
  158. while True:
  159. timeout = random.random() * 2 * self.mean_balance_check_period
  160. # TODO: Follow ModuleContainer status (to restart/stop if it crashes)
  161. if self.stop.wait(timeout):
  162. return
  163. if self._should_choose_other_blocks():
  164. logger.info("Swarm is imbalanced, server will load other blocks")
  165. break # Stop serving this set of modules
  166. finally:
  167. self.module_container.shutdown()
  168. self._clean_memory_and_fds()
  169. def _clean_memory_and_fds(self):
  170. del self.module_container
  171. gc.collect() # In particular, this closes unused file descriptors
  172. cur_proc = psutil.Process()
  173. num_fds = [proc.num_fds() for proc in [cur_proc] + psutil.Process().children(recursive=True)]
  174. logger.info(f"Cleanup complete, {sum(num_fds)} open file descriptors left")
  175. def _choose_blocks(self) -> List[int]:
  176. if self.strict_block_indices is not None:
  177. return self.strict_block_indices
  178. assert self.num_blocks is not None
  179. # If multiple servers (e.g., launched on the same machine by a script) get to this line at the same time,
  180. # this delay decreases the probability of a race condition while choosing the best blocks to serve.
  181. time.sleep(random.random() * 2 * self.mean_block_selection_delay)
  182. module_infos = get_remote_module_infos(self.dht, self.module_uids, expiration_time=np.inf)
  183. return block_selection.choose_best_blocks(self.num_blocks, module_infos)
  184. def _should_choose_other_blocks(self) -> bool:
  185. if self.strict_block_indices is not None:
  186. return False
  187. module_infos = get_remote_module_infos(self.dht, self.module_uids, expiration_time=np.inf)
  188. return block_selection.should_choose_other_blocks(self.dht.peer_id, module_infos, self.min_balance_quality)
  189. def shutdown(self):
  190. self.stop.set()
  191. self.dht.shutdown()
  192. self.dht.join()
  193. class ModuleContainer(threading.Thread):
  194. """Serves a set of specific Bloom layers for inference, forward, and backward. Announces itself over the DHT."""
  195. def __init__(
  196. self,
  197. dht: DHT,
  198. module_backends: Dict[str, TransformerBackend],
  199. *,
  200. inference_max_length: int,
  201. num_connection_handlers: int,
  202. throughput: float,
  203. update_period: float,
  204. expiration: Optional[float] = None,
  205. start: bool,
  206. **kwargs,
  207. ):
  208. super().__init__()
  209. self.dht, self.module_backends = dht, module_backends
  210. self.throughput, self.update_period, self.expiration = throughput, update_period, expiration
  211. self.conn_handlers = [
  212. TransformerConnectionHandler(dht, self.module_backends, inference_max_length)
  213. for _ in range(num_connection_handlers)
  214. ]
  215. self.runtime = Runtime(self.module_backends, **kwargs)
  216. self.dht_handler_thread = ModuleAnnouncerThread(
  217. self.module_backends,
  218. dht,
  219. throughput=throughput,
  220. update_period=update_period,
  221. expiration=expiration,
  222. daemon=True,
  223. )
  224. self.checkpoint_saver = None # no need to save checkpoints since we do not change model state
  225. if start:
  226. self.run_in_background(await_ready=True)
  227. def run(self):
  228. """
  229. Runs ModuleContainer in the current thread. Initializes dht if necessary, starts connection handlers,
  230. runs Runtime (self.runtime) to process incoming requests.
  231. """
  232. logger.info(f"Serving {len(self.module_backends)} blocks:")
  233. for expert_name, backend in self.module_backends.items():
  234. num_parameters = sum(p.numel() for p in backend.module.parameters() if p.requires_grad)
  235. logger.info(f"{expert_name}: {backend.module.__class__.__name__}, {num_parameters} parameters")
  236. if not self.dht.is_alive():
  237. self.dht.run_in_background(await_ready=True)
  238. if self.module_backends:
  239. self.dht_handler_thread.start()
  240. if self.checkpoint_saver is not None:
  241. self.checkpoint_saver.start()
  242. for handler in self.conn_handlers:
  243. handler.run_in_background()
  244. self.runtime.run()
  245. # noinspection PyMethodOverriding
  246. @classmethod
  247. def create(
  248. cls,
  249. *,
  250. dht: DHT,
  251. prefix: str,
  252. converted_model_name_or_path: str,
  253. block_config: BloomConfig,
  254. memory_cache: MemoryCache,
  255. throughput: float,
  256. block_indices: List[int],
  257. num_handlers: Optional[int],
  258. min_batch_size: int,
  259. max_batch_size: int,
  260. inference_max_length: int,
  261. torch_dtype: torch.dtype,
  262. cache_dir: Optional[str],
  263. device: Union[str, torch.device],
  264. compression: CompressionType,
  265. stats_report_interval: Optional[int],
  266. update_period: float,
  267. expiration: Optional[float],
  268. prefetch_batches: int,
  269. sender_threads: int,
  270. use_auth_token: Optional[str],
  271. load_in_8bit: bool,
  272. start: bool,
  273. ) -> ModuleContainer:
  274. module_uids = [f"{prefix}.{block_index}" for block_index in block_indices]
  275. declare_active_modules(
  276. dht,
  277. module_uids,
  278. expiration_time=get_dht_time() + expiration,
  279. state=ServerState.JOINING,
  280. throughput=throughput,
  281. )
  282. logger.info(f"Announced that blocks {block_indices} are joining")
  283. blocks = {}
  284. for module_uid, block_index in zip(module_uids, block_indices):
  285. block = load_pretrained_block(
  286. converted_model_name_or_path,
  287. block_index,
  288. block_config,
  289. torch_dtype=torch_dtype,
  290. use_auth_token=use_auth_token,
  291. cache_dir=cache_dir,
  292. )
  293. if load_in_8bit:
  294. dtype = block.input_layernorm.weight.dtype
  295. block = replace_8bit_linear(block)
  296. block = block.to(device)
  297. for param in block.parameters():
  298. param.requires_grad = False
  299. blocks[module_uid] = TransformerBackend(
  300. module_uid,
  301. block,
  302. memory_cache=memory_cache,
  303. backend_dtype=None if torch_dtype == "auto" else torch_dtype,
  304. args_schema=(
  305. BatchTensorDescriptor(
  306. 1, 2048, block_config.hidden_size, dtype=torch.float32, compression=compression
  307. ),
  308. ),
  309. kwargs_schema={},
  310. outputs_schema=(
  311. BatchTensorDescriptor(
  312. 1, 2048, block_config.hidden_size, dtype=torch.float32, compression=compression
  313. ),
  314. ),
  315. min_batch_size=min_batch_size,
  316. max_batch_size=max_batch_size,
  317. )
  318. return cls(
  319. dht,
  320. blocks,
  321. throughput=throughput,
  322. num_connection_handlers=num_handlers,
  323. inference_max_length=inference_max_length,
  324. device=device,
  325. stats_report_interval=stats_report_interval,
  326. update_period=update_period,
  327. expiration=expiration,
  328. prefetch_batches=prefetch_batches,
  329. sender_threads=sender_threads,
  330. start=start,
  331. )
  332. def run_in_background(self, await_ready=True, timeout=None):
  333. """
  334. Starts ModuleContainer in a background thread. if await_ready, this method will wait until the container
  335. is ready to process incoming requests or for :timeout: seconds max.
  336. """
  337. self.start()
  338. if await_ready and not self.ready.wait(timeout=timeout):
  339. raise TimeoutError("ModuleContainer didn't notify .ready in {timeout} seconds")
  340. @property
  341. def ready(self) -> mp.synchronize.Event:
  342. """
  343. An event (multiprocessing.Event) that is set when the container is ready to process requests.
  344. Example
  345. =======
  346. >>> container.start()
  347. >>> container.ready.wait(timeout=10)
  348. >>> print("Container ready" if container.ready.is_set() else "Container didn't start in 10 seconds")
  349. """
  350. return self.runtime.ready # mp.Event that is true if self is ready to process batches
  351. def shutdown(self):
  352. """
  353. Gracefully terminate the container, process-safe.
  354. Please note that terminating container otherwise (e.g. by killing processes) may result in zombie processes.
  355. If you did already cause a zombie outbreak, your only option is to kill them with -9 (SIGKILL).
  356. """
  357. if self.module_backends:
  358. self.dht_handler_thread.stop.set()
  359. self.dht_handler_thread.join()
  360. declare_active_modules(
  361. self.dht,
  362. self.module_backends.keys(),
  363. expiration_time=get_dht_time() + self.expiration,
  364. state=ServerState.OFFLINE,
  365. throughput=self.throughput,
  366. )
  367. logger.info(f"Announced that blocks {list(self.module_backends.keys())} are offline")
  368. self.ready.clear()
  369. for handler in self.conn_handlers:
  370. handler.shutdown()
  371. logger.debug("Connection handlers terminated")
  372. if self.checkpoint_saver is not None:
  373. self.checkpoint_saver.stop.set()
  374. self.checkpoint_saver.join()
  375. logger.debug(f"Shutting down pools")
  376. for pool in self.runtime.pools:
  377. if pool.is_alive():
  378. pool.shutdown()
  379. logger.debug(f"Shutting down runtime")
  380. self.runtime.shutdown()
  381. logger.info("Module container shut down succesfully")
  382. class ModuleAnnouncerThread(threading.Thread):
  383. """Periodically announces that this container hosts the specified modules, visible to all DHT peers"""
  384. def __init__(
  385. self,
  386. module_backends: Dict[str, TransformerBackend],
  387. dht: DHT,
  388. *,
  389. throughput: float,
  390. update_period: float = 30,
  391. expiration: float,
  392. **kwargs,
  393. ):
  394. super().__init__(**kwargs)
  395. self.module_backends = module_backends
  396. self.dht = dht
  397. self.throughput = throughput
  398. self.update_period = update_period
  399. self.expiration = expiration
  400. self.stop = threading.Event()
  401. def run(self) -> None:
  402. while True:
  403. declare_active_modules(
  404. self.dht,
  405. self.module_backends.keys(),
  406. expiration_time=get_dht_time() + self.expiration,
  407. state=ServerState.ONLINE,
  408. throughput=self.throughput,
  409. )
  410. if self.stop.wait(self.update_period):
  411. break