from mlbench_core.utils.pytorch.utils import pack_tensors, unpack_tensors
[docs]class Aggregation(object):
"""Aggregate updates / models from different processes.
Args:
use_cuda (bool): Whether to use CUDA tensors for communication
"""
def __init__(self, use_cuda=False):
self.use_cuda = use_cuda
[docs] def _agg(self, data, op, denom=None):
"""Aggregate data using `op` operation.
Args:
data (:obj:`torch.Tensor`): A Tensor to be aggregated.
op (str): Aggregation methods like `avg`, `sum`, `min`, `max`, etc.
denom (:obj:`torch.Tensor`, optional): Custom denominator to average by
Use with op == `custom_avg`. (default: `None`)
Returns:
:obj:`torch.Tensor`: An aggregated tensor.
"""
raise NotImplementedError
[docs] def _agg_weights_by_model(self, model, op, denom=None):
"""Aggregate models by model weight, all layers at once
Args:
model (:obj:`torch.Module`): Models to be averaged.
op (str): Aggregation method. Should be in `ALLREDUCE_AGGREGATION_OPS`
denom (:obj:`torch.Tensor`, optional): Custom denominator to average by
Use with op == `custom_avg`. (default: `None`)
"""
# Pack all layers
packed, indices, sizes = pack_tensors(
[t for t in model.parameters()], use_cuda=self.use_cuda
)
aggregated = self._agg(packed, op=op, denom=denom)
tensors = unpack_tensors(aggregated, indices, sizes)
# Unpack
for i, param in enumerate(model.parameters()):
param.data = tensors[i]
[docs] def _agg_gradients_by_model(self, model, op, denom=None):
"""Aggregate models gradients, all layers at once
Args:
model (:obj:`torch.Module`): Models to be averaged.
op (str): Aggregation method. Should be in `ALLREDUCE_AGGREGATION_OPS`
denom (:obj:`torch.Tensor`, optional): Custom denominator to average by
Use with op == `custom_avg`. (default: `None`)
"""
# Pack all layers
packed, indices, sizes = pack_tensors(
[t.grad for t in model.parameters()], use_cuda=self.use_cuda
)
aggregated = self._agg(packed, op=op, denom=denom)
# Unpack
tensors = unpack_tensors(aggregated, indices, sizes)
for i, param in enumerate(model.parameters()):
param.grad.data = tensors[i]
[docs] def _agg_weights_by_layer(self, model, op, denom=None):
"""Aggregate models by model weight, for each layer individually
Args:
model (:obj:`torch.Module`): Models to be averaged.
op (str): Aggregation method. Should be in `ALLREDUCE_AGGREGATION_OPS`
denom (:obj:`torch.Tensor`, optional): Custom denominator to average by
Use with op == `custom_avg`. (default: `None`)
"""
# Aggregate layer by layer
for _, param in enumerate(model.parameters()):
grad = self._agg(param.data, op=op, denom=denom)
param.data = grad
[docs] def _agg_gradients_by_layer(self, model, op, denom=None):
"""Aggregate models gradients each layer individually
Args:
model (:obj:`torch.Module`): Models to be averaged.
op (str): Aggregation method. Should be in `ALLREDUCE_AGGREGATION_OPS`
denom (:obj:`torch.Tensor`, optional): Custom denominator to average by
Use with op == `custom_avg`. (default: `None`)
"""
# Aggregate layer by layer
for _, param in enumerate(model.parameters()):
grad = self._agg(param.grad.data, op=op, denom=denom)
param.grad.data = grad
[docs] def agg_model(self, by_layer=False):
if by_layer:
return self._agg_weights_by_layer
else:
return self._agg_weights_by_model
[docs] def agg_grad(self, by_layer=False):
if by_layer:
return self._agg_gradients_by_layer
else:
return self._agg_gradients_by_model