Source code for stellargraph.mapper.sliding

# -*- coding: utf-8 -*-
#
#
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

__all__ = [
"SlidingFeaturesNodeGenerator",
"SlidingFeaturesNodeSequence",
]

import numpy as np
from . import Generator
from tensorflow.keras.utils import Sequence

from ..core.validation import require_integer_in_range

[docs]class SlidingFeaturesNodeGenerator(Generator):
"""
A data generator for a graph containing sequence data, created by sliding windows across the
features of each node in a graph.

.. seealso:: Model using this generator: :class:.GCN_LSTM.

Args:
G (StellarGraph): a graph instance where the node features are ordered sequence data
window_size (int): the number of sequence points included in the sliding window.
batch_size (int, optional): the number of sliding windows to include in each batch.
"""

def __init__(self, G, window_size, batch_size=1):
require_integer_in_range(window_size, "window_size", min_val=1)
require_integer_in_range(batch_size, "batch_size", min_val=1)

self.graph = G

node_type = G.unique_node_type(
"G: expected a graph with a single node type, found a graph with node types: %(found)s"
)
self._features = G.node_features(node_type=node_type)
if len(self._features.shape) == 3:
self.variates = self._features.shape[2]
else:
self.variates = None

self.window_size = window_size
self._batch_size = batch_size

[docs]    def num_batch_dims(self):
return 1

[docs]    def flow(self, sequence_iloc_slice, target_distance=None):
"""
Create a sequence object for time series prediction within the given section of the node
features.

This handles both univariate data (each node has a single associated feature vector) and
multivariate data (each node has an associated feature tensor). The features are always
sliced and indexed along the first feature axis.

Args:
sequence_iloc_slice (slice):
A slice object of the range of features from which to select windows. A slice object
is the object form of : within [...], e.g. slice(a, b) is equivalent to
the a:b in v[a:b], and slice(None, b) is equivalent to v[:b]. As
with that slicing, this parameter is inclusive in the start and exclusive in the
end.

For example, suppose the graph has feature vectors of length 10 and window_size =
3:

* passing in slice(None, None) will create 7 windows across all 10 features
starting with the features slice 0:3, then 1:4, and so on.

* passing in slice(4, 7) will create just one window, slicing the three elements
4:7.

For training, one might do a train-test split by choosing a boundary and considering
everything before that as training data, and everything after, e.g. 80% of the
features::

train_end = int(0.8 * sequence_length)
train_gen = sliding_generator.flow(slice(None, train_end))
test_gen = sliding_generator.flow(slice(train_end, None))

target_distance (int, optional):
The distance from the last element of each window to select an element to include as
a supervised training target. Note: this always stays within the slice defined by
sequence_iloc_slice.

Continuing the example above: a call like sliding_generator.flow(slice(4, 9),
target_distance=1) will yield two pairs of window and target:

* a feature window slicing 4:7 which includes the features at indices 4, 5, 6,
and then a target feature at index 7 (distance 1 from the last element of the
feature window)

* a feature window slicing 5:8 and a target feature from index 8.

Returns:
A Keras sequence that yields batches of sliced windows of features, and, optionally,
selected target values.
"""
return SlidingFeaturesNodeSequence(
self._features,
self.window_size,
self._batch_size,
sequence_iloc_slice,
target_distance,
)

class SlidingFeaturesNodeSequence(Sequence):
def __init__(
self, features, window_size, batch_size, sequence_iloc_slice, target_distance
):
if target_distance is not None:
require_integer_in_range(target_distance, "target_distance", min_val=1)

if not isinstance(sequence_iloc_slice, slice):
raise TypeError(
f"sequence_iloc_slice: expected a slice(...) object, found {type(sequence_iloc_slice).__name__}"
)

if sequence_iloc_slice.step not in (None, 1):
raise TypeError(
f"sequence_iloc_slice: expected a slice object with a step = 1, found step = {sequence_iloc_slice.step}"
)

self._features = features[:, sequence_iloc_slice, ...]
shape = self._features.shape
self._num_nodes = shape[0]
self._num_sequence_samples = shape[1]
self._num_sequence_variates = shape[2:]

self._window_size = window_size
self._target_distance = target_distance
self._batch_size = batch_size

query_length = window_size + (0 if target_distance is None else target_distance)
self._num_windows = self._num_sequence_samples - query_length + 1

# if there's not enough data to fill one window, there's a problem!
if self._num_windows <= 0:
if target_distance is None:
target_str = ""
else:
target_str = f" + target_distance={target_distance}"

total_sequence_samples = features.shape[1]
start, stop, step = sequence_iloc_slice.indices(total_sequence_samples)
# non-trivial steps aren't supported at the moment, so this doesn't need to be included
# in the message
assert step == 1

raise ValueError(
f"expected at least one sliding window of features, found a total window of size {query_length} (window_size={window_size}{target_str}) which is larger than the {self._num_sequence_samples} selected feature sample(s) (sequence_iloc_slice selected from {start} to {stop} in the sequence axis of length {total_sequence_samples})"
)

def __len__(self):
return int(np.ceil(self._num_windows / self._batch_size))

def __getitem__(self, batch_num):
first_start = batch_num * self._batch_size
last_start = min((batch_num + 1) * self._batch_size, self._num_windows)

has_targets = self._target_distance is not None

arrays = []
targets = [] if has_targets else None
for start in range(first_start, last_start):
end = start + self._window_size
arrays.append(self._features[:, start:end, ...])
if has_targets:
target_idx = end + self._target_distance - 1
targets.append(self._features[:, target_idx, ...])

this_batch_size = last_start - first_start

batch_feats = np.stack(arrays)
assert (
batch_feats.shape
== (this_batch_size, self._num_nodes, self._window_size)
+ self._num_sequence_variates
)

if has_targets:
batch_targets = np.stack(targets)
assert (
batch_targets.shape
== (this_batch_size, self._num_nodes) + self._num_sequence_variates
)
else:
batch_targets = None

return [batch_feats], batch_targets