Source code for stellargraph.mapper.sliding

# -*- coding: utf-8 -*-
#
# Copyright 2020 Data61, CSIRO
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = [
    "SlidingFeaturesNodeGenerator",
    "SlidingFeaturesNodeSequence",
]

import numpy as np
from . import Generator
from tensorflow.keras.utils import Sequence

from ..core.validation import require_integer_in_range


[docs]class SlidingFeaturesNodeGenerator(Generator): """ A data generator for a graph containing sequence data, created by sliding windows across the features of each node in a graph. .. seealso:: Model using this generator: :class:`.GCN_LSTM`. Args: G (StellarGraph): a graph instance where the node features are ordered sequence data window_size (int): the number of sequence points included in the sliding window. batch_size (int, optional): the number of sliding windows to include in each batch. """ def __init__(self, G, window_size, batch_size=1): require_integer_in_range(window_size, "window_size", min_val=1) require_integer_in_range(batch_size, "batch_size", min_val=1) self.graph = G node_type = G.unique_node_type( "G: expected a graph with a single node type, found a graph with node types: %(found)s" ) self._features = G.node_features(node_type=node_type) if len(self._features.shape) == 3: self.variates = self._features.shape[2] else: self.variates = None self.window_size = window_size self._batch_size = batch_size
[docs] def num_batch_dims(self): return 1
[docs] def flow(self, sequence_iloc_slice, target_distance=None): """ Create a sequence object for time series prediction within the given section of the node features. This handles both univariate data (each node has a single associated feature vector) and multivariate data (each node has an associated feature tensor). The features are always sliced and indexed along the first feature axis. Args: sequence_iloc_slice (slice): A slice object of the range of features from which to select windows. A slice object is the object form of ``:`` within ``[...]``, e.g. ``slice(a, b)`` is equivalent to the ``a:b`` in ``v[a:b]``, and ``slice(None, b)`` is equivalent to ``v[:b]``. As with that slicing, this parameter is inclusive in the start and exclusive in the end. For example, suppose the graph has feature vectors of length 10 and ``window_size = 3``: * passing in ``slice(None, None)`` will create 7 windows across all 10 features starting with the features slice ``0:3``, then ``1:4``, and so on. * passing in ``slice(4, 7)`` will create just one window, slicing the three elements ``4:7``. For training, one might do a train-test split by choosing a boundary and considering everything before that as training data, and everything after, e.g. 80% of the features:: train_end = int(0.8 * sequence_length) train_gen = sliding_generator.flow(slice(None, train_end)) test_gen = sliding_generator.flow(slice(train_end, None)) target_distance (int, optional): The distance from the last element of each window to select an element to include as a supervised training target. Note: this always stays within the slice defined by ``sequence_iloc_slice``. Continuing the example above: a call like ``sliding_generator.flow(slice(4, 9), target_distance=1)`` will yield two pairs of window and target: * a feature window slicing ``4:7`` which includes the features at indices 4, 5, 6, and then a target feature at index 7 (distance 1 from the last element of the feature window) * a feature window slicing ``5:8`` and a target feature from index 8. Returns: A Keras sequence that yields batches of sliced windows of features, and, optionally, selected target values. """ return SlidingFeaturesNodeSequence( self._features, self.window_size, self._batch_size, sequence_iloc_slice, target_distance, )
class SlidingFeaturesNodeSequence(Sequence): def __init__( self, features, window_size, batch_size, sequence_iloc_slice, target_distance ): if target_distance is not None: require_integer_in_range(target_distance, "target_distance", min_val=1) if not isinstance(sequence_iloc_slice, slice): raise TypeError( f"sequence_iloc_slice: expected a slice(...) object, found {type(sequence_iloc_slice).__name__}" ) if sequence_iloc_slice.step not in (None, 1): raise TypeError( f"sequence_iloc_slice: expected a slice object with a step = 1, found step = {sequence_iloc_slice.step}" ) self._features = features[:, sequence_iloc_slice, ...] shape = self._features.shape self._num_nodes = shape[0] self._num_sequence_samples = shape[1] self._num_sequence_variates = shape[2:] self._window_size = window_size self._target_distance = target_distance self._batch_size = batch_size query_length = window_size + (0 if target_distance is None else target_distance) self._num_windows = self._num_sequence_samples - query_length + 1 # if there's not enough data to fill one window, there's a problem! if self._num_windows <= 0: if target_distance is None: target_str = "" else: target_str = f" + target_distance={target_distance}" total_sequence_samples = features.shape[1] start, stop, step = sequence_iloc_slice.indices(total_sequence_samples) # non-trivial steps aren't supported at the moment, so this doesn't need to be included # in the message assert step == 1 raise ValueError( f"expected at least one sliding window of features, found a total window of size {query_length} (window_size={window_size}{target_str}) which is larger than the {self._num_sequence_samples} selected feature sample(s) (sequence_iloc_slice selected from {start} to {stop} in the sequence axis of length {total_sequence_samples})" ) def __len__(self): return int(np.ceil(self._num_windows / self._batch_size)) def __getitem__(self, batch_num): first_start = batch_num * self._batch_size last_start = min((batch_num + 1) * self._batch_size, self._num_windows) has_targets = self._target_distance is not None arrays = [] targets = [] if has_targets else None for start in range(first_start, last_start): end = start + self._window_size arrays.append(self._features[:, start:end, ...]) if has_targets: target_idx = end + self._target_distance - 1 targets.append(self._features[:, target_idx, ...]) this_batch_size = last_start - first_start batch_feats = np.stack(arrays) assert ( batch_feats.shape == (this_batch_size, self._num_nodes, self._window_size) + self._num_sequence_variates ) if has_targets: batch_targets = np.stack(targets) assert ( batch_targets.shape == (this_batch_size, self._num_nodes) + self._num_sequence_variates ) else: batch_targets = None return [batch_feats], batch_targets