'''Visualization functions'''
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.patheffects as path_effects
import matplotlib.patches
import matplotlib.collections
from matplotlib import cm
import numpy as np
from .globalVariables import *
from . import (visibilityGraphAuxiliaryFunctions,
clusteringFunctions,
coreFunctions,
extendedDataFrame,
visibilityGraphCommunityDetection,
utilityFunctions)
[docs]def makeDataHistograms(df, saveDir, dataName, figsize=(8,8), range_min=np.min, range_max=np.max, includeTitle=True, title='Data @ timePoint:', fontsize=8, fontcolor='b', N_bins=100, color='b', extension='.png', dpi=300):
"""Make a histogram for each Series (time point) in a Dataframe.
Parameters:
df: pandas.DataFrame
Data to visualize
saveDir: str
Path of directories to save the object to
dataName: str
Label to include in the file name
figsize: tuple, Default (8,8)
Size of the figure in inches
range_min: str, Default
How to determine data minimum
range_max: int, float or function, Default
How to determine data maximum
includeTitle: boolean, Default True
Path of directories to save the object to
title: str, Default 'Data @ timePoint:'
Text of the title
fontsize: str, Default 8
Fontsize of the labels
fontcolor: str, Default 'b'
Color of the title font
N_bins: int, Default 100
Number of bins in the histogram
color: str, Default 'b'
Color of the bars
extension: str, Default '.png'
Path of directories to save the object to
dpi: int, Default 300
Figure resolution if rasterized
Returns:
None
Usage:
makeDataHistograms(df, '/dir1', 'myData')
"""
if not os.path.exists(saveDir):
os.makedirs(saveDir)
for timePoint in df.columns[:]:
label = str(timePoint)
subset = df[timePoint]
subset = subset[~np.isnan(subset.values)].values
if type(range_min) is types.FunctionType:
range_min = range_min(subset)
if type(range_max) is types.FunctionType:
range_max = range_max(subset)
hist_of_subset = scipy.stats.rv_histogram(np.histogram(subset, bins=N_bins, range=(range_min,range_max)))
hist_data = hist_of_subset._hpdf / N_bins
hist_bins = hist_of_subset._hbins
fig, ax = plt.subplots(figsize=figsize)
bar_bin_width = range_max / N_bins
ax.bar(hist_bins, hist_data[:-1], width=0.9 * bar_bin_width, color=color, align='center')
if includeTitle:
ax.set_title(title + label, fontdict={'color': fontcolor})
ax.set_xlabel('Values', fontsize=fontsize)
ax.set_ylabel('Density', fontsize=fontsize)
ax.set_xlim(range_min - 0.5 * bar_bin_width, range_max + 0.5 * bar_bin_width)
fig.tight_layout()
saveFigure(fig, saveDir, dataName + '_' + label + '_histogram', extension, dpi)
return None
[docs]def makeLombScarglePeriodograms(df, saveDir, dataName, minNumberOfNonzeroPoints=5, oversamplingRate=100, figsize=(5,5), title1='TimeSeries Data', title2='Lomb-Scargle periodogram' , extension='.png', dpi=300):
"""Make a combined plot of the signal and its Lomb-Scargle periodogram
for each pandas Series (time point) in a Dataframe.
Parameters:
df: pandas.DataFrame
Data to visualize
saveDir: str
Path of directories to save the object to
dataName: str
Label to include in the file name
minNumberOfNonzeroPoints: int, Default 5
Minimum number of non-zero points in signal to use it
oversamplingRate: int, Default 100
Periodogram oversampling rate
figsize: tuple, Default (8,8)
Size of the figure in inches
title1: str, Default 'TimeSeries Data'
Text of the upper title
title2: str, Default 'Lomb-Scargle periodogram'
Text of the lower title
extension: str, Default '.png'
Path of directories to save the object to
dpi: int, Default 300
Figure resolution if rasterized
Returns:
None
Usage:
makeLombScarglePeriodograms(df, '/dir1', 'myData')
"""
if not os.path.exists(saveDir):
os.makedirs(saveDir)
for geneIndex in range(len(df.index[:])):
geneName = df.index[geneIndex].replace(':', '_')
subset = df.iloc[geneIndex]
subset = subset[subset > 0.]
setTimes, setValues, inputSetTimes = subset.index.values, subset.values, df.columns.values
if len(subset) < minNumberOfNonzeroPoints:
print(geneName, ' skipped (only %s non-zero point%s), ' % (len(subset), 's' if len(subset) != 1 else ''), end=' ', flush=True)
continue
pgram = coreFunctions.LombScargle(setTimes, setValues, inputSetTimes, OversamplingRate=oversamplingRate)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize)
ax1.plot(setTimes, setValues, 'bo', linewidth=3, markersize=5, markeredgecolor='k', markeredgewidth=2)
zero_points = np.array(list(set(inputSetTimes) - set(setTimes)))
ax1.plot(zero_points, np.zeros((len(zero_points),)), 'ro', linewidth=3, markersize=3, markeredgecolor='k', markeredgewidth=0)
ax1.set_aspect('auto')
minTime = np.min(inputSetTimes)
maxTime = np.max(inputSetTimes)
extraTime = (maxTime - minTime) / 10
ax1.set_xlim(minTime - extraTime, maxTime + extraTime)
ax1.set_title(title1)
ax2.plot(2 * np.pi * pgram[0], pgram[1], 'r-', linewidth=1)
ax2.set_aspect('auto')
ax2.set_title(title2)
fig.tight_layout()
saveFigure(fig, saveDir, dataName + '_' + geneName + '_Lomb_Scargle_periodogram', extension, dpi)
return None
[docs]def addVisibilityGraph(data, times, dataName='G1S1', coords=[0.05,0.95,0.05,0.95],
numberOfVGs=1, groups_ac_colors=['b'], fig=None, numberOfCommunities=6, printCommunities=False,
fontsize=None, nodesize=None, level=0.55, commLineWidth=0.5, lineWidth=1.0,
withLabel=True, withTitle=False, layout='circle', radius=0.07, noplot=False, horizontal=False, communities=None, minNumberOfCommunities=2, communitiesMethod='betweenness_centrality', direction='left', weight='distance'):
"""Draw a Visibility graph of data on a provided Matplotlib figure.
We represent each timepoint in a series as a node.
Temporal events are detected and indicated with solid blue
lines encompassing groups of points, or communities.
The shortest path identifies nodes (i.e. timepoints) that display high
intensity, and thus dominate the global signal profile, are robust
to noise, and are likely drivers of the global temporal behavior.
Parameters:
data: 2d numpy.array
Array of data to visualize
times: 1d numpy.array
Times corresponding to each data point, used for labels
dataName: str, Default 'G1S1'
label to include in file name
coords: list, Default [0.05,0.95,0.05,0.95]
Coordinates of location of the plot on the figure
numberOfVGs: int, Default 1
Number of plots to add to this figure
groups_ac_colors: list, Default ['b']
Colors corresponding to different groups of graphs
fig: matplotlib.figure, Default None
Figure object
numberOfCommunities: int, Default 6
Number of communities
printCommunities: boolean, Default False
Whether to print communities details to screen
fontsize: float, Default None
Size of labels
nodesize: float, Default None
Size of nodes
level: float, Default 0.55
Distance of the community lines to nodes
commLineWidth: float, Default 0.5
Width of the community lines
lineWidth: float, Default 1.0
Width of the edges between nodes
withLabel: boolean, Default True
Whether to include label on plot
withTitle: boolean, Default False
Whether to include title on plot
layout: str, Default 'circle'
Type of the layout. Other option is 'line'
radius: float, Default 0.07
Radius of the circle
noplot: boolean, Default False
Whether to make a plot or only calculate communities
horizontal: boolean, Default False
Whether to use horizontal or natural visibility graph.
communities: tuple, Default None
A tuple containing communities sturcture of network, and networkx Graph:
List of list, e.g. [[],[],...]
networkx.Graph
minNumberOfCommunities: int, Default 2
Number of communities to find depends on the number of splits.
This parameter is ignored in methods that automatically
estimate optimal number of communities.
communitiesMethod: str, Default 'betweenness_centrality'
String defining the method to use for communitiy detection:
'Girvan_Newman': edge betweenness centrality based approach
'betweenness_centrality': reflected graph node betweenness centrality based approach
'WDPVG': weighted dual perspective visibility graph method (note to also set weight variable)
direction:str, default 'left'
The direction that nodes aggregate to communities:
None: no specific direction, e.g. both sides.
'left': nodes can only aggregate to the left side hubs, e.g. early hubs
'right': nodes can only aggregate to the right side hubs, e.g. later hubs
weight: str, Default 'distance'
Type of weight for communitiesMethod='WDPVG':
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
Returns:
tuple
(graph_nx, data, communities)
Usage:
addVisibilityGraph(exampleData, exampleTimes, fig=fig, fontsize=16, nodesize=700,
level=0.85, commLineWidth=3.0, lineWidth=2.0, withLabel=False)
"""
if len(data.shape)>1:
data = extendedDataFrame.DataFrame(data=data).imputeMissingWithMedian().apply(lambda data: np.sum(data[data > 0.0]) / len(data), axis=0).values
if communities is None:
communities, graph_nx = clusteringFunctions.getCommunitiesOfTimeSeries(data, times, minNumberOfCommunities=minNumberOfCommunities, horizontal=horizontal, method=communitiesMethod, direction=direction, weight=weight)
else:
communities, graph_nx = communities
if printCommunities:
print('Communities:')
[print(community) for community in communities]
print('\n')
if noplot:
return graph_nx, data, communities
group = int(dataName[:dataName.find('S')].strip('G'))
if fontsize is None:
fontsize = 4. * (8. + 5.) / (numberOfVGs + 5.)
if nodesize is None:
nodesize = 30. * (8. + 5.) / (numberOfVGs + 5.)
(x1,x2,y1,y2) = coords
axisVG = fig.add_axes([x1,y1,x2 - x1,y2 - y1])
cmap = matplotlib.colors.LinearSegmentedColormap.from_list('GR', [(0, 1, 0), (1, 0, 0)], N=1000)
if layout=='line':
pos = {i:[float(i)/float(len(graph_nx)), 0.5] for i in range(len(graph_nx))}
else:
pos = nx.circular_layout(graph_nx)
keys = np.array(list(pos.keys())[::-1])
values = np.array(list(pos.values()))
values = (values - np.min(values, axis=0))/(np.max(values, axis=0)-np.min(values, axis=0))
keys = np.roll(keys, np.argmax(values.T[1]) - np.argmin(keys))
pos = dict(zip(keys, values))
keys = np.array(list(pos.keys()))
values = np.array(list(pos.values()))
shortest_path = nx.shortest_path(graph_nx, source=min(keys), target=max(keys))
shortest_path_edges = [(shortest_path[i],shortest_path[i + 1]) for i in range(len(shortest_path) - 1)]
if layout=='line':
for edge in graph_nx.edges:
l = np.array(pos[edge[0]])
r = np.array(pos[edge[1]])
if edge in shortest_path_edges:
axisVG.add_artist(matplotlib.patches.Wedge((l+r)/2., 0.5*np.sqrt((l-r)[0]*(l-r)[0]+(l-r)[1]*(l-r)[1]), 0, 180, fill=False, edgecolor='y', linewidth=0.5*3.*lineWidth, alpha=0.7, width=0.001))
axisVG.add_artist(matplotlib.patches.Wedge((l+r)/2., 0.5*np.sqrt((l-r)[0]*(l-r)[0]+(l-r)[1]*(l-r)[1]), 0, 180, fill=False, edgecolor='k', linewidth=0.5*lineWidth, alpha=0.7, width=0.001))
nx.draw_networkx(graph_nx, pos=pos, ax=axisVG, node_color='y', edge_color='y', node_size=nodesize * 1.7, width=0., nodelist=shortest_path, edgelist=shortest_path_edges, with_labels=False)
nx.draw_networkx(graph_nx, pos=pos, ax=axisVG, node_color=data, cmap=cmap, alpha=1.0, font_size=fontsize, width=0., font_color='k', node_size=nodesize)
else:
nx.draw_networkx(graph_nx, pos=pos, ax=axisVG, node_color='y', edge_color='y', node_size=nodesize * 1.7, width=3.0*lineWidth, nodelist=shortest_path, edgelist=shortest_path_edges, with_labels=False)
nx.draw_networkx(graph_nx, pos=pos, ax=axisVG, node_color=data, cmap=cmap, alpha=1.0, font_size=fontsize, width=lineWidth, font_color='k', node_size=nodesize)
if layout=='line':
xmin, xmax = (-1.,1.)
ymin, ymax = (-1.,1.)
else:
xmin, xmax = axisVG.get_xlim()
ymin, ymax = axisVG.get_ylim()
X, Y = np.meshgrid(np.arange(xmin, xmax, (xmax - xmin) / 300.), np.arange(ymin, ymax, (ymax - ymin) / 300.))
def smooth(Z, N=7.):
for ix in range(1,Z.shape[0]-1,1):
Z[ix] = ((N-1.)*Z[ix] + (Z[ix-1] + Z[ix+1])/2.)/N
return Z
for icommunity, community in enumerate(communities):
Z = np.exp(X ** 2 - Y ** 2) * 0.
nX, nY = tuple(np.array([pos[node] for node in community]).T)
for i in range(len(community)-1):
p1, p2 = np.array([nX[i], nY[i]]), np.array([nX[i+1], nY[i+1]])
for j in range(-2, 32):
pm = p1 + (p2-p1)*float(j)/30.
Z[np.where((X-pm[0])**2+(Y-pm[1])**2<=radius**2)] = 1.
for _ in range(20):
Z = smooth(smooth(Z).T).T
CS = axisVG.contour(X, Y, Z, [level], linewidths=commLineWidth, alpha=0.8, colors=groups_ac_colors[group - 1])
#axisVG.clabel(CS, inline=True,fontsize=4,colors=group_colors[group-1], fmt ={level:'C%s'%icommunity})
if layout=='line':
axisVG.set_xlim(-0.1,1.)
axisVG.set_ylim(-0.1,1.)
axisVG.spines['left'].set_visible(False)
axisVG.spines['right'].set_visible(False)
axisVG.spines['top'].set_visible(False)
axisVG.spines['bottom'].set_visible(False)
axisVG.set_xticklabels([])
axisVG.set_yticklabels([])
axisVG.set_xticks([])
axisVG.set_yticks([])
if withLabel:
axisVG.text(axisVG.get_xlim()[1], (axisVG.get_ylim()[1] + axisVG.get_ylim()[0]) * 0.5, dataName, ha='left', va='center',
fontsize=8).set_path_effects([path_effects.Stroke(linewidth=0.4, foreground=groups_ac_colors[group - 1]),path_effects.Normal()])
if withTitle:
titleText = dataName + ' (size: ' + str(data.shape[0]) + ')' + ' min=%s max=%s' % (np.round(min(data),2), np.round(max(data),2))
axisVG.set_title(titleText, fontsize=10)
return graph_nx, data, communities
[docs]def makeVisibilityGraph(intensities, positions, saveDir, fileName, communities=None, minNumberOfCommunities=2, communitiesMethod='betweenness_centrality', direction='left', weight='distance', printCommunities=False,fontsize=16, nodesize=500, level=0.5, commLineWidth=3.0, lineWidth=2.0, layout='circle', horizontal=False, radius=0.03, figsize=(10,10), addColorbar=True, colorbarAxisCoordinates=[0.90,0.7,0.02,0.2], colorbarLabelsize=12, colorbarPrecision=2, extension='png', dpi=300):
'''Make either horizontal or normal visibility graph of a time series using function addVisibilityGraph.
We represent each timepoint in a series as a node.
Temporal events are detected and indicated with solid blue
lines encompassing groups of points, or communities.
The shortest path identifies nodes (i.e. timepoints) that display high
intensity, and thus dominate the global signal profile, are robust
to noise, and are likely drivers of the global temporal behavior.
Parameters:
intensities:
Data to plot
positions:
Time points corresponding to data
saveDir: str
Path of directories to save the object to
fileName: str
Label to include in the file name
horizontal: boolean, Default False
Whether to use horizontal or natural visibility graph. Note that if communitiesMethod 'WDPVG' is set, this setting has no effect.
communities: tuple, Default None
A tuple containing communities sturcture of network, and networkx Graph:
List of list, e.g. [[],[],...]
networkx.Graph
minNumberOfCommunities: int, Default 2
Number of communities to find depends on the number of splits.
This parameter is ignored in methods that automatically
estimate optimal number of communities.
communitiesMethod: str, Default 'betweenness_centrality'
String defining the method to use for communitiy detection:
'Girvan_Newman': edge betweenness centrality based approach
'betweenness_centrality': reflected graph node betweenness centrality based approach
'WDPVG': weighted dual perspective visibility graph method (note to also set weight variable)
direction:str, default 'left'
The direction that nodes aggregate to communities:
None: no specific direction, e.g. both sides.
'left': nodes can only aggregate to the left side hubs, e.g. early hubs
'right': nodes can only aggregate to the right side hubs, e.g. later hubs
weight: str, Default 'distance'
Type of weight for communitiesMethod='WDPVG':
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
printCommunities: boolean, Default False
Whether to print communities details to screen
fontsize: float, Default 16
Labels fontsize
nodesize: int, Default 500
Node size
level: float, Default 0.5
Level
commLineWidth: float, Default 3.0
Communities lines width
lineWidth: float, Default 2.0
Edge lines width
layout: str, Default 'circle'
Type of layout, 'circle' or 'line'
horizontal: boolean, Default False
Whether to make horizontal of normal visibility graph
radius: float, Default 0.03
Rounding of the lines
figsize: tuple, Default (10,10)
Figure size in inches
addColorbar: boolean, Default True
Whether to add colorbar
colorbarAxisCoordinates: list, Default [0.90,0.7,0.02,0.2]
colorbar axis coordinates
colorbarLabelsize: float, Default 12
Colorbar labels size
colorbarPrecision: int, Default 2
colorbar labels rounding
extension: str, Default '.png'
Figure extension
dpi: int, Default 300
Figure resolution
Returns:
None
Usage:
makeVisibilityGraph(data, times, 'dir1/', 'myData')
'''
fig = plt.figure(figsize=figsize)
addVisibilityGraph(intensities, positions, fig=fig, fontsize=fontsize, nodesize=nodesize, level=level,
commLineWidth=commLineWidth, lineWidth=lineWidth, withLabel=False, layout=layout,
printCommunities=printCommunities, radius=radius, horizontal=horizontal,communities=communities, minNumberOfCommunities=minNumberOfCommunities, communitiesMethod=communitiesMethod, direction=direction, weight=weight)
addColorbarToFigure(fig, intensities, axisCoordinates=colorbarAxisCoordinates, labelsize=colorbarLabelsize, precision=colorbarPrecision)
saveFigure(fig, saveDir, ('horizontal_' if horizontal else 'normal_') + fileName, extension, dpi)
return
[docs]def makeVisibilityBarGraph(data, times, saveDir, fileName, AdjacencyMatrix=None, horizontal=False, barWidth=0.2, dotColor='b', barColor='r', arrowColor='k', id='', extension='.png', figsize=(8,4), dpi=300):
"""Bar-plot style visibility graph.
Representing the intensities as bars, this is equivalent to connecting the top
of each bar to another top if there is a direct line-of-sight to that top.
The resulting visibility graph has characteristics that reflect the equivalent
time series temporal structure and can be used to identify trends.
Parameters:
data: 2d numpy.array
Numpy array of floats
times: 2d numpy.array
Numpy array of floats
fileName: str
Path where to save the figure file
fileName: str
Name of the figure file to save
AdjacencyMatrix: 2d numpy.array, Default None
Adjacency matrix of network
horizontal: boolean, default False
Horizontal or normal visibility graph
barWidth: float, default 0.2
Horizontal or normal visibility graph
dotColor: str, default 'b'
Color of the data points
barColor: str, default 'r'
Color of the bars
arrowColor: str, default 'k'
Color of lines
id: str or int, default ''
Label to add to the figure title
extension: str, Default '.png'
Figure format
figsize: tuple of int, Default (8,4)
Figure size in inches
dpi: int, Default 300
Figure resolution
Returns:
None
Usage:
makeVisibilityBarGraph(A, data, times, 'my_figure')
"""
fig, ax = plt.subplots(figsize=figsize)
if AdjacencyMatrix is None:
if horizontal:
A = visibilityGraphAuxiliaryFunctions.getAdjacencyMatrixOfHVG(data)
else:
A = visibilityGraphAuxiliaryFunctions.getAdjacencyMatrixOfNVG(data, times)
else:
A = AdjacencyMatrix
ax.bar(times, data, width=barWidth, color=barColor, align='center', zorder=-np.inf)
ax.scatter(times, data, color=dotColor)
for i in range(A.shape[0]):
for j in range(A.shape[1]):
if i>j and A[i,j] == 1:
if horizontal:
level1 = level2 = np.min([data[i],data[j]])
else:
level1 = data[i]
level2 = data[j]
ax.annotate(text='', xy=(times[i],level1), xytext=(times[j],level2),
arrowprops=dict(arrowstyle='<->', shrinkA=0, shrinkB=0, linestyle='--'))
ax.set_title('%s Time Series'%(id), fontdict={'color': arrowColor})
ax.set_xlabel('Times', fontsize=8)
ax.set_ylabel('Signal intensity', fontsize=8)
ax.set_xticks(times)
ax.set_xticklabels(times, fontsize=10, rotation=90)
ax.set_yticks([])
ax.spines['left'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.spines['bottom'].set_visible(True)
fig.tight_layout()
saveFigure(fig, saveDir, ('horizontal_' if horizontal else 'normal_') + fileName, extension, dpi)
return None
[docs]def makePlotOfPeak(data_all, scores, selected_peak, selected_peak_value, plotID):
'''Plot peaks. Function used internally during certain data processing steps'''
fig, ax = plt.subplots()
ax.plot(data_all.T[0], data_all.T[1], 'g', lw=3)
ax.plot(scores.T[0], scores.T[1], 'ro', ms=5)
ax.plot(selected_peak, selected_peak_value, 'bo', alpha=0.5, ms=10)
# saveFigure(fig, saveDir, 'spline_%s' % ('' if plotID == None else str(plotID)), extension, dpi)
return
[docs]def makeDendrogramHeatmapOfClusteringObject(ClusteringObject, saveDir, dataName, AutocorrNotPeriodogr=True, textScale=1.0, figsize=(12,8), extension='.png', dpi=300, xLabel='Time', plotLabel='Transformed Expression',horizontal=False, minNumberOfCommunities=2, communitiesMethod='WDPVG', direction='left', weight='distance'):
"""Make Dendrogram-Heatmap plot along with Visibility graphs.
Parameters:
ClusteringObject:
Clustering object
saveDir: str
Path of directories to save the object to
dataName: str
Label to include in the file name
AutocorrNotPeriodogr: boolean, Default True
Whether to use autocorrelation method instead of periodograms
textScale: float, Default 1.0
scaling of text size
figsize: tuple, Default (12,8)
Figure size in inches
extension: str, Default '.png'
Figure format extension
dpi: int, Default 300
Figure resolution
xLabel: str, Default 'Time'
Label for the x axis in the heatmap
plotLabel: str, Default 'Transformed Expression'
Label for the heatmap plot
horizontal: boolean, Default False
Whether to use horizontal or natural visibility graph. Note that if communitiesMethod 'WDPVG' is set, this setting has no effect.
minNumberOfCommunities: int, Default 2
Number of communities to find depends on the number of splits.
This parameter is ignored in methods that automatically
estimate optimal number of communities.
communitiesMethod: str, Default 'WDPVG'
String defining the method to use for communitiy detection:
'Girvan_Newman': edge betweenness centrality based approach
'betweenness_centrality': reflected graph node betweenness centrality based approach
'WDPVG': weighted dual perspective visibility graph method (note to also set weight variable)
direction:str, default 'left'
The direction that nodes aggregate to communities:
None: no specific direction, e.g. both sides.
'left': nodes can only aggregate to the left side hubs, e.g. early hubs
'right': nodes can only aggregate to the right side hubs, e.g. later hubs
weight: str, Default 'distance'
Type of weight for communitiesMethod='WDPVG':
None: no weighted
'time': weight = abs(times[i] - times[j])
'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8)
'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5
Returns:
None
Usage:
makeDendrogramHeatmap(myObj, '/dir1', 'myData', AutocorrNotPeriodogr=True)
"""
def addAutocorrelationDendrogramAndHeatmap(ClusteringObject, groupColors, fig, AutocorrNotPeriodogr=AutocorrNotPeriodogr):
axisDendro = fig.add_axes([0.68,0.1,0.17,0.8], frame_on=False)
n_clusters = len(ClusteringObject.keys()) - 1
hierarchy.set_link_color_palette(groupColors[:n_clusters]) #gist_ncar #nipy_spectral #hsv
origLineWidth = matplotlib.rcParams['lines.linewidth']
matplotlib.rcParams['lines.linewidth'] = 0.5
Y_ac = ClusteringObject['linkage']
Z_ac = hierarchy.dendrogram(Y_ac, orientation='left',color_threshold=Y_ac[-n_clusters + 1][2])
hierarchy.set_link_color_palette(None)
matplotlib.rcParams['lines.linewidth'] = origLineWidth
axisDendro.set_xticks([])
axisDendro.set_yticks([])
posA = ((axisDendro.get_xlim()[0] if n_clusters == 1 else Y_ac[-n_clusters + 1][2]) + Y_ac[-n_clusters][2]) / 2
axisDendro.plot([posA, posA], [axisDendro.get_ylim()[0], axisDendro.get_ylim()[1]], 'k--', linewidth = 1)
clusters = []
order = []
tempData = None
for group in sorted([item for item in list(ClusteringObject.keys()) if not item=='linkage']):
for subgroup in sorted([item for item in list(ClusteringObject[group].keys()) if not item=='linkage']):
subgroupData = ClusteringObject[group][subgroup]['dataAutocorr'].values
tempData = subgroupData if tempData is None else np.vstack((tempData, subgroupData))
clusters.extend([group for _ in range(subgroupData.shape[0])])
order.extend(ClusteringObject[group][subgroup]['order'])
tempData = tempData[np.argsort(order),:][Z_ac['leaves'],:].T[1:].T
clusters = np.array(clusters)
cluster_line_positions = np.where(clusters - np.roll(clusters,1) != 0)[0]
for i in range(n_clusters - 1):
posB = cluster_line_positions[i + 1] * (axisDendro.get_ylim()[1] / len(Z_ac['leaves'])) - 5. * 0
axisDendro.plot([posA, axisDendro.get_xlim()[1]], [posB, posB], '--', color='black', linewidth = 1.0)
axisDendro.plot([posA, axisDendro.get_xlim()[1]], [0., 0.], '--', color='k', linewidth = 1.0)
axisDendro.plot([posA, axisDendro.get_xlim()[1]], [-0. + axisDendro.get_ylim()[1], -0. + axisDendro.get_ylim()[1]], '--', color='k', linewidth = 1.0)
axisMatrixAC = fig.add_axes([0.78 + 0.07,0.1,0.18 - 0.075,0.8])
imAC = axisMatrixAC.imshow(tempData, aspect='auto', vmin=np.min(tempData), vmax=np.max(tempData), origin='lower', cmap=plt.cm.bwr)
for i in range(n_clusters - 1):
axisMatrixAC.plot([-0.5, tempData.shape[1] - 0.5], [cluster_line_positions[i + 1] - 0.5, cluster_line_positions[i + 1] - 0.5], '--', color='black', linewidth = 1.0)
axisMatrixAC.set_xticks([i for i in range(tempData.shape[1] - 1)])
axisMatrixAC.set_xticklabels([i + 1 for i in range(tempData.shape[1] - 1)], fontsize=6*textScale)
axisMatrixAC.set_yticks([])
axisMatrixAC.set_xlabel('Lag' if AutocorrNotPeriodogr else 'Frequency', fontsize=axisMatrixAC.xaxis.label._fontproperties._size*textScale)
axisMatrixAC.set_title('Autocorrelation' if AutocorrNotPeriodogr else 'Periodogram', fontsize=axisMatrixAC.title._fontproperties._size*textScale)
axisColorAC = fig.add_axes([0.9 + 0.065,0.55,0.01,0.35])
axisColorAC.tick_params(labelsize=6*textScale)
plt.colorbar(imAC, cax=axisColorAC, ticks=[np.round(np.min(tempData),2),np.round(np.max(tempData),2)])
return
def addGroupDendrogramAndShowSubgroups(ClusteringObject, groupSize, bottom, top, group, groupColors, fig):
Y = ClusteringObject[group]['linkage']
n_clusters = len(sorted([item for item in list(ClusteringObject[group].keys()) if not item=='linkage']))
print("Number of subgroups:", n_clusters)
axisDendro = fig.add_axes([left, bottom, dx + 0.005, top - bottom], frame_on=False)
hierarchy.set_link_color_palette([matplotlib.colors.rgb2hex(rgb[:3]) for rgb in cm.nipy_spectral(np.linspace(0, 0.5, n_clusters + 1))]) #gist_ncar #nipy_spectral #hsv
origLineWidth = matplotlib.rcParams['lines.linewidth']
matplotlib.rcParams['lines.linewidth'] = 0.5
Z = hierarchy.dendrogram(Y, orientation='left',color_threshold=Y[-n_clusters + 1][2])
hierarchy.set_link_color_palette(None)
matplotlib.rcParams['lines.linewidth'] = origLineWidth
axisDendro.set_xticks([])
axisDendro.set_yticks([])
posA = axisDendro.get_xlim()[0]/2 if n_clusters == groupSize else (Y[-n_clusters + 1][2] + Y[-n_clusters][2])/2
axisDendro.plot([posA, posA], [axisDendro.get_ylim()[0], axisDendro.get_ylim()[1]], 'k--', linewidth = 1)
clusters = []
for subgroup in sorted([item for item in list(ClusteringObject[group].keys()) if not item=='linkage']):
clusters.extend([subgroup for _ in range(ClusteringObject[group][subgroup]['data'].values.shape[0])])
clusters = np.array(clusters)
cluster_line_positions = np.where(clusters - np.roll(clusters,1) != 0)[0]
for i in range(n_clusters - 1):
posB = cluster_line_positions[i + 1] * (axisDendro.get_ylim()[1] / len(Z['leaves'])) - 5. * 0
axisDendro.plot([posA, axisDendro.get_xlim()[1]], [posB, posB], '--', color='black', linewidth = 1.0)
axisDendro.plot([posA, axisDendro.get_xlim()[1]], [0., 0.], '--', color='k', linewidth = 1.0)
axisDendro.plot([posA, axisDendro.get_xlim()[1]], [-0. + axisDendro.get_ylim()[1], -0. + axisDendro.get_ylim()[1]], '--', color='k', linewidth = 1.0)
axisDendro.text(axisDendro.get_xlim()[0], 0.5 * axisDendro.get_ylim()[1],
'G%s:' % group + str(groupSize), fontsize=14*textScale).set_path_effects([path_effects.Stroke(linewidth=1, foreground=groupColors[group - 1]),path_effects.Normal()])
return n_clusters, clusters, cluster_line_positions
def addGroupHeatmapAndColorbar(data_loc, n_clusters, clusters, cluster_line_positions, bottom, top, group, groupColors, fig,xLabel = 'Time',plotLabel = 'Transformed Expression'):
axisMatrix = fig.add_axes([left + 0.205, bottom, dx + 0.025 + 0.075, top - bottom])
masked_array = np.ma.array(data_loc, mask=np.isnan(data_loc))
cmap = matplotlib.colors.LinearSegmentedColormap.from_list('GR', [(0, 1, 0), (1, 0, 0)], N=1000) #plt.cm.prism #plt.cm.hsv_r #plt.cm.RdYlGn_r
cmap.set_bad('grey')
im = axisMatrix.imshow(masked_array, aspect='auto', origin='lower', vmin=np.min(data_loc[np.isnan(data_loc) == False]), vmax=np.max(data_loc[np.isnan(data_loc) == False]), cmap=cmap)
for i in range(n_clusters - 1):
posB = cluster_line_positions[i + 1]
posBm = cluster_line_positions[i]
axisMatrix.plot([-0.5, data_loc.shape[1] - 0.5], [posB - 0.5, posB - 0.5], '--', color='black', linewidth = 1.0)
def add_label(pos, labelText):
return axisMatrix.text(-1., pos, labelText, ha='right', va='center', fontsize=12.*textScale).set_path_effects([path_effects.Stroke(linewidth=0.4, foreground=groupColors[group - 1]),path_effects.Normal()])
order = clusters[np.sort(np.unique(clusters,return_index=True)[1])] - 1
for i in range(n_clusters - 1):
if len(data_loc[clusters == i + 1]) >= 5.:
try:
add_label((cluster_line_positions[np.where(order == i)[0][0]] + cluster_line_positions[np.where(order == i)[0][0] + 1]) * 0.5, 'G%sS%s:%s' % (group,i + 1,len(data_loc[clusters == i + 1])))
except Exception as exception:
print(exception)
print('Label printing error!')
if len(data_loc[clusters == n_clusters]) >= 5.:
posC = axisMatrix.get_ylim()[0] if n_clusters == 1 else cluster_line_positions[n_clusters - 1]
add_label((posC + axisMatrix.get_ylim()[1]) * 0.5, 'G%sS%s:%s' % (group,n_clusters,len(data_loc[clusters == n_clusters])))
axisMatrix.set_xticks([])
axisMatrix.set_yticks([])
times = ClusteringObject[group][subgroup]['data'].columns.values
if group == 1:
axisMatrix.set_xticks(range(data_loc.shape[1]))
axisMatrix.set_xticklabels([('' if (i%2==1 and textScale>1.3) else np.int(time)) for i, time in enumerate(np.round(times,1))], rotation=0, fontsize=6*textScale)
axisMatrix.set_xlabel(xLabel, fontsize=axisMatrix.xaxis.label._fontproperties._size*textScale)
if group == sorted([item for item in list(ClusteringObject.keys()) if not item=='linkage'])[-1]:
axisMatrix.set_title(plotLabel, fontsize=axisMatrix.title._fontproperties._size*textScale)
axisColor = fig.add_axes([0.635 - 0.075 - 0.1 + 0.075,current_bottom + 0.01,0.01, max(0.01,(current_top - current_bottom) - 0.02)])
plt.colorbar(im, cax=axisColor, ticks=[np.max(im._A),np.min(im._A)])
axisColor.tick_params(labelsize=6*textScale)
axisColor.set_yticklabels([np.round(np.max(im._A),2),np.round(np.min(im._A),2)])
return
signalsInClusteringObject = 0
for group in sorted([item for item in list(ClusteringObject.keys()) if not item=='linkage']):
for subgroup in sorted([item for item in list(ClusteringObject[group].keys()) if not item=='linkage']):
signalsInClusteringObject += ClusteringObject[group][subgroup]['data'].shape[0]
fig = plt.figure(figsize=(12,8))
left = 0.02
bottom = 0.1
current_top = bottom
dx = 0.2
dy = 0.8
groupColors = ['b','g','r','c','m','y','k']
[groupColors.extend(groupColors) for _ in range(10)]
addAutocorrelationDendrogramAndHeatmap(ClusteringObject, groupColors, fig)
for group in sorted([item for item in list(ClusteringObject.keys()) if not item=='linkage']):
tempData = None
for subgroup in sorted([item for item in list(ClusteringObject[group].keys()) if not item=='linkage']):
subgroupData = ClusteringObject[group][subgroup]['data'].values
tempData = subgroupData if tempData is None else np.vstack((tempData, subgroupData))
current_bottom = current_top
current_top += dy * float(len(tempData)) / float(signalsInClusteringObject)
if len(tempData)==1:
n_clusters, clusters, cluster_line_positions = 1, np.array([]), np.array([])
else:
n_clusters, clusters, cluster_line_positions = addGroupDendrogramAndShowSubgroups(ClusteringObject, len(tempData), current_bottom, current_top, group, groupColors, fig)
addGroupHeatmapAndColorbar(tempData, n_clusters, clusters, cluster_line_positions, current_bottom, current_top, group, groupColors, fig,xLabel=xLabel,plotLabel=plotLabel)
data_list = []
data_names_list = []
for group in sorted([item for item in list(ClusteringObject.keys()) if not item=='linkage']):
for subgroup in sorted([item for item in list(ClusteringObject[group].keys()) if not item=='linkage']):
if ClusteringObject[group][subgroup]['data'].shape[0] >= 5:
data_list.append(ClusteringObject[group][subgroup]['data'].values)
data_names_list.append('G%sS%s' % (group, subgroup))
times = ClusteringObject[group][subgroup]['data'].columns.values
for indVG, (dataVG, dataNameVG) in enumerate(zip(data_list, data_names_list)):
x_min = 0.57
x_max = 0.66
y_min = 0.1
y_max = 0.9
numberOfVGs = len(data_list)
height = min((y_max - y_min) / numberOfVGs, (x_max - x_min) * (12. / 8.))
x_displacement = (x_max - x_min - height / 1.5) * 0.5
y_displacement = (y_max - y_min - numberOfVGs * height) / numberOfVGs
coords = [x_min + x_displacement, x_min + x_displacement + height / (12. / 8.), y_min + indVG * height + (0.5 + indVG) * y_displacement, y_min + (indVG + 1) * height + (0.5 + indVG) * y_displacement]
addVisibilityGraph(dataVG, times, dataNameVG, coords, numberOfVGs, groupColors, fig,horizontal=horizontal, minNumberOfCommunities=minNumberOfCommunities, communitiesMethod=communitiesMethod, direction=direction, weight=weight)
saveFigure(fig, saveDir, dataName + '_DendrogramHeatmap', extension, dpi)
return None
[docs]def plotHVGBarGraphDual(A, data, times, fileName, title='', fontsize=8, barwidth=0.05, figsize=(8,4), dpi=600):
"""Bar-plot style horizontal visibility graph with different link colors for different perspectives
Parameters:
A: 2d numpy.array
Adjacency matrix
data: 2d numpy.array
Data used to make the visibility graph
times: 1d numpy.array
Times corresponding to data points
fileName: str
Name of the figure file to save
title: str, Default ''
Label to add to the figure title
figsize: tuple of int, Default (8,4)
Figure size in inches
dpi: int, 600
Resolution of the image
barwidth: float, Default 0.05
The bar width
fontsize:int, Default 8
The text font size
Returns:
None
Usage:
PlotHorizontalVisibilityGraph(A, data, times, 'Figure.png', 'Test Data')
"""
fig, ax = plt.subplots(figsize=figsize)
ax.bar(times, data, width = barwidth, color='k', align='center', zorder=-np.inf)
ax.scatter(times, data, color='k')
for i in range(A.shape[0]):
for j in range(A.shape[1]):
if i>j and A[i,j] > 0:
if data[i] > 0 and data[j] >0:
level = np.min([data[i],data[j]])
elif data[i] < 0 and data[j] < 0:
level = np.max([data[i],data[j]])
else:
level = 0
ax.annotate(text='', xy=(times[i],level), xytext=(times[j],level),
arrowprops=dict(arrowstyle='->', shrinkA=0, shrinkB=0,linestyle='--', color='r'))
if i>j and A[i,j] < 0:
if data[i] > 0 and data[j] >0:
level = np.min([data[i],data[j]])
elif data[i] < 0 and data[j] < 0:
level = np.max([data[i],data[j]])
else:
level = 0
ax.annotate(text='', xy=(times[i],level), xytext=(times[j],level),
arrowprops=dict(arrowstyle='->', shrinkA=0, shrinkB=0,linestyle='--', color='b'))
ax.set_title('%s'%(id), fontdict={'color': 'k'},fontsize=fontsize)
ax.set_xlabel('Times', fontsize=fontsize)
ax.set_ylabel('Signal intensity', fontsize=fontsize)
ax.set_xticks(times)
ax.set_xticklabels([str(item) for item in np.round(times,2)],fontsize=fontsize, rotation=0)
ax.set_yticks([])
ax.axhline(y=0, color='k')
ax.spines['left'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.spines['bottom'].set_visible(True)
fig.tight_layout()
fig.savefig(fileName, dpi=dpi)
plt.close(fig)
return None
[docs]def plotNVGBarGraphDual(A, data, times, fileName, title='', fontsize=8, barwidth=0.05, figsize=(8,4), dpi=600):
"""Bar-plot style natural visibility graph with different link colors for different perspectives
Parameters:
A: 2d numpy.array
Adjacency matrix
data: 2d numpy.array
Data used to make the visibility graph
times: 1d numpy.array
Times corresponding to data points
fileName: str
Name of the figure file to save
title: str, Default ''
Label to add to the figure title
figsize: tuple of int, Default (8,4)
Figure size in inches
dpi: int, 600
Resolution of the image
barwidth: float, Default 0.05
The bar width
fontsize:int, Default 8
The text font size
Returns:
None
Usage:
PlotVisibilityGraph(A, data, times, 'FIgure.png', 'Test Data')
"""
fig, ax = plt.subplots(figsize=figsize)
ax.bar(times, data, width = barwidth, color='k', align='center', zorder=-np.inf)
ax.scatter(times, data, color='k')
for i in range(A.shape[0]):
for j in range(A.shape[1]):
if i>j and A[i,j] > 0:
ax.annotate(text='', xy=(times[i],data[i]), xytext=(times[j],data[j]),
arrowprops=dict(arrowstyle='->', shrinkA=0, shrinkB=0,linestyle='--',color='r'))
if i>j and A[i,j] < 0:
ax.annotate(text='', xy=(times[i],data[i]), xytext=(times[j],data[j]),
arrowprops=dict(arrowstyle='->', shrinkA=0, shrinkB=0,linestyle='--',color='b'))
ax.set_title('%s'%(title), fontdict={'color': 'k'},fontsize=fontsize)
ax.set_xlabel('Times', fontsize=fontsize)
ax.set_ylabel('Signal intensity', fontsize=fontsize)
ax.set_xticks(times)
ax.set_xticklabels([str(item) for item in np.round(times,2)],fontsize=fontsize, rotation=0)
ax.set_yticks([])
ax.axhline(y=0, color='k')
ax.spines['left'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.spines['bottom'].set_visible(True)
fig.tight_layout()
fig.savefig(fileName, dpi=dpi)
plt.close(fig)
return None