Code source de Grounded.DensityAnalyser

import logging
import os.path
import shutil
import subprocess

from Grounded.Tools.SFM import SFM
from Grounded.Tools.DetecteurMire import DetecteurMire
from Grounded.Tools.PointCloudProcessor import PointCloudProcessor
from Grounded.Tools.SFM.SFM import SFM
from Grounded.DataObject import PointCloud, Mire3D, Mire, Raster, ScaleBar, Mire2D, Image, File
import Grounded.logger as logger
from Grounded.utils import raise_logged

import statistics
import rasterio
import numpy as np
from scipy.ndimage import generic_filter
from shapely.geometry import Polygon
from scipy.ndimage import label
from shapely import buffer
from skimage import measure
from matplotlib import pyplot, patches
from matplotlib.colors import LinearSegmentedColormap
import trimesh

[docs] def calculate_average_mire_3d(mires_3d: list[Mire3D]) -> list[Mire3D]: dictionnaire_coordinates_mires = {} for mire in mires_3d: if mire.identifier not in dictionnaire_coordinates_mires: dictionnaire_coordinates_mires[mire.identifier] = {'x': [], 'y': [], 'z': []} dictionnaire_coordinates_mires[mire.identifier]['x'].append(mire.coordinates[0]) dictionnaire_coordinates_mires[mire.identifier]['y'].append(mire.coordinates[1]) dictionnaire_coordinates_mires[mire.identifier]['z'].append(mire.coordinates[2]) mires_3d_moyens: list[Mire3D] = [] for identifier in dictionnaire_coordinates_mires.keys(): moyenne_x = statistics.median(dictionnaire_coordinates_mires[identifier]['x']) moyenne_y = statistics.median(dictionnaire_coordinates_mires[identifier]['y']) moyenne_z = statistics.median(dictionnaire_coordinates_mires[identifier]['z']) mires_3d_moyens.append(Mire3D(identifier, (moyenne_x, moyenne_y, moyenne_z))) return mires_3d_moyens
[docs] def calculate_standard_deviation_mire_3d(mires_3d: list[Mire3D]): dictionnaire_coordinates_mires = {} for mire in mires_3d: if mire.identifier not in dictionnaire_coordinates_mires: dictionnaire_coordinates_mires[mire.identifier] = {'x': [], 'y': [], 'z': []} dictionnaire_coordinates_mires[mire.identifier]['x'].append(mire.coordinates[0]) dictionnaire_coordinates_mires[mire.identifier]['y'].append(mire.coordinates[1]) dictionnaire_coordinates_mires[mire.identifier]['z'].append(mire.coordinates[2]) ecart_type = {} for identifier in dictionnaire_coordinates_mires.keys(): ecart_type_x = statistics.stdev(dictionnaire_coordinates_mires[identifier]['x']) \ if len(dictionnaire_coordinates_mires[identifier]['x']) > 1 \ else 0 ecart_type_y = statistics.stdev(dictionnaire_coordinates_mires[identifier]['y']) \ if len(dictionnaire_coordinates_mires[identifier]['y']) > 1 \ else 0 ecart_type_z = statistics.stdev(dictionnaire_coordinates_mires[identifier]['z']) \ if len(dictionnaire_coordinates_mires[identifier]['z']) > 1 \ else 0 ecart_type[identifier] = {'x': ecart_type_x, 'y': ecart_type_y, 'z': ecart_type_z} return ecart_type
[docs] def scale_bars_filter_without_pair(mires: list[Mire], scale_bars: list[ScaleBar]) -> list[ScaleBar]: mires.sort(key=lambda x: x.identifier) scale_bars_copy = scale_bars.copy() for scale_bar in scale_bars: try: start_mire = [mire for mire in mires if mire.identifier == scale_bar.start.identifier][0] end_mire = [mire for mire in mires if mire.identifier == scale_bar.end.identifier][0] except IndexError: mires = [mire for mire in mires if mire.identifier != scale_bar.start.identifier and mire.identifier != scale_bar.end.identifier] scale_bars_copy.remove(scale_bar) return scale_bars_copy
[docs] def distance_euclidienne(point1, point2): """ Calcule la distance euclidienne entre deux points dans un espace multidimensionnel. Args: point1 (list, tuple, numpy.ndarray): Coordonnées du premier point. point2 (list, tuple, numpy.ndarray): Coordonnées du deuxième point. Returns: float: La distance euclidienne entre les deux points. """ point1 = np.array(point1) point2 = np.array(point2) return np.sqrt(np.sum((point1 - point2) ** 2))
[docs] def calculate_average_scale_factor(mires: list[Mire3D], scale_bars): scale_factors: list[float] = [] for scale_bar in scale_bars: start_mire = [mire for mire in mires if mire.identifier == scale_bar.start.identifier][0] end_mire = [mire for mire in mires if mire.identifier == scale_bar.end.identifier][0] scale_factors.append(scale_bar.length / distance_euclidienne(start_mire.coordinates, end_mire.coordinates)) return statistics.mean(scale_factors)
[docs] def prospect_zone(raster: Raster): def mean(x): # Exclure les valeurs NA du calcul de la moyenne valid_values = x[~np.isnan(x)] if len(valid_values) > 0: return np.mean(valid_values) else: return np.nan with rasterio.open(raster.path, "r+") as data_set: raster_array = data_set.read(2) return generic_filter(raster_array, mean, size=(25, 25))
[docs] def get_coordinates_mires3d_in_raster(mires3d: list[Mire3D], raster: Raster, scale_factor: float): coords = [] for mire in mires3d: x, y, z = mire.coordinates # Convertir les coordonnées spatiales (x, y) en indices de pixel (row, col) x, y = raster.xy_3d_space_to_xy_raster(x * scale_factor, y * scale_factor) coords.append((x, y)) return coords
[docs] def delimitate_holes(raster_resolution: float, raster_zone, tol_simplify=0.01, width_buffer=0.02, area_hole=0.008, thres_hole=0.00021, k_cox_threshold=0.35, coords_mires_in_raster=[], min_height: float = 0, max_height: float = 0, min_width: float = 0, max_width: float = 0): """ Identifie et délimite les trous (zones non-continues) dans un raster. Args: coords_mires_in_raster: raster (Raster): Dataobject Raster stockant les informations sur le fichier raster_zone (numpy.ndarray): matrice représentant la zone d'intérêt. tol_simplify (float): Tolérance de simplification des polygones. width_buffer (float): Largeur d'agrandissement des polygones. area_hole (float): Surface minimale requise pour un trou. thres_hole (float): Seuil de binarisation du raster. k_cox_threshold (float): Seuil de rondeur (Cox) pour filtrer les polygones. width_padding (float): Pourcentage de padding en largeur appliqué à la détection des trous height_padding (float): Pourcentage de padding en hauteur appliqué à la détection des trous Returns: list: Liste de polygones Shapely représentant les trous délimités, triés de gauche à droite. """ # Fonction pour calculer la rondeur selon la formule de Cox def roundness(polygone: Polygon): return 4 * np.pi * polygone.area / (polygone.length ** 2) def find_polygons(mask): # Trouver les contours dans le masque contours = measure.find_contours(mask, fully_connected='high') all_points = np.concatenate(contours) # Créer un polygone à partir de tous les points des contours polygon = Polygon(all_points) return polygon # Binarisation du raster avec le seuil mask_zone = raster_zone > thres_hole # Identification des groupes de cellules connectées (trous) labeled_image, num_labels = label(mask_zone.astype(int)) # Estimation de la surface des groupes (trous) hole_areas = [] for lab in range(1, num_labels + 1): hole_cells = labeled_image == lab # ici la surface d'un trou est égal à son ratio par rapport à la taille de l'image hole_area = np.sum(hole_cells) * (raster_resolution ** 2) hole_areas.append((lab, hole_area)) # Sélection des groupes correspondant aux trous potentiels (surface >= area_hole) biggers_holes = [lab for lab, area in hole_areas if area >= area_hole] polygons = [] # Process the features for lab in biggers_holes: polygons.append(find_polygons(np.where(labeled_image == lab, 1, 0))) # Ici on vérifie si les polygones sont centrés centred_polygons = [] for poly in polygons: if min_height <= poly.centroid.x <= max_height and min_width <= poly.centroid.y <= max_width: centred_polygons.append(poly) # Simplification des polygones simplified_polygons = [polygon.simplify(tolerance=tol_simplify) for polygon in centred_polygons] # Filtrage des polygones par rondeur (Cox) et surface minimale filtered_polygons = [] for polygon in simplified_polygons: if roundness(polygon) >= k_cox_threshold: filtered_polygons.append(polygon) # Agrandissement des polygones buffered_polygons = [buffer(polygon, distance=width_buffer / raster_resolution) for polygon in filtered_polygons] # Tri des polygones de gauche à droite sorted_polygons = sorted(buffered_polygons, key=lambda poly: poly.centroid.y) return sorted_polygons
[docs] def polygon_coordinate_conversion(raster: Raster, polygon: Polygon) -> list[tuple[float, float]]: coordinates = [] for point in polygon.exterior.coords: # On convertit les coordonnées dans la matrice en coordonnées dans le raster x, y = raster.xy(point[0], point[1]) coordinates.append((x, y)) return coordinates
[docs] def save_plot_result(raster_array, holes_polygons, list_volumes, output_name, display_padding, mires_coords, min_height, max_height, min_width, max_width): # On récupère les valeurs maximales et minimales mini = np.nanmin(raster_array) maxi = np.nanmax(raster_array) # On définit l'échelle de couleur qui sera utilisé pour l'affichage des du plot colors = [(0, '#aaffff'), (0.04, '#001f6f'), (0.1, '#00f600'), (0.15, '#035700'), (0.25, '#fcfd00'), (0.6, '#ef0000'), (0.8, '#921a1a'), (1, '#e7e6e6')] high_contrast = LinearSegmentedColormap.from_list('high_contrast', colors) # Ajout du plot du raster ainsi que de la barre de couleur pyplot.imshow(raster_array, cmap=high_contrast, vmin=mini, vmax=maxi) pyplot.colorbar() # Ajout du rectangle correspondant à la zone de détection des trous if display_padding: x = min_height y = min_width width = max_width - min_width height = max_height - min_height rect = patches.Rectangle((y, x), height=height, width=width, linewidth=1, edgecolor='r', facecolor='none') pyplot.gca().add_patch(rect) x = [coord[0] for coord in mires_coords] y = [coord[1] for coord in mires_coords] pyplot.scatter(x, y, color='blue', s=10, marker='o') # Ajout des informations concernant les trous for i in range(len(holes_polygons)): poly = holes_polygons[i] # Affichage des polygons sur l'image x_coords, y_coords = poly.exterior.xy # Extraire les coordonnées x et y du polygone x_coords = list(x_coords) y_coords = list(y_coords) pyplot.plot(y_coords, x_coords, color='black') # Ajout des volumes pyplot.text(poly.centroid.y, poly.centroid.x, format(list_volumes[i], '.6f'), fontsize=5, ha='center', va='center', color='black') # Enregistrement au format .pdf du fichier pyplot.savefig(output_name, format='pdf')
[docs] class BadModuleError(Exception): def __init__(self, module_name): self.module_name = module_name self.message = f"Le module de type '{self.module_name}' est incorrect" super().__init__(self.message)
[docs] class DensityAnalyser: """ Classe permettant l'analyse de la densité apparente du sol en utilisant la photogrammétrie. Une implémentation des techniques démontrés dans ces articles: "Joining multi-epoch archival aerial images in a single SfM block allows 3-D change detection with almost exclusively image information." - D. Feurer, F. Vinatier "Assessing new sensor-based volume measurement methods for high-throughput bulk density estimation in the field under various soil conditions." - Guillaume Coulouma, Denis Feurer, Fabrice Vinatier, Olivier Huttel """ def __init__(self, sfm: SFM, detecteur_mire: DetecteurMire, point_cloud_processor: PointCloudProcessor, output_dir: str = os.curdir, verbosity: int = 1): """ Initialise une instance de la classe DensityAnalyser Args: sfm: un objet implémentant l'interface SFM detecteur_mire: un objet implémentant l'interface DetecteurMire point_cloud_processor: un objet implémentant l'interface PointCloudProcessor output_dir (str): dossier de sortie verbosity (int): un entier compris dans l'intervalle [0;2] """ # Configuration des logs logger.config_logger(verbosity, os.path.join(output_dir, "grounded.log")) log = logger.get_logger() if not isinstance(sfm, SFM): raise_logged(log.critical, BadModuleError("sfm")) if not isinstance(detecteur_mire, DetecteurMire): raise_logged(log.critical, BadModuleError("detecteur de mire")) if not isinstance(point_cloud_processor, PointCloudProcessor): raise_logged(log.critical, BadModuleError("point cloud processor")) self.sfm = sfm self.detecteur_mire = detecteur_mire self.point_cloud_processor = point_cloud_processor self.output_dir = output_dir if shutil.which("git"): self.git_revision = subprocess.run(['git', '-C', File(__file__).get_path_directory(), 'rev-parse', 'HEAD'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True).stdout.strip() else: self.git_revision = None
[docs] def analyse(self, photo_path_before_excavation: str, photo_path_after_excavation: str, scale_bars: list[ScaleBar], display_padding: bool = False) -> list[float]: """ Args: photo_path_before_excavation (str): chemin vers le fichier avant excavation photo_path_after_excavation (str): chemin vers le fichier après excavation scale_bars (list[ScaleBar]): réglets utilisés display_padding (bool): Option d'affiche de la zone de détection sur la sortie graphique Returns: list[float]: volumes des trous trouvés """ photo_path_before_excavation = os.path.abspath(photo_path_before_excavation) photo_path_after_excavation = os.path.abspath(photo_path_after_excavation) # Détection des mires présentes sur les images images = self._mire_detection(photo_path_before_excavation, photo_path_after_excavation) # Vérification de la configuration des scalebars afin de détecter une potentielle erreur de l'utilisateur detected_target_are_in_loaded_scalebar = self._check_scalebars_settings(images, scale_bars) # Génération des nuages de points à l'aide du module sfm point_cloud_before_excavation, point_cloud_after_excavation = self._generate_point_cloud( photo_path_before_excavation, photo_path_after_excavation) # Calcul de la position des mires dans l'espace 3D mires_3d, ecart_type = self._calculate_mire3d(images) # Vérification de la valeur de l'écart des écarts types self._check_ecart_type(ecart_type, 0.1) # Suppression des ScaleBars dont au moins l'une des extrémités est manquante scale_bars = scale_bars_filter_without_pair(mires_3d, scale_bars) # on calcule le facteur de redimensionnement pour mettre à l'echelle les nuages de points scale_factor = calculate_average_scale_factor(mires_3d, scale_bars) # Rotation des nuages de points pour être perpendiculaire au plan moyen des mires mires_3d, point_cloud_before_excavation, point_cloud_after_excavation = self._rotate_2point_clouds_and_3Dtargets( point_cloud_before_excavation, point_cloud_after_excavation, mires_3d ) with open(os.path.join(self.output_dir, "02.2_SfM_scaleFactor.txt"), 'w') as file: file.write(f"facteur d'échelle : {scale_factor}") print("Redimensionnement des nuages de points en cours...") point_cloud_before_excavation = self._resize_point_clouds(point_cloud_before_excavation, scale_factor) point_cloud_after_excavation = self._resize_point_clouds(point_cloud_after_excavation, scale_factor) # -------------------------------------- Calcul du volume des trous ------------------------------------------- print("Calcul du volume en cours...") # on calcule le raster de la distance entre les deux nuages de points raster = self._cloud_to_cloud_distance(point_cloud_before_excavation, point_cloud_after_excavation) # on isole et on homogénéise la bande que nous allons étudier zone_tot = prospect_zone(raster) resolution = raster.resolution coords_mires_in_raster = get_coordinates_mires3d_in_raster(mires_3d, raster, scale_factor) min_width, max_width, min_height, max_height = self._calculate_detection_zone(resolution, coords_mires_in_raster) # on récupère les polygones détourant les trous holes_polygons = delimitate_holes(resolution, zone_tot, 0.01, 0.02, 0.007, 0.005, 0.4, coords_mires_in_raster, min_height, max_height, min_width, max_width) # on récupère les coordonnées des points des polygones dans l'espace du raster list_holes_coordinates = [polygon_coordinate_conversion(raster, hole) for hole in holes_polygons] # on récupère les trous détourés holes_cropped: list[tuple[PointCloud, PointCloud]] = [] for hole_coordinates in list_holes_coordinates: before = self.point_cloud_processor.crop_point_cloud(point_cloud_before_excavation, hole_coordinates) after = self.point_cloud_processor.crop_point_cloud(point_cloud_after_excavation, hole_coordinates) holes_cropped.append((before, after)) # on récupère les volumes des différents trous triés holes_volumes = [self.point_cloud_processor.volume_between_clouds(hole[0], hole[1]) for hole in holes_cropped] # ############################################################################################################## # ###################################### Enregistrement des résultats ########################################## # ############################################################################################################## # on enregistre au format txt les résultats with open(os.path.join(self.output_dir, "results.txt"), 'w') as file: file.write(f"nombre de trous détectés : {len(holes_volumes)}\n" "------------Trous triés de gauche à droite------------\n") for i in range(len(holes_volumes)): file.write(f"volume du trou n°{i + 1} : {holes_volumes[i]}\n") # on enregistre au format pdf les résultats save_plot_result(zone_tot, holes_polygons, holes_volumes, os.path.join(self.output_dir, "results.pdf"), display_padding, coords_mires_in_raster, min_height, max_height, min_width, max_width) with open(os.path.join(self.output_dir, "config.txt"), 'w') as file: file.write(self.get_config()) if logger.get_verbosity() >= logging.WARN: self._clean() return holes_volumes
[docs] def get_config(self): result = f"git-revision : {self.git_revision}\n" \ f"SFM : {self.sfm.get_config()}\n" \ f"PointCloudProcessor : {self.point_cloud_processor.get_config()}\n" \ f"DetecteurMire : {self.detecteur_mire.get_config()}\n" return result
def _mire_detection(self, photo_path_before_excavation: str, photo_path_after_excavation: str) -> list[Image]: print("Détection des mires présentes sur les images...") # on récupère les images ainsi que les coordonnées 2D de leurs mires images = self.detecteur_mire.detection_mires(photo_path_before_excavation) images += self.detecteur_mire.detection_mires(photo_path_after_excavation) return images @staticmethod def _check_scalebars_settings(images: list[Image], scale_bars: list[ScaleBar]): # vérification du paramétrage des scalebar id_scalebars = [] for scalebar in scale_bars: id_scalebars.append(scalebar.end.identifier) id_scalebars.append(scalebar.start.identifier) detected_target_are_in_loaded_scalebar = True # on boucle sur les mires détectées, chaque mire détectée doit etre dans les scalebar for im in images: for mir in im.mires_visibles: if mir.identifier not in id_scalebars: detected_target_are_in_loaded_scalebar = False logger.get_logger().warn(f"/!\\ La mire {mir.identifier} détectée dans l'image " f"{im.name} n'est pas dans les scalebars. Vérifier le fichier chargé.") return detected_target_are_in_loaded_scalebar def _generate_point_cloud(self, photo_path_before_excavation, photo_path_after_excavation): print("Début du calcul des nuages de points.") return self.sfm.generer_nuages_de_points(photo_path_before_excavation, photo_path_after_excavation) def _calculate_mire3d(self, images: list[Image]) -> (list[Mire3D], dict[int, float]): #on crée une liste vide de mires 3D que l'on va utiliser pour calculer la valeur moyenne et l'écart-type mires_3d: list[Mire3D] = [] # on crée aussi une liste qui contiendra pour chaque élément le nom de l'image, l'identifiant de la mire, ses coordonnées 2D et 3D images_et_mires_2d_et_3d: list = [] # pour calculer la bonne valeur de référence de coordonnées 3D de la mire on a besoin de d'abord calculer sur les mires non dupliquées index_mires_non_dupliquees: list[int] = [] # pour filtrer la bonne mire on a besoin de savoir pour chaque image quels sont les index de ligne avec mire dupliquée (traitement par image) dict_images_avec_mires_dupliquées = {} # lors de chaque traitement par image, pour savoir à quel élément du tableau global on est, on a besoin de savoir combien de lignes on a traité avant n_lines_processed_before_this_image = 0 # Boucle sur les images pour créer lignes du tableau global et détecter les lignes et les images correspondant à des mires détectées plusieurs fois dans une seule image for image in images: # sur chaque image on calcule les coordonnées 3D des mires mires_2d_in_this_image,mires_3d_in_this_image = self.sfm.calculer_coordinates_3d_mires(image) if len(mires_2d_in_this_image)!=len(mires_3d_in_this_image): print(f"WARNING on n'a pas le même nombre de mires 2D et 3D dans l'image {image.name} WARNING") # dans certains cas l'algo SfM ne trouve pas de point 3D, on se restreint donc au cas où il y en a if len(mires_3d_in_this_image)>0: # on cherche les éventuelles mires doublons ; on aura besoin de savoir si une mire est dupliquée pour faire un traitemetn particulier identifiers = [mir.identifier for mir in mires_2d_in_this_image] duplicate_in_image=[identifiers.count(a)>1 for a in identifiers] # pour chaque mire de l'image on ajoute la ligne dans le tableau for index, (dupl,mir2d, mir3d) in enumerate(zip(duplicate_in_image,mires_2d_in_this_image, mires_3d_in_this_image)): images_et_mires_2d_et_3d.append([image.name,mir2d.identifier,mir2d.coordinates,mir3d.coordinates]) # si la ligne correspond à une mire dupliquée on l'ajoute dans le dictionnaire des mires dupliquées par image if dupl: # l'ajout ne se fait pas de la même manière selon qu'on a déjà l'entrée dans le dictionnaire ou pas (à vérifier) if image not in dict_images_avec_mires_dupliquées: dict_images_avec_mires_dupliquées[image]=list([index + n_lines_processed_before_this_image]) else: dict_images_avec_mires_dupliquées[image].append(index + n_lines_processed_before_this_image) else: # on garde les index de lignes de mires non dupliquée pour le calcul de la coordonée 3D de référence index_mires_non_dupliquees.append(index + n_lines_processed_before_this_image) # on met à jour le nombre de lignes traitées n_lines_processed_before_this_image += index + 1 # on ajoute ces nouvelles mires 3D au vecteur sur lequel on fera nos stats en suite (moyenne, écart-type) mires_3d += mires_3d_in_this_image # Traitement des mires dupliquées # on calcule les mires_3d_moyennes à partir des images où les mires ne sont pas dupliquées mires_3d_moyens_tmp = calculate_average_mire_3d([mires_3d[i] for i in index_mires_non_dupliquees]) # on boucle sur les images qui ont des mires dupliquées # on prépare une liste des index de mauvaises mires qu'il faudra sortir du tableau index_bad_mires=[] for im in dict_images_avec_mires_dupliquées.keys(): # pour chaque image qui a une mire dupliquée on récupère les index de lignes de ces mires dans le tableau global idxs_lines_duplicate_mires_this_im = dict_images_avec_mires_dupliquées[im] # on crée une liste qui contiendra pour chaque mire la distance de la mire3D courante avec la mire3d moyenne dist_mir3D_mir3Dmoyenne = [] # on crée une liste des id de mire dupliquées de cette image pour vérifier qu'il n'y en a qu'une, seul cas traité pour l'instant. Tous les éléments de cette liste devraient être identiques ids_de_mires_dupliquees = [] # on traite donc les mires dupliquées de cette image for idx in idxs_lines_duplicate_mires_this_im: # on récupère les coordonnées 3D de cette mire2D x = images_et_mires_2d_et_3d[idx][3][0] y = images_et_mires_2d_et_3d[idx][3][1] z = images_et_mires_2d_et_3d[idx][3][2] # ainsi que son identifiant id_mire = images_et_mires_2d_et_3d[idx][1] # que l'on sauvegarde dans le liste des identifiants de mires dupliquées de cette image ids_de_mires_dupliquees.append(id_mire) # on récupère la coordonnée 3D moyenne temporaire de cette mire via son identifiant (xm, ym, zm) = mires_3d_moyens_tmp[[mir.identifier for mir in mires_3d_moyens_tmp].index(id_mire)].coordinates # on calcule la distance 3D dist=((x-xm)**2+(y-ym)**2+(z-zm)**2)**.5 # que l'on sauvegarde dans la liste dist_mir3D_mir3Dmoyenne.append(dist) if len(set(ids_de_mires_dupliquees))>1 : logger.get_logger().warn(f"/!\\ Plusieurs mires dupliquées ({set(ids_de_mires_dupliquees)}) " f"dans une seule image (image {im.name}). Cas non traité, bug potentiel") # à la fin de cette boucle la liste dist_mir3D_mir3Dmoyenne a la même longeur que idxs_lines_duplicate_mires_in_this_im # on récupère l'index de dist_mir3D_mir3Dmoyenne qui a la valeur minimale index_good_mire_this_image=dist_mir3D_mir3Dmoyenne.index(min(dist_mir3D_mir3Dmoyenne)) # on en déduit l'index de la bonne mire dans le tableau général index_good_mire=idxs_lines_duplicate_mires_this_im[index_good_mire_this_image] # on ajoute l'index de ligne du grand tableau correspondant à cette mire index_mires_non_dupliquees.append(index_good_mire) # on sauve aussi les index de lignes des mauvaises mires dans le tableau général idxs_lines_duplicate_mires_this_im.remove(index_good_mire) index_bad_mires+=idxs_lines_duplicate_mires_this_im # boucle sur toutes les images terminée, on nettoie le grand tableau en supprimant les lignes qui n'ont pas cet index et idem pour la liste de mires3D images_et_mires_2d_et_3d = [line for index, line in enumerate(images_et_mires_2d_et_3d) if index not in index_bad_mires] mires_3d = [mire for index, mire in enumerate(mires_3d) if index not in index_bad_mires] # ATTENTION arrivé ici tous les index sont devenus caducs car on a extrait les mauvaises mires des listes # sauvegarde : le "3Dorig" dans le nom du fichier est lié au fait que les coordonnées 3D des mires seront changées après rotation et mise à l'échelle with open(os.path.join(self.output_dir, "02.1_Sfm_toutes_mires_2D_et_3Dorig.txt"), 'w') as f: for line in images_et_mires_2d_et_3d: f.write(f"{line[0]},{line[1]}," f"{line[2][0]:.3f},{line[2][1]:.3f}," f"{line[3][0]:.3f},{line[3][1]:.3f},{line[3][2]:.3f}") f.write("\n") # on calcule les coordonnées moyennes de chaque mire 3d ainsi que l'écart type mires_3d_moyens = calculate_average_mire_3d(mires_3d) ecart_type = calculate_standard_deviation_mire_3d(mires_3d) return mires_3d_moyens, ecart_type def _fit_plane(self, x, y, z): # Solution de Ben trouvée sur StackOverFlow https://stackoverflow.com/a/44315488 tmp_A = [] tmp_b = [] for i in range(len(x)): tmp_A.append([x[i], y[i], 1]) tmp_b.append(z[i]) b = np.matrix(tmp_b).T A = np.matrix(tmp_A) fit = (A.T * A).I * A.T * b errors = b - A * fit residual = np.linalg.norm(errors) return fit[0, 0], fit[1, 0], fit[2, 0], errors, residual def _rotate_points_to_abc_plane(self, points, a, b, c): # Ensure points is a numpy array points = np.array(points) # Normal vector of the plane normal = np.array([-a, -b, 1]) normal = normal / np.linalg.norm(normal) # Calculate rotation axis (cross product of normal and [0, 0, 1]) rotation_axis = np.cross(normal, [0, 0, 1]) rotation_axis = rotation_axis / np.linalg.norm(rotation_axis) # Calculate rotation angle cos_theta = np.dot(normal, [0, 0, 1]) # sin_theta = np.linalg.norm(rotation_axis) sin_theta = np.sqrt(1 - cos_theta ** 2) # Construct rotation matrix using Rodriguez rotation formula K = np.array([[0, -rotation_axis[2], rotation_axis[1]], [rotation_axis[2], 0, -rotation_axis[0]], [-rotation_axis[1], rotation_axis[0], 0]]) R = np.eye(3) + sin_theta * K + (1 - cos_theta) * np.dot(K, K) # Apply rotation to all points rotated_points = np.dot(points, R.T) return rotated_points def _apply_two_rotations_to_one_point_cloud(self, point_cloud: PointCloud, a, b, c, rotmat): # on charge le premier nuage PTS = trimesh.load(point_cloud.path) # on le clone PTS_rotated = PTS # on change les points du nuage de point par la rotation des points du nuage initial pour que le Z fasse face au plan d'équation ax + by + c = z PTS_rotated.vertices = self._rotate_points_to_abc_plane(PTS.vertices, a, b, c) # on fait la rotation autour de Z pour avoir l'endroit où il y a le moins de mires en bas (= les réglets du haut en haut) x1, y1 = np.dot(rotmat,PTS_rotated.vertices[:,0:2].T) #on met à jour les coordonnées x, y du nuage de point PTS_rotated.vertices[:,0:2] = np.array([x1,y1]).T # on crée un nouveau nom de fichier en ajoutant _Rotated au nuage de base path_point_cloud = os.sep.join( [point_cloud.get_path_directory(), point_cloud.get_name_without_extension() + '_Rotated.ply']) out_file = open(path_point_cloud, 'wb') out_file.write(trimesh.exchange.ply.export_ply(PTS_rotated)) out_file.close() return PointCloud(path_point_cloud) def _rotate_2point_clouds_and_3Dtargets(self, point_cloud1: PointCloud, point_cloud2: PointCloud, mires_3d: Mire3D): # Get 3D coordinates of mires_3d x = [mire.coordinates[0] for mire in mires_3d] y = [mire.coordinates[1] for mire in mires_3d] z = [mire.coordinates[2] for mire in mires_3d] # fit a plane to these 3D points a, b, c, errors, residual = self._fit_plane(x, y, z) # compute the result of the rotation of the mires_3d (x,y,z) res = self._rotate_points_to_abc_plane(np.array([x, y, z]).T, a, b, c) # mise à jour des coordonnées des mires rotated_mires_3d = mires_3d xr = x yr = y for i in np.arange(len(mires_3d)): rotated_mires_3d[i].coordinates = res[i,:] xr[i] = rotated_mires_3d[i].coordinates[0] yr[i] = rotated_mires_3d[i].coordinates[1] # at this point we get a,b,c the plane fitted to initial position of the mires_3d and rotated_mires_3d # we need to rotate point clouds around the Z axis so that les mires horizontales soient en haut et les mires verticales soient verticales # # on postule que le "bas" est défini par l'endroit où il n'y a pas de réglet # # => c'est raccord avec la config en vertical/fosse = le bas est le côté où est le bac de prélévèement # # => c'est raccord avec la config en horizontal/par-dessus = le "bas" est le côté où se situe l'opérateur qui prélève, on laisse un côté sans mire pour simplifier # # dans ce cas de figure le barycentre des mires sera tiré du côté opposé où il n'y a pas de mire. # # on calcule le barycentre des mires avec la médiane (moins sensible aux extrèmes) => point H (np.median(xr) , np.median(yr)) # # on calcule le point milieu du rectangle englobant => point O, box_centre # # => la verticale orientée vers le haut est définie par le vecteur OH box_centre_xr, box_centre_yr = (np.min(xr) + np.max(xr)) / 2, (np.min(yr) + np.max(yr)) / 2 # l'angle de la rotation pour passer d'un vecteur (xr,yr) à un vecteur (0,1) c'est arctan (xr/yr) on fait donc arctan2(xr,yr) # voir arctan2 https://numpy.org/doc/2.1/reference/generated/numpy.arctan2.html#numpy.arctan2 angle = np.arctan2 ( (np.median(xr)-box_centre_xr), (np.median(yr)-box_centre_yr) ) rotmat = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) ## APPLICATION # MIRES 3D #on calcule les nouvelles coordonnées x,y xr2,yr2 = np.dot(rotmat, [xr, yr]) # on met à jour les mires déjà tournées dans le plan for i in np.arange(len(rotated_mires_3d)): rotated_mires_3d[i].coordinates[0] = xr2[i] rotated_mires_3d[i].coordinates[1] = yr2[i] # NUAGES DE POINTS #fonction qui fait les deux rotation sur un fichier ply point_cloud1 = self._apply_two_rotations_to_one_point_cloud(point_cloud1, a, b, c, rotmat) point_cloud2 = self._apply_two_rotations_to_one_point_cloud(point_cloud2, a, b, c, rotmat) # PTS_1 = trimesh.load(point_cloud2.path) # PTS_1_rotated = PTS_1 # PTS_1_rotated.vertices = self._rotate_points_to_abc_plane(PTS_1.vertices, a, b, c) # x2, y2 = np.dot(rotmat, PTS_1_rotated.vertices[0:2]) # PTS_1_rotated.vertices[0:2] = [x2, y2] # out_file = open(point_cloud2.path, 'wb') # out_file.write(trimesh.exchange.ply.export_ply(PTS_1_rotated)) # out_file.close() return rotated_mires_3d, point_cloud1, point_cloud2 def _resize_point_clouds(self, point_cloud: PointCloud, scale_factor: float): # on redimensionne les nuages de points à l'aide de ce facteur d'échelle resize_point_cloud = self.point_cloud_processor.mise_a_echelle(point_cloud, scale_factor) return resize_point_cloud def _cloud_to_cloud_distance(self, point_cloud_ref: PointCloud, point_cloud_compared: PointCloud) -> Raster: return self.point_cloud_processor.cloud_to_cloud_distance(point_cloud_ref, point_cloud_compared) @staticmethod def _calculate_detection_zone(resolution, coords_mires_in_raster): min_width = min(x for x, y in coords_mires_in_raster) max_width = max(x for x, y in coords_mires_in_raster) min_height = min(y for x, y in coords_mires_in_raster) max_height = max(y for x, y in coords_mires_in_raster) y_mean = statistics.mean([y for x, y in coords_mires_in_raster]) # Ajout de la marge en bas pour détecter les trous qui dépassent if abs(min_height - y_mean) < abs(max_height - y_mean): max_height += (0.1 / resolution) else: min_height += (0.1 / resolution) return min_width, max_width, min_height, max_height @staticmethod def _check_ecart_type(ecart_type: dict, threshold=0.1): for key, values in ecart_type.items(): if values.get('x') >= threshold or values.get('y') >= threshold or values.get('z') >= threshold: logger.get_logger().warn( f"L'écart type des coordonnées de la mire {key} est anormalement élevé {values}") def _clean(self): self.sfm.clean() self.detecteur_mire.clean() self.point_cloud_processor.clean()