89 lines
2.5 KiB
Python
89 lines
2.5 KiB
Python
import os
|
|
import torch
|
|
|
|
from datetime import timedelta
|
|
from loguru import logger
|
|
|
|
# Tensor Parallelism settings
|
|
RANK = int(os.getenv("RANK", "0"))
|
|
WORLD_SIZE = int(os.getenv("WORLD_SIZE", "1"))
|
|
|
|
# CUDA memory fraction
|
|
MEMORY_FRACTION = float(os.getenv("CUDA_MEMORY_FRACTION", "1.0"))
|
|
|
|
|
|
class FakeBarrier:
|
|
def wait(self):
|
|
pass
|
|
|
|
|
|
class FakeGroup:
|
|
def __init__(self, rank, size):
|
|
self._rank = rank
|
|
self._size = size
|
|
|
|
def allreduce(self, *args, **kwargs):
|
|
return FakeBarrier()
|
|
|
|
def allgather(self, inputs, local_tensor, **kwargs):
|
|
assert (
|
|
len(inputs[0]) == len(local_tensor) == 1
|
|
), f"{len(inputs[0])} != {len(local_tensor)} != 1, and the FakeGroup is supposed to join on simple tensors"
|
|
for input_ in inputs:
|
|
input_[0].data = local_tensor[0].data
|
|
return FakeBarrier()
|
|
|
|
def barrier(self, *args, **kwargs):
|
|
return FakeBarrier()
|
|
|
|
def size(self):
|
|
return self._size
|
|
|
|
def rank(self):
|
|
return self._rank
|
|
|
|
|
|
def initialize_torch_distributed():
|
|
if torch.cuda.is_available():
|
|
from torch.distributed import ProcessGroupNCCL
|
|
|
|
# Set the device id.
|
|
assert WORLD_SIZE <= torch.cuda.device_count(), "Each process is one gpu"
|
|
device = RANK % torch.cuda.device_count()
|
|
torch.cuda.set_device(device)
|
|
torch.cuda.set_per_process_memory_fraction(MEMORY_FRACTION, device)
|
|
backend = "nccl"
|
|
options = ProcessGroupNCCL.Options()
|
|
options.is_high_priority_stream = True
|
|
options._timeout = timedelta(seconds=60)
|
|
else:
|
|
try:
|
|
import oneccl_bindings_for_pytorch
|
|
|
|
backend = "ccl"
|
|
if os.getenv("CCL_WORKER_COUNT", None) is None:
|
|
os.environ["CCL_WORKER_COUNT"] = str(1)
|
|
except ImportError:
|
|
backend = "gloo"
|
|
options = None
|
|
|
|
if WORLD_SIZE == 1:
|
|
return FakeGroup(RANK, WORLD_SIZE), RANK, WORLD_SIZE
|
|
else:
|
|
if os.getenv("DEBUG", None) == "1":
|
|
return FakeGroup(RANK, WORLD_SIZE), RANK, WORLD_SIZE
|
|
|
|
if not torch.distributed.is_initialized():
|
|
# Call the init process.
|
|
torch.distributed.init_process_group(
|
|
backend=backend,
|
|
world_size=WORLD_SIZE,
|
|
rank=RANK,
|
|
timeout=timedelta(seconds=60),
|
|
pg_options=options,
|
|
)
|
|
else:
|
|
logger.warning("torch.distributed is already initialized.")
|
|
|
|
return torch.distributed.group.WORLD, RANK, WORLD_SIZE
|