Execute this notebook: Download locally

Resource usage of the StellarGraph class

This notebooks records the time and memory (both peak and long-term) required to construct a StellarGraph object for several datasets.

This notebook is aimed at helping contributors to the StellarGraph library itself understand how their changes affect the resource usage of the StellarGraph object.

Various measures of resource usage for several “real world” graphs of various sizes are recorded:

  • time for construction

  • memory usage of the final StellarGraph object

  • peak memory usage during StellarGraph construction (both absolute, and additional compared to the raw input data)

These are recorded both with explicit nodes (and node features if they exist), and implicit/inferred nodes.

The memory usage is recorded end-to-end. That is, the recording starts from data on disk and continues until the StellarGraph object has been constructed and other data has been cleaned up. This is important for accurately recording the total memory usage, as NumPy arrays can often share data with existing arrays in memory and so retroactive or partial (starting from data in memory) analysis can miss significant amounts of data. The parsing code in stellargraph.datasets doesn’t allow determining the memory usage of the intermediate nodes and edges input to the StellarGraph constructor, and so cannot be used here.

[3]:
import stellargraph as sg
import pandas as pd
import numpy as np

import gc
import json
import os
import timeit
import tempfile
import tracemalloc

Optional reddit data

The original GraphSAGE paper evaluated on a reddit dataset, available at http://snap.stanford.edu/graphsage/#datasets. This dataset is large (1.3GB compressed) and so there is not automatic download support for it. The following reddit_path variable controls whether and how the reddit dataset is included:

  • to ignore the dataset: set the variable to None

  • to include the dataset: download the dataset zip, decompress it, and set the variable to the decompressed directory

[4]:
reddit_path = os.path.expanduser("~/data/reddit")

Datasets

Cora

[5]:
cora = sg.datasets.Cora()
cora.download()

cora_cites_path = os.path.join(cora.data_directory, "cora.cites")
cora_content_path = os.path.join(cora.data_directory, "cora.content")
cora_dtypes = {0: int, **{i: np.float32 for i in range(1, 1433 + 1)}}


def cora_pandas_parts(include_nodes):
    if include_nodes:
        nodes = pd.read_csv(
            cora_content_path,
            header=None,
            sep="\t",
            index_col=0,
            usecols=range(0, 1433 + 1),
            dtype=cora_dtypes,
            na_filter=False,
        )
    else:
        nodes = None
    edges = pd.read_csv(
        cora_cites_path,
        header=None,
        sep="\t",
        names=["source", "target"],
        dtype=int,
        na_filter=False,
    )
    return nodes, edges, {}


def cora_indexed_array_parts(include_nodes):
    nodes, edges, args = cora_pandas_parts(include_nodes)
    if nodes is not None:
        nodes = sg.IndexedArray(nodes.to_numpy(), index=nodes.index)
    return nodes, edges, args

BlogCatalog3

[6]:
blogcatalog3 = sg.datasets.BlogCatalog3()
blogcatalog3.download()

blogcatalog3_edges = os.path.join(blogcatalog3.data_directory, "edges.csv")
blogcatalog3_group_edges = os.path.join(blogcatalog3.data_directory, "group-edges.csv")
blogcatalog3_groups = os.path.join(blogcatalog3.data_directory, "groups.csv")
blogcatalog3_nodes = os.path.join(blogcatalog3.data_directory, "nodes.csv")


def blogcatalog3_parts(include_nodes):
    if include_nodes:
        raw_nodes = pd.read_csv(blogcatalog3_nodes, header=None)[0]
        raw_groups = pd.read_csv(blogcatalog3_groups, header=None)[0]
        nodes = {
            "user": pd.DataFrame(index=raw_nodes),
            "group": pd.DataFrame(index=-raw_groups),
        }
    else:
        nodes = None

    edges = pd.read_csv(blogcatalog3_edges, header=None, names=["source", "target"])

    group_edges = pd.read_csv(
        blogcatalog3_group_edges, header=None, names=["source", "target"]
    )
    group_edges["target"] *= -1
    start = len(edges)
    group_edges.index = range(start, start + len(group_edges))

    edges = {"friend": edges, "belongs": group_edges}
    return nodes, edges, {}

FB15k

[7]:
fb15k = sg.datasets.FB15k()
fb15k.download()
fb15k_files = [
    os.path.join(fb15k.data_directory, f"freebase_mtr100_mte100-{x}.txt")
    for x in ["train", "test", "valid"]
]


def fb15k_parts(include_nodes, usecols=None):
    loaded = [
        pd.read_csv(
            name,
            header=None,
            names=["source", "label", "target"],
            sep="\t",
            dtype=str,
            na_filter=False,
            usecols=usecols,
        )
        for name in fb15k_files
    ]
    edges = pd.concat(loaded, ignore_index=True)

    if include_nodes:
        # infer the set of nodes manually, in a memory-minimal way
        raw_nodes = set(edges.source)
        raw_nodes.update(edges.target)
        nodes = pd.DataFrame(index=raw_nodes)
    else:
        nodes = None

    return nodes, edges, {"edge_type_column": "label"}


def fb15k_no_edge_types_parts(include_nodes):
    nodes, edges, _ = fb15k_parts(include_nodes, usecols=["source", "target"])
    return nodes, edges, {}

reddit

As discussed above, the reddit dataset is large and optional. It is also slow to parse, as the graph structure is a huge JSON file. Thus, we prepare the dataset by converting that JSON file into a NumPy edge list array, of shape (num_edges, 2). This is significantly faster to load from disk.

[8]:
%%time

# if requested, prepare the reddit dataset by saving the slow-to-read JSON to a temporary .npy file
if reddit_path is not None:
    reddit_graph_path = os.path.join(reddit_path, "reddit-G.json")
    reddit_feats_path = os.path.join(reddit_path, "reddit-feats.npy")

    with open(reddit_graph_path) as f:
        reddit_g = json.load(f)
    reddit_numpy_edges = np.array([[x["source"], x["target"]] for x in reddit_g["links"]])

    reddit_edges_file = tempfile.NamedTemporaryFile(suffix=".npy")
    np.save(reddit_edges_file, reddit_numpy_edges)
CPU times: user 15.9 s, sys: 1.97 s, total: 17.8 s
Wall time: 17.9 s
[9]:
def reddit_numpy_parts(include_nodes):
    if include_nodes:
        nodes = np.load(reddit_feats_path).astype(np.float32)
    else:
        nodes = None

    raw_edges = np.load(reddit_edges_file.name)
    edges = pd.DataFrame(raw_edges, columns=["source", "target"])
    return nodes, edges, {}


def reddit_pandas_parts(include_nodes):
    nodes, edges, args = reddit_numpy_parts(include_nodes)
    if nodes is not None:
        nodes = pd.DataFrame(nodes)

    return nodes, edges, args

Collected

[10]:
datasets = {
    "Cora (Pandas)": cora_pandas_parts,
    "Cora (IndexedArray)": cora_indexed_array_parts,
    "BlogCatalog3": blogcatalog3_parts,
    "FB15k (no edge types)": fb15k_no_edge_types_parts,
    "FB15k": fb15k_parts,
}
if reddit_path is not None:
    datasets["reddit (Pandas)"] = reddit_pandas_parts
    datasets["reddit (NumPy)"] = reddit_numpy_parts

Measurement

[11]:
def mem_snapshot_diff(after, before):
    """Total memory difference between two tracemalloc.snapshot objects"""
    return sum(elem.size_diff for elem in after.compare_to(before, "lineno"))
[12]:
# names of columns computed by the measurement code
def measurement_columns(title):
    names = [
        "time",
        "memory (graph)",
        "memory (graph, not shared with data)",
        "peak memory (graph)",
        "peak memory (graph, ignoring data)",
        "memory (data)",
        "peak memory (data)",
    ]
    return [(title, x) for x in names]


columns = pd.MultiIndex.from_tuples(
    [
        ("graph", "nodes"),
        ("graph", "node feat size"),
        ("graph", "edges"),
        *measurement_columns("explicit nodes"),
        *measurement_columns("inferred nodes (no features)"),
    ]
)
[13]:
def measure_time(f, include_nodes):
    nodes, edges, args = f(include_nodes)
    start = timeit.default_timer()
    sg.StellarGraph(nodes, edges, **args)
    end = timeit.default_timer()
    return end - start
[14]:
def measure_memory(f, include_nodes):
    """
    Measure exactly what it takes to load the data.

    - the size of the original edge data (as a baseline)
    - the size of the final graph
    - the peak memory use of both

    This uses a similar technique to the 'allocation_benchmark' fixture in tests/test_utils/alloc.py.
    """
    gc.collect()
    # ensure we're measuring the worst-case peak, when no GC happens
    gc.disable()

    tracemalloc.start()
    snapshot_start = tracemalloc.take_snapshot()

    nodes, edges, args = f(include_nodes)

    gc.collect()
    _, data_memory_peak = tracemalloc.get_traced_memory()
    snapshot_data = tracemalloc.take_snapshot()

    if include_nodes:
        assert nodes is not None, f
        sg_g = sg.StellarGraph(nodes, edges, **args)
    else:
        assert nodes is None, f
        sg_g = sg.StellarGraph(edges=edges, **args)

    gc.collect()
    snapshot_graph = tracemalloc.take_snapshot()

    # clean up the input data and anything else leftover, so that the snapshot
    # includes only the long-lasting data: the StellarGraph.
    del edges
    del nodes
    del args
    gc.collect()

    _, graph_memory_peak = tracemalloc.get_traced_memory()
    snapshot_end = tracemalloc.take_snapshot()
    tracemalloc.stop()

    gc.enable()

    data_memory = mem_snapshot_diff(snapshot_data, snapshot_start)
    graph_memory = mem_snapshot_diff(snapshot_end, snapshot_start)
    graph_over_data_memory = mem_snapshot_diff(snapshot_graph, snapshot_data)

    return (
        sg_g,
        graph_memory,
        graph_over_data_memory,
        graph_memory_peak,
        graph_memory_peak - data_memory,
        data_memory,
        data_memory_peak,
    )
[15]:
def measure(f):
    time_nodes = measure_time(f, include_nodes=True)
    time_no_nodes = measure_time(f, include_nodes=False)

    sg_g, *mem_nodes = measure_memory(f, include_nodes=True)
    _, *mem_no_nodes = measure_memory(f, include_nodes=False)

    feat_sizes = sg_g.node_feature_sizes()
    try:
        feat_sizes = feat_sizes[sg_g.unique_node_type()]
    except ValueError:
        pass

    return [
        sg_g.number_of_nodes(),
        feat_sizes,
        sg_g.number_of_edges(),
        time_nodes,
        *mem_nodes,
        time_no_nodes,
        *mem_no_nodes,
    ]
[16]:
%%time
recorded = [measure(f) for f in datasets.values()]
CPU times: user 28 s, sys: 7.04 s, total: 35 s
Wall time: 35.1 s
[17]:
raw = pd.DataFrame(recorded, columns=columns, index=datasets.keys())
raw
[17]:
graph explicit nodes inferred nodes (no features)
nodes node feat size edges time memory (graph) memory (graph, not shared with data) peak memory (graph) peak memory (graph, ignoring data) memory (data) peak memory (data) time memory (graph) memory (graph, not shared with data) peak memory (graph) peak memory (graph, ignoring data) memory (data) peak memory (data)
Cora (Pandas) 2708 1433 5429 0.025028 15586530 15564897 46764625 31079400 15685225 31995857 0.002037 60994 63025 251118 160985 90133 197529
Cora (IndexedArray) 2708 1433 5429 0.001163 15585170 40633 31993945 16356516 15637429 31993945 0.001545 61018 63049 251118 160985 90133 197529
BlogCatalog3 10351 {'group': 0, 'user': 0} 348459 0.020382 4635099 7428226 14146092 8477331 5668761 10805413 0.027501 4633843 7427186 14061652 8479763 5581889 10711633
FB15k (no edge types) 14951 0 592213 0.098957 3970442 2985020 25830730 10151739 15678991 25830730 0.184846 3969362 3107220 34644016 19090289 15553727 25049683
FB15k 14951 0 592213 0.610353 9793950 13398273 57650243 36747424 20902819 35792614 0.700297 9794126 13521649 57650811 36873168 20777643 35011663
reddit (Pandas) 232965 602 11606919 3.130784 665684661 665691152 1868696353 1121990320 746706033 1682947017 0.483119 106555123 106556406 375628530 189913865 185714665 185723196
reddit (NumPy) 232965 602 11606919 0.545932 665684061 104705536 1682947017 936252548 746694469 1682947017 0.475468 106555123 106556406 375628530 189913865 185714665 185723196

Pretty results

This shows the results in a prettier way, such as memory in MB instead of bytes.

[18]:
mem_columns = raw.columns[["memory" in x[1] for x in raw.columns]]

memory_mb = raw.copy()
memory_mb[mem_columns] = (memory_mb[mem_columns] / 10 ** 6).round(3)
memory_mb
[18]:
graph explicit nodes inferred nodes (no features)
nodes node feat size edges time memory (graph) memory (graph, not shared with data) peak memory (graph) peak memory (graph, ignoring data) memory (data) peak memory (data) time memory (graph) memory (graph, not shared with data) peak memory (graph) peak memory (graph, ignoring data) memory (data) peak memory (data)
Cora (Pandas) 2708 1433 5429 0.025028 15.587 15.565 46.765 31.079 15.685 31.996 0.002037 0.061 0.063 0.251 0.161 0.090 0.198
Cora (IndexedArray) 2708 1433 5429 0.001163 15.585 0.041 31.994 16.357 15.637 31.994 0.001545 0.061 0.063 0.251 0.161 0.090 0.198
BlogCatalog3 10351 {'group': 0, 'user': 0} 348459 0.020382 4.635 7.428 14.146 8.477 5.669 10.805 0.027501 4.634 7.427 14.062 8.480 5.582 10.712
FB15k (no edge types) 14951 0 592213 0.098957 3.970 2.985 25.831 10.152 15.679 25.831 0.184846 3.969 3.107 34.644 19.090 15.554 25.050
FB15k 14951 0 592213 0.610353 9.794 13.398 57.650 36.747 20.903 35.793 0.700297 9.794 13.522 57.651 36.873 20.778 35.012
reddit (Pandas) 232965 602 11606919 3.130784 665.685 665.691 1868.696 1121.990 746.706 1682.947 0.483119 106.555 106.556 375.629 189.914 185.715 185.723
reddit (NumPy) 232965 602 11606919 0.545932 665.684 104.706 1682.947 936.253 746.694 1682.947 0.475468 106.555 106.556 375.629 189.914 185.715 185.723

Execute this notebook: Download locally