# -*- coding: utf-8 -*-
#
# Copyright 2018-2019 Data61, CSIRO
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mappers to provide input data for the graph models in layers.
"""
__all__ = [
"NodeSequence",
"GraphSAGENodeGenerator",
"HinSAGENodeGenerator",
"Attri2VecNodeGenerator",
"FullBatchNodeGenerator",
"FullBatchNodeSequence",
"SparseFullBatchNodeSequence",
"DirectedGraphSAGENodeGenerator",
]
import warnings
import operator
import random
import numpy as np
import itertools as it
import networkx as nx
import scipy.sparse as sps
from tensorflow.keras import backend as K
from functools import reduce
from tensorflow.keras.utils import Sequence
from ..data.explorer import (
SampledBreadthFirstWalk,
SampledHeterogeneousBreadthFirstWalk,
DirectedBreadthFirstNeighbours,
)
from ..core.graph import StellarGraphBase, GraphSchema, StellarDiGraph
from ..core.utils import is_real_iterable
from ..core.utils import GCN_Aadj_feats_op, PPNP_Aadj_feats_op
class NodeSequence(Sequence):
"""Keras-compatible data generator to use with the Keras
methods :meth:`keras.Model.fit_generator`, :meth:`keras.Model.evaluate_generator`,
and :meth:`keras.Model.predict_generator`.
This class generated data samples for node inference models
and should be created using the `.flow(...)` method of
:class:`GraphSAGENodeGenerator` or :class:`DirectedGraphSAGENodeGenerator`
or :class:`HinSAGENodeGenerator` or :class:`Attri2VecNodeGenerator`.
GraphSAGENodeGenerator, DirectedGraphSAGENodeGenerator,and HinSAGENodeGenerator
are classes that capture the graph structure and the feature vectors of each node.
These generator classes are used within the NodeSequence to generate
samples of k-hop neighbourhoods in the graph and to return to this
class the features from the sampled neighbourhoods.
Attri2VecNodeGenerator is the class that captures node feature vectors
of each node.
Args:
generator: GraphSAGENodeGenerator, DirectedGraphSAGENodeGenerator or
HinSAGENodeGenerator or Attri2VecNodeGenerator. The generator object
containing the graph information.
ids: list
A list of the node_ids to be used as head-nodes in the
downstream task.
targets: list, optional (default=None)
A list of targets or labels to be used in the downstream
class.
shuffle (bool): If True (default) the ids will be randomly shuffled every epoch.
"""
def __init__(self, generator, ids, targets=None, shuffle=True):
# Check that ids is an iterable
if not is_real_iterable(ids):
raise TypeError("IDs must be an iterable or numpy array of graph node IDs")
# Check targets is iterable & has the correct length
if targets is not None:
if not is_real_iterable(targets):
raise TypeError("Targets must be None or an iterable or numpy array ")
if len(ids) != len(targets):
raise ValueError(
"The length of the targets must be the same as the length of the ids"
)
self.targets = np.asanyarray(targets)
else:
self.targets = None
# Check all IDs are actually in the graph
if any(n not in generator.graph for n in ids):
raise KeyError(
"Head nodes supplied to generator contain IDs not found in graph"
)
# Store the generator to draw samples from graph
if (
isinstance(generator, GraphSAGENodeGenerator)
or isinstance(generator, DirectedGraphSAGENodeGenerator)
or isinstance(generator, HinSAGENodeGenerator)
or isinstance(generator, Attri2VecNodeGenerator)
):
self.generator = generator
else:
raise TypeError(
"({}) GraphSAGENodeGenerator, DirectedGraphSAGENodeGenerator, HinSAGENodeGenerator or Attri2VecNodeGenerator is required.".format(
type(self).__name__
)
)
self.ids = list(ids)
self.data_size = len(self.ids)
self.shuffle = shuffle
# Shuffle IDs to start
self.on_epoch_end()
if (
isinstance(self.generator, GraphSAGENodeGenerator)
or isinstance(self.generator, DirectedGraphSAGENodeGenerator)
or isinstance(self.generator, HinSAGENodeGenerator)
):
# Infer head_node_type
if (
generator.schema.node_type_map is None
or generator.schema.edge_type_map is None
):
raise RuntimeError("Schema must have node and edge type maps.")
else:
head_node_types = {generator.schema.get_node_type(n) for n in ids}
if len(head_node_types) > 1:
raise ValueError(
"Only a single head node type is currently supported for HinSAGE models"
)
head_node_type = head_node_types.pop()
# Save head node type and generate sampling schema
self.head_node_types = [head_node_type]
### Experimental; for directed sampling
num_samples = getattr(generator, "num_samples", [])
self._sampling_schema = generator.schema.sampling_layout(
self.head_node_types, num_samples
)
def __len__(self):
"""Denotes the number of batches per epoch"""
return int(np.ceil(self.data_size / self.generator.batch_size))
def __getitem__(self, batch_num):
"""
Generate one batch of data
Args:
batch_num (int): number of a batch
Returns:
batch_feats (list): Node features for nodes and neighbours sampled from a
batch of the supplied IDs
batch_targets (list): Targets/labels for the batch.
"""
start_idx = self.generator.batch_size * batch_num
end_idx = start_idx + self.generator.batch_size
if start_idx >= self.data_size:
raise IndexError("Mapper: batch_num larger than length of data")
# print("Fetching batch {} [{}]".format(batch_num, start_idx))
# The ID indices for this batch
batch_indices = self.indices[start_idx:end_idx]
# Get head (root) nodes
head_ids = [self.ids[ii] for ii in batch_indices]
# Get corresponding targets
batch_targets = None if self.targets is None else self.targets[batch_indices]
if isinstance(self.generator, GraphSAGENodeGenerator) or isinstance(
self.generator, HinSAGENodeGenerator
):
# Get sampled nodes for GraphSAGENodeGenerator and HinSAGENodeGenerator
batch_feats = self.generator.sample_features(
head_ids, self._sampling_schema
)
else:
# Get sampled nodes for Attri2VecNodeGenerator
batch_feats = self.generator.sample_features(head_ids)
return batch_feats, batch_targets
def on_epoch_end(self):
"""
Shuffle all head (root) nodes at the end of each epoch
"""
self.indices = list(range(self.data_size))
if self.shuffle:
random.shuffle(self.indices)
[docs]class GraphSAGENodeGenerator:
"""
A data generator for node prediction with Homogeneous GraphSAGE models
At minimum, supply the StellarGraph, the batch size, and the number of
node samples for each layer of the GraphSAGE model.
The supplied graph should be a StellarGraph object that is ready for
machine learning. Currently the model requires node features for all
nodes in the graph.
Use the :meth:`flow` method supplying the nodes and (optionally) targets
to get an object that can be used as a Keras data generator.
Example::
G_generator = GraphSAGENodeGenerator(G, 50, [10,10])
train_data_gen = G_generator.flow(train_node_ids, train_node_labels)
test_data_gen = G_generator.flow(test_node_ids)
Args:
G (StellarGraph): The machine-learning ready graph.
batch_size (int): Size of batch to return.
num_samples (list): The number of samples per layer (hop) to take.
schema (GraphSchema): [Optional] Graph schema for G.
seed (int): [Optional] Random seed for the node sampler.
name (str or None): Name of the generator (optional)
"""
def __init__(self, G, batch_size, num_samples, schema=None, seed=None, name=None):
if not isinstance(G, StellarGraphBase):
raise TypeError("Graph must be a StellarGraph object.")
self.graph = G
self.num_samples = num_samples
self.batch_size = batch_size
self.name = name
# Check if the graph has features
G.check_graph_for_ml()
# We need a schema for compatibility with HinSAGE
if schema is None:
self.schema = G.create_graph_schema(create_type_maps=True)
elif isinstance(schema, GraphSchema):
self.schema = schema
else:
raise TypeError("Schema must be a GraphSchema object")
# Check that there is only a single node type for GraphSAGE
if len(self.schema.node_types) > 1:
print(
"Warning: running homogeneous GraphSAGE on a graph with multiple node types"
)
# Create sampler for GraphSAGE
self.sampler = SampledBreadthFirstWalk(G, graph_schema=self.schema, seed=seed)
[docs] def sample_features(self, head_nodes, sampling_schema):
"""
Sample neighbours recursively from the head nodes, collect the features of the
sampled nodes, and return these as a list of feature arrays for the GraphSAGE
algorithm.
Args:
head_nodes: An iterable of head nodes to perform sampling on.
sampling_schema: The sampling schema for the model
Returns:
A list of the same length as ``num_samples`` of collected features from
the sampled nodes of shape:
``(len(head_nodes), num_sampled_at_layer, feature_size)``
where num_sampled_at_layer is the cumulative product of `num_samples`
for that layer.
"""
node_samples = self.sampler.run(nodes=head_nodes, n=1, n_size=self.num_samples)
# The number of samples for each head node (not including itself)
num_full_samples = np.sum(np.cumprod(self.num_samples))
# Reshape node samples to sensible format
def get_levels(loc, lsize, samples_per_hop, walks):
end_loc = loc + lsize
walks_at_level = list(it.chain(*[w[loc:end_loc] for w in walks]))
if len(samples_per_hop) < 1:
return [walks_at_level]
return [walks_at_level] + get_levels(
end_loc, lsize * samples_per_hop[0], samples_per_hop[1:], walks
)
nodes_per_hop = get_levels(0, 1, self.num_samples, node_samples)
node_type = sampling_schema[0][0][0]
# Get features for sampled nodes
batch_feats = [
self.graph.get_feature_for_nodes(layer_nodes, node_type)
for layer_nodes in nodes_per_hop
]
# Resize features to (batch_size, n_neighbours, feature_size)
batch_feats = [
np.reshape(a, (len(head_nodes), -1 if np.size(a) > 0 else 0, a.shape[1]))
for a in batch_feats
]
return batch_feats
[docs] def flow(self, node_ids, targets=None, shuffle=False):
"""
Creates a generator/sequence object for training or evaluation
with the supplied node ids and numeric targets.
The node IDs are the nodes to train or inference on: the embeddings
calculated for these nodes are passed to the downstream task. These
are a subset of the nodes in the graph.
The targets are an array of numeric targets corresponding to the
supplied node_ids to be used by the downstream task. They should
be given in the same order as the list of node IDs.
If they are not specified (for example, for use in prediction),
the targets will not be available to the downstream task.
Note that the shuffle argument should be True for training and
False for prediction.
Args:
node_ids: an iterable of node IDs
targets: a 2D array of numeric targets with shape
`(len(node_ids), target_size)`
shuffle (bool): If True the node_ids will be shuffled at each
epoch, if False the node_ids will be processed in order.
Returns:
A NodeSequence object to use with the GraphSAGE model
in Keras methods ``fit_generator``, ``evaluate_generator``,
and ``predict_generator``
"""
return NodeSequence(self, node_ids, targets, shuffle=shuffle)
[docs] def flow_from_dataframe(self, node_targets, shuffle=False):
"""
Creates a generator/sequence object for training or evaluation
with the supplied node ids and numeric targets.
Args:
node_targets: a Pandas DataFrame of numeric targets indexed
by the node ID for that target.
shuffle (bool): If True the node_ids will be shuffled at each
epoch, if False the node_ids will be processed in order.
Returns:
A NodeSequence object to use with the GraphSAGE model
in Keras methods ``fit_generator``, ``evaluate_generator``,
and ``predict_generator``
"""
return NodeSequence(
self, node_targets.index, node_targets.values, shuffle=shuffle
)
[docs]class HinSAGENodeGenerator:
"""Keras-compatible data mapper for Heterogeneous GraphSAGE (HinSAGE)
At minimum, supply the StellarGraph, the batch size, and the number of
node samples for each layer of the HinSAGE model.
The supplied graph should be a StellarGraph object that is ready for
machine learning. Currently the model requires node features for all
nodes in the graph.
Use the :meth:`flow` method supplying the nodes and (optionally) targets
to get an object that can be used as a Keras data generator.
Note that the shuffle argument should be True for training and
False for prediction.
Example::
G_generator = HinSAGENodeGenerator(G, 50, [10,10])
train_data_gen = G_generator.flow(train_node_ids, train_node_labels)
test_data_gen = G_generator.flow(test_node_ids)
"""
def __init__(self, G, batch_size, num_samples, schema=None, seed=None, name=None):
"""
Args:
G (StellarGraph): The machine-learning ready graph
batch_size (int): Size of batch to return
num_samples (list): The number of samples per layer (hop) to take
schema (GraphSchema): [Optional] Graph schema for G.
seed (int), Optional: Random seed for the node sampler
name (str), optional: Name of the generator.
"""
self.graph = G
self.num_samples = num_samples
self.batch_size = batch_size
self.name = name
# We require a StellarGraph
if not isinstance(G, StellarGraphBase):
raise TypeError("Graph must be a StellarGraph object.")
G.check_graph_for_ml(features=True)
# Generate schema
# We need a schema for compatibility with HinSAGE
if schema is None:
self.schema = G.create_graph_schema(create_type_maps=True)
elif isinstance(schema, GraphSchema):
self.schema = schema
else:
raise TypeError("Schema must be a GraphSchema object")
# Create sampler for HinSAGE
self.sampler = SampledHeterogeneousBreadthFirstWalk(
G, graph_schema=self.schema, seed=seed
)
[docs] def sample_features(self, head_nodes, sampling_schema):
"""
Sample neighbours recursively from the head nodes, collect the features of the
sampled nodes, and return these as a list of feature arrays for the GraphSAGE
algorithm.
Args:
head_nodes: An iterable of head nodes to perform sampling on.
sampling_schema: The node sampling schema for the HinSAGE model,
this is can be generated by the ``GraphSchema`` object.
Returns:
A list of the same length as ``num_samples`` of collected features from
the sampled nodes of shape:
``(len(head_nodes), num_sampled_at_layer, feature_size)``
where num_sampled_at_layer is the cumulative product of `num_samples`
for that layer.
"""
# Get sampled nodes
node_samples = self.sampler.run(nodes=head_nodes, n=1, n_size=self.num_samples)
# Reshape node samples to the required format for the HinSAGE model
# This requires grouping the sampled nodes by edge type and in order
nodes_by_type = [
(
nt,
reduce(
operator.concat,
(samples[ks] for samples in node_samples for ks in indices),
[],
),
)
for nt, indices in sampling_schema[0]
]
# Get features
batch_feats = [
self.graph.get_feature_for_nodes(layer_nodes, nt)
for nt, layer_nodes in nodes_by_type
]
# Resize features to (batch_size, n_neighbours, feature_size)
batch_feats = [
np.reshape(a, (len(head_nodes), -1 if np.size(a) > 0 else 0, a.shape[1]))
for a in batch_feats
]
return batch_feats
[docs] def flow(self, node_ids, targets=None, shuffle=False):
"""
Creates a generator/sequence object for training or evaluation
with the supplied node ids and numeric targets.
The node IDs are the nodes to train or inference on: the embeddings
calculated for these nodes are passed to the downstream task. These
are a subset of the nodes in the graph.
The targets are an array of numeric targets corresponding to the
supplied node_ids to be used by the downstream task. They should
be given in the same order as the list of node IDs.
If they are not specified (for example, for use in prediction),
the targets will not be available to the downstream task.
Note that the shuffle argument should be True for training and
False for prediction.
Args:
node_ids (iterable): The head node IDs
targets (Numpy array): a 2D array of numeric targets with shape
``(len(node_ids), target_size)``
shuffle (bool): If True the node_ids will be shuffled at each
epoch, if False the node_ids will be processed in order.
Returns:
A NodeSequence object to use with the GraphSAGE model
in Keras methods `fit_generator`, `evaluate_generator`,
and `predict_generator`.
"""
return NodeSequence(self, node_ids, targets, shuffle=shuffle)
[docs] def flow_from_dataframe(self, node_targets, shuffle=False):
"""
Creates a generator/sequence object for training or evaluation
with the supplied node ids and numeric targets.
Note that the shuffle argument should be True for training and
False for prediction.
Args:
node_targets (DataFrame): Numeric targets indexed
by the node ID for that target.
shuffle (bool): If True the node_ids will be shuffled at each
epoch, if False the node_ids will be processed in order.
Returns:
A NodeSequence object to use with the GraphSAGE model
in Keras methods `fit_generator`, `evaluate_generator`,
and `predict_generator`.
"""
return NodeSequence(
self, node_targets.index, node_targets.values, shuffle=shuffle
)
[docs]class Attri2VecNodeGenerator:
"""
A node feature generator for node representation prediction with the
attri2vec model.
At minimum, supply the StellarGraph and the batch size.
The supplied graph should be a StellarGraph object that is ready for
machine learning. Currently the model requires node features for all
nodes in the graph.
Use the :meth:`flow` method supplying the nodes to get an object
that can be used as a Keras data generator.
Example::
G_generator = Attri2VecNodeGenerator(G, 50)
data_gen = G_generator.flow(node_ids)
Args:
G (StellarGraph): The machine-learning ready graph.
batch_size (int): Size of batch to return.
name (str or None): Name of the generator (optional).
"""
def __init__(self, G, batch_size, name=None):
if not isinstance(G, StellarGraphBase):
raise TypeError("Graph must be a StellarGraph object.")
self.graph = G
self.batch_size = batch_size
self.name = name
# Check if the graph has features
G.check_graph_for_ml()
[docs] def sample_features(self, head_nodes):
"""
Sample content features of the head nodes, and return these as a list of feature
arrays for the attri2vec algorithm.
Args:
head_nodes: An iterable of head nodes to perform sampling on.
Returns:
A list of feature arrays, with each element being the feature of a
head node.
"""
batch_feats = self.graph.get_feature_for_nodes(head_nodes)
return batch_feats
[docs] def flow(self, node_ids):
"""
Creates a generator/sequence object for node representation prediction
with the supplied node ids.
The node IDs are the nodes to inference on: the embeddings
calculated for these nodes are passed to the downstream task. These
are a subset/all of the nodes in the graph.
Args:
node_ids: an iterable of node IDs.
Returns:
A NodeSequence object to use with the Attri2Vec model
in the Keras method ``predict_generator``.
"""
return NodeSequence(self, node_ids, shuffle=False)
[docs] def flow_from_dataframe(self, node_ids):
"""
Creates a generator/sequence object for node representation prediction
with the supplied node ids.
Args:
node_ids: a Pandas DataFrame of node_ids.
Returns:
A NodeSequence object to use with the Attri2Vec model
in the Keras method ``predict_generator``.
"""
return NodeSequence(self, node_ids.index)
class SparseFullBatchNodeSequence(Sequence):
"""
Keras-compatible data generator for for node inference models
that require full-batch training (e.g., GCN, GAT).
Use this class with the Keras methods :meth:`keras.Model.fit_generator`,
:meth:`keras.Model.evaluate_generator`, and
:meth:`keras.Model.predict_generator`,
This class uses sparse matrix representations to send data to the models,
and only works with the Keras tensorflow backend. For any other backends,
use the :class:`FullBatchNodeSequence` class.
This class should be created using the `.flow(...)` method of
:class:`FullBatchNodeGenerator`.
Args:
features (np.ndarray): An array of node features of size (N x F),
where N is the number of nodes in the graph, F is the node feature size
A (sparse matrix): An adjacency matrix of the graph of size (N x N).
targets (np.ndarray, optional): An optional array of node targets of size (N x C),
where C is the target size (e.g., number of classes for one-hot class targets)
indices (np.ndarray, optional): Array of indices to the feature and adjacency matrix
of the targets. Required if targets is not None.
"""
use_sparse = True
def __init__(self, features, A, targets=None, indices=None):
if (targets is not None) and (len(indices) != len(targets)):
raise ValueError(
"When passed together targets and indices should be the same length."
)
# Store features and targets as np.ndarray
self.features = np.asanyarray(features)
self.target_indices = np.asanyarray(indices)
# Ensure matrix is in COO format to extract indices
if sps.isspmatrix(A):
A = A.tocoo()
else:
raise ValueError("Adjacency matrix not in expected sparse format")
# Convert matrices to list of indices & values
self.A_indices = np.expand_dims(np.hstack((A.row[:, None], A.col[:, None])), 0)
self.A_values = np.expand_dims(A.data, 0)
# Reshape all inputs to have batch dimension of 1
self.target_indices = np.reshape(
self.target_indices, (1,) + self.target_indices.shape
)
self.features = np.reshape(self.features, (1,) + self.features.shape)
self.inputs = [
self.features,
self.target_indices,
self.A_indices,
self.A_values,
]
if targets is not None:
self.targets = np.asanyarray(targets)
self.targets = np.reshape(self.targets, (1,) + self.targets.shape)
else:
self.targets = None
def __len__(self):
return 1
def __getitem__(self, index):
return self.inputs, self.targets
class FullBatchNodeSequence(Sequence):
"""
Keras-compatible data generator for for node inference models
that require full-batch training (e.g., GCN, GAT).
Use this class with the Keras methods :meth:`keras.Model.fit_generator`,
:meth:`keras.Model.evaluate_generator`, and
:meth:`keras.Model.predict_generator`,
This class should be created using the `.flow(...)` method of
:class:`FullBatchNodeGenerator`.
Args:
features (np.ndarray): An array of node features of size (N x F),
where N is the number of nodes in the graph, F is the node feature size
A (np.ndarray or sparse matrix): An adjacency matrix of the graph of size (N x N).
targets (np.ndarray, optional): An optional array of node targets of size (N x C),
where C is the target size (e.g., number of classes for one-hot class targets)
indices (np.ndarray, optional): Array of indices to the feature and adjacency matrix
of the targets. Required if targets is not None.
"""
use_sparse = False
def __init__(self, features, A, targets=None, indices=None):
if (targets is not None) and (len(indices) != len(targets)):
raise ValueError(
"When passed together targets and indices should be the same length."
)
# Store features and targets as np.ndarray
self.features = np.asanyarray(features)
self.target_indices = np.asanyarray(indices)
# Convert sparse matrix to dense:
if sps.issparse(A) and hasattr(A, "toarray"):
self.A_dense = A.toarray()
elif isinstance(A, (np.ndarray, np.matrix)):
self.A_dense = np.asanyarray(A)
else:
raise TypeError(
"Expected input matrix to be either a Scipy sparse matrix or a Numpy array."
)
# Reshape all inputs to have batch dimension of 1
self.features = np.reshape(self.features, (1,) + self.features.shape)
self.A_dense = self.A_dense.reshape((1,) + self.A_dense.shape)
self.target_indices = np.reshape(
self.target_indices, (1,) + self.target_indices.shape
)
self.inputs = [self.features, self.target_indices, self.A_dense]
if targets is not None:
self.targets = np.asanyarray(targets)
self.targets = np.reshape(self.targets, (1,) + self.targets.shape)
else:
self.targets = None
def __len__(self):
return 1
def __getitem__(self, index):
return self.inputs, self.targets
[docs]class FullBatchNodeGenerator:
"""
A data generator for use with full-batch models on homogeneous graphs,
e.g., GCN, GAT, SGC.
The supplied graph G should be a StellarGraph object that is ready for
machine learning. Currently the model requires node features to be available for all
nodes in the graph.
Use the :meth:`flow` method supplying the nodes and (optionally) targets
to get an object that can be used as a Keras data generator.
This generator will supply the features array and the adjacency matrix to a
full-batch Keras graph ML model. There is a choice to supply either a sparse
adjacency matrix (the default) or a dense adjacency matrix, with the `sparse`
argument.
For these algorithms the adjacency matrix requires pre-processing and the
'method' option should be specified with the correct pre-processing for
each algorithm. The options are as follows:
* ``method='gcn'`` Normalizes the adjacency matrix for the GCN algorithm.
This implements the linearized convolution of Eq. 8 in [1].
* ``method='chebyshev'``: Implements the approximate spectral convolution
operator by implementing the k-th order Chebyshev expansion of Eq. 5 in [1].
* ``method='sgc'``: This replicates the k-th order smoothed adjacency matrix
to implement the Simplified Graph Convolutions of Eq. 8 in [2].
* ``method='self_loops'`` or ``method='gat'``: Simply sets the diagonal elements
of the adjacency matrix to one, effectively adding self-loops to the graph. This is
used by the GAT algorithm of [3].
* ``method='ppnp'`` Calculates the personalized page rank matrix of Eq 2 in [4].
[1] `Kipf and Welling, 2017 <https://arxiv.org/abs/1609.02907>`_.
[2] `Wu et al. 2019 <https://arxiv.org/abs/1902.07153>`_.
[3] `Veličković et al., 2018 <https://arxiv.org/abs/1710.10903>`_
[4] `Klicpera et al., 2018 <https://arxiv.org/abs/1810.05997>`_.
Example::
G_generator = FullBatchNodeGenerator(G)
train_data_gen = G_generator.flow(node_ids, node_targets)
# Fetch the data from train_data_gen, and feed into a Keras model:
x_inputs, y_train = train_data_gen[0]
model.fit(x=x_inputs, y=y_train)
# Alternatively, use the generator itself with model.fit_generator:
model.fit_generator(train_gen, epochs=num_epochs, ...)
For more information, please see the GCN/GAT, PPNP/APPNP and SGC demos:
`<https://github.com/stellargraph/stellargraph/blob/master/demos/>`_
Args:
G (StellarGraphBase): a machine-learning StellarGraph-type graph
name (str): an optional name of the generator
method (str): Method to pre-process adjacency matrix. One of 'gcn' (default),
'chebyshev','sgc', 'self_loops', or 'none'.
k (None or int): This is the smoothing order for the 'sgc' method or the
Chebyshev series order for the 'chebyshev' method. In both cases this
should be positive integer.
transform (callable): an optional function to apply on features and adjacency matrix
the function takes (features, Aadj) as arguments.
sparse (bool): If True (default) a sparse adjacency matrix is used,
if False a dense adjacency matrix is used.
teleport_probability (float): teleport probability between 0.0 and 1.0. "probability" of returning to the
starting node in the propagation step as in [4].
"""
def __init__(
self,
G,
name=None,
method="gcn",
k=1,
sparse=True,
transform=None,
teleport_probability=0.1,
):
if not isinstance(G, StellarGraphBase):
raise TypeError("Graph must be a StellarGraph object.")
self.graph = G
self.name = name
self.k = k
self.teleport_probability = teleport_probability
self.method = method
# Check if the graph has features
G.check_graph_for_ml()
# Create sparse adjacency matrix
self.node_list = list(G.nodes())
self.Aadj = nx.to_scipy_sparse_matrix(
G, nodelist=self.node_list, dtype="float32", weight="weight", format="coo"
)
# Power-user feature: make the generator yield dense adjacency matrix instead
# of the default sparse one.
# If sparse is specified, check that the backend is tensorflow
if sparse and K.backend() != "tensorflow":
warnings.warn(
"Sparse adjacency matrices are only supported in tensorflow."
" Falling back to using a dense adjacency matrix."
)
self.use_sparse = False
else:
self.use_sparse = sparse
# We need a schema to check compatibility with GAT, GCN
self.schema = G.create_graph_schema(create_type_maps=True)
# Check that there is only a single node type for GAT or GCN
if len(self.schema.node_types) > 1:
raise TypeError(
"{}: node generator requires graph with single node type; "
"a graph with multiple node types is passed. Stopping.".format(
type(self).__name__
)
)
# Get the features for the nodes
self.features = G.get_feature_for_nodes(self.node_list)
if transform is not None:
if callable(transform):
self.features, self.Aadj = transform(
features=self.features, A=self.Aadj
)
else:
raise ValueError("argument 'transform' must be a callable.")
elif self.method in ["gcn", "chebyshev", "sgc"]:
self.features, self.Aadj = GCN_Aadj_feats_op(
features=self.features, A=self.Aadj, k=self.k, method=self.method
)
elif self.method in ["gat", "self_loops"]:
self.Aadj = self.Aadj + sps.diags(
np.ones(self.Aadj.shape[0]) - self.Aadj.diagonal()
)
elif self.method in ["ppnp"]:
if self.use_sparse:
raise ValueError(
"use_sparse=true' is incompatible with 'ppnp'."
"Set 'use_sparse=True' or consider using the APPNP model instead."
)
self.features, self.Aadj = PPNP_Aadj_feats_op(
features=self.features,
A=self.Aadj,
teleport_probability=self.teleport_probability,
)
elif self.method in [None, "none"]:
pass
else:
raise ValueError(
"Undefined method for adjacency matrix transformation. "
"Accepted: 'gcn' (default), 'chebyshev','sgc', and 'self_loops'."
)
[docs] def flow(self, node_ids, targets=None):
"""
Creates a generator/sequence object for training or evaluation
with the supplied node ids and numeric targets.
Args:
node_ids: and iterable of node ids for the nodes of interest
(e.g., training, validation, or test set nodes)
targets: a 2D array of numeric node targets with shape `(len(node_ids), target_size)`
Returns:
A NodeSequence object to use with GCN or GAT models
in Keras methods :meth:`fit_generator`, :meth:`evaluate_generator`,
and :meth:`predict_generator`
"""
if targets is not None:
# Check targets is an iterable
if not is_real_iterable(targets):
raise TypeError("Targets must be an iterable or None")
# Check targets correct shape
if len(targets) != len(node_ids):
raise TypeError("Targets must be the same length as node_ids")
# The list of indices of the target nodes in self.node_list
node_indices = np.array([self.node_list.index(n) for n in node_ids])
if self.use_sparse:
return SparseFullBatchNodeSequence(
self.features, self.Aadj, targets, node_indices
)
else:
return FullBatchNodeSequence(
self.features, self.Aadj, targets, node_indices
)
class DirectedGraphSAGENodeGenerator:
"""
A data generator for node prediction with homogeneous GraphSAGE models
on directed graphs.
At minimum, supply the StellarDiGraph, the batch size, and the number of
node samples (separately for in-nodes and out-nodes)
for each layer of the GraphSAGE model.
The supplied graph should be a StellarDiGraph object that is ready for
machine learning. Currently the model requires node features for all
nodes in the graph.
Use the :meth:`flow` method supplying the nodes and (optionally) targets
to get an object that can be used as a Keras data generator.
Example::
G_generator = DirectedGraphSAGENodeGenerator(G, 50, [10,5], [5,1])
train_data_gen = G_generator.flow(train_node_ids, train_node_labels)
test_data_gen = G_generator.flow(test_node_ids)
Args:
G (StellarDiGraph): The machine-learning ready graph.
batch_size (int): Size of batch to return.
in_samples (list): The number of in-node samples per layer (hop) to take.
out_samples (list): The number of out-node samples per layer (hop) to take.
schema (GraphSchema): [Optional] Graph schema for G.
seed (int): [Optional] Random seed for the node sampler.
name (str or None): Name of the generator (optional)
"""
def __init__(
self, G, batch_size, in_samples, out_samples, schema=None, seed=None, name=None
):
if not isinstance(G, StellarDiGraph):
raise TypeError("Graph must be a StellarDiGraph object.")
# TODO Add checks for in- and out-nodes sizes
self.graph = G
self.in_samples = in_samples
self.out_samples = out_samples
self.batch_size = batch_size
self.name = name
# Check if the graph has features
G.check_graph_for_ml()
# We need a schema for compatibility with HinSAGE
if schema is None:
self.schema = G.create_graph_schema(create_type_maps=True)
elif isinstance(schema, GraphSchema):
self.schema = schema
else:
raise TypeError("Schema must be a GraphSchema object")
# Check that there is only a single node type for GraphSAGE
if len(self.schema.node_types) > 1:
print(
"Warning: running homogeneous GraphSAGE on a graph with multiple node types"
)
# Create sampler for GraphSAGE
self.sampler = DirectedBreadthFirstNeighbours(
G, graph_schema=self.schema, seed=seed
)
def sample_features(self, head_nodes, sampling_schema):
"""
Sample neighbours recursively from the head nodes, collect the features of the
sampled nodes, and return these as a list of feature arrays for the GraphSAGE
algorithm.
Args:
head_nodes: An iterable of head nodes to perform sampling on.
sampling_schema: The sampling schema for the model
Returns:
A list of feature tensors from the sampled nodes at each layer, each of shape:
``(len(head_nodes), num_sampled_at_layer, feature_size)``
where num_sampled_at_layer is the total number (cumulative product)
of nodes sampled at the given number of hops from each head node,
given the sequence of in/out directions.
"""
node_samples = self.sampler.run(
nodes=head_nodes, n=1, in_size=self.in_samples, out_size=self.out_samples
)
# Reshape node samples to sensible format
# Each 'slot' represents the list of nodes sampled from some neighbourhood, and will have a corresponding
# NN input layer. Every hop potentially generates both in-nodes and out-nodes, held separately,
# and thus the slot (or directed hop sequence) structure forms a binary tree.
node_type = sampling_schema[0][0][0]
max_hops = len(self.in_samples)
max_slots = 2 ** (max_hops + 1) - 1
features = [None] * max_slots # flattened binary tree
for slot in range(max_slots):
nodes_in_slot = list(it.chain(*[sample[slot] for sample in node_samples]))
features_for_slot = self.graph.get_feature_for_nodes(
nodes_in_slot, node_type
)
resize = -1 if np.size(features_for_slot) > 0 else 0
features[slot] = np.reshape(
features_for_slot, (len(head_nodes), resize, features_for_slot.shape[1])
)
return features
def flow(self, node_ids, targets=None, shuffle=False):
"""
Creates a generator/sequence object for training or evaluation
with the supplied node ids and numeric targets.
The node IDs are the nodes to train or inference on: the embeddings
calculated for these nodes are passed to the downstream task. These
are a subset of the nodes in the graph.
The targets are an array of numeric targets corresponding to the
supplied node_ids to be used by the downstream task. They should
be given in the same order as the list of node IDs.
If they are not specified (for example, for use in prediction),
the targets will not be available to the downstream task.
Note that the shuffle argument should be True for training and
False for prediction.
Args:
node_ids: an iterable of node IDs
targets: a 2D array of numeric targets with shape
`(len(node_ids), target_size)`
shuffle (bool): If True the node_ids will be shuffled at each
epoch, if False the node_ids will be processed in order.
Returns:
A NodeSequence object to use with the GraphSAGE model
in Keras methods ``fit_generator``, ``evaluate_generator``,
and ``predict_generator``
"""
return NodeSequence(self, node_ids, targets, shuffle=shuffle)
def flow_from_dataframe(self, node_targets, shuffle=False):
"""
Creates a generator/sequence object for training or evaluation
with the supplied node ids and numeric targets.
Args:
node_targets: a Pandas DataFrame of numeric targets indexed
by the node ID for that target.
shuffle (bool): If True the node_ids will be shuffled at each
epoch, if False the node_ids will be processed in order.
Returns:
A NodeSequence object to use with the GraphSAGE model
in Keras methods ``fit_generator``, ``evaluate_generator``,
and ``predict_generator``
"""
return NodeSequence(
self, node_targets.index, node_targets.values, shuffle=shuffle
)