from qgis.core import QgsApplication, QgsField, QgsProject, \ QgsProcessingFeedback, QgsVectorLayer, QgsVectorDataProvider, \ QgsExpressionContext, QgsExpressionContextUtils, edit from qgis.PyQt.QtCore import QVariant from qgis.analysis import QgsNativeAlgorithms from basic_functions import * import processing class ScrubLayer: def __init__(self, qgis_path, layer_path, layer_name): self.qgis_path = qgis_path # Set the path to QGIS installation QgsApplication.setPrefixPath(self.qgis_path, True) # # Initialize QGIS application # qgs = QgsApplication([], False) # qgs.initQgis() self.layer_path = layer_path self.layer_name = layer_name self.layer = self.load_layer() self.data_count = self.layer.featureCount() def load_layer(self): the_layer = QgsVectorLayer(self.layer_path, self.layer_name, 'ogr') if not the_layer.isValid(): raise ValueError(f'Failed to load layer {self.layer_name} from {self.layer_path}') else: QgsProject.instance().addMapLayer(the_layer) return the_layer def fix_geometries(self, fixed_layer): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) fix_geometries_params = { 'INPUT': self.layer, 'METHOD': 0, 'OUTPUT': fixed_layer } processing.run("native:fixgeometries", fix_geometries_params) def create_spatial_index(self): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) create_spatial_index_params = { 'INPUT': self.layer, 'OUTPUT': 'Output' } processing.run("native:createspatialindex", create_spatial_index_params) print(f'Creating Spatial index for {self.layer_name} is completed.') def spatial_join(self, joining_layer_path, joined_layer_path): """In QGIS, it is called 'Join attributes by Location'""" params = {'INPUT': self.layer, 'PREDICATE': [0], 'JOIN': joining_layer_path, 'JOIN_FIELDS': [], 'METHOD': 0, 'DISCARD_NONMATCHING': False, 'PREFIX': '', 'OUTPUT': joined_layer_path} feedback = QgsProcessingFeedback() processing.run('native:joinattributesbylocation', params, feedback=feedback) print(f'Spatial Join with input layer {self.layer_name} is completed.') def clip_layer(self, overlay_layer, clipped_layer): """This must be tested""" QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) clip_layer_params = { 'INPUT': self.layer_path, 'OVERLAY': overlay_layer, 'FILTER_EXPRESSION': '', 'FILTER_EXTENT': None, 'OUTPUT': clipped_layer } processing.run("native:clip", clip_layer_params) print(f'Clipping of {self.layer_name} is completed.') def clip_by_multiple(self, number_of_partitions, overlay_layers_dir, clipped_layers_dir): create_folders(clipped_layers_dir, number_of_partitions) for layer in range(number_of_partitions): overlay = overlay_layers_dir + f'/layer_{layer}/layer_{layer}.shp' clipped = clipped_layers_dir + f'/layer_{layer}/layer_{layer}.shp' self.clip_layer(overlay, clipped) clipped_layer = ScrubLayer(self.qgis_path, clipped, 'Temp Layer') clipped_layer.create_spatial_index() def split_layer(self, number_of_layers, splitted_layers_dir): number_of_layers -= 1 QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) create_folders(splitted_layers_dir, number_of_layers) intervals = self.data_count // number_of_layers for part in range(number_of_layers): output_layer_path = \ splitted_layers_dir + f'/layer_{part}/layer_{part}.shp' params = {'INPUT': self.layer, 'EXPRESSION': f'$id >= {part * intervals} ' f'AND $id < {(part + 1) * intervals}\r\n', 'OUTPUT': output_layer_path} processing.run("native:extractbyexpression", params) new_layer = ScrubLayer(self.qgis_path, output_layer_path, 'Temp Layer') new_layer.create_spatial_index() # Adding a folder for the remaining features os.makedirs(splitted_layers_dir + f'/layer_{number_of_layers}') output_layer_path = splitted_layers_dir + \ f'/layer_{number_of_layers}/layer_{number_of_layers}.shp' params = {'INPUT': self.layer, 'EXPRESSION': f'$id >= {number_of_layers * intervals}\r\n', 'OUTPUT': output_layer_path} processing.run("native:extractbyexpression", params) new_layer = ScrubLayer(self.qgis_path, output_layer_path, 'Temp Layer') new_layer.create_spatial_index() @staticmethod def merge_layers(layers_path, mergeded_layer_path): merging_layers = find_shp_files(layers_path) QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) params = {'LAYERS': merging_layers, 'CRS': None, 'OUTPUT': mergeded_layer_path} processing.run("native:mergevectorlayers", params) def multipart_to_singleparts(self, singleparts_layer_path): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) params = {'INPUT': self.layer, 'OUTPUT': singleparts_layer_path} processing.run("native:multiparttosingleparts", params) def delete_duplicates(self, deleted_duplicates_layer): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) params = {'INPUT': self.layer_path, 'OUTPUT': deleted_duplicates_layer} processing.run("native:deleteduplicategeometries", params) def delete_field(self, field_name): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) with edit(self.layer): # Get the index of the column to delete idx = self.layer.fields().indexFromName(field_name) # Delete the field self.layer.deleteAttribute(idx) # Update layer fields self.layer.updateFields() def add_field(self, new_field_name): functionalities = self.layer.dataProvider().capabilities() if functionalities & QgsVectorDataProvider.AddAttributes: new_field = QgsField(new_field_name, QVariant.Double) self.layer.dataProvider().addAttributes([new_field]) self.layer.updateFields() def assign_area(self, field_name): self.layer.startEditing() idx = self.layer.fields().indexFromName(field_name) context = QgsExpressionContext() context.appendScopes(QgsExpressionContextUtils.globalProjectLayerScopes(self.layer)) for feature in self.layer.getFeatures(): area = feature.geometry().area() feature[idx] = area self.layer.updateFeature(feature) self.layer.commitChanges() def __str__(self): return f'The {self.layer_name} has {self.data_count} records.' @staticmethod def cleanup(): QgsApplication.exitQgis()