from qgis.core import QgsApplication, QgsField, QgsProject, \ QgsProcessingFeedback, QgsVectorLayer, QgsVectorDataProvider, \ QgsExpressionContext, QgsExpressionContextUtils, edit from qgis.PyQt.QtCore import QVariant from qgis.analysis import QgsNativeAlgorithms from basic_functions import * import processing class ScrubLayer: def __init__(self, qgis_path, layer_path, layer_name): # Set the path to QGIS installation QgsApplication.setPrefixPath(qgis_path, True) # Initialize QGIS application qgs = QgsApplication([], False) qgs.initQgis() self.layer_path = layer_path self.layer_name = layer_name self.layer = self.load_layer() self.data_count = self.layer.featureCount() def load_layer(self): the_layer = QgsVectorLayer(self.layer_path, self.layer_name, 'ogr') if not the_layer.isValid(): raise ValueError(f'Failed to load layer {self.layer_name} from {self.layer_path}') else: QgsProject.instance().addMapLayer(the_layer) return the_layer def fix_geometries(self, fixed_layer): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) fix_geometries_params = { 'INPUT': self.layer, 'METHOD': 0, 'OUTPUT': fixed_layer } processing.run("native:fixgeometries", fix_geometries_params) def create_spatial_index(self): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) create_spatial_index_params = { 'INPUT': self.layer, 'OUTPUT': 'Output' } processing.run("native:createspatialindex", create_spatial_index_params) print(f'Creating Spatial index for {self.layer_name} is completed.') def spatial_join(self, joining_layer_path, joined_layer_path): """In QGIS, it is called 'Join attributes by Location'""" params = {'INPUT': self.layer, 'PREDICATE': [0], 'JOIN': joining_layer_path, 'JOIN_FIELDS': [], 'METHOD': 0, 'DISCARD_NONMATCHING': False, 'PREFIX': '', 'OUTPUT': joined_layer_path} feedback = QgsProcessingFeedback() processing.run('native:joinattributesbylocation', params, feedback=feedback) print(f'Spatial Join with input layer {self.layer_name} is completed.') def clip_layer(self, overlay_layer, clipped_layer): """This must be tested""" QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) clip_layer_params = { 'INPUT': self.layer_path, 'OVERLAY': overlay_layer, 'FILTER_EXPRESSION': '', 'FILTER_EXTENT': None, 'OUTPUT': clipped_layer } processing.run("native:clip", clip_layer_params) print(f'Clipping of {self.layer_name} is completed.') def multipart_to_singleparts(self, singleparts_layer_path): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) params = {'INPUT': self.layer, 'OUTPUT': singleparts_layer_path} processing.run("native:multiparttosingleparts", params) def split_layer(self): pass def delete_duplicates(self, deleted_duplicates_layer): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) params = {'INPUT': self.layer_path, 'OUTPUT': deleted_duplicates_layer} processing.run("native:deleteduplicategeometries", params) def delete_field(self, field_name): QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms()) with edit(self.layer): # Get the index of the column to delete idx = self.layer.fields().indexFromName(field_name) # Delete the field self.layer.deleteAttribute(idx) # Update layer fields self.layer.updateFields() def add_field(self, new_field_name): functionalities = self.layer.dataProvider().capabilities() if functionalities & QgsVectorDataProvider.AddAttributes: new_field = QgsField(new_field_name, QVariant.Double) self.layer.dataProvider().addAttributes([new_field]) self.layer.updateFields() def assign_area(self, field_name): self.layer.startEditing() idx = self.layer.fields().indexFromName(field_name) context = QgsExpressionContext() context.appendScopes(QgsExpressionContextUtils.globalProjectLayerScopes(self.layer)) for feature in self.layer.getFeatures(): area = feature.geometry().area() feature[idx] = area self.layer.updateFeature(feature) self.layer.commitChanges() def __str__(self): return f'The {self.layer_name} has {self.data_count} records.' @staticmethod def cleanup(): QgsApplication.exitQgis()