🔍 Code Extractor

class Graph

Maturity: 48

A Graph class that provides an interface for interacting with a Neo4j graph database, supporting CRUD operations on nodes and relationships through Cypher queries.

File:
/tf/active/vicechatdev/neo4j_driver/neo4j_driver.py
Lines:
92 - 328
Complexity:
complex

Purpose

This class serves as a wrapper around the Neo4j Python driver, providing convenient methods for connecting to a Neo4j database and performing common operations like matching nodes by ID/UID/name, creating/updating/deleting nodes and relationships, and executing custom Cypher queries. It handles connection management, automatic reconnection on service unavailability, and provides a clean interface for working with graph entities.

Source Code

class Graph():
    """
    A Graph class for interacting with a Neo4j graph database. The Graph class has methods for running Cypher queries, matching nodes by ID, UID, and name, and matching relationships by ID.
    """
    def __init__(self, URI, auth, database=None, name=None):
        self.driver = GraphDatabase.driver(URI, auth=auth)
        self.database = database or name
        
    def close(self):
        self.driver.close()
        
    def open(self):
        self.driver.open()
        
    def __repr__(self):
        return "Graph interface bound to host %s to database '%s'" % (self.driver.initial_addresses[0], self.database)
    
    __str__ = __repr__
    
    @staticmethod
    def _get_label_strings(labels):
        if None in labels:
            return ''
        return ''.join(":" + i for i in labels)
    
    def catch_service_unavailable(func):
        @wraps(func)
        def wrapper(self,*args, **kwargs):
            try:
                return func(self,*args, **kwargs)
            except ServiceUnavailable:
                self.open()
                return func(self,*args, **kwargs)
        return wrapper
            
    @staticmethod
    def _run(tx, query, **kwargs):
        result = tx.run(query, **kwargs)
        records = list(result)
        summary = result.consume()
        return records
    
    @catch_service_unavailable
    def run(self, query, **kwargs):
        with self.driver.session(database=self.database) as session:
            result = self._run(session, query, **kwargs)
        return ResultWrapper(list(result), graph=self)
    
    @staticmethod
    def _match_by_id(tx, x, label):
        result = tx.run("MATCH (o%s) WHERE id(o) = $x RETURN o" % Graph._get_label_strings(label), x=x)
        return result.single()[0]
    
    @catch_service_unavailable
    def match_by_id(self, x, label=None):
        if not isinstance(x, int):
            try:
                x=int(x)
            except:
                raise ValueError("Failed to coerce id to type int. Element id must be of type int, passed '%s' of type %s" % (x, type(x)))
        if not isinstance(label, list):
            label=[label]
        with self.driver.session(database=self.database) as session:
            result = session.execute_read(self._match_by_id, x, label)
        return Node._from_neo4j_node(result, graph = self)
    
    @staticmethod
    def _match_by_uid(tx, uid, label):
        result = tx.run("MATCH (o%s {UID:$uid}) RETURN o" % Graph._get_label_strings(label), uid=uid)
        record = result.single()[0]
        summary = result.consume()
        return record
    
    @catch_service_unavailable
    def match_by_uid(self, uid, label=None):
        if not isinstance(label, list):
            label=[label]
        with self.driver.session(database=self.database) as session:
            result = session.execute_read(self._match_by_uid, uid, label)
        return Node._from_neo4j_node(result, graph=self)
    
    @staticmethod
    def _match_by_name(tx, name, label):
        result = tx.run("MATCH (o%s {N:$name}) RETURN n" % Graph._get_label_strings(label), name=name)
        return result.single()[0]
    
    @catch_service_unavailable
    def match_by_name(self, name, label=None):
        if not isinstance(label, list):
            label=[label]
        with self.driver.session(database=self.database) as session:
            result = session.execute_read(self._match_by_name, name, label)
        return Node._from_neo4j_node(result, graph=self)
    
    @staticmethod
    def _match_relationship_by_id(tx, x):
        result = tx.run("MATCH ()-[_]->() WHERE id(_) = $x RETURN _", x=x)
        return result.single()[0]
    
    @catch_service_unavailable
    def match_relationship_by_id(self, x):
        if not isinstance(x, int):
            try:
                x=int(x)
            except:
                raise ValueError("Failed to coerce id to type int. Element id must be of type int, passed '%s' of type %s" % (x, type(x)))
        with self.driver.session(database=self.database) as session:
            result = session.execute_read(self._match_relationship_by_id, x)
        return Relationship._from_neo4j_node(result, graph=self)
    
    @staticmethod
    def _push(tx, element_id, properties):
        result = tx.run("MATCH (o) WHERE id(o) = $x SET o = $properties", x=int(element_id), properties=properties)
        records = list(result)
        summary = result.consume()
        return records
    
    @catch_service_unavailable
    def push(self, node):
        assert node.graph, "Node is not associated with any database. Please use graph.create for new nodes, or retrieve the node from the database first."
        assert node.graph == self, "Entity bound to different database."
        assert node.element_id, "Please run graph.create when creating a node for the first time."
        items = dict(node)
        with self.driver.session(database=self.database) as session:
            session.execute_write(self._push, node.element_id, items)
        return
        
    @staticmethod
    def _node_pull(tx, ids):
        query = tx.run("MATCH (_) WHERE id(_) in $x "
                       "RETURN id(_), labels(_), properties(_)", x=ids)
        return list(query)
    
    @staticmethod
    def _relationship_pull(tx, ids):
        result = tx.run("MATCH ()-[_]->() WHERE id(_) in $x "
                       "RETURN id(_), properties(_)", x=ids)
        return list(result)
        
    @catch_service_unavailable
    def pull(self, entity):
        nodes = {}
        for node in entity.nodes:
            if node.graph == self:
                if not isinstance(node.element_id, int):
                    try:
                        node.element_id = int(node.element_id)
                    except:
                        warnings.warn("Could not coerce element id to int, skipped node %s" % node.element_id, stacklevel=5)
                        continue
                nodes[node.element_id] = node
                node._lock = True
        with self.driver.session(database=self.database) as session:
            query = session.execute_read(self._node_pull, list(nodes.keys()))
        for element_id, new_labels, new_properties in query:
            node = nodes[element_id]
            node.clear_labels()
            node.update_labels(new_labels)
            node.clear()
            node.update(new_properties)
            node._lock = False
        relationships = {}
        for relationship in entity.relationships:
            if relationship.graph == self:
                relationships[relationship.element_id] = relationship
        with self.driver.session(database=self.database) as session:
            query = session.execute_read(self._relationship_pull, list(relationships.keys()))
        for element_id, new_properties in query:
            relationship = relationships[element_id]
            relationship.clear()
            relationship.update(new_properties)
            
    
    @staticmethod
    def _create(tx, query, data):
        result = tx.run(query, data=data)
        return list(result)
            
    @catch_service_unavailable
    def create(self, entity):
        entity.graph=self #mostly to bind subgraphs
        node_dict={}
        for node in entity.nodes:
            if node:
                if not node.element_id:
                    key = frozenset(node.labels)
                    node_dict.setdefault(key, []).append(node)
        rel_dict = {}
        for relationship in entity.relationships:
            key = frozenset(relationship.labels)
            rel_dict.setdefault(key, []).append(relationship)
        for labels, nodes in node_dict.items():
            query = """
            UNWIND $data AS d
            MERGE (_%s {UID:d.UID})
            ON CREATE
                SET _ += d
            RETURN id(_)
            """ % self._get_label_strings(labels)
            with self.driver.session(database=self.database) as session:
                result = session.execute_write(self._create, query, list(map(dict, nodes)))
                for i, return_id in enumerate(result):
                    node = nodes[i]
                    node.graph = self
                    node.element_id = return_id.value()
        for labels, relationships in rel_dict.items():
            data = map(lambda r: [r.start_node.element_id, dict(r.relationship), r.end_node.element_id],
                               relationships)
            # print(list(data)) #calling prematurely exhausts the generator
            query = """
            UNWIND $data as d
            MATCH (a) WHERE id(a) = d[0]
            MATCH (b) WHERE id(b) = d[2]
            MERGE (a)-[_%s]->(b) SET _ = d[1]
            RETURN id(_)
            """ % self._get_label_strings(labels)
            with self.driver.session(database=self.database) as session:
                result = session.execute_write(self._create, query, list(data))
                for i, return_id in enumerate(result):
                    rel = relationships[i]
                    rel.graph=self
                    rel.element_id = return_id.value()
    @staticmethod
    def _delete(tx, identities):
        result = tx.run("MATCH (_) WHERE id(_) IN $x DETACH DELETE _", x=identities)
        return list(result)
               
    @catch_service_unavailable
    def delete(self, entity):
        identities = []
        for rel in entity.relationships:
            identities.append(rel.element_id)
        for node in entity.nodes:
            if node.element_id:
                identities.append(node.element_id)
        with self.driver.session(database=self.database) as session:
            session.execute_write(self._delete, identities)

Parameters

Name Type Default Kind
bases - -

Parameter Details

URI: The connection URI for the Neo4j database (e.g., 'bolt://localhost:7687' or 'neo4j://localhost:7687'). This specifies the protocol, host, and port for the database connection.

auth: Authentication credentials for the Neo4j database, typically a tuple of (username, password) or an auth object created by neo4j.basic_auth(). Required for database access.

database: Optional name of the specific database to connect to within the Neo4j instance. If not provided, falls back to the 'name' parameter or uses the default database.

name: Alternative parameter name for specifying the database name. Deprecated in favor of 'database' parameter but maintained for backward compatibility.

Return Value

Instantiation returns a Graph object that maintains a connection to the Neo4j database. Methods return various types: run() returns a ResultWrapper object, match methods return Node or Relationship objects, create/push/pull/delete methods return None or modify entities in place.

Class Interface

Methods

__init__(self, URI, auth, database=None, name=None)

Purpose: Initialize a Graph instance with connection parameters to a Neo4j database

Parameters:

  • URI: Connection URI for Neo4j database
  • auth: Authentication credentials (username, password tuple or auth object)
  • database: Optional database name to connect to
  • name: Alternative parameter for database name (deprecated)

Returns: None (constructor)

close(self)

Purpose: Close the Neo4j driver connection

Returns: None

open(self)

Purpose: Open/reopen the Neo4j driver connection

Returns: None

__repr__(self) -> str

Purpose: Return a string representation of the Graph instance showing host and database

Returns: String describing the graph interface connection

_get_label_strings(labels) -> str static

Purpose: Convert a list of labels into a Cypher-compatible label string (e.g., ':Person:Employee')

Parameters:

  • labels: List of label strings or None

Returns: Formatted label string for Cypher queries, empty string if None in labels

catch_service_unavailable(func)

Purpose: Decorator that catches ServiceUnavailable exceptions and attempts to reconnect before retrying

Parameters:

  • func: Function to wrap with exception handling

Returns: Wrapped function with automatic reconnection on service unavailability

_run(tx, query, **kwargs) -> list static

Purpose: Execute a Cypher query within a transaction and return all records

Parameters:

  • tx: Neo4j transaction object
  • query: Cypher query string to execute
  • kwargs: Parameters to pass to the query

Returns: List of result records from the query

run(self, query, **kwargs) -> ResultWrapper

Purpose: Execute a Cypher query and return results wrapped in a ResultWrapper object

Parameters:

  • query: Cypher query string to execute
  • kwargs: Parameters to pass to the query

Returns: ResultWrapper object containing query results

_match_by_id(tx, x, label) static

Purpose: Internal method to match a node by its Neo4j internal ID within a transaction

Parameters:

  • tx: Neo4j transaction object
  • x: Integer ID of the node
  • label: List of labels to filter by

Returns: Neo4j node object

match_by_id(self, x, label=None) -> Node

Purpose: Match and return a node by its Neo4j internal ID

Parameters:

  • x: Integer or string ID of the node (will be coerced to int)
  • label: Optional label or list of labels to filter by

Returns: Node object representing the matched node

_match_by_uid(tx, uid, label) static

Purpose: Internal method to match a node by its UID property within a transaction

Parameters:

  • tx: Neo4j transaction object
  • uid: UID property value to match
  • label: List of labels to filter by

Returns: Neo4j node object

match_by_uid(self, uid, label=None) -> Node

Purpose: Match and return a node by its UID property

Parameters:

  • uid: UID property value to match
  • label: Optional label or list of labels to filter by

Returns: Node object representing the matched node

_match_by_name(tx, name, label) static

Purpose: Internal method to match a node by its N (name) property within a transaction

Parameters:

  • tx: Neo4j transaction object
  • name: Name property value to match
  • label: List of labels to filter by

Returns: Neo4j node object

match_by_name(self, name, label=None) -> Node

Purpose: Match and return a node by its N (name) property

Parameters:

  • name: Name property value to match
  • label: Optional label or list of labels to filter by

Returns: Node object representing the matched node

_match_relationship_by_id(tx, x) static

Purpose: Internal method to match a relationship by its Neo4j internal ID within a transaction

Parameters:

  • tx: Neo4j transaction object
  • x: Integer ID of the relationship

Returns: Neo4j relationship object

match_relationship_by_id(self, x) -> Relationship

Purpose: Match and return a relationship by its Neo4j internal ID

Parameters:

  • x: Integer or string ID of the relationship (will be coerced to int)

Returns: Relationship object representing the matched relationship

_push(tx, element_id, properties) static

Purpose: Internal method to update a node's properties in the database within a transaction

Parameters:

  • tx: Neo4j transaction object
  • element_id: Integer ID of the node to update
  • properties: Dictionary of properties to set on the node

Returns: List of result records

push(self, node) -> None

Purpose: Update an existing node's properties in the database

Parameters:

  • node: Node object to push to the database (must have element_id and be bound to this graph)

Returns: None

_node_pull(tx, ids) -> list static

Purpose: Internal method to retrieve node data by IDs within a transaction

Parameters:

  • tx: Neo4j transaction object
  • ids: List of node IDs to retrieve

Returns: List of tuples containing (id, labels, properties) for each node

_relationship_pull(tx, ids) -> list static

Purpose: Internal method to retrieve relationship data by IDs within a transaction

Parameters:

  • tx: Neo4j transaction object
  • ids: List of relationship IDs to retrieve

Returns: List of tuples containing (id, properties) for each relationship

pull(self, entity) -> None

Purpose: Refresh an entity (node or subgraph) with the latest data from the database

Parameters:

  • entity: Entity object (Node, Relationship, or Subgraph) to refresh from database

Returns: None (updates entity in place)

_create(tx, query, data) -> list static

Purpose: Internal method to execute a create/merge query within a transaction

Parameters:

  • tx: Neo4j transaction object
  • query: Cypher query string for creating entities
  • data: List of data dictionaries to create

Returns: List of result records

create(self, entity) -> None

Purpose: Create nodes and relationships in the database, using MERGE to avoid duplicates based on UID

Parameters:

  • entity: Entity object (Node, Relationship, or Subgraph) to create in database

Returns: None (updates entity element_ids in place)

_delete(tx, identities) -> list static

Purpose: Internal method to delete nodes and relationships by ID within a transaction

Parameters:

  • tx: Neo4j transaction object
  • identities: List of element IDs to delete

Returns: List of result records

delete(self, entity) -> None

Purpose: Delete nodes and relationships from the database (performs DETACH DELETE)

Parameters:

  • entity: Entity object (Node, Relationship, or Subgraph) to delete from database

Returns: None

Attributes

Name Type Description Scope
driver neo4j.GraphDatabase.driver Neo4j driver instance used for all database connections and operations instance
database str or None Name of the specific Neo4j database to connect to, or None for default database instance

Dependencies

  • neo4j
  • functools
  • warnings
  • pandas

Required Imports

from neo4j import GraphDatabase
from neo4j.exceptions import ServiceUnavailable
from functools import wraps
import warnings

Conditional/Optional Imports

These imports are only needed under specific conditions:

from neo4j_objects import *

Condition: Required for Node, Relationship, and ResultWrapper classes that are used throughout the Graph class methods

Required (conditional)
import pandas as pd

Condition: Imported in source file but not directly used in this class; may be used by related classes or for data processing

Optional

Usage Example

from neo4j import GraphDatabase
from neo4j.auth import basic_auth

# Create a Graph instance
graph = Graph(
    URI='bolt://localhost:7687',
    auth=basic_auth('neo4j', 'password'),
    database='neo4j'
)

# Run a Cypher query
result = graph.run('MATCH (n:Person) RETURN n LIMIT 10')

# Match a node by ID
node = graph.match_by_id(123, label=['Person'])

# Match a node by UID property
node = graph.match_by_uid('unique-id-123', label=['Person'])

# Create a new node (assuming Node class is available)
from neo4j_objects import Node
new_node = Node(labels=['Person'], properties={'name': 'John', 'UID': 'john-123'})
graph.create(new_node)

# Update an existing node
node['age'] = 30
graph.push(node)

# Pull latest data from database
graph.pull(node)

# Delete a node
graph.delete(node)

# Close the connection when done
graph.close()

Best Practices

  • Always call close() when done with the Graph instance to properly release database connections
  • Use context managers or try-finally blocks to ensure connections are closed even if errors occur
  • The class automatically handles ServiceUnavailable exceptions and attempts to reconnect, but persistent connection issues should be investigated
  • Nodes and relationships must have a UID property for proper MERGE operations in create()
  • Before calling push(), ensure the node has been created in the database (has an element_id)
  • The pull() method locks nodes during update to prevent concurrent modifications
  • Label parameters can be passed as a single string or list of strings; they are automatically converted to lists internally
  • Element IDs must be integers; the class attempts to coerce string IDs to integers but will raise ValueError if coercion fails
  • When creating entities, the graph property is automatically set on nodes and relationships
  • The delete() method performs DETACH DELETE, which removes all relationships before deleting nodes

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class Neo4jManager 74.9% similar

    A manager class that provides a high-level interface for interacting with Neo4j graph databases, handling connections, queries, node creation, and relationship management.

    From: /tf/active/vicechatdev/QA_updater/knowledge_store/neo4j_manager.py
  • class Node 66.8% similar

    A Node class representing a graph node with labels and properties, designed to work with Neo4j graph databases. It extends PropertyDict to manage node properties and provides methods for label management and graph synchronization.

    From: /tf/active/vicechatdev/neo4j_driver/neo4j_objects.py
  • class Relationship 64.3% similar

    A class representing a graph relationship between two nodes, wrapping a _Relationship object with start and end Node objects.

    From: /tf/active/vicechatdev/neo4j_driver/neo4j_objects.py
  • class Subgraph 59.5% similar

    A class representing a graph subgraph containing nodes and relationships, with support for set operations and graph binding/unbinding.

    From: /tf/active/vicechatdev/neo4j_driver/neo4j_objects.py
  • function run_query_v2 57.8% similar

    Executes a Cypher query against a Neo4j graph database and returns the results as a list of dictionaries.

    From: /tf/active/vicechatdev/neo4j_schema/neo4j_python_snippets.py
← Back to Browse