from copy import deepcopy
from xml.dom import NotSupportedErr
from numpy import (
arctan2,
asarray,
concatenate,
cos,
diag,
dot,
max,
mean,
min,
sin,
sqrt,
square,
)
from numpy import sum as nsum
from numpy import tile, zeros
from numpy.linalg import inv, pinv
from pydistsim.logging import logger
from pydistsim.utils.localization.aoastitcher import AoAStitcher
from pydistsim.utils.localization.diststitcher import DistStitcher
from pydistsim.utils.memory.positions import Positions
[docs]
def align_clusters(dst, src, scale):
"""Scale, rotate, translate srcLoc w.r.t. dst."""
assert isinstance(src, Positions)
assert isinstance(dst, Positions)
if scale:
stitcher = AoAStitcher(reflectable=True)
else:
stitcher = DistStitcher()
stitcher.align(dst, src)
[docs]
def get_rms(truePos, estimated, align=False, scale=False, norm=False):
"""
Returns root mean square error of estimated positions.
Set align if estimated positions need to be transformed (rotated and
translated) before calculating rms error of localization. Set scale=True
if they needs to be scaled also.
If norm is True then rms is divided with norm of the true formation
translated to centroid.
"""
truePos = Positions.create(truePos)
assert len(truePos) == 1
estimated = Positions.create(estimated)
if align:
estimated = deepcopy(estimated)
align_clusters(truePos, estimated, scale)
suma = 0
node_count = 0
for estimated_subcluster in estimated:
for n in estimated_subcluster:
suma += (estimated_subcluster[n][0] - truePos[0][n][0]) ** 2 + (
estimated_subcluster[n][1] - truePos[0][n][1]
) ** 2
node_count += 1
rms = sqrt(suma / node_count)
if norm:
sc = truePos.subclusters[0]
truePos.subclusters[0] = {n: p for n, p in list(sc.items()) if n in estimated.subclusters[0]}
rms = rms / get_pos_norm(truePos)
return rms
[docs]
def construct_G(pos, edges, u, sensor):
"""
Construct Jacobian of vector mu(pos) that defines expectation of
measurement given positions.
u - number of unknowns i.e. 3 - (x, y, theta)
edges - list of edges as tuples, if there is (i, j) then there should not
be (j, i)
sensor - 'DistSensor' or 'AoASensor'
For AoA and measurement between nodes i and j:
mu(pos) = arctan((pos_y_j - pos_y_i)/(pos_x_j - pos_x_i) - alpha_i)
"""
nodes = list(pos.keys())
neighbors = {n: [] for n in nodes}
for n1, n2 in edges:
neighbors[n1].append(n2)
neighbors[n2].append(n1)
# number of measurements x number of unknowns
G = zeros((2 * len(edges), u * len(nodes)))
m = 0
for r, node in enumerate(nodes):
(x_r, y_r) = pos[node][:2]
for neighbor in neighbors[node]:
t = nodes.index(neighbor)
(x_t, y_t) = pos[neighbor][:2]
d = sqrt((x_r - x_t) ** 2 + (y_r - y_t) ** 2)
if sensor == "DistSensor":
G[m, r * u] = (x_r - x_t) / d
G[m, r * u + 1] = (y_r - y_t) / d
G[m, t * u] = (x_t - x_r) / d
G[m, t * u + 1] = (y_t - y_r) / d
elif sensor == "AoASensor":
G[m, r * u] = (y_t - y_r) / d**2
G[m, r * u + 1] = (x_r - x_t) / d**2
G[m, t * u] = (y_r - y_t) / d**2
G[m, t * u + 1] = (x_t - x_r) / d**2
if u == 3:
G[m, t * u + 2] = -1.0
m += 1 # next measurement
return G
# TODO: only one per row -1 element in G?
[docs]
def get_crb(net, sensor, compass="off", loc_type="anchor-free", anchors=[]):
"""
Calculates Cramer-Rao lower bound on covariance matrix of unbiased multihop
location estimator based on sensor (distance/angle) measurement between
in-commrange neighbors.
POS is n by 2 matrix of X and Y coordinates of n nodes
SIGMA is standard deviation on measurement vector.
Arguments:
`compass` -- 'off' (default) or 'on' which means all nodes have compass
`loc_type` -- 'anchor-free' (default) or 'anchored'
`anchors` -- list of anchor nodes, used only when loc_type is 'anchored'
References:
Savvides2003 - Savvides et al,
On the Error Characteristics of Multihop Node
Localization in Ad-Hoc Sensor Networks, 2003
Patwari2005 - Patwari et al,
Locating the nodes: cooperative localization in
wireless sensor networks
Chang2006 - Chen Chang et al,
Cramer-Rao-Type Bounds for Localization, 2006
"""
if loc_type == "anchored" and not anchors:
raise Exception("Anchors not defined.")
if loc_type == "anchor-free" and anchors:
logger.warning("Anchors are ignored")
anchors = []
nodes = [node for node in net.nodes() if node not in anchors]
if sensor.name() == "AoASensor" and compass == "off":
u = 3 # x, y, theta
else: # DistSensor or AoASensor with compass on
u = 2 # x, y
G = construct_G(net.pos, net.edges(), u, sensor.name())
for s in nodes[0].compositeSensor.sensors:
if s.name() == sensor.name():
sigma = s.probabilityFunction.scale
break
else:
raise Exception("Sensor not found in nodes")
J = (dot(G.T, G)) / sigma**2
if loc_type == "anchor-free":
cov = pinv(J)
elif loc_type == "anchored":
cov = inv(J)
# Chang2006 (28)
di = diag(cov)
# extract only position from cov
di = concatenate((di[::3], di[1::3]))
# return is lower bound on rms -> sqrt(1/n*sum_i^n((x_i'-x_i)^2+(y_i'-y_i)^2))
return sqrt(2 * mean(di))
[docs]
def get_crb_norm(*args, **kwargs):
"""
Calculates crb that is not dependent on scale i.e. to check correlation
with gdop.
"""
net = args[0]
norm = get_pos_norm(net.pos)
return get_crb(*args, **kwargs) / norm
[docs]
def get_pos_norm(pos):
"""Translate positions so that centroid is in the origin and return mean
norm of the translated positions."""
pos = Positions.create(pos)
assert len(pos) == 1
n = len(pos[0])
p = zeros((n, 2))
for i, node in enumerate(pos[0]):
p[i, :] = pos[0][node][:2]
centroid = p.sum(axis=0) / n
p -= tile(centroid, (n, 1))
p_norm = nsum(sqrt(nsum(square(p), axis=1))) / n
return p_norm
[docs]
def get_aoa_gdop_rel(estimated):
"""
Calculation of relative GDOP is based on CRB.
GDOP_rel = sigma_CRB/sigma_d = sqrt(tr((G^TG)^-1)/(sum(Di^2))/M)
As regular GDOP, it is not dependent on scale nor on sigma_AoA.
"""
estimated = Positions.create(estimated)
assert len(estimated.subclusters) == 1
pos = estimated.subclusters[0]
nodes = list(pos.keys())
edges = [e for e in list(pos.keys())[0].network.edges() if e[0] in nodes and e[1] in nodes]
G = construct_G(pos, edges, 3, "AoASensor")
J = dot(G.T, G)
cov = pinv(J)
di = diag(cov)
di = concatenate((di[::3], di[1::3]))
var_p = sum(di) / len(nodes)
distances = [sqrt(dot(pos[n1][:2] - pos[n2][:2], pos[n1][:2] - pos[n2][:2])) for n1, n2 in edges]
var_d = sum(square(distances)) / len(edges)
return sqrt(var_p / var_d)
[docs]
def get_aoa_gdop_node(estimated, node):
"""
Calculate geometric dilution of precision for node in estimated formation.
Node is in the formation and all other nodes should be neighbors of
node. Node should have Neighbors sensorReadings in it memory.
"""
if "AoASensor" not in [sensor.name() for sensor in node.sensors]:
raise NotSupportedErr("Only angle of arrival based gdop is supported")
sensor = node.compositeSensor.get_sensor("AoASensor")
sigma = sensor.probabilityFunction.scale
for n in estimated:
sensor = n.compositeSensor.get_sensor("AoASensor")
if sensor.probabilityFunction.scale != sigma:
raise NotSupportedErr("All nodes' AoA sensors should have same scale")
if len(estimated) <= 2:
return 0.0
# Note from Torrieri, Statistical Theory of Passive Location Systems
# if measurement sigmas are all equal gdop doesn't depend on sigma.
sigma = 1
x, y = estimated[node][:2]
fi = []
d = []
for n in estimated:
xi, yi = estimated[n][:2]
if n != node and n in node.memory["sensorReadings"]["Neighbors"]:
fi.append(arctan2(y - yi, x - xi))
d.append(sqrt((x - xi) ** 2 + (y - yi) ** 2))
mi = ni = l = 0
for fii, di in zip(fi, d):
mi += (cos(fii) / di / sigma) ** 2 # (129)
l += (sin(fii) / di / sigma) ** 2 # (130)
ni += sin(fii) * cos(fii) / (di * sigma) ** 2 # (131)
sigma1 = sqrt(mi / (mi * l - ni**2)) # (126)
sigma2 = sqrt(l / (mi * l - ni**2)) # (127)
# sigma12 = sqrt(ni/(mi*l-ni**2))
sigmad = sqrt(sum((di * sigma) ** 2 for di in d) / len(d)) # (138)
return sqrt(sigma1**2 + sigma2**2) / sigmad # (139)
[docs]
def get_aoa_gdops(estimated):
estimated = Positions.create(estimated)
assert len(estimated.subclusters) == 1
estimated = estimated.subclusters[0]
return [get_aoa_gdop_node(estimated, node) for node in estimated]
[docs]
def get_aoa_gdop(estimated):
return sum(get_aoa_gdops(estimated))
[docs]
def show_subclusters(net, subclusters):
"""Show colored subclusters."""
colors = [node in subclusters[0] and "g" or "r" for node in net.nodes()]
net.show(nodeColor=colors)
[docs]
def show_localized(net, estimated, scale=False, align=True, display_loc_err=True, show_labels=True):
"""
Display estimated positions.
estimated should be a list of dictionaries.
"""
from matplotlib.collections import LineCollection
from matplotlib.pylab import gca
truePos = Positions.create(net.pos)
estimated = Positions.create(estimated)
# copy estimated so that passed estimated remains unchanged
estimated = deepcopy(estimated)
if align:
# rotate, translate and if needed scale estimated w.r.t. true positions
align_clusters(truePos, estimated, scale)
# TODO: implement display of all subclusters
if len(estimated) > 1:
raise (NotImplementedError)
else:
estimated_sc = estimated[0]
# net.show(positions=estimated_sc, show_labels=show_labels)
fig = net.get_fig(positions=estimated_sc, show_labels=show_labels)
ax = fig.gca()
minpos = min(list(estimated_sc.values()), axis=0)
maxpos = max(list(estimated_sc.values()), axis=0)
minpos -= (maxpos - minpos) * 0.1
maxpos += (maxpos - minpos) * 0.1
ax.set_xlim(minpos[0], maxpos[0])
ax.set_ylim(minpos[1], maxpos[1])
# fig.show()
if display_loc_err:
# TODO: not working in ipython notepad
ax = gca()
ax.set_title("Localized positions")
ax.set_title("Localization error display")
edge_pos = asarray([(net.pos[n], estimated_sc[n]) for n in list(estimated_sc.keys())])
errorCollection = LineCollection(edge_pos, colors="r", transOffset=ax.transData)
errorCollection.set_zorder(1) # errors go behind nodes
ax.add_collection(errorCollection)
ax.figure.canvas.draw()