"""Visibility Graph Community detection functions.
"""
from .globalVariables import *
import numpy as np
import networkx as nx
[docs]def createVisibilityGraph(data, times, graph_type='natural', weight=None, withsign=False):
"""Calculate adjacency matrix of visibility graph, create the networkx.Graph network.
Parameters:
data: 2d numpy.array
Data to process
times: 1d numpy.array
Times corresponding to provided data points
weight: str, Default None
Type of normalization:
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
graph_type: str, Default 'natural'
Type of the visibility graph:
"horizontal", Horizontal Visibility Graph
"natural", Natural Visibility Graph
"dual_horizontal", Dual Perspective Horizontal Visibility Graph
"dual_natural", Dual Perspective Natural Visibility Graph
withsign: boolean, Default False
Whether to return the sign of adjacency matrix,
the link from normal perspective VG is positive,
the link from reflected perspective VG is negative
Returns: tuple
Tuple of two objects:
networkx.Graph
Graph of networkx type
2d numpy.array
Adjacency matrix
Usage:
A = createVisibilityGraph(data, times)
"""
idx_nan = np.argwhere(np.isnan(data))
ndata = np.delete(data, idx_nan)
ntimes = np.delete(times, idx_nan)
if graph_type not in ['horizontal','dual_horizontal','dual_natural','natural']:
print('Unknown graph type: %s, adjust graph type to natural'%(graph_type))
else:
print('graph type is: %s' %(graph_type))
if weight not in ['time', 'tan', 'distance', None]:
print('Unknown weight type: %s, adjust weight to None'%(weight))
else:
print('weight is: %s' %(weight))
if graph_type == "horizontal":
AdMatrixOfVisibilityGraph = np.asmatrix(__getAdjacencyMatrixOfHorizontalVisibilityGraph(ndata, ntimes, weight=weight))
elif graph_type == "dual_horizontal":
AdMatrixOfVisibilityGraph = np.asmatrix(__getAdjacencyMatrixOfHorizontalVisibilityGraph_dual(ndata, ntimes, weight=weight, withsign=withsign))
elif graph_type == "dual_natural":
AdMatrixOfVisibilityGraph = np.asmatrix(__getAdjacencyMatrixOfVisibilityGraph_dual(ndata, ntimes, weight=weight, withsign=withsign))
else:
AdMatrixOfVisibilityGraph = np.asmatrix(__getAdjacencyMatrixOfVisibilityGraph(ndata, ntimes, weight=weight))
G = nx.convert_matrix.from_numpy_matrix(abs(AdMatrixOfVisibilityGraph))
labels = {}
intensity = {}
for idx, node in enumerate(G.nodes()):
labels[node] = str(np.round(ntimes[idx],2))
intensity[node] = ndata[idx]
nx.set_node_attributes(G, labels, 'timepoint')
nx.set_node_attributes(G, intensity, 'intensity')
return G,AdMatrixOfVisibilityGraph
def __getAdjacencyMatrixOfHorizontalVisibilityGraph(data, times, weight=None):
"""Calculate adjacency matrix of horizontal visibility graph.
Parameters:
data: 2d numpy.array
Data to process
times: 1d numpy.array
Times corresponding to provided data points
weight: str, Default None
Type of normalization:
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
Returns:
2d numpy.array
Adjacency matrix
Usage:
A = __getAdjacencyMatrixOfHorizontalVisibilityGraph(data, times)
"""
dimension = len(data)
A = np.zeros((dimension,dimension))
for i in range(dimension):
if i<dimension-1:
if weight == 'time':
A[i, i+1] = A[i+1, i] = abs(times[i] - times[i+1])
elif weight == 'tan':
A[i, i+1] = A[i+1, i] = abs((data[i] - data[i+1])/(times[i] - times[i+1])) + 10**(-8)
elif weight == 'distance':
A[i, i+1] = A[i+1, i] = ((data[i] - data[i+1])**2 + (times[i] - times[i+1])**2)**0.5
else:
A[i, i+1] = A[i+1, i] = 1
for j in range(i + 2, dimension):
if np.max(data[i+1:j]) < min(data[i], data[j]):
if weight == 'time':
A[i,j] = A[j,i] = abs(times[i] - times[j])
elif weight == 'tan':
A[i, j] = A[j, i] = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
elif weight == 'distance':
A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
else:
A[i, j] = A[j, i] = 1
return A
def __getAdjacencyMatrixOfHorizontalVisibilityGraph_dual(data, times, weight=None, withsign=False):
"""Calculate adjacency matrix of dual perspective horizontal visibility graph.
Parameters:
data: 2d numpy.array
Data to process
times: 1d numpy.array
Times corresponding to provided data points
weight: str, Default None
Type of normalization:
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
withsign: boolean, Default False
Whether to return the sign of adjacency matrix, the link from normal perspective VG is positive,
the link from reflected perspective VG is negative
Returns:
2d numpy.array
Adjacency matrix
Usage:
A = __getAdjacencyMatrixOfHorizontalVisibilityGraph_dual(data, times)
"""
A_posi = np.asmatrix(__getAdjacencyMatrixOfHorizontalVisibilityGraph(data, times, weight=weight))
A_nega = np.asmatrix(__getAdjacencyMatrixOfHorizontalVisibilityGraph(-data, times, weight=weight))
A_dual = np.where(A_posi >= A_nega, A_posi, -A_nega)
if withsign == True:
A_dual = A_dual
else:
A_dual = abs(A_dual)
return A_dual
def __getAdjacencyMatrixOfVisibilityGraph(data, times, weight=None):
"""Calculate adjacency matrix of natural visibility graph.
Parameters:
data: 2d numpy.array
Data to process
times: 1d numpy.array
Times corresponding to provided data points
weight: str, Default None
Type of normalization:
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
Returns:
2d numpy.array
Adjacency matrix
Usage:
A = __getAdjacencyMatrixOfVisibilityGraph(data, times)
"""
dimension = len(data)
V = (np.subtract.outer(data, data))/(np.subtract.outer(times, times) + np.identity(dimension))
A = np.zeros((dimension,dimension))
for i in range(dimension):
if i<dimension-1:
if weight == 'time':
A[i, i+1] = A[i+1, i] = abs(times[i] - times[i+1])
elif weight == 'tan':
A[i, i+1] = A[i+1, i] = abs((data[i] - data[i+1])/(times[i] - times[i+1])) + 10**(-8)
elif weight == 'distance':
A[i, i+1] = A[i+1, i] = ((data[i] - data[i+1])**2 + (times[i] - times[i+1])**2)**0.5
else:
A[i, i+1] = A[i+1, i] = 1
for j in range(i + 2, dimension):
if np.max(V[i+1:j,i]) < V[j,i]:
if weight == 'time':
A[i,j] = A[j,i] = abs(times[i] - times[j])
elif weight == 'tan':
A[i, j] = A[j, i] = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
elif weight == 'distance':
A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
else:
A[i, j] = A[j, i] = 1
return A
def __getAdjacencyMatrixOfVisibilityGraph_dual(data, times, weight=None, withsign=False):
"""Calculate adjacency matrix of dual perspective natural visibility graph.
Parameters:
data: 2d numpy.array
Data to process
times: 1d numpy.array
Times corresponding to provided data points
weight: str, Default None
Type of normalization:
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
withsign: boolean, Default False
Whether to return the sign of adjacency matrix, the link from normal perspective VG is positive,
the link from reflected perspective VG is negative
Returns:
2d numpy.array
Adjacency matrix
Usage:
A = __getAdjacencyMatrixOfVisibilityGraph_dual(data, times)
"""
A_posi = np.asmatrix(__getAdjacencyMatrixOfVisibilityGraph(data, times, weight=weight))
A_nega = np.asmatrix(__getAdjacencyMatrixOfVisibilityGraph(-data, times, weight=weight))
if min(data)<0:
A_dual = np.where(A_posi >= A_nega, A_posi, -A_nega)
else:
A_dual = A_posi
if withsign == True:
A_dual = A_dual
else:
A_dual = abs(A_dual)
return A_dual
def __optimize_cutoff(PL_dict, value_PL_list, sort_spl):
"""Calculate the optimized cutoff, which will be used to combine initial communities. This function is used internally only.
"""
L = len(sort_spl)
plvalue = [min(value_PL_list)]
for i in range (0,L-1):
n1 = sort_spl[i]
n2 = sort_spl[i+1]
plvalue.append(PL_dict[n1][n2])
plvalue.append(max(value_PL_list))
plvalue.sort()
plv_diff = [j-i for i, j in zip(plvalue[:-1], plvalue[1:])]
index = plv_diff.index(max(plv_diff))
plv_diff_copy = plv_diff[:]
tcut = plvalue[index]
while tcut > np.percentile(value_PL_list,50) and len(plv_diff)>1:
plv_diff.pop()
index = plv_diff_copy.index(max(plv_diff))
tcut = plvalue[index]
if tcut <= np.percentile(value_PL_list,50):
cut = tcut
else:
cut = None
return cut