import io
import re
import sys
import numpy as np
import json
import gzip
import os.path
import warnings
from collections import OrderedDict
import numpy as np
[docs]class NumpyEncoder(json.JSONEncoder):
""" Custom encoder for numpy data types """
[docs] def default(self, obj):
if isinstance(obj, (np.int_, np.intc, np.intp, np.int8,
np.int16, np.int32, np.int64, np.uint8,
np.uint16, np.uint32, np.uint64)):
return int(obj)
elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)):
return float(obj)
elif isinstance(obj, (np.complex_, np.complex64, np.complex128)):
return {'real': obj.real, 'imag': obj.imag}
elif isinstance(obj, (np.ndarray,)):
return obj.tolist()
elif isinstance(obj, (np.bool_)):
return bool(obj)
elif isinstance(obj, (np.void)):
return None
return json.JSONEncoder.default(self, obj)
[docs]class JGFParseError(Exception):
pass
[docs]class JGFParseWarning(UserWarning):
pass
_nonMetaNodeAttributes = set(["type","label","metadata"])
_nonMetaEdgeAttributes = set(["type","label","directed","relation","metadata"])
_nonMetaGraphAttributes = set(["id","type","label","metadata"])
def _JGFAddNodeAttribute(graph,key,values):
isDictionary = isinstance(values,dict) ;
if("nodes" in graph and (isDictionary or len(values)==len(graph["nodes"]))):
if(isDictionary):
indices = values.keys()
else:
indices = range(len(values))
jsonNodes = graph["nodes"]
if(key in _nonMetaNodeAttributes):
for nodeIndex in indices:
jsonNodes[str(nodeIndex)][key] = values[nodeIndex]
else:
for nodeIndex in indices:
nodeEntry = jsonNodes[str(nodeIndex)]
if ("metadata" not in nodeEntry):
nodeEntry["metadata"] = OrderedDict()
nodeEntry["metadata"][key] = values[nodeIndex]
else:
raise ValueError("Node properties must have same length as node-count.")
def _JGFAddEdgeAttribute(graph,key,values):
isDictionary = isinstance(values,dict) ;
if("edges" in graph and (isDictionary or len(values)==len(graph["edges"]))):
if(isinstance(values,dict)):
indices = values.keys()
else:
indices = range(len(values))
jsonEdges = graph["edges"]
if(key in _nonMetaEdgeAttributes):
for edgeIndex in indices:
jsonEdges[str(edgeIndex)][key] = values[edgeIndex]
else:
for edgeIndex in range(len(values)):
edgeEntry = jsonEdges[edgeIndex]
if ("metadata" not in edgeEntry):
edgeEntry["metadata"] = OrderedDict()
edgeEntry["metadata"][key] = values[edgeIndex]
else:
raise ValueError("Edge properties must have same length as edges.")
def _JGFAddGraphAttribute(graph,key,value):
if(key in _nonMetaGraphAttributes):
graph[key] = value
else:
if ("metadata" not in graph):
graph["metadata"] = OrderedDict()
graph["metadata"][key] = value
def _convertToJGFEntry(graph):
graphJSON = OrderedDict()
if("label" in graph):
_JGFAddGraphAttribute(graphJSON,"label",graph["label"]);
if("directed" in graph):
graphJSON["directed"] = graph["directed"]
if("network-properties" in graph):
networkProperties = graph["network-properties"]
for key,value in networkProperties.items():
_JGFAddGraphAttribute(graphJSON,key,value)
if("node-count" in graph and graph["node-count"]>0):
nodeCount = graph["node-count"]
graphJSON["nodes"] = OrderedDict((str(nodeIndex),OrderedDict()) for nodeIndex in range(nodeCount))
else:
raise ValueError("graph must contain node-count property")
if("edges" in graph and len(graph["edges"])>0):
graphJSON["edges"] = [{"source":str(fromNodeIndex),"target":str(toNodeIndex)} for edgeIndex,(fromNodeIndex,toNodeIndex) in enumerate(graph["edges"])]
if("node-properties" in graph):
nodeProperties = graph["node-properties"]
for key,values in nodeProperties.items():
_JGFAddNodeAttribute(graphJSON,key,values)
if("edge-properties" in graph):
edgeProperties = graph["edge-properties"]
for key,values in edgeProperties.items():
_JGFAddEdgeAttribute(graphJSON,key,values)
return graphJSON
def _convertFromJGFEntry(graphJSON):
graph = OrderedDict()
hasNodes = "nodes" in graphJSON
hasEdges = "edges" in graphJSON
if(not hasNodes and hasEdges):
raise JGFParseError("JGF files must contain 'nodes' if 'edges' is defined.")
directed = False
if("directed" in graphJSON):
directed = graphJSON["directed"]
else:
warnings.warn("'directed' was not found in the JGF file. This parser only works for fully directed/undirected networks. The whole network is now considered undirected.", JGFParseWarning)
graphProperties = OrderedDict()
if("label" in graphJSON): # root graph property
graph["label"] = graphJSON["label"]
if("id" in graphJSON):
graphProperties["id"] = graphJSON["id"]
if("type" in graphJSON):
graphProperties["type"] = graphJSON["type"]
graph["directed"] = directed
if(hasNodes):
graph["node-count"] = len(graphJSON["nodes"])
if("metadata" in graphJSON):
for key, value in graphJSON["metadata"].items():
graphProperties[key] = value
if(len(graphProperties)>0):
graph["network-properties"] = graphProperties
nodeProperties = OrderedDict()
nodeIDToIndex = OrderedDict()
nodeIndexToID = []
digitIndices = True
if(hasNodes):
for nodeID, nodeData in graphJSON["nodes"].items():
if(nodeID not in nodeIDToIndex):
nodeIDToIndex[nodeID] = len(nodeIndexToID)
nodeIndexToID.append(nodeID)
if(not nodeID.isdigit()):
digitIndices = False
for key,value in nodeData.items():
if(key!="metadata"):
if(key not in nodeProperties):
nodeProperties[key] = OrderedDict()
nodeProperties[key][nodeID] = value
if("metadata" in nodeData):
for key,value in nodeData["metadata"].items():
if(key not in nodeProperties):
nodeProperties[key] = OrderedDict()
nodeProperties[key][nodeID] = value
contiguosIndices = False
if(digitIndices):
IDs = [int(ID) for ID in nodeIndexToID]
if(max(IDs)==len(IDs) and min(IDs) == 0):
contiguosIndices = True #No need for IDs
nodeIDToIndex = [str(ID) for ID in range(len(IDs))]
nodeIndexToID = OrderedDict((str(ID),ID) for ID in range(len(IDs)))
for key in nodeProperties:
if(len(nodeProperties[key]) == len(nodeIDToIndex)):
#all nodes have property so convert it to an array
nodeProperties[key] = [nodeProperties[key][ID] for ID in nodeIDToIndex]
else:
#some nodes do not have this property, so reindex the dictionary
nodeProperties[key] = OrderedDict((nodeIDToIndex[ID],value) for ID,value in nodeProperties[key].items())
if(contiguosIndices):
nodeProperties["ID"] = nodeIndexToID
edges = []
edgeProperties = OrderedDict()
if(hasEdges):
for edgeIndex, edgeData in enumerate(graphJSON["edges"]):
hasSource = "source" in edgeData
hasTarget = "target" in edgeData
if(not hasSource or not hasTarget):
raise JGFParseError("Edges must contain both source and target attributes.")
sourceID = edgeData["source"]
targetID = edgeData["target"]
if(sourceID not in nodeIDToIndex or targetID not in nodeIDToIndex):
raise JGFParseError("Node %s is not present in the list of nodes. The file was not loaded."%sourceID)
sourceIndex = nodeIDToIndex[sourceID]
targetIndex = nodeIDToIndex[targetID]
edges.append((sourceIndex,targetIndex))
for key,value in edgeData.items():
if(key!="metadata" and key!="source" and key!="target"):
if(key not in edgeProperties):
edgeProperties[key] = OrderedDict()
edgeProperties[key][edgeIndex] = value
if("metadata" in edgeData):
for key,value in edgeData["metadata"].items():
if(key not in edgeProperties):
edgeProperties[key] = OrderedDict()
edgeProperties[key][edgeIndex] = value
for key in edgeProperties:
if(len(edgeProperties[key]) == len(edges)):
#all nodes have property so convert it to an array
edgeProperties[key] = [edgeProperties[key][ID] for ID in edgeProperties[key]]
else:
#some nodes do not have this property, so reindex it
edgeProperties[key] = OrderedDict((ID,value) for ID,value in edgeProperties[key].items())
if(len(edges)>0):
graph["edges"] = edges
if(len(nodeProperties)>0):
graph["node-properties"] = nodeProperties
if(len(edgeProperties)>0):
graph["edge-properties"] = edgeProperties
return graph
[docs]def load(filename='',compressed=None):
"""
Loads a JGF(Z) – Json Graph Format (gZipped) – file and converts it
to a list of JXNF – Json compleX Network Format – dictionaries.
Parameters
----------
filename : str or file handle
Path to the file or a file handle to be used as input.
compressed : bool
If true, the input file will be interpreted as being compressed.
If not provided, this will be guessed from the file extension.
Use '.jgfz' for compressed files.
Returns
-------
out : List of OrderedDict
Data readed from the JGF(Z) file formatted as a list of JXNF
dictionaries.
"""
shallCleanupHandler = False;
if(isinstance(filename, str)):
shallCleanupHandler = True
if(compressed is None):
fileExtension = os.path.splitext(filename)[1]
if(fileExtension==".jgfz"):
compressed = True
else:
compressed = False
if(compressed):
filehandler = gzip.open(filename,"rt")
else:
filehandler = open(filename,"rt")
else:
shallCleanupHandler=False
if(compressed is None):
compressed = False
filehandler = filename
importJSON = json.load(filehandler,object_pairs_hook=OrderedDict)
if("graph" in importJSON):
graphJSONArray = [importJSON["graph"]]
elif("graphs" in importJSON):
graphJSONArray = importJSON["graphs"]
else:
graphJSONArray = []
graphs = [_convertFromJGFEntry(graphJSON) for graphJSON in graphJSONArray]
if(shallCleanupHandler):
filehandler.close()
return graphs
[docs]def save(graphs,filename="",compressed=None):
"""
Writes a list of JXNF – Json compleX Network Format – dictionaries to
a JGF(Z) – Json Graph Format (gZipped) – file.
Parameters
----------
graphs : list of dict
List of dictionaries in JXNF.
filename : str or file handle
Path to the file or a file handle to be used as output.
compressed : bool
If true, the input file will be interpreted as being compressed.
If not provided, this will be guessed from the file extension.
Use '.jgfz' for compressed files.
"""
shallCleanupHandler = False;
if(isinstance(filename, str)):
shallCleanupHandler = True
if(compressed is None):
fileExtension = os.path.splitext(filename)[1]
if(fileExtension==".jgfz"):
compressed = True
else:
compressed = False
if(compressed):
filehandler = gzip.open(filename,"wt")
else:
filehandler = open(filename,"wt")
else:
shallCleanupHandler=False
if(compressed is None):
compressed = False
filehandler = filename
if(not isinstance(graphs, list)):
if(isinstance(graphs, dict)):
graphs = [graphs]
else:
raise TypeError(f"Argument graphs must be of type dict or a list of dicts, not {type(graphs)}")
exportGraphs = []
for graph in graphs:
exportGraphs.append(_convertToJGFEntry(graph))
exportJSON={}
if(len(graphs)==1):
exportJSON["graph"] = exportGraphs[0]
else:
exportJSON["graphs"] = exportGraphs
json.dump(exportJSON,filehandler,cls=NumpyEncoder)
if(shallCleanupHandler):
filehandler.close()