Source code for mlbench_core.aggregation.pytorch.centralized

import numpy as np
import torch
import torch.distributed as dist

from mlbench_core.utils.pytorch.utils import orthogonalize, pack_tensors, unpack_tensors

try:
    import horovod.torch as hvd
except ImportError as e:
    pass

from mlbench_core.aggregation.pytorch.aggregation import Aggregation

AVG_WORLD = "avg_world"
AVG_CUSTOM = "avg_custom"
ALLREDUCE_AGGREGATION_OPS = [AVG_WORLD, AVG_CUSTOM]
"""
All possible aggregations for AllReduceAggregation
"""


[docs]class AllReduceAggregation(Aggregation): """Aggregate weights / models from different processes using all-reduce aggregation Args: world_size (int): Current distributed world size divide_before (bool): Perform division before reduction (avoid overflow) use_cuda (bool): Use cuda tensors for reduction """ def __init__(self, world_size, divide_before=False, use_cuda=False): self.world_size = world_size self.divide_before = divide_before super(AllReduceAggregation, self).__init__(use_cuda=use_cuda) def _reduce(self, data): """Reduces the given tensor using `torch.distributed` and op=`dist.ReduceOp.SUM` Args: data (:obj:`torch.Tensor`): The tensor to reduce Returns: (:obj:`torch.Tensor`): The reduced Tensor """ dist.all_reduce(data, op=dist.ReduceOp.SUM) return data def _divide(self, data, op, denom=None): """Divides the given `data` tensor by - `world_size` if op == `avg_world` - `denom`, if op == `custom_avg` Args: data (:obj:`torch.Tensor`): Data tensor to divide op (str): Aggregation method. Should be in `ALLREDUCE_AGGREGATION_OPS` denom (:obj:`torch.Tensor`, optional): Custom denominator to average by Use with op == `custom_avg`. (default: `None`) Returns: (:obj:`torch.Tensor`): The resulting tensor """ if op not in ALLREDUCE_AGGREGATION_OPS: raise NotImplementedError("Allreduce not implemented for op={}".format(op)) if op == AVG_WORLD: data.div_(self.world_size) elif op == AVG_CUSTOM: if denom is None or denom == 0: raise ValueError("Denominator should be one element tensor") data.div_(denom) return data def _agg(self, data, op, denom=None): """Aggregate data using `op` operation. - If op == `avg_world`, the reduced tensor will be divided by `world_size`, - if op == `custom_avg`, the reduced tensor will be divided by `denom` If `self.divide_before`, the division is performed before reduction. This can be helpful to avoid overflows when using `float16` training Args: data (:obj:`torch.Tensor`): A Tensor to be aggregated. op (str): Aggregation method. Should be in `ALLREDUCE_AGGREGATION_OPS` denom (:obj:`torch.Tensor`, optional): Custom denominator to average by Use with op == `custom_avg`. (default: `None`) Returns: (:obj:`torch.Tensor`): The aggregated tensor. """ if self.divide_before: data = self._divide(data, op, denom) if self.world_size > 1: data = self._reduce(data) if dist.get_backend() == dist.Backend.NCCL: torch.cuda.synchronize() if not self.divide_before: data = self._divide(data, op, denom) return data
[docs]class AllReduceAggregationHVD(AllReduceAggregation): """Implements `AllReduceAggregation` using horovod for communication""" def _reduce(self, data): data = hvd.allreduce(data, op=hvd.Sum) return data
[docs]class SparsifiedAggregation(Aggregation): """Aggregate sparsified updates.""" def __init__(self, model, use_cuda=False): super(SparsifiedAggregation, self).__init__(use_cuda=use_cuda) pass def _agg(self, data, op, denom=None): pass
[docs]class PowerAggregation(Aggregation): """Aggregate updates using power iteration and error feedback. Args: model (:obj:`nn.Module`): Model which contains parameters for SGD use_cuda (bool): Whether to use cuda tensors for aggregation reuse_query (bool): Whether to use warm start to initialize the power iteration rank (int): The rank of the gradient approximation """ def __init__(self, model, use_cuda=False, reuse_query=False, world_size=1, rank=1): super(PowerAggregation, self).__init__(use_cuda=use_cuda) self.p_memory = None self.q_memory = None self.reuse_query = reuse_query self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") self.rng = np.random.RandomState(1) self.n_workers = world_size self.rank = rank self.memories = [torch.zeros_like(param) for param in model.parameters()] self.send_buffers = [torch.zeros_like(param) for param in model.parameters()] def set_random(self, vector): """Sets the data in `vector` to random values.""" torch.manual_seed(self.rng.randint(1_000_000_000)) vector.data[:] = torch.randn(*vector.shape, device=self.device) def _agg_gradients_by_model(self, model, op, denom=None): """Aggregate models gradients, all layers at once Args: model (:obj:`torch.Module`): Models to be averaged. op (str): Aggregation methods like `avg`, `sum`, `min`, `max`, etc. denom (None): Not used here """ grads = [t.grad.data for t in model.parameters()] aggregated = self._agg(grads, op=op) for i, param in enumerate(model.parameters()): param.grad.data = aggregated[i] def agg_weights(self, by_layer=False): raise NotImplementedError("PowerSGD doesn't allow aggregation by weights") def agg_model(self, by_layer=False): if by_layer: raise NotImplementedError("PowerSGD doesn't allow aggregation by layer") else: return self._agg_gradients_by_model def _agg(self, data, op, denom=None): """Aggregate data using `op` operation. Args: data (:obj:`torch.Tensor`): A Tensor to be aggragated. op (str): Aggregation methods like `avg`, `sum`, `min`, `max`, etc. denom (None): Not used here Returns: :obj:`torch.Tensor`: An aggregated tensor. """ if op == "avg": for grad, memory, send_bfr in zip(data, self.memories, self.send_buffers): send_bfr.data[:] = grad + memory self.reduce(self.send_buffers, data, self.memories) else: raise NotImplementedError("op {} is not supported yet.".format(op)) return data def reduce(self, grad_in, grad_out, memory_out): """Reduces the gradients between the workers in place and calculates error feedback. Args: grad_in (list[torch.Tensor]): The gradients to reduce. grad_out (list[torch.Tensor]): Used for storing the reduced gradients. memory_out (list[torch.Tensor]): Used for storing error feedback. """ # Split the tensors into rank1-ones that will be reduced un-compressed # and rank > 1 tensors that are compressed rank1_tensors = [ (tensor, out, mem) for tensor, out, mem in zip(grad_in, grad_out, memory_out) if tensor.ndimension() <= 1 ] high_rank_tensors = [ (tensor, out, mem) for tensor, out, mem in zip(grad_in, grad_out, memory_out) if tensor.ndimension() > 1 ] # We are building a rank-1 approximation of every tensor # that can be interpreted as a matrix. Let the approximation be # M = p q^T # We are allocating consequtive memory for the p's and q's memory_is_uninitialized = self.p_memory is None p_total_size = 0 q_total_size = 0 for tensor, _, _ in high_rank_tensors: matrix = tensor.view(tensor.shape[0], -1) n, m = matrix.shape rank = min(n, m, self.rank) p_total_size += n * rank q_total_size += m * rank if self.p_memory is None: self.p_memory = torch.empty(p_total_size, device=self.device) self.q_memory = torch.empty(q_total_size, device=self.device) # Find them again and make lists of pointers ps = [] qs = [] p_idx = 0 q_idx = 0 for tensor, _, _ in high_rank_tensors: matrix = tensor.view(tensor.shape[0], -1) n, m = matrix.shape rank = min(n, m, self.rank) ps.append(self.p_memory[p_idx : p_idx + n * rank].view(n, rank)) qs.append(self.q_memory[q_idx : q_idx + m * rank].view(m, rank)) p_idx += n * rank q_idx += m * rank for (tensor, _, _), q, p in zip(high_rank_tensors, qs, ps): matrix = tensor.view(tensor.shape[0], -1) n, m = matrix.shape if self.reuse_query and not memory_is_uninitialized: pass else: # Sample a query vector q self.set_random(q) for (tensor, _, _), q, p in zip(high_rank_tensors, qs, ps): matrix = tensor.view(tensor.shape[0], -1) torch.matmul(matrix, q, out=p) dist.all_reduce(self.p_memory) # Start communicating rank 1 tensors rank1_packed, rank1_indices, rank1_sizes = pack_tensors( [tensor for (tensor, _, _) in rank1_tensors] ) rank1_handle = dist.all_reduce(rank1_packed, async_op=True) for p in ps: orthogonalize(p) for p, q, (tensor, _, _) in zip(ps, qs, high_rank_tensors): matrix = tensor.view(tensor.shape[0], -1) torch.matmul(matrix.t(), p, out=q) dist.all_reduce(self.q_memory) self.q_memory.data[:] /= self.n_workers for p, q, (tensor, out, mem) in zip(ps, qs, high_rank_tensors): # Set the output gradient torch.matmul(p, q.t(), out=out.data[:]) mem.data[:] = tensor - out rank1_handle.wait() rank1_packed /= self.n_workers rank1_unpacked = unpack_tensors(rank1_packed, rank1_indices, rank1_sizes) for i, (_, out, _) in enumerate(rank1_tensors): out[:] = rank1_unpacked[i]