Spaces:
Runtime error
Runtime error
| import Muscat.Containers.ElementsDescription as ED | |
| from Muscat.FE.FETools import PrepareFEComputation | |
| from Muscat.FE.Fields.FEField import FEField | |
| import Muscat.Containers.MeshModificationTools as MMT | |
| from Muscat.Containers import MeshGraphTools as MGT | |
| from Muscat.Containers.Filters import FilterObjects as FO | |
| from Muscat.Containers.Filters import FilterOperators as FOp | |
| import Muscat.Containers.MeshInspectionTools as MIT | |
| out_fields_names = ['Ux', 'Uy', 'p', 'nut'] | |
| in_scalars_names = ['angle_of_attack', 'inlet_velocity'] | |
| from Muscat.Containers.NativeTransfer import NativeTransfer | |
| import copy | |
| import bisect | |
| import numpy as np | |
| from scipy import spatial | |
| import pickle | |
| from Muscat.Bridges.CGNSBridge import CGNSToMesh | |
| import torch | |
| device = torch.device("cpu") | |
| num_steps = [2_000] | |
| from plaid.containers.dataset import Sample | |
| def pretreat_sample(mesh): | |
| ################### | |
| # Compute the skin of the mesh (containing the external boundary and the airfoil boundary) | |
| ################### | |
| MMT.ComputeSkin(mesh, md = 2, inPlace = True) | |
| ################### | |
| # Extract ids of the bar elements corresponding to the airfoil | |
| ################### | |
| ff1 = FO.ElementFilter(zone = lambda p: (-p[:,0]-1.99)) | |
| ff2 = FO.ElementFilter(zone = lambda p: (p[:,0]-3.99)) | |
| ff3 = FO.ElementFilter(zone = lambda p: (-p[:,1]-1.49)) | |
| ff4 = FO.ElementFilter(zone = lambda p: (p[:,1]-1.49)) | |
| efAirfoil = FOp.IntersectionFilter(filters=[ff1, ff2, ff3, ff4]) | |
| airfoil_ids = efAirfoil.GetIdsToTreat(mesh, "bar2") | |
| ################### | |
| # Preparations | |
| ################### | |
| ext_bound = np.setdiff1d(mesh.elements["bar2"].GetTag("Skin").GetIds(), airfoil_ids) | |
| mesh.elements["bar2"].GetTag("External_boundary").SetIds(ext_bound) | |
| mesh.elements["bar2"].GetTag("Airfoil").SetIds(airfoil_ids) | |
| nfExtBoundary = FO.NodeFilter(eTag = "External_boundary") | |
| nodeIndexExtBoundary = nfExtBoundary.GetNodesIndices(mesh) | |
| mesh.GetNodalTag("External_boundary").AddToTag(nodeIndexExtBoundary) | |
| nfAirfoil = FO.NodeFilter(eTag = "Airfoil") | |
| nodeIndexAirfoil = nfAirfoil.GetNodesIndices(mesh) | |
| mesh.GetNodalTag("Airfoil").AddToTag(nodeIndexAirfoil) | |
| ################### | |
| # Add node tag for the intrado and extrado | |
| ################### | |
| nfAirfoil = FO.NodeFilter(eTag = "Airfoil") | |
| nodeIndexAirfoil = nfAirfoil.GetNodesIndices(mesh) | |
| mesh.GetNodalTag("Airfoil").AddToTag(nodeIndexAirfoil) | |
| airfoil = ExtractAirfoil(mesh) | |
| indices_extrado = airfoil[0][0] | |
| indices_intrado = airfoil[0][1] | |
| mesh.GetNodalTag("Extrado").AddToTag(indices_extrado) | |
| mesh.GetNodalTag("Intrado").AddToTag(indices_intrado) | |
| efExtrado = FO.ElementFilter(nTag = "Extrado") | |
| efIntrado = FO.ElementFilter(nTag = "Intrado") | |
| mesh.elements["bar2"].GetTag("Extrado").SetIds(efExtrado.GetIdsToTreat(mesh, "bar2")) | |
| mesh.elements["bar2"].GetTag("Intrado").SetIds(efIntrado.GetIdsToTreat(mesh, "bar2")) | |
| def ExtractPathFromMeshOfBars(mesh, startingClosestToPoint, trigo_dir = True): | |
| nodeGraph0Airfoild = MGT.ComputeNodeToNodeGraph(mesh, dimensionality=1) | |
| nodeGraphAirfoild = [list(nodeGraph0Airfoild[i].keys()) for i in range(nodeGraph0Airfoild.number_of_nodes())] | |
| tree = spatial.KDTree(mesh.nodes) | |
| _, indicesTrailEdge = tree.query([startingClosestToPoint], k=1) | |
| p1init = indicesTrailEdge[0] | |
| temp1=mesh.nodes[nodeGraphAirfoild[p1init][0]][1] | |
| temp2=mesh.nodes[nodeGraphAirfoild[p1init][1]][1] | |
| if trigo_dir: | |
| condition = temp1 > temp2 | |
| else: | |
| condition = temp1 < temp2 | |
| if condition: | |
| p2 = nodeGraphAirfoild[p1init][0] | |
| else: | |
| p2 = nodeGraphAirfoild[p1init][1] | |
| p1 = p1init | |
| path = [p1, p2] | |
| while p2 != p1init: | |
| p2save = p2 | |
| tempArray = np.asarray(nodeGraphAirfoild[p2]) | |
| p2 = tempArray[tempArray!=p1][0] | |
| p1 = p2save | |
| path.append(p2) | |
| return path | |
| def ExtractAirfoil(mesh): | |
| efAirfoil = FO.ElementFilter(elementType=ED.Bar_2, eTag=["Airfoil"]) | |
| airfoilMesh = MIT.ExtractElementsByElementFilter(mesh, efAirfoil) | |
| path = ExtractPathFromMeshOfBars(airfoilMesh, np.array([1.,0.])) | |
| tree = spatial.KDTree(airfoilMesh.nodes[path]) | |
| _, indicesLeadEdge = tree.query([[0.,0.]], k=1) | |
| indices_extrado = path[:indicesLeadEdge[0]+1] | |
| indices_intrado = path[indicesLeadEdge[0]:] | |
| indices_airfoil = [indices_extrado, indices_intrado] | |
| nodes_extrado = mesh.nodes[indices_extrado] | |
| nodes_intrado = mesh.nodes[indices_intrado] | |
| nodes_airfoil = [nodes_extrado, nodes_intrado] | |
| return indices_airfoil, nodes_airfoil | |
| def computeAirfoilCurvAbscissa(airfoil): | |
| indices_airfoil = airfoil[0] | |
| nodes_airfoil = airfoil[1] | |
| curv_abscissa = [] | |
| for i in range(2): | |
| local_curv_abscissa = np.zeros(len(indices_airfoil[i])) | |
| for j in range(1,len(local_curv_abscissa)): | |
| local_curv_abscissa[j] = local_curv_abscissa[j-1] + np.linalg.norm(nodes_airfoil[i][j]-nodes_airfoil[i][j-1]) | |
| local_curv_abscissa /= local_curv_abscissa[-1] | |
| curv_abscissa.append(local_curv_abscissa) | |
| return curv_abscissa | |
| def MapAirfoil(airfoil_ref, curv_abscissa_ref, curv_abscissa): | |
| nodes_airfoil_ref = airfoil_ref[1] | |
| dim_nodes = nodes_airfoil_ref[0][0].shape[0] | |
| mapped_airfoil = [] | |
| for i in range(2): | |
| local_mapped_airfoil = np.zeros((len(curv_abscissa[i])-1, dim_nodes)) | |
| for j in range(len(curv_abscissa[i])-1): | |
| index = max(bisect.bisect_right(curv_abscissa_ref[i], curv_abscissa[i][j]) - 1, 0) | |
| a = nodes_airfoil_ref[i][index] | |
| b = nodes_airfoil_ref[i][index+1] | |
| dl = curv_abscissa[i][j] - curv_abscissa_ref[i][index] | |
| dir = (b-a)/np.linalg.norm(b-a) | |
| local_mapped_airfoil[j] = a + dl * dir | |
| mapped_airfoil.append(local_mapped_airfoil) | |
| return mapped_airfoil | |
| def GetFieldTransferOpCppStep1(inputField, nbThreads = None): | |
| method="Interp/Clamp" | |
| nt = NativeTransfer() | |
| if nbThreads is not None: | |
| nt.SetMaxConcurrency(nbThreads) | |
| nt.SetTransferMethod(method) | |
| defaultOptions = {"usePointSearch": True, | |
| "useElementSearch": False, | |
| "useElementSearchFast": False, | |
| "useEdgeSearch": True, | |
| } | |
| options = {} | |
| defaultOptions.update(options) | |
| dispatch = {"usePointSearch": nt.SetUsePointSearch, | |
| "useElementSearch": nt.SetUseElementSearch, | |
| "useElementSearchFast": nt.SetUseElementSearchFast, | |
| "useEdgeSearch": nt.SetUseEdgeSearch, | |
| "DifferentialOperator": nt.SetDifferentialOperator, | |
| } | |
| for k, v in defaultOptions.items(): | |
| if k in dispatch.keys(): | |
| dispatch[k](v) | |
| else: | |
| raise RuntimeError(f"Option {k} not valid") | |
| nt.SetSourceFEField(inputField, None) | |
| return nt | |
| def GetFieldTransferOpCppStep2(nt, targetPoints): | |
| nt.SetTargetPoints(targetPoints) | |
| nt.Compute() | |
| op = nt.GetOperator() | |
| status = nt.GetStatus() | |
| return op, status | |
| def morph_sample(mesh, airfoil_0, curv_abscissa_0): | |
| airfoil_1 = ExtractAirfoil(mesh) | |
| curv_abscissa_1 = computeAirfoilCurvAbscissa(airfoil_1) | |
| mapped_airfoil = MapAirfoil(airfoil_0, curv_abscissa_0, curv_abscissa_1) | |
| ############################################################## | |
| # Compute global target displacement and masks for RBF field morphing | |
| ############################################################## | |
| indices_extrado_to_morph_1 = airfoil_1[0][0][:-1] | |
| indices_intrado_to_morph_1 = airfoil_1[0][1][:-1] | |
| other_boundary_ids_1 = mesh.GetNodalTag("External_boundary").GetIds() | |
| l1 = len(indices_extrado_to_morph_1) | |
| l2 = len(indices_intrado_to_morph_1) | |
| l3 = len(other_boundary_ids_1) | |
| targetDisplacement = np.zeros((l1 + l2 + l3, 2)) | |
| targetDisplacementMask = np.zeros((l1 + l2 + l3), dtype = int) | |
| targetDisplacement[:l1] = mapped_airfoil[0] - mesh.nodes[indices_extrado_to_morph_1] | |
| targetDisplacement[l1:l1+l2] = mapped_airfoil[1] - mesh.nodes[indices_intrado_to_morph_1] | |
| targetDisplacement[l1+l2:l1+l2+l3] = np.zeros((l3,2)) | |
| targetDisplacementMask[:l1] = indices_extrado_to_morph_1 | |
| targetDisplacementMask[l1:l1+l2] = indices_intrado_to_morph_1 | |
| targetDisplacementMask[l1+l2:l1+l2+l3] = other_boundary_ids_1 | |
| ############################################################## | |
| # Compute the morphing | |
| ############################################################## | |
| # RBF morphing | |
| mesh_nodes = mesh.nodes.copy() | |
| morphed_nodes = MMT.Morphing(mesh, targetDisplacement, targetDisplacementMask, radius=None) | |
| mesh.nodes = morphed_nodes | |
| mesh.nodeFields['X'] = mesh_nodes[:,0] | |
| mesh.nodeFields['Y'] = mesh_nodes[:,1] | |
| return mesh | |
| def project_sample(morphed_mesh, morphed_mesh_0): | |
| projected_mesh = copy.deepcopy(morphed_mesh_0) | |
| space_, numberings_, _, _ = PrepareFEComputation(morphed_mesh) | |
| inputFEField = FEField(name="dummy", mesh=morphed_mesh, space=space_, numbering=numberings_[0]) | |
| nt = GetFieldTransferOpCppStep1(inputFEField, 1) | |
| FE_interpolation_op, _ = GetFieldTransferOpCppStep2(nt, morphed_mesh_0.nodes) | |
| for pfn in out_fields_names + ['X', 'Y']: | |
| projected_mesh.nodeFields[pfn] = FE_interpolation_op.dot(morphed_mesh.nodeFields[pfn]) | |
| return projected_mesh | |
| def pretreat_morph_and_project_mesh(i_sample, dataset, indices, morphed_mesh_0, airfoil_0, curv_abscissa_0): | |
| ################### | |
| ## 1) Pretreat data | |
| ################### | |
| sample = Sample.model_validate(pickle.loads(dataset[int(indices[i_sample])]["sample"])) | |
| mesh = CGNSToMesh(sample.get_mesh()) | |
| pretreat_sample(mesh) | |
| ################### | |
| ## 2) Morph data | |
| ################### | |
| morphed_mesh = morph_sample(mesh, airfoil_0, curv_abscissa_0) | |
| # print("morphed_mesh =", morphed_mesh) | |
| # from Muscat.IO import XdmfWriter as XW | |
| # XW.WriteMeshToXdmf("morphed_mesh.xdmf", morphed_mesh) | |
| ################### | |
| ## 3) Project data | |
| ################### | |
| projected_mesh = project_sample(morphed_mesh, morphed_mesh_0) | |
| # print("projected_mesh =", projected_mesh) | |
| # from Muscat.IO import XdmfWriter as XW | |
| # XW.WriteMeshToXdmf("projected_mesh.xdmf", projected_mesh) | |
| in_scalars = [sample.get_scalar(sn) for sn in in_scalars_names] | |
| return [projected_mesh, in_scalars, morphed_mesh.nodes] | |
| def infer(dataset, indice, training_data): | |
| common_mesh, correlationOperator2c, airfoil_0, curv_abscissa_0, scalar_scalers, scalerX, pca_clouds, pca_fields, y_scalers, kmodels, X_train, y_train = training_data | |
| projected_mesh, in_scalars, morphed_mesh_nodes = pretreat_morph_and_project_mesh(indice, dataset = dataset, indices = np.arange(len(dataset)), morphed_mesh_0 = common_mesh, airfoil_0 = airfoil_0, curv_abscissa_0 = curv_abscissa_0) | |
| space_, numberings_, _, _ = PrepareFEComputation(common_mesh) | |
| inputFEField_0 = FEField(name="dummy", mesh=common_mesh, space=space_, numbering=numberings_[0]) | |
| nt_0 = GetFieldTransferOpCppStep1(inputFEField_0, nbThreads = 2) | |
| iffo = GetFieldTransferOpCppStep2(nt_0, morphed_mesh_nodes)[0] | |
| clouds = np.stack([projected_mesh.nodeFields["X"], projected_mesh.nodeFields["Y"]], axis=1) | |
| scalars = np.array(in_scalars) | |
| clouds = clouds.reshape(-1, 1) | |
| X_pca = np.dot(pca_clouds, correlationOperator2c.dot(clouds)).T | |
| X_scalars = scalar_scalers.transform(scalars.reshape(1, -1)) | |
| unscaled_X = np.concatenate([X_pca, X_scalars], axis=-1) | |
| X = scalerX.transform(unscaled_X) | |
| predictions = {} | |
| y = [] | |
| for i, fn in enumerate(out_fields_names): | |
| X_ = torch.tensor(X, dtype=torch.float64).to(device) | |
| output_dim = pca_fields[0].shape[0] | |
| n_samples = 1 | |
| y_pred_i = np.empty((n_samples, output_dim, len(num_steps))) | |
| for j in range(output_dim): | |
| for k in range(len(num_steps)): | |
| y_pred_i[:,j,k] = kmodels[i][j][k](X_).detach().cpu().numpy().squeeze() | |
| y.append(y_pred_i) | |
| y_pred_i = np.mean(y_pred_i, axis = 2) | |
| y_pred_i = y_scalers[i].inverse_transform(y_pred_i) | |
| y_pred_common_i = np.dot(y_pred_i, pca_fields[i]).flatten() | |
| predictions[fn] = iffo.dot(y_pred_common_i) | |
| return predictions | |