Source code for stellargraph.ensemble

# -*- coding: utf-8 -*-
#
# Copyright 2018-2020 Data61, CSIRO
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Ensembles of graph neural network models, GraphSAGE, GCN, GAT, and HinSAGE, with optional bootstrap sampling of the
training data (implemented in the BaggingEnsemble class).
"""

from stellargraph.layer import *

__all__ = ["Ensemble", "BaggingEnsemble"]

import numpy as np
from tensorflow import keras as K
from tensorflow.keras.callbacks import EarlyStopping

import stellargraph as sg


[docs]class Ensemble(object): """ The Ensemble class can be used to create ensembles of stellargraph's graph neural network algorithms including GCN, GraphSAGE, GAT, and HinSAGE. Ensembles can be used for training classification and regression problems for node attribute inference and link prediction. The Ensemble class can be used to create Naive ensembles. Naive ensembles add model diversity by random initialisation of the models' weights (before training) to different values. Each model in the ensemble is trained on the same training set of examples. .. seealso:: Example using ensembles: `node classification <https://stellargraph.readthedocs.io/en/stable/demos/ensembles/ensemble-node-classification-example.html>`__. Related functionality: :class:`.BaggingEnsemble` for bootstrap sampling while training, in addition to random initialisation. """ def __init__(self, model, n_estimators=3, n_predictions=3): """ Args: model: A keras model. n_estimators (int): The number of estimators (aka models) in the ensemble. n_predictions (int): The number of predictions per query point per estimator """ if not isinstance(model, K.Model): raise ValueError( "({}) model must be a Keras model received object of type {}".format( type(self).__name__, type(model).__name__ ) ) if n_estimators <= 0 or not isinstance(n_estimators, int): raise ValueError( "({}) n_estimators must be positive integer but received {}".format( type(self).__name__, n_estimators ) ) if n_predictions <= 0 or not isinstance(n_predictions, int): raise ValueError( "({}) n_predictions must be positive integer but received {}".format( type(self).__name__, n_predictions ) ) self.metrics_names = ( None # It will be set when the self.compile() method is called ) self.models = [] self.history = [] self.n_estimators = n_estimators self.n_predictions = n_predictions self.early_stoppping_patience = 10 # Create the enseble from the given base model self._init_models(model) def _init_models(self, model): """ This method creates an ensemble of models by cloning the given base model self.n_estimators times. All models have the same architecture but their weights are initialised with different (random) values. Args: model: A Keras model that is the base model for the ensemble. """ # first copy is the given model self.models.append(model) # now clone the model self.n_estimators-1 times for _ in range(self.n_estimators - 1): self.models.append(K.models.clone_model(model))
[docs] def layers(self, indx=None): """ This method returns the layer objects for the model specified by the value of ``indx``. Args: indx (None or int): The index (starting at 0) of the model to return the layers for. If it is None, then the layers for the 0-th (or first) model are returned. Returns: list: The layers for the specified model. """ if indx is not None and not isinstance(indx, (int,)): raise ValueError( "({}) indx should be None or integer type but received type {}".format( type(self).__name__, type(indx).__name__ ) ) if isinstance(indx, (int,)) and indx < 0: raise ValueError( "({}) indx must be greater than or equal to zero but received {}".format( type(self).__name__, indx ) ) if indx is None and len(self.models) > 0: # Default is to return the layers for the first model return self.models[0].layers if len(self.models) > indx: return self.models[indx].layers else: # Error because index is out of bounds raise ValueError( "({}) indx {} is out of range 0 to {}".format( type(self).__name__, indx, len(self.models) ) )
[docs] def compile( self, optimizer, loss=None, metrics=None, loss_weights=None, sample_weight_mode=None, weighted_metrics=None, ): """ Method for configuring the model for training. It is a wrapper of the `keras.models.Model.compile` method for all models in the ensemble. For detailed descriptions of Keras-specific parameters consult the Keras documentation at https://keras.io/models/sequential/ Args: optimizer (Keras optimizer or str): (Keras-specific parameter) The optimizer to use given either as an instance of a Keras optimizer or a string naming the optimiser of choice. loss (Keras function or str): (Keras-specific parameter) The loss function or string indicating the type of loss to use. metrics (list or dict): (Keras-specific parameter) List of metrics to be evaluated by each model in the ensemble during training and testing. It should be a list for a model with a single output. To specify different metrics for different outputs of a multi-output model, you could also pass a dictionary. loss_weights (None or list): (Keras-specific parameter) Optional list or dictionary specifying scalar coefficients (Python floats) to weight the loss contributions of different model outputs. The loss value that will be minimized by the model will then be the weighted sum of all individual losses, weighted by the loss_weights coefficients. If a list, it is expected to have a 1:1 mapping to the model's outputs. If a tensor, it is expected to map output names (strings) to scalar coefficients. sample_weight_mode (None, str, list, or dict): (Keras-specific parameter) If you need to do timestep-wise sample weighting (2D weights), set this to "temporal". None defaults to sample-wise weights (1D). If the model has multiple outputs, you can use a different sample_weight_mode on each output by passing a dictionary or a list of modes. weighted_metrics (list): (Keras-specific parameter) List of metrics to be evaluated and weighted by sample_weight or class_weight during training and testing. """ for model in self.models: model.compile( optimizer=optimizer, loss=loss, metrics=metrics, loss_weights=loss_weights, sample_weight_mode=sample_weight_mode, weighted_metrics=weighted_metrics, ) self.metrics_names = self.models[0].metrics_names # assumes all models are same
[docs] def fit( self, generator, steps_per_epoch=None, epochs=1, verbose=1, validation_data=None, validation_steps=None, class_weight=None, max_queue_size=10, workers=1, use_multiprocessing=False, shuffle=True, initial_epoch=0, use_early_stopping=False, early_stopping_monitor="val_loss", ): """ This method trains the ensemble on the data specified by the generator. If validation data are given, then the training metrics are evaluated on these data and results printed on screen if verbose level is greater than 0. The method trains each model in the ensemble in series for the number of epochs specified. Training can also stop early with the best model as evaluated on the validation data, if use_early_stopping is set to True. For detail descriptions of Keras-specific parameters consult the Keras documentation at https://keras.io/models/sequential/ Args: generator: The generator object for training data. It should be one of type NodeSequence, LinkSequence, SparseFullBatchSequence, or FullBatchSequence. steps_per_epoch (None or int): (Keras-specific parameter) If not None, it specifies the number of steps to yield from the generator before declaring one epoch finished and starting a new epoch. epochs (int): (Keras-specific parameter) The number of training epochs. verbose (int): (Keras-specific parameter) The verbosity mode that should be 0 , 1, or 2 meaning silent, progress bar, and one line per epoch respectively. validation_data: A generator for validation data that is optional (None). If not None then, it should be one of type NodeSequence, LinkSequence, SparseFullBatchSequence, or FullBatchSequence. validation_steps (None or int): (Keras-specific parameter) If validation_generator is not None, then it specifies the number of steps to yield from the generator before stopping at the end of every epoch. class_weight (None or dict): (Keras-specific parameter) If not None, it should be a dictionary mapping class indices (integers) to a weight (float) value, used for weighting the loss function (during training only). This can be useful to tell the model to "pay more attention" to samples from an under-represented class. max_queue_size (int): (Keras-specific parameter) The maximum size for the generator queue. workers (int): (Keras-specific parameter) The maximum number of workers to use. use_multiprocessing (bool): (Keras-specific parameter) If True then use process based threading. shuffle (bool): (Keras-specific parameter) If True, then it shuffles the order of batches at the beginning of each training epoch. initial_epoch (int): (Keras-specific parameter) Epoch at which to start training (useful for resuming a previous training run). use_early_stopping (bool): If set to True, then early stopping is used when training each model in the ensemble. The default is False. early_stopping_monitor (str): The quantity to monitor for early stopping, e.g., 'val_loss', 'val_weighted_acc'. It should be a valid Keras metric. Returns: list: It returns a list of Keras History objects each corresponding to one trained model in the ensemble. """ if not isinstance( generator, ( sg.mapper.NodeSequence, sg.mapper.LinkSequence, sg.mapper.FullBatchSequence, sg.mapper.SparseFullBatchSequence, ), ): raise ValueError( "({}) If train_data is None, generator must be one of type NodeSequence, LinkSequence, FullBatchSequence " "but received object of type {}".format( type(self).__name__, type(generator).__name__ ) ) self.history = [] es_callback = None if use_early_stopping and validation_data is not None: es_callback = [ EarlyStopping( monitor=early_stopping_monitor, patience=self.early_stoppping_patience, restore_best_weights=True, ) ] for model in self.models: self.history.append( model.fit( generator, steps_per_epoch=steps_per_epoch, epochs=epochs, verbose=verbose, callbacks=es_callback, validation_data=validation_data, validation_steps=validation_steps, class_weight=class_weight, max_queue_size=max_queue_size, workers=workers, use_multiprocessing=use_multiprocessing, shuffle=shuffle, initial_epoch=initial_epoch, ) ) return self.history
[docs] def fit_generator(self, *args, **kwargs): """ Deprecated: use :meth:`fit`. """ warnings.warn( "'fit_generator' has been replaced by 'fit', to match tensorflow.keras.Model", DeprecationWarning, stacklevel=2, ) return self.fit(*args, **kwargs)
[docs] def evaluate( self, generator, test_data=None, test_targets=None, max_queue_size=10, workers=1, use_multiprocessing=False, verbose=0, ): """ Evaluates the ensemble on a data (node or link) generator. It makes `n_predictions` for each data point for each of the `n_estimators` and returns the mean and standard deviation of the predictions. For detailed descriptions of Keras-specific parameters consult the Keras documentation at https://keras.io/models/sequential/ Args: generator: The generator object that, if test_data is not None, should be one of type GraphSAGENodeGenerator, HinSAGENodeGenerator, FullBatchNodeGenerator, GraphSAGELinkGenerator, or HinSAGELinkGenerator. However, if test_data is None, then generator should be one of type NodeSequence, LinkSequence, or FullBatchSequence. test_data (None or iterable): If not None, then it is an iterable, e.g. list, that specifies the node IDs to evaluate the model on. test_targets (None or iterable): If not None, then it is an iterable, e.g. list, that specifies the target values for the test_data. max_queue_size (int): (Keras-specific parameter) The maximum size for the generator queue. workers (int): (Keras-specific parameter) The maximum number of workers to use. use_multiprocessing (bool): (Keras-specific parameter) If True then use process based threading. verbose (int): (Keras-specific parameter) The verbosity mode that should be 0 or 1 with the former turning verbosity off and the latter on. Returns: tuple: The mean and standard deviation of the model metrics for the given data. """ if test_data is not None and not isinstance( generator, ( sg.mapper.GraphSAGENodeGenerator, sg.mapper.HinSAGENodeGenerator, sg.mapper.FullBatchNodeGenerator, sg.mapper.GraphSAGELinkGenerator, sg.mapper.HinSAGELinkGenerator, ), ): raise ValueError( "({}) generator parameter must be of type GraphSAGENodeGenerator, HinSAGENodeGenerator, FullBatchNodeGenerator, " "GraphSAGELinkGenerator, or HinSAGELinkGenerator. Received type {}".format( type(self).__name__, type(generator).__name__ ) ) elif not isinstance( generator, ( sg.mapper.NodeSequence, sg.mapper.LinkSequence, sg.mapper.FullBatchSequence, sg.mapper.SparseFullBatchSequence, ), ): raise ValueError( "({}) If test_data is None, generator must be one of type NodeSequence, " "LinkSequence, FullBatchSequence, or SparseFullBatchSequence " "but received object of type {}".format( type(self).__name__, type(generator).__name__ ) ) if test_data is not None and test_targets is None: raise ValueError("({}) test_targets not given.".format(type(self).__name__)) data_generator = generator if test_data is not None: data_generator = generator.flow(test_data, test_targets) test_metrics = [] for model in self.models: tm = [] for _ in range(self.n_predictions): tm.append( model.evaluate( data_generator, max_queue_size=max_queue_size, workers=workers, use_multiprocessing=use_multiprocessing, verbose=verbose, ) # Keras evaluate_generator returns a scalar ) test_metrics.append(np.mean(tm, axis=0)) # Return the mean and standard deviation of the metrics return np.mean(test_metrics, axis=0), np.std(test_metrics, axis=0)
[docs] def evaluate_generator(self, *args, **kwargs): """ Deprecated: use :meth:`evaluate`. """ warnings.warn( "'evaluate_generator' has been replaced by 'evaluate', to match tensorflow.keras.Model", DeprecationWarning, stacklevel=2, ) return self.evaluate(*args, **kwargs)
[docs] def predict( self, generator, predict_data=None, summarise=False, output_layer=None, max_queue_size=10, workers=1, use_multiprocessing=False, verbose=0, ): """ This method generates predictions for the data produced by the given generator or alternatively the data given in parameter predict_data. For detailed descriptions of Keras-specific parameters consult the Keras documentation at https://keras.io/models/sequential/ Args: generator: The generator object that, if predict_data is None, should be one of type GraphSAGENodeGenerator, HinSAGENodeGenerator, FullBatchNodeGenerator, GraphSAGELinkGenerator, or HinSAGELinkGenerator. However, if predict_data is not None, then generator should be one of type NodeSequence, LinkSequence, SparseFullBatchSequence, or FullBatchSequence. predict_data (None or iterable): If not None, then it is an iterable, e.g. list, that specifies the node IDs to make predictions for. If generator is of type FullBatchNodeGenerator then predict_data should be all the nodes in the graph since full batch approaches such as GCN and GAT can only be used to make predictions for all graph nodes. summarise (bool): If True, then the mean of the predictions over self.n_estimators and self.n_predictions are returned for each query point. If False, then all predictions are returned. output_layer (None or int): If not None, then the predictions are the outputs of the layer specified. The default is the model's output layer. max_queue_size (int): (Keras-specific parameter) The maximum size for the generator queue. workers (int): (Keras-specific parameter) The maximum number of workers to use. use_multiprocessing (bool): (Keras-specific parameter) If True then use process based threading. verbose (int): (Keras-specific parameter) The verbosity mode that should be 0 or 1 with the former turning verbosity off and the latter on. Returns: numpy array: The predictions. It will have shape ``M × K × N × F`` if ``summarise`` is set to ``False``, or ``N × F`` otherwise. ``M`` is the number of estimators in the ensemble; ``K`` is the number of predictions per query point; ``N`` is the number of query points; and ``F`` is the output dimensionality of the specified layer determined by the shape of the output layer. """ data_generator = generator if predict_data is not None: if not isinstance( generator, ( sg.mapper.GraphSAGENodeGenerator, sg.mapper.HinSAGENodeGenerator, sg.mapper.FullBatchNodeGenerator, ), ): raise ValueError( "({}) generator parameter must be of type GraphSAGENodeGenerator, HinSAGENodeGenerator, or FullBatchNodeGenerator. Received type {}".format( type(self).__name__, type(generator).__name__ ) ) data_generator = generator.flow(predict_data) elif not isinstance( generator, ( sg.mapper.NodeSequence, sg.mapper.LinkSequence, sg.mapper.FullBatchSequence, sg.mapper.SparseFullBatchSequence, ), ): raise ValueError( "({}) If x is None, generator must be one of type NodeSequence, " "LinkSequence, SparseFullBatchSequence, or FullBatchSequence.".format( type(self).__name__ ) ) predictions = [] if output_layer is not None: predict_models = [ K.Model(inputs=model.input, outputs=model.layers[output_layer].output) for model in self.models ] else: predict_models = self.models for model in predict_models: model_predictions = [] for _ in range(self.n_predictions): model_predictions.append( model.predict( data_generator, max_queue_size=max_queue_size, workers=workers, use_multiprocessing=use_multiprocessing, verbose=verbose, ) ) # add to predictions list predictions.append(model_predictions) predictions = np.array(predictions) if summarise is True: # average the predictions across models and predictions per query point predictions = np.mean(predictions, axis=(0, 1)) # if len(predictions.shape) > 4: # predictions = predictions.reshape(predictions.shape[0:3] + (-1,)) return predictions
[docs] def predict_generator(self, *args, **kwargs): """ Deprecated: use :meth:`predict`. """ warnings.warn( "'predict_generator' has been replaced by 'predict', to match tensorflow.keras.Model", DeprecationWarning, stacklevel=2, ) return self.predict(*args, **kwargs)
# # #
[docs]class BaggingEnsemble(Ensemble): """ The BaggingEnsemble class can be used to create ensembles of stellargraph's graph neural network algorithms including GCN, GraphSAGE, GAT, and HinSAGE. Ensembles can be used for training classification and regression problems for node attribute inference and link prediction. This class can be used to create Bagging ensembles. Bagging ensembles add model diversity in two ways: (1) by random initialisation of the models' weights (before training) to different values; and (2) by bootstrap sampling of the training data for each model. That is, each model in the ensemble is trained on a random subset of the training examples, sampled with replacement from the original training data. .. seealso:: `Examples using ensembles <https://stellargraph.readthedocs.io/en/stable/demos/ensembles/index.html>`__. Related functionality: :class:`.Ensemble` for only random initialisation. """ def __init__(self, model, n_estimators=3, n_predictions=3): """ Args: model: A keras model. n_estimators (int): The number of estimators (aka models) in the ensemble. n_predictions (int): The number of predictions per query point per estimator """ super().__init__( model=model, n_estimators=n_estimators, n_predictions=n_predictions )
[docs] def fit( self, generator, train_data, train_targets, steps_per_epoch=None, epochs=1, verbose=1, validation_data=None, validation_steps=None, class_weight=None, max_queue_size=10, workers=1, use_multiprocessing=False, shuffle=True, initial_epoch=0, bag_size=None, use_early_stopping=False, early_stopping_monitor="val_loss", ): """ This method trains the ensemble on the data given in train_data and train_targets. If validation data are also given, then the training metrics are evaluated on these data and results printed on screen if verbose level is greater than 0. The method trains each model in the ensemble in series for the number of epochs specified. Training can also stop early with the best model as evaluated on the validation data, if use_early_stopping is enabled. Each model in the ensemble is trained using a bootstrapped sample of the data (the train data are re-sampled with replacement.) The number of bootstrap samples can be specified via the bag_size parameter; by default, the number of bootstrap samples equals the number of training points. For detail descriptions of Keras-specific parameters consult the Keras documentation at https://keras.io/models/sequential/ Args: generator: The generator object for training data. It should be one of type GraphSAGENodeGenerator, HinSAGENodeGenerator, FullBatchNodeGenerator, GraphSAGELinkGenerator, or HinSAGELinkGenerator. train_data (iterable): It is an iterable, e.g. list, that specifies the data to train the model with. train_targets (iterable): It is an iterable, e.g. list, that specifies the target values for the train data. steps_per_epoch (None or int): (Keras-specific parameter) If not None, it specifies the number of steps to yield from the generator before declaring one epoch finished and starting a new epoch. epochs (int): (Keras-specific parameter) The number of training epochs. verbose (int): (Keras-specific parameter) The verbosity mode that should be 0 , 1, or 2 meaning silent, progress bar, and one line per epoch respectively. validation_data: A generator for validation data that is optional (None). If not None then, it should be one of type GraphSAGENodeGenerator, HinSAGENodeGenerator, FullBatchNodeGenerator, GraphSAGELinkGenerator, or HinSAGELinkGenerator. validation_steps (None or int): (Keras-specific parameter) If validation_generator is not None, then it specifies the number of steps to yield from the generator before stopping at the end of every epoch. class_weight (None or dict): (Keras-specific parameter) If not None, it should be a dictionary mapping class indices (integers) to a weight (float) value, used for weighting the loss function (during training only). This can be useful to tell the model to "pay more attention" to samples from an under-represented class. max_queue_size (int): (Keras-specific parameter) The maximum size for the generator queue. workers (int): (Keras-specific parameter) The maximum number of workers to use. use_multiprocessing (bool): (Keras-specific parameter) If True then use process based threading. shuffle (bool): (Keras-specific parameter) If True, then it shuffles the order of batches at the beginning of each training epoch. initial_epoch (int): (Keras-specific parameter) Epoch at which to start training (useful for resuming a previous training run). bag_size (None or int): The number of samples in a bootstrap sample. If None and bagging is used, then the number of samples is equal to the number of training points. use_early_stopping (bool): If set to True, then early stopping is used when training each model in the ensemble. The default is False. early_stopping_monitor (str): The quantity to monitor for early stopping, e.g., 'val_loss', 'val_weighted_acc'. It should be a valid Keras metric. Returns: list: It returns a list of Keras History objects each corresponding to one trained model in the ensemble. """ if not isinstance( generator, ( sg.mapper.GraphSAGENodeGenerator, sg.mapper.HinSAGENodeGenerator, sg.mapper.FullBatchNodeGenerator, sg.mapper.GraphSAGELinkGenerator, sg.mapper.HinSAGELinkGenerator, ), ): raise ValueError( "({}) generator parameter must be of type GraphSAGENodeGenerator, HinSAGENodeGenerator, " "FullBatchNodeGenerator, GraphSAGELinkGenerator, or HinSAGELinkGenerator if you want to use Bagging. " "Received type {}".format(type(self).__name__, type(generator).__name__) ) if bag_size is not None and (bag_size > len(train_data) or bag_size <= 0): raise ValueError( "({}) bag_size must be positive and less than or equal to the number of training points ({})".format( type(self).__name__, len(train_data) ) ) if train_targets is None: raise ValueError( "({}) If train_data is given then train_targets must be given as well.".format( type(self).__name__ ) ) self.history = [] num_points_per_bag = bag_size if bag_size is not None else len(train_data) # Prepare the training data for each model. Use sampling with replacement to create len(self.models) # datasets. for model in self.models: di_index = np.random.choice( len(train_data), size=num_points_per_bag ) # sample with replacement di_train = train_data[di_index] di_targets = train_targets[di_index] di_gen = generator.flow(di_train, di_targets) es_callback = None if use_early_stopping and validation_data is not None: es_callback = [ EarlyStopping( monitor=early_stopping_monitor, patience=self.early_stoppping_patience, restore_best_weights=True, ) ] self.history.append( model.fit( di_gen, steps_per_epoch=steps_per_epoch, epochs=epochs, verbose=verbose, callbacks=es_callback, validation_data=validation_data, validation_steps=validation_steps, class_weight=class_weight, max_queue_size=max_queue_size, workers=workers, use_multiprocessing=use_multiprocessing, shuffle=shuffle, initial_epoch=initial_epoch, ) ) return self.history
[docs] def fit_generator(self, *args, **kwargs): """ Deprecated: use :meth:`fit`. """ warnings.warn( "'fit_generator' has been replaced by 'fit', to match tensorflow.keras.Model", DeprecationWarning, stacklevel=2, ) return self.fit(*args, **kwargs)