Source code for stellargraph.layer.graph_attention

# -*- coding: utf-8 -*-
#
# Copyright 2018-2019 Data61, CSIRO
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Definition of Graph Attention Network (GAT) layer, and GAT class that is a stack of GAT layers
"""
__all__ = ["GraphAttention", "GraphAttentionSparse", "GAT"]

import warnings
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras import activations, constraints, initializers, regularizers
from tensorflow.keras.layers import Input, Layer, Dropout, LeakyReLU, Lambda, Reshape

from ..mapper import FullBatchNodeGenerator
from .misc import SqueezedSparseConversion


[docs]class GraphAttention(Layer): """ Graph Attention (GAT) layer. The base implementation is taken from https://github.com/danielegrattarola/keras-gat, with some modifications added for ease of use. Based on the original paper: Graph Attention Networks. P. Velickovic et al. ICLR 2018 https://arxiv.org/abs/1710.10903 Notes: - The inputs are tensors with a batch dimension of 1: Keras requires this batch dimension, and for full-batch methods we only have a single "batch". - There are three inputs required, the node features, the output indices (the nodes that are to be selected in the final layer) and the graph adjacency matrix - This does not add self loops to the adjacency matrix, you should preprocess the adjacency matrix to add self-loops - The output indices are used when ``final_layer=True`` and the returned outputs are the final-layer features for the nodes indexed by output indices. - If ``final_layer=False`` all the node features are output in the same ordering as given by the adjacency matrix. Args: F_out (int): dimensionality of output feature vectors attn_heads (int or list of int): number of attention heads attn_heads_reduction (str): reduction applied to output features of each attention head, 'concat' or 'average'. 'Average' should be applied in the final prediction layer of the model (Eq. 6 of the paper). in_dropout_rate (float): dropout rate applied to features attn_dropout_rate (float): dropout rate applied to attention coefficients activation (str): nonlinear activation applied to layer's output to obtain output features (eq. 4 of the GAT paper) final_layer (bool): If False the layer returns output for all nodes, if True it returns the subset specified by the indices passed to it. use_bias (bool): toggles an optional bias saliency_map_support (bool): If calculating saliency maps using the tools in stellargraph.utils.saliency_maps this should be True. Otherwise this should be False (default). kernel_initializer (str or func): The initialiser to use for the head weights; defaults to 'glorot_uniform'. kernel_regularizer (str or func): The regulariser to use for the head weights; defaults to None. kernel_constraint (str or func): The constraint to use for the head weights; defaults to None. bias_initializer (str or func): The initialiser to use for the head bias; defaults to 'zeros'. bias_regularizer (str or func): The regulariser to use for the head bias; defaults to None. bias_constraint (str or func): The constraint to use for the head bias; defaults to None. attn_kernel_initializer (str or func): The initialiser to use for the attention weights; defaults to 'glorot_uniform'. attn_kernel_regularizer (str or func): The regulariser to use for the attention weights; defaults to None. attn_kernel_constraint (str or func): The constraint to use for the attention weights; defaults to None. """ def __init__( self, units, attn_heads=1, attn_heads_reduction="concat", # {'concat', 'average'} in_dropout_rate=0.0, attn_dropout_rate=0.0, activation="relu", use_bias=True, final_layer=False, saliency_map_support=False, **kwargs, ): if attn_heads_reduction not in {"concat", "average"}: raise ValueError( "{}: Possible heads reduction methods: concat, average; received {}".format( type(self).__name__, attn_heads_reduction ) ) self.units = units # Number of output features (F' in the paper) self.attn_heads = attn_heads # Number of attention heads (K in the paper) self.attn_heads_reduction = attn_heads_reduction # Eq. 5 and 6 in the paper self.in_dropout_rate = in_dropout_rate # dropout rate for node features self.attn_dropout_rate = attn_dropout_rate # dropout rate for attention coefs self.activation = activations.get(activation) # Eq. 4 in the paper self.use_bias = use_bias self.final_layer = final_layer self.saliency_map_support = saliency_map_support # Populated by build() self.kernels = [] # Layer kernels for attention heads self.biases = [] # Layer biases for attention heads self.attn_kernels = [] # Attention kernels for attention heads if attn_heads_reduction == "concat": # Output will have shape (..., K * F') self.output_dim = self.units * self.attn_heads else: # Output will have shape (..., F') self.output_dim = self.units self._get_regularisers_from_keywords(kwargs) super().__init__(**kwargs) def _get_regularisers_from_keywords(self, kwargs): self.kernel_initializer = initializers.get( kwargs.pop("kernel_initializer", "glorot_uniform") ) self.kernel_regularizer = regularizers.get( kwargs.pop("kernel_regularizer", None) ) self.kernel_constraint = constraints.get(kwargs.pop("kernel_constraint", None)) self.bias_initializer = initializers.get( kwargs.pop("bias_initializer", "zeros") ) self.bias_regularizer = regularizers.get(kwargs.pop("bias_regularizer", None)) self.bias_constraint = constraints.get(kwargs.pop("bias_constraint", None)) self.attn_kernel_initializer = initializers.get( kwargs.pop("attn_kernel_initializer", "glorot_uniform") ) self.attn_kernel_regularizer = regularizers.get( kwargs.pop("attn_kernel_regularizer", None) ) self.attn_kernel_constraint = constraints.get( kwargs.pop("attn_kernel_constraint", None) )
[docs] def get_config(self): """ Gets class configuration for Keras serialization """ config = { "units": self.units, "attn_heads": self.attn_heads, "attn_heads_reduction": self.attn_heads_reduction, "in_dropout_rate": self.in_dropout_rate, "attn_dropout_rate": self.attn_dropout_rate, "activation": activations.serialize(self.activation), "use_bias": self.use_bias, "final_layer": self.final_layer, "saliency_map_support": self.saliency_map_support, "kernel_initializer": initializers.serialize(self.kernel_initializer), "kernel_regularizer": regularizers.serialize(self.kernel_regularizer), "kernel_constraint": constraints.serialize(self.kernel_constraint), "bias_initializer": initializers.serialize(self.bias_initializer), "bias_regularizer": regularizers.serialize(self.bias_regularizer), "bias_constraint": constraints.serialize(self.bias_constraint), "attn_kernel_initializer": initializers.serialize( self.attn_kernel_initializer ), "attn_kernel_regularizer": regularizers.serialize( self.attn_kernel_regularizer ), "attn_kernel_constraint": constraints.serialize( self.attn_kernel_constraint ), } base_config = super().get_config() return {**base_config, **config}
[docs] def compute_output_shape(self, input_shapes): """ Computes the output shape of the layer. Assumes the following inputs: Args: input_shapes (tuple of ints) Shape tuples can include None for free dimensions, instead of an integer. Returns: An input shape tuple. """ feature_shape, out_shape, *As_shapes = input_shapes batch_dim = feature_shape[0] if self.final_layer: out_dim = out_shape[1] else: out_dim = feature_shape[1] return batch_dim, out_dim, self.output_dim
[docs] def build(self, input_shapes): """ Builds the layer Args: input_shapes (list of int): shapes of the layer's inputs (node features and adjacency matrix) """ feat_shape = input_shapes[0] input_dim = int(feat_shape[-1]) # Variables to support integrated gradients self.delta = self.add_weight( name="ig_delta", shape=(), trainable=False, initializer=initializers.ones() ) self.non_exist_edge = self.add_weight( name="ig_non_exist_edge", shape=(), trainable=False, initializer=initializers.zeros(), ) # Initialize weights for each attention head for head in range(self.attn_heads): # Layer kernel kernel = self.add_weight( shape=(input_dim, self.units), initializer=self.kernel_initializer, regularizer=self.kernel_regularizer, constraint=self.kernel_constraint, name="kernel_{}".format(head), ) self.kernels.append(kernel) # # Layer bias if self.use_bias: bias = self.add_weight( shape=(self.units,), initializer=self.bias_initializer, regularizer=self.bias_regularizer, constraint=self.bias_constraint, name="bias_{}".format(head), ) self.biases.append(bias) # Attention kernels attn_kernel_self = self.add_weight( shape=(self.units, 1), initializer=self.attn_kernel_initializer, regularizer=self.attn_kernel_regularizer, constraint=self.attn_kernel_constraint, name="attn_kernel_self_{}".format(head), ) attn_kernel_neighs = self.add_weight( shape=(self.units, 1), initializer=self.attn_kernel_initializer, regularizer=self.attn_kernel_regularizer, constraint=self.attn_kernel_constraint, name="attn_kernel_neigh_{}".format(head), ) self.attn_kernels.append([attn_kernel_self, attn_kernel_neighs]) self.built = True
[docs] def call(self, inputs): """ Creates the layer as a Keras graph. Note that the inputs are tensors with a batch dimension of 1: Keras requires this batch dimension, and for full-batch methods we only have a single "batch". There are three inputs required, the node features, the output indices (the nodes that are to be selected in the final layer) and the graph adjacency matrix Notes: This does not add self loops to the adjacency matrix. The output indices are only used when ``final_layer=True`` Args: inputs (list): list of inputs with 3 items: node features (size 1 x N x F), output indices (size 1 x M), graph adjacency matrix (size N x N), where N is the number of nodes in the graph, F is the dimensionality of node features M is the number of output nodes """ X = inputs[0] # Node features (1 x N x F) out_indices = inputs[1] # output indices (1 x K) A = inputs[2] # Adjacency matrix (N x N) N = K.int_shape(A)[-1] batch_dim, n_nodes, _ = K.int_shape(X) if batch_dim != 1: raise ValueError( "Currently full-batch methods only support a batch dimension of one" ) else: # Remove singleton batch dimension X = K.squeeze(X, 0) out_indices = K.squeeze(out_indices, 0) outputs = [] for head in range(self.attn_heads): kernel = self.kernels[head] # W in the paper (F x F') attention_kernel = self.attn_kernels[ head ] # Attention kernel a in the paper (2F' x 1) # Compute inputs to attention network features = K.dot(X, kernel) # (N x F') # Compute feature combinations # Note: [[a_1], [a_2]]^T [[Wh_i], [Wh_2]] = [a_1]^T [Wh_i] + [a_2]^T [Wh_j] attn_for_self = K.dot( features, attention_kernel[0] ) # (N x 1), [a_1]^T [Wh_i] attn_for_neighs = K.dot( features, attention_kernel[1] ) # (N x 1), [a_2]^T [Wh_j] # Attention head a(Wh_i, Wh_j) = a^T [[Wh_i], [Wh_j]] dense = attn_for_self + K.transpose( attn_for_neighs ) # (N x N) via broadcasting # Add nonlinearity dense = LeakyReLU(alpha=0.2)(dense) # Mask values before activation (Vaswani et al., 2017) # YT: this only works for 'binary' A, not for 'weighted' A! # YT: if A does not have self-loops, the node itself will be masked, so A should have self-loops # YT: this is ensured by setting the diagonal elements of A tensor to 1 above if not self.saliency_map_support: mask = -10e9 * (1.0 - A) self.A = A dense += mask dense = K.softmax(dense) # (N x N), Eq. 3 of the paper else: # dense = dense - tf.reduce_max(dense) # GAT with support for saliency calculations W = (self.delta * A) * K.exp( dense - K.max(dense, axis=1, keepdims=True) ) * (1 - self.non_exist_edge) + self.non_exist_edge * ( A + self.delta * (K.ones(shape=[N, N], dtype="float") - A) + K.eye(N) ) * K.exp( dense - K.max(dense, axis=1, keepdims=True) ) dense = W / K.sum(W, axis=1, keepdims=True) # Apply dropout to features and attention coefficients dropout_feat = Dropout(self.in_dropout_rate)(features) # (N x F') dropout_attn = Dropout(self.attn_dropout_rate)(dense) # (N x N) # Linear combination with neighbors' features [YT: see Eq. 4] node_features = K.dot(dropout_attn, dropout_feat) # (N x F') if self.use_bias: node_features = K.bias_add(node_features, self.biases[head]) # Add output of attention head to final output outputs.append(node_features) # Aggregate the heads' output according to the reduction method if self.attn_heads_reduction == "concat": output = K.concatenate(outputs) # (N x KF') else: output = K.mean(K.stack(outputs), axis=0) # N x F') # Nonlinear activation function output = self.activation(output) # On the final layer we gather the nodes referenced by the indices if self.final_layer: output = K.gather(output, out_indices) # Add batch dimension back if we removed it if batch_dim == 1: output = K.expand_dims(output, 0) return output
[docs]class GraphAttentionSparse(GraphAttention): """ Graph Attention (GAT) layer, base implementation taken from https://github.com/danielegrattarola/keras-gat, some modifications added for ease of use. Based on the original paper: Graph Attention Networks. P. Velickovic et al. ICLR 2018 https://arxiv.org/abs/1710.10903 Notes: - The inputs are tensors with a batch dimension of 1: Keras requires this batch dimension, and for full-batch methods we only have a single "batch". - There are three inputs required, the node features, the output indices (the nodes that are to be selected in the final layer), and the graph adjacency matrix - This does not add self loops to the adjacency matrix, you should preprocess the adjacency matrix to add self-loops - The output indices are used when `final_layer=True` and the returned outputs are the final-layer features for the nodes indexed by output indices. - If `final_layer=False` all the node features are output in the same ordering as given by the adjacency matrix. Args: F_out (int): dimensionality of output feature vectors attn_heads (int or list of int): number of attention heads attn_heads_reduction (str): reduction applied to output features of each attention head, 'concat' or 'average'. 'Average' should be applied in the final prediction layer of the model (Eq. 6 of the paper). in_dropout_rate (float): dropout rate applied to features attn_dropout_rate (float): dropout rate applied to attention coefficients activation (str): nonlinear activation applied to layer's output to obtain output features (eq. 4 of the GAT paper) final_layer (bool): If False the layer returns output for all nodes, if True it returns the subset specified by the indices passed to it. use_bias (bool): toggles an optional bias saliency_map_support (bool): If calculating saliency maps using the tools in stellargraph.utils.saliency_maps this should be True. Otherwise this should be False (default). kernel_initializer (str or func): The initialiser to use for the head weights; defaults to 'glorot_uniform'. kernel_regularizer (str or func): The regulariser to use for the head weights; defaults to None. kernel_constraint (str or func): The constraint to use for the head weights; defaults to None. bias_initializer (str or func): The initialiser to use for the head bias; defaults to 'zeros'. bias_regularizer (str or func): The regulariser to use for the head bias; defaults to None. bias_constraint (str or func): The constraint to use for the head bias; defaults to None. attn_kernel_initializer (str or func): The initialiser to use for the attention weights; defaults to 'glorot_uniform'. attn_kernel_regularizer (str or func): The regulariser to use for the attention weights; defaults to None. attn_kernel_constraint (str or func): The constraint to use for the attention weights; defaults to None. """
[docs] def call(self, inputs, **kwargs): """ Creates the layer as a Keras graph Notes: This does not add self loops to the adjacency matrix. The output indices are only used when `final_layer=True` Args: inputs (list): list of inputs with 4 items: node features (size b x N x F), output indices (size b x M), sparse graph adjacency matrix (size N x N), where N is the number of nodes in the graph, F is the dimensionality of node features M is the number of output nodes """ X = inputs[0] # Node features (1 x N x F) out_indices = inputs[1] # output indices (1 x K) A_sparse = inputs[2] # Adjacency matrix (1 x N x N) if not isinstance(A_sparse, tf.SparseTensor): raise TypeError("A is not sparse") # Get undirected graph edges (E x 2) A_indices = A_sparse.indices batch_dim, n_nodes, _ = K.int_shape(X) if batch_dim != 1: raise ValueError( "Currently full-batch methods only support a batch dimension of one" ) else: # Remove singleton batch dimension out_indices = K.squeeze(out_indices, 0) X = K.squeeze(X, 0) outputs = [] for head in range(self.attn_heads): kernel = self.kernels[head] # W in the paper (F x F') attention_kernel = self.attn_kernels[ head ] # Attention kernel a in the paper (2F' x 1) # Compute inputs to attention network features = K.dot(X, kernel) # (N x F') # Compute feature combinations # Note: [[a_1], [a_2]]^T [[Wh_i], [Wh_j]] = [a_1]^T [Wh_i] + [a_2]^T [Wh_j] attn_for_self = K.dot( features, attention_kernel[0] ) # (N x 1), [a_1]^T [Wh_i] attn_for_neighs = K.dot( features, attention_kernel[1] ) # (N x 1), [a_2]^T [Wh_j] # Attention head a(Wh_i, Wh_j) = a^T [[Wh_i], [Wh_j]] dense = attn_for_self + K.transpose( attn_for_neighs ) # (N x N) via broadcasting # Create sparse attention vector (All non-zero values of the matrix) sparse_attn_self = tf.gather( K.reshape(attn_for_self, [-1]), A_indices[:, 0], axis=0 ) sparse_attn_neighs = tf.gather( K.reshape(attn_for_neighs, [-1]), A_indices[:, 1], axis=0 ) attn_values = sparse_attn_self + sparse_attn_neighs # Add nonlinearity attn_values = LeakyReLU(alpha=0.2)(attn_values) # Apply dropout to features and attention coefficients dropout_feat = Dropout(self.in_dropout_rate)(features) # (N x F') dropout_attn = Dropout(self.attn_dropout_rate)(attn_values) # (N x N) # Convert to sparse matrix sparse_attn = tf.sparse.SparseTensor( A_indices, values=dropout_attn, dense_shape=[n_nodes, n_nodes] ) # Apply softmax to get attention coefficients sparse_attn = tf.sparse.softmax(sparse_attn) # (N x N), Eq. 3 of the paper # Linear combination with neighbors' features [YT: see Eq. 4] node_features = tf.sparse.matmul(sparse_attn, dropout_feat) # (N x F') if self.use_bias: node_features = K.bias_add(node_features, self.biases[head]) # Add output of attention head to final output outputs.append(node_features) # Aggregate the heads' output according to the reduction method if self.attn_heads_reduction == "concat": output = K.concatenate(outputs) # (N x KF') else: output = K.mean(K.stack(outputs), axis=0) # N x F') output = self.activation(output) # On the final layer we gather the nodes referenced by the indices if self.final_layer: output = K.gather(output, out_indices) # Add batch dimension back if we removed it if batch_dim == 1: output = K.expand_dims(output, 0) return output
[docs]class GAT: """ A stack of Graph Attention (GAT) layers with aggregation of multiple attention heads, Eqs 5-6 of the GAT paper https://arxiv.org/abs/1710.10903 To use this class as a Keras model, the features and pre-processed adjacency matrix should be supplied using the :class:`FullBatchNodeGenerator` class. To have the appropriate pre-processing the generator object should be instantiated as follows:: generator = FullBatchNodeGenerator(G, sparse=False, method="gat") For more details, please see the GAT demo notebook: demos/node-classification/gat/gat-cora-node-classification-example.ipynb Examples: Creating a GAT node classification model from an existing :class:`StellarGraph` object `G`:: generator = FullBatchNodeGenerator(G, method="gat") gat = GAT( layer_sizes=[8, 4], activations=["elu","softmax"], attn_heads=8, generator=generator, in_dropout=0.5, attn_dropout=0.5, ) x_inp, predictions = gat.node_model() Notes: - The inputs are tensors with a batch dimension of 1. These are provided by the \ :class:`FullBatchNodeGenerator` object. - This does not add self loops to the adjacency matrix, you should preprocess the adjacency matrix to add self-loops, using the ``method='gat'`` argument of the :class:`FullBatchNodeGenerator`. - The nodes provided to the :class:`FullBatchNodeGenerator.flow` method are used by the final layer to select the predictions for those nodes in order. However, the intermediate layers before the final layer order the nodes in the same way as the adjacency matrix. Args: layer_sizes (list of int): list of output sizes of GAT layers in the stack. The length of this list defines the number of GraphAttention layers in the stack. generator (FullBatchNodeGenerator): an instance of FullBatchNodeGenerator class constructed on the graph of interest attn_heads (int or list of int): number of attention heads in GraphAttention layers. The options are: - a single integer: the passed value of ``attn_heads`` will be applied to all GraphAttention layers in the stack, except the last layer (for which the number of attn_heads will be set to 1). - a list of integers: elements of the list define the number of attention heads in the corresponding layers in the stack. attn_heads_reduction (list of str or None): reductions applied to output features of each attention head, for all layers in the stack. Valid entries in the list are {'concat', 'average'}. If None is passed, the default reductions are applied: 'concat' reduction to all layers in the stack except the final layer, 'average' reduction to the last layer (Eqs. 5-6 of the GAT paper). bias (bool): toggles an optional bias in GAT layers in_dropout (float): dropout rate applied to input features of each GAT layer attn_dropout (float): dropout rate applied to attention maps normalize (str or None): normalization applied to the final output features of the GAT layers stack. Default is None. activations (list of str): list of activations applied to each layer's output; defaults to ['elu', ..., 'elu']. saliency_map_support (bool): If calculating saliency maps using the tools in stellargraph.utils.saliency_maps this should be True. Otherwise this should be False (default). kernel_regularizer (str or func): The regulariser to use for the head weights; defaults to None. attn_kernel_regularizer (str or func): The regulariser to use for the attention weights; defaults to None. """ def __init__( self, layer_sizes, generator=None, attn_heads=1, attn_heads_reduction=None, bias=True, in_dropout=0.0, attn_dropout=0.0, normalize=None, activations=None, saliency_map_support=False, **kwargs, ): self.bias = bias self.in_dropout = in_dropout self.attn_dropout = attn_dropout self.generator = generator self.saliency_map_support = saliency_map_support # Check layer_sizes (must be list of int): # check type: if not isinstance(layer_sizes, list): raise TypeError( "{}: layer_sizes should be a list of integers; received type {} instead.".format( type(self).__name__, type(layer_sizes).__name__ ) ) # check that values are valid: elif not all([isinstance(s, int) and s > 0 for s in layer_sizes]): raise ValueError( "{}: all elements in layer_sizes should be positive integers!".format( type(self).__name__ ) ) self.layer_sizes = layer_sizes n_layers = len(layer_sizes) # Check attn_heads (must be int or list of int): if isinstance(attn_heads, list): # check the length if not len(attn_heads) == n_layers: raise ValueError( "{}: length of attn_heads list ({}) should match the number of GAT layers ({})".format( type(self).__name__, len(attn_heads), n_layers ) ) # check that values in the list are valid if not all([isinstance(a, int) and a > 0 for a in attn_heads]): raise ValueError( "{}: all elements in attn_heads should be positive integers!".format( type(self).__name__ ) ) self.attn_heads = attn_heads # (list of int as passed by the user) elif isinstance(attn_heads, int): self.attn_heads = list() for l, _ in enumerate(layer_sizes): # number of attention heads for layer l: attn_heads (int) for all but the last layer (for which it's set to 1) self.attn_heads.append(attn_heads if l < n_layers - 1 else 1) else: raise TypeError( "{}: attn_heads should be an integer or a list of integers!".format( type(self).__name__ ) ) # Check attn_heads_reduction (list of str, or None): if attn_heads_reduction is None: # set default head reductions, see eqs 5-6 of the GAT paper self.attn_heads_reduction = ["concat"] * (n_layers - 1) + ["average"] else: # user-specified list of head reductions (valid entries are 'concat' and 'average') # check type (must be a list of str): if not isinstance(attn_heads_reduction, list): raise TypeError( "{}: attn_heads_reduction should be a string; received type {} instead.".format( type(self).__name__, type(attn_heads_reduction).__name__ ) ) # check length of attn_heads_reduction list: if not len(attn_heads_reduction) == len(layer_sizes): raise ValueError( "{}: length of attn_heads_reduction list ({}) should match the number of GAT layers ({})".format( type(self).__name__, len(attn_heads_reduction), n_layers ) ) # check that list elements are valid: if all( [ahr.lower() in {"concat", "average"} for ahr in attn_heads_reduction] ): self.attn_heads_reduction = attn_heads_reduction else: raise ValueError( "{}: elements of attn_heads_reduction list should be either 'concat' or 'average'!".format( type(self).__name__ ) ) # Check activations (list of str): # check type: if activations is None: activations = ["elu"] * n_layers if not isinstance(activations, list): raise TypeError( "{}: activations should be a list of strings; received {} instead".format( type(self).__name__, type(activations) ) ) # check length: if not len(activations) == n_layers: raise ValueError( "{}: length of activations list ({}) should match the number of GAT layers ({})".format( type(self).__name__, len(activations), n_layers ) ) self.activations = activations # check generator: if generator is not None: if not isinstance(generator, FullBatchNodeGenerator): raise ValueError( "{}: generator must be of type FullBatchNodeGenerator or None; received object of type {} instead".format( type(self).__name__, type(generator).__name__ ) ) # Check if the generator is producing a sparse matrix self.use_sparse = generator.use_sparse else: self.use_sparse = False # Set the normalization layer used in the model if normalize == "l2": self._normalization = Lambda(lambda x: K.l2_normalize(x, axis=2)) elif normalize is None or str(normalize).lower() in {"none", "linear"}: self._normalization = Lambda(lambda x: x) else: raise ValueError( "Normalization should be either 'l2' or None (also allowed as 'none'); received '{}'".format( normalize ) ) # Switch between sparse or dense model if self.use_sparse: self._gat_layer = GraphAttentionSparse else: self._gat_layer = GraphAttention # Optional regulariser, etc. for weights and biases self._get_regularisers_from_keywords(kwargs) # Initialize a stack of GAT layers self._layers = [] n_layers = len(self.layer_sizes) for ii in range(n_layers): # Dropout on input node features before each GAT layer self._layers.append(Dropout(self.in_dropout)) # GraphAttention layer self._layers.append( self._gat_layer( units=self.layer_sizes[ii], attn_heads=self.attn_heads[ii], attn_heads_reduction=self.attn_heads_reduction[ii], in_dropout_rate=self.in_dropout, attn_dropout_rate=self.attn_dropout, activation=self.activations[ii], use_bias=self.bias, final_layer=ii == (n_layers - 1), saliency_map_support=self.saliency_map_support, **self._regularisers, ) ) def _get_regularisers_from_keywords(self, kwargs): regularisers = {} for param_name in [ "kernel_initializer", "kernel_regularizer", "kernel_constraint", "bias_initializer", "bias_regularizer", "bias_constraint", "attn_kernel_initializer", "attn_kernel_regularizer", "attn_kernel_constraint", ]: param_value = kwargs.pop(param_name, None) if param_value is not None: regularisers[param_name] = param_value self._regularisers = regularisers def __call__(self, inputs): """ Apply a stack of GAT layers to the input x_inp Args: x_inp (Tensor): input of the 1st GAT layer in the stack Returns: Output tensor of the GAT layers stack """ assert isinstance(inputs, list), "input must be a list, got {} instead".format( type(inputs) ) x_in, out_indices, *As = inputs # Currently we require the batch dimension to be one for full-batch methods batch_dim, n_nodes, _ = K.int_shape(x_in) if batch_dim != 1: raise ValueError( "Currently full-batch methods only support a batch dimension of one" ) # Convert input indices & values to a sparse matrix if self.use_sparse: A_indices, A_values = As Ainput = [ SqueezedSparseConversion(shape=(n_nodes, n_nodes))( [A_indices, A_values] ) ] # Otherwise, create dense matrix from input tensor else: Ainput = [Lambda(lambda A: K.squeeze(A, 0))(A) for A in As] # TODO: Support multiple matrices? if len(Ainput) != 1: raise NotImplementedError( "The GAT method currently only accepts a single matrix" ) # Remove singleton batch dimension h_layer = x_in for layer in self._layers: if isinstance(layer, self._gat_layer): # For a GAT layer add the matrix and output indices # Note that the output indices are only used if `final_layer=True` h_layer = layer([h_layer, out_indices] + Ainput) else: # For other (non-graph) layers only supply the input tensor h_layer = layer(h_layer) # print("Hlayer:", h_layer) return self._normalization(h_layer)
[docs] def node_model(self, num_nodes=None, feature_size=None): """ Builds a GAT model for node prediction Returns: tuple: `(x_inp, x_out)`, where `x_inp` is a list of two Keras input tensors for the GAT model (containing node features and graph adjacency matrix), and `x_out` is a Keras tensor for the GAT model output. """ # Create input tensor: if self.generator is not None: # Placeholder for node features N_nodes = self.generator.features.shape[0] N_feat = self.generator.features.shape[1] elif num_nodes is not None and feature_size is not None: N_nodes = num_nodes N_feat = feature_size else: raise RuntimeError( "node_model: if generator is not provided to object constructor, num_nodes and feature_size must be specified." ) # Inputs for features & target indices x_t = Input(batch_shape=(1, N_nodes, N_feat)) out_indices_t = Input(batch_shape=(1, None), dtype="int32") # Create inputs for sparse or dense matrices if self.use_sparse: # Placeholders for the sparse adjacency matrix A_indices_t = Input(batch_shape=(1, None, 2), dtype="int64") A_values_t = Input(batch_shape=(1, None)) A_placeholders = [A_indices_t, A_values_t] else: # Placeholders for the dense adjacency matrix A_m = Input(batch_shape=(1, N_nodes, N_nodes)) A_placeholders = [A_m] # TODO: Support multiple matrices? x_inp = [x_t, out_indices_t] + A_placeholders x_out = self(x_inp) # Flatten output by removing singleton batch dimension if x_out.shape[0] == 1: self.x_out_flat = Lambda(lambda x: K.squeeze(x, 0))(x_out) else: self.x_out_flat = x_out return x_inp, x_out
def default_model(self, flatten_output=True): warnings.warn( "The .default_model() method will be deprecated in future versions. " "Please use .node_model() or .link_model() methods instead.", PendingDeprecationWarning, ) return self.node_model()