Source code for stellargraph.mapper.node_mappers

# -*- coding: utf-8 -*-
#
# Copyright 2018-2019 Data61, CSIRO
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Mappers to provide input data for the graph models in layers.

"""
__all__ = [
    "NodeSequence",
    "GraphSAGENodeGenerator",
    "HinSAGENodeGenerator",
    "FullBatchNodeGenerator",
    "FullBatchNodeSequence",
]

import operator
import random
import numpy as np
import itertools as it
from functools import reduce
from keras.utils import Sequence
import networkx as nx

from ..data.explorer import (
    SampledBreadthFirstWalk,
    SampledHeterogeneousBreadthFirstWalk,
)
from ..core.graph import StellarGraphBase, GraphSchema
from ..core.utils import is_real_iterable


class NodeSequence(Sequence):
    """Keras-compatible data generator to use with the Keras
    methods :meth:`keras.Model.fit_generator`, :meth:`keras.Model.evaluate_generator`,
    and :meth:`keras.Model.predict_generator`.

    This class generated data samples for node inference models
    and should be created using the `.flow(...)` method of
    :class:`GraphSAGENodeGenerator` or :class:`HinSAGENodeGenerator`.

    These Generators are classes that capture the graph structure
    and the feature vectors of each node. These generator classes
    are used within the NodeSequence to generate samples of k-hop
    neighbourhoods in the graph and to return to this class the
    features from the sampled neighbourhoods.

    Args:
        generator: GraphSAGENodeGenerator or HinSAGENodeGenerator
            The generator object containing the graph information.
        ids: list
            A list of the node_ids to be used as head-nodes in the
            downstream task.
        targets: list, optional (default=None)
            A list of targets or labels to be used in the downstream
            class.

        shuffle (bool): If True (default) the ids will be randomly shuffled every epoch.

    """

    def __init__(self, generator, ids, targets=None, shuffle=True):
        # Check that ids is an iterable
        if not is_real_iterable(ids):
            raise TypeError("IDs must be an iterable or numpy array of graph node IDs")

        # Check targets is iterable & has the correct length
        if targets is not None:
            if not is_real_iterable(targets):
                raise TypeError("Targets must be None or an iterable or numpy array ")
            if len(ids) != len(targets):
                raise ValueError(
                    "The length of the targets must be the same as the length of the ids"
                )
            self.targets = np.asanyarray(targets)
        else:
            self.targets = None

        # Check all IDs are actually in the graph
        if any(n not in generator.graph for n in ids):
            raise KeyError(
                "Head nodes supplied to generator contain IDs not found in graph"
            )

        # Infer head_node_type
        if generator.schema.node_type_map is None:
            head_node_types = {generator.graph.type_for_node(n) for n in ids}
        else:
            head_node_types = {generator.schema.get_node_type(n) for n in ids}
        if len(head_node_types) > 1:
            raise ValueError(
                "Only a single head node type is currently supported for HinSAGE models"
            )
        head_node_type = head_node_types.pop()

        # Store the generator to draw samples from graph
        self.generator = generator
        self.ids = list(ids)
        self.data_size = len(self.ids)
        self.shuffle = shuffle

        # Shuffle IDs to start
        self.on_epoch_end()

        # Save head node type and generate sampling schema
        self.head_node_types = [head_node_type]
        self._sampling_schema = generator.schema.sampling_layout(
            self.head_node_types, generator.num_samples
        )

    def __len__(self):
        """Denotes the number of batches per epoch"""
        return int(np.ceil(self.data_size / self.generator.batch_size))

    def __getitem__(self, batch_num):
        """
        Generate one batch of data

        Args:
            batch_num (int): number of a batch

        Returns:
            batch_feats (list): Node features for nodes and neighbours sampled from a
                batch of the supplied IDs
            batch_targets (list): Targets/labels for the batch.

        """
        start_idx = self.generator.batch_size * batch_num
        end_idx = start_idx + self.generator.batch_size
        if start_idx >= self.data_size:
            raise IndexError("Mapper: batch_num larger than length of data")
        # print("Fetching batch {} [{}]".format(batch_num, start_idx))

        # The ID indices for this batch
        batch_indices = self.indices[start_idx:end_idx]

        # Get head (root) nodes
        head_ids = [self.ids[ii] for ii in batch_indices]

        # Get corresponding targets
        batch_targets = None if self.targets is None else self.targets[batch_indices]

        # Get sampled nodes
        batch_feats = self.generator.sample_features(head_ids, self._sampling_schema)

        return batch_feats, batch_targets

    def on_epoch_end(self):
        """
        Shuffle all head (root) nodes at the end of each epoch
        """
        self.indices = list(range(self.data_size))
        if self.shuffle:
            random.shuffle(self.indices)


[docs]class GraphSAGENodeGenerator: """ A data generator for node prediction with Homogeneous GraphSAGE models At minimum, supply the StellarGraph, the batch size, and the number of node samples for each layer of the GraphSAGE model. The supplied graph should be a StellarGraph object that is ready for machine learning. Currently the model requires node features for all nodes in the graph. Use the :meth:`flow` method supplying the nodes and (optionally) targets to get an object that can be used as a Keras data generator. Example:: G_generator = GraphSAGENodeGenerator(G, 50, [10,10]) train_data_gen = G_generator.flow(node_ids) Args: G (StellarGraph): The machine-learning ready graph. batch_size (int): Size of batch to return. num_samples (list): The number of samples per layer (hop) to take. schema (GraphSchema): [Optional] Graph schema for G. seed (int): [Optional] Random seed for the node sampler. name (str or None): Name of the generator (optional) """ def __init__(self, G, batch_size, num_samples, schema=None, seed=None, name=None): if not isinstance(G, StellarGraphBase): raise TypeError("Graph must be a StellarGraph object.") self.graph = G self.num_samples = num_samples self.batch_size = batch_size self.name = name # Check if the graph has features G.check_graph_for_ml() # Create sampler for GraphSAGE self.sampler = SampledBreadthFirstWalk(G, seed=seed) # We need a schema for compatibility with HinSAGE if schema is None: self.schema = G.create_graph_schema(create_type_maps=True) elif isinstance(schema, GraphSchema): self.schema = schema else: raise TypeError("Schema must be a GraphSchema object") # Check that there is only a single node type for GraphSAGE if len(self.schema.node_types) > 1: print( "Warning: running homogeneous GraphSAGE on a graph with multiple node types" )
[docs] def sample_features(self, head_nodes, sampling_schema): """ Sample neighbours recursively from the head nodes, collect the features of the sampled nodes, and return these as a list of feature arrays for the GraphSAGE algorithm. Args: head_nodes: An iterable of head nodes to perform sampling on. sampling_schema: The sampling schema for the model Returns: A list of the same length as ``num_samples`` of collected features from the sampled nodes of shape: ``(len(head_nodes), num_sampled_at_layer, feature_size)`` where num_sampled_at_layer is the cumulative product of `num_samples` for that layer. """ node_samples = self.sampler.run(nodes=head_nodes, n=1, n_size=self.num_samples) # The number of samples for each head node (not including itself) num_full_samples = np.sum(np.cumprod(self.num_samples)) # Isolated nodes will return only themselves in the sample list # let's correct for this by padding with None (the dummy node ID) node_samples = [ ns + [None] * num_full_samples if len(ns) == 1 else ns for ns in node_samples ] # Reshape node samples to sensible format def get_levels(loc, lsize, samples_per_hop, walks): end_loc = loc + lsize walks_at_level = list(it.chain(*[w[loc:end_loc] for w in walks])) if len(samples_per_hop) < 1: return [walks_at_level] return [walks_at_level] + get_levels( end_loc, lsize * samples_per_hop[0], samples_per_hop[1:], walks ) nodes_per_hop = get_levels(0, 1, self.num_samples, node_samples) node_type = sampling_schema[0][0][0] # Get features for sampled nodes batch_feats = [ self.graph.get_feature_for_nodes(layer_nodes, node_type) for layer_nodes in nodes_per_hop ] # Resize features to (batch_size, n_neighbours, feature_size) batch_feats = [ np.reshape(a, (len(head_nodes), -1 if np.size(a) > 0 else 0, a.shape[1])) for a in batch_feats ] return batch_feats
[docs] def flow(self, node_ids, targets=None, shuffle=False): """ Creates a generator/sequence object for training or evaluation with the supplied node ids and numeric targets. The node IDs are the nodes to train or inference on: the embeddings calculated for these nodes are passed to the downstream task. These are a subset of the nodes in the graph. The targets are an array of numeric targets corresponding to the supplied node_ids to be used by the downstream task. They should be given in the same order as the list of node IDs. If they are not specified (for example, for use in prediction), the targets will not be available to the downsteam task. Note that the shuffle argument should be True for training and False for prediction. Args: node_ids: an iterable of node IDs targets: a 2D array of numeric targets with shape `(len(node_ids), target_size)` shuffle (bool): If True the node_ids will be shuffled at each epoch, if False the node_ids will be processed in order. Returns: A NodeSequence object to use with the GraphSAGE model in Keras methods ``fit_generator``, ``evaluate_generator``, and ``predict_generator`` """ return NodeSequence(self, node_ids, targets, shuffle=shuffle)
[docs] def flow_from_dataframe(self, node_targets, shuffle=False): """ Creates a generator/sequence object for training or evaluation with the supplied node ids and numeric targets. Args: node_targets: a Pandas DataFrame of numeric targets indexed by the node ID for that target. shuffle (bool): If True the node_ids will be shuffled at each epoch, if False the node_ids will be processed in order. Returns: A NodeSequence object to use with the GraphSAGE model in Keras methods ``fit_generator``, ``evaluate_generator``, and ``predict_generator`` """ return NodeSequence( self, node_targets.index, node_targets.values, shuffle=shuffle )
[docs]class HinSAGENodeGenerator: """Keras-compatible data mapper for Heterogeneous GraphSAGE (HinSAGE) At minimum, supply the StellarGraph, the batch size, and the number of node samples for each layer of the HinSAGE model. The supplied graph should be a StellarGraph object that is ready for machine learning. Currently the model requires node features for all nodes in the graph. Use the :meth:`flow` method supplying the nodes and (optionally) targets to get an object that can be used as a Keras data generator. Note that the shuffle argument should be True for training and False for prediction. Example:: G_generator = HinSAGENodeGenerator(G, 50, [10,10]) data_gen = G_generator.flow(node_ids) """ def __init__(self, G, batch_size, num_samples, schema=None, seed=None, name=None): """ Args: G (StellarGraph): The machine-learning ready graph batch_size (int): Size of batch to return num_samples (list): The number of samples per layer (hop) to take schema (GraphSchema): [Optional] Graph schema for G. seed (int), Optional: Random seed for the node sampler name (str), optional: Name of the generator. """ self.graph = G self.num_samples = num_samples self.batch_size = batch_size self.name = name # We require a StellarGraph if not isinstance(G, StellarGraphBase): raise TypeError("Graph must be a StellarGraph object.") G.check_graph_for_ml(features=True) # Create sampler for HinSAGE self.sampler = SampledHeterogeneousBreadthFirstWalk(G, seed=seed) # Generate schema # We need a schema for compatibility with HinSAGE if schema is None: self.schema = G.create_graph_schema(create_type_maps=True) elif isinstance(schema, GraphSchema): self.schema = schema else: raise TypeError("Schema must be a GraphSchema object")
[docs] def sample_features(self, head_nodes, sampling_schema): """ Sample neighbours recursively from the head nodes, collect the features of the sampled nodes, and return these as a list of feature arrays for the GraphSAGE algorithm. Args: head_nodes: An iterable of head nodes to perform sampling on. sampling_schema: The node sampling schema for the HinSAGE model, this is can be generated by the ``GraphSchema`` object. Returns: A list of the same length as ``num_samples`` of collected features from the sampled nodes of shape: ``(len(head_nodes), num_sampled_at_layer, feature_size)`` where num_sampled_at_layer is the cumulative product of `num_samples` for that layer. """ # Get sampled nodes node_samples = self.sampler.run(nodes=head_nodes, n=1, n_size=self.num_samples) # Reshape node samples to the required format for the HinSAGE model # This requires grouping the sampled nodes by edge type and in order nodes_by_type = [ ( nt, reduce( operator.concat, (samples[ks] for samples in node_samples for ks in indices), [], ), ) for nt, indices in sampling_schema[0] ] # Get features batch_feats = [ self.graph.get_feature_for_nodes(layer_nodes, nt) for nt, layer_nodes in nodes_by_type ] # Resize features to (batch_size, n_neighbours, feature_size) batch_feats = [ np.reshape(a, (len(head_nodes), -1 if np.size(a) > 0 else 0, a.shape[1])) for a in batch_feats ] return batch_feats
[docs] def flow(self, node_ids, targets=None, shuffle=False): """ Creates a generator/sequence object for training or evaluation with the supplied node ids and numeric targets. The node IDs are the nodes to train or inference on: the embeddings calculated for these nodes are passed to the downstream task. These are a subset of the nodes in the graph. The targets are an array of numeric targets corresponding to the supplied node_ids to be used by the downstream task. They should be given in the same order as the list of node IDs. If they are not specified (for example, for use in prediction), the targets will not be available to the downsteam task. Note that the shuffle argument should be True for training and False for prediction. Args: node_ids (iterable): The head node IDs targets (Numpy array): a 2D array of numeric targets with shape ``(len(node_ids), target_size)`` shuffle (bool): If True the node_ids will be shuffled at each epoch, if False the node_ids will be processed in order. Returns: A NodeSequence object to use with the GraphSAGE model in Keras methods `fit_generator`, `evaluate_generator`, and `predict_generator`. """ return NodeSequence(self, node_ids, targets, shuffle=shuffle)
[docs] def flow_from_dataframe(self, node_targets, shuffle=False): """ Creates a generator/sequence object for training or evaluation with the supplied node ids and numeric targets. Note that the shuffle argument should be True for training and False for prediction. Args: node_targets (DataFrame): Numeric targets indexed by the node ID for that target. shuffle (bool): If True the node_ids will be shuffled at each epoch, if False the node_ids will be processed in order. Returns: A NodeSequence object to use with the GraphSAGE model in Keras methods `fit_generator`, `evaluate_generator`, and `predict_generator`. """ return NodeSequence( self, node_targets.index, node_targets.values, shuffle=shuffle )
class FullBatchNodeSequence(Sequence): """ Keras-compatible data generator to use with the Keras methods :meth:`keras.Model.fit_generator`, :meth:`keras.Model.evaluate_generator`, and :meth:`keras.Model.predict_generator`, for models that require full-batch training (e.g., GCN, GAT). This class generated data samples for node inference models and should be created using the `.flow(...)` method of :class:`FullBatchNodeGenerator`. These Generators are classes that capture the graph structure and the feature vectors of each node. """ def __init__(self, features, A, targets=None, sample_weight=None): """ Args: features: a matrix of node features of size (N x F), where N is the number of nodes in the graph, F is the node feature size A: an adjacency matrix of the graph targets: an optional array of node targets of size (N x C), where C is the target size (e.g., number of classes for one-hot class targets) sample_weight: Optional Numpy array of weights for the node samples, used for weighting the loss function during training or evaluation. You can either pass a flat (1D) Numpy array with the same length as the input features (1:1 mapping between weights and rows in features) """ self.features = features self.A = A self.targets = targets self.sample_weight = sample_weight def __len__(self): return 1 def __getitem__(self, index): return [self.features, self.A], self.targets, self.sample_weight
[docs]class FullBatchNodeGenerator: """ A data generator for node prediction with Homogeneous full-batch models, e.g., GCN, GAT. The supplied graph G should be a StellarGraph object that is ready for machine learning. Currently the model requires node features to be available for all nodes in the graph. Use the :meth:`flow` method supplying the nodes and (optionally) targets to get an object that can be used as a Keras data generator. Example:: G_generator = FullBatchNodeGenerator(G) train_data_gen = G_generator.flow(node_ids, node_targets) # Fetch the data from train_data_gen, and feed into a Keras model: [X, A], y_train, node_mask_train = train_data_gen.__getitem__(0) model.fit(x=[X, A], y=y_train, sample_weight=node_mask_train, ...) # Alternatively, use the generator itself with model.fit_generator: model.fit_generator(train_gen, epochs=num_epochs, ...) Args: G (StellarGraphBase): a machine-learning StellarGraph-type graph name (str): an optional name of the generator func_opt: an optional function to apply on features and adjacency matrix (declared func_opt(features, Aadj, **kwargs)) kwargs: additional parameters needed when using this generator with GCN model with the [func_opt] function. It must be chebyshev or localpool filters (e.g. filter="localpool", or filter="chebyshev", max_degree=2). For more information, please read `GCN_Aadj_feats_op <https://github.com/stellargraph/stellargraph/tree/master/stellargraph/core>`_ in the file **utils.py** and GCN demo `gcn-cora-example.py <https://github.com/stellargraph/stellargraph/blob/master/demos/node-classification-gcn/gcn-cora-example.py>`_ """ def __init__(self, G, name=None, func_opt=None, **kwargs): if not isinstance(G, StellarGraphBase): raise TypeError("Graph must be a StellarGraph object.") self.graph = G self.name = name self.kwargs = kwargs # Check if the graph has features G.check_graph_for_ml() # Create sparse adjacency matrix self.node_list = list(G.nodes()) self.Aadj = nx.adjacency_matrix(G, nodelist=self.node_list) # Power-user feature: make the generator yield dense adjacency matrix instead of the default sparse one. # this is needed for GAT model to be differentiable through all layers down to the input, e.g., for saliency maps self.sparse = kwargs.get("sparse", True) if not self.sparse: self.Aadj = self.Aadj.todense() # We need a schema to check compatibility with GAT, GCN self.schema = G.create_graph_schema(create_type_maps=True) # Check that there is only a single node type for GAT or GCN if len(self.schema.node_types) > 1: raise TypeError( "{}: node generator requires graph with single node type; " "a graph with multiple node types is passed. Stopping.".format( type(self).__name__ ) ) # Get the features for the nodes self.features = G.get_feature_for_nodes(self.node_list) if func_opt is not None: if callable(func_opt): self.features, self.Aadj = func_opt( features=self.features, A=self.Aadj, **kwargs ) else: raise ValueError("argument 'func_opt' must be a callable.")
[docs] def flow(self, node_ids, targets=None): """ Creates a generator/sequence object for training or evaluation with the supplied node ids and numeric targets. Args: node_ids: and iterable of node ids for the nodes of interest (e.g., training, validation, or test set nodes) targets: a 2D array of numeric node targets with shape `(len(node_ids), target_size)` Returns: A NodeSequence object to use with GCN or GAT models in Keras methods :meth:`fit_generator`, :meth:`evaluate_generator`, and :meth:`predict_generator` """ # Check targets is an iterable if not is_real_iterable(targets) and targets is not None: raise TypeError("Targets must be an iterable or None") # The list of indices of the target nodes in self.node_list node_indices = np.array([self.node_list.index(n) for n in node_ids]) node_mask = np.zeros(len(self.node_list), dtype=int) node_mask[node_indices] = 1 node_mask = np.ma.make_mask(node_mask) # Reshape targets to (number of nodes in self.graph, number of classes), and store in y if targets is not None: targets = np.array(targets) if len(targets.shape) == 1: c = 1 else: c = targets.shape[1] n = self.Aadj.shape[0] y = np.zeros((n, c)) for i, t in zip(node_indices, targets): y[i] = t else: y = None return FullBatchNodeSequence(self.features, self.Aadj, y, node_mask)