164 lines
5.8 KiB
Python
164 lines
5.8 KiB
Python
from qgis.core import QgsApplication, QgsField, QgsProject, \
|
|
QgsProcessingFeedback, QgsVectorLayer, QgsVectorDataProvider, \
|
|
QgsExpressionContext, QgsExpressionContextUtils, edit
|
|
from qgis.PyQt.QtCore import QVariant
|
|
from qgis.analysis import QgsNativeAlgorithms
|
|
from basic_functions import *
|
|
import processing
|
|
|
|
|
|
class ScrubLayer:
|
|
def __init__(self, qgis_path, layer_path, layer_name):
|
|
# Set the path to QGIS installation
|
|
QgsApplication.setPrefixPath(qgis_path, True)
|
|
|
|
# Initialize QGIS application
|
|
qgs = QgsApplication([], False)
|
|
qgs.initQgis()
|
|
|
|
self.layer_path = layer_path
|
|
self.layer_name = layer_name
|
|
self.layer = self.load_layer()
|
|
self.data_count = self.layer.featureCount()
|
|
|
|
def load_layer(self):
|
|
the_layer = QgsVectorLayer(self.layer_path, self.layer_name, 'ogr')
|
|
if not the_layer.isValid():
|
|
raise ValueError(f'Failed to load layer {self.layer_name} from {self.layer_path}')
|
|
else:
|
|
QgsProject.instance().addMapLayer(the_layer)
|
|
return the_layer
|
|
|
|
def fix_geometries(self, fixed_layer):
|
|
QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms())
|
|
fix_geometries_params = {
|
|
'INPUT': self.layer,
|
|
'METHOD': 0,
|
|
'OUTPUT': fixed_layer
|
|
}
|
|
processing.run("native:fixgeometries", fix_geometries_params)
|
|
|
|
def create_spatial_index(self):
|
|
QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms())
|
|
create_spatial_index_params = {
|
|
'INPUT': self.layer,
|
|
'OUTPUT': 'Output'
|
|
}
|
|
processing.run("native:createspatialindex", create_spatial_index_params)
|
|
print(f'Creating Spatial index for {self.layer_name} is completed.')
|
|
|
|
def spatial_join(self, joining_layer_path, joined_layer_path):
|
|
"""In QGIS, it is called 'Join attributes by Location'"""
|
|
params = {'INPUT': self.layer,
|
|
'PREDICATE': [0],
|
|
'JOIN': joining_layer_path,
|
|
'JOIN_FIELDS': [],
|
|
'METHOD': 0,
|
|
'DISCARD_NONMATCHING': False,
|
|
'PREFIX': '',
|
|
'OUTPUT': joined_layer_path}
|
|
|
|
feedback = QgsProcessingFeedback()
|
|
processing.run('native:joinattributesbylocation', params, feedback=feedback)
|
|
print(f'Spatial Join with input layer {self.layer_name} is completed.')
|
|
|
|
def clip_layer(self, overlay_layer, clipped_layer):
|
|
"""This must be tested"""
|
|
QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms())
|
|
clip_layer_params = {
|
|
'INPUT': self.layer_path,
|
|
'OVERLAY': overlay_layer,
|
|
'FILTER_EXPRESSION': '',
|
|
'FILTER_EXTENT': None,
|
|
'OUTPUT': clipped_layer
|
|
}
|
|
processing.run("native:clip", clip_layer_params)
|
|
print(f'Clipping of {self.layer_name} is completed.')
|
|
|
|
def multipart_to_singleparts(self, singleparts_layer_path):
|
|
QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms())
|
|
params = {'INPUT': self.layer,
|
|
'OUTPUT': singleparts_layer_path}
|
|
processing.run("native:multiparttosingleparts", params)
|
|
|
|
def split_layer(self, number_of_layers, splitted_layers_dir, app_path):
|
|
number_of_layers -= 1
|
|
QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms())
|
|
create_folders(splitted_layers_dir, number_of_layers)
|
|
intervals = self.data_count // number_of_layers
|
|
for part in range(number_of_layers):
|
|
output_layer_path = \
|
|
splitted_layers_dir + f'/layer_{part}/layer_{part}.shp'
|
|
params = {'INPUT': self.layer,
|
|
'EXPRESSION': f'$id >= {part * intervals} '
|
|
f'AND $id < {(part + 1) * intervals}\r\n',
|
|
'OUTPUT': output_layer_path}
|
|
|
|
processing.run("native:extractbyexpression", params)
|
|
|
|
new_layer = ScrubLayer(app_path, output_layer_path, 'Temp Layer')
|
|
new_layer.create_spatial_index()
|
|
|
|
remaining_features = number_of_layers
|
|
os.makedirs(splitted_layers_dir + f'/layer_{remaining_features}')
|
|
output_layer_path = splitted_layers_dir + \
|
|
f'/layer_{remaining_features}/layer_{remaining_features}.shp'
|
|
params = {'INPUT': self.layer,
|
|
'EXPRESSION': f'$id >= {number_of_layers * intervals}\r\n',
|
|
'OUTPUT': output_layer_path}
|
|
|
|
processing.run("native:extractbyexpression", params)
|
|
new_layer = ScrubLayer(app_path, output_layer_path, 'Temp Layer')
|
|
new_layer.create_spatial_index()
|
|
|
|
def delete_duplicates(self, deleted_duplicates_layer):
|
|
QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms())
|
|
params = {'INPUT': self.layer_path,
|
|
'OUTPUT': deleted_duplicates_layer}
|
|
processing.run("native:deleteduplicategeometries", params)
|
|
|
|
def delete_field(self, field_name):
|
|
QgsApplication.processingRegistry().addProvider(QgsNativeAlgorithms())
|
|
with edit(self.layer):
|
|
# Get the index of the column to delete
|
|
idx = self.layer.fields().indexFromName(field_name)
|
|
|
|
# Delete the field
|
|
self.layer.deleteAttribute(idx)
|
|
|
|
# Update layer fields
|
|
self.layer.updateFields()
|
|
|
|
def add_field(self, new_field_name):
|
|
functionalities = self.layer.dataProvider().capabilities()
|
|
|
|
if functionalities & QgsVectorDataProvider.AddAttributes:
|
|
new_field = QgsField(new_field_name, QVariant.Double)
|
|
self.layer.dataProvider().addAttributes([new_field])
|
|
self.layer.updateFields()
|
|
|
|
def assign_area(self, field_name):
|
|
self.layer.startEditing()
|
|
idx = self.layer.fields().indexFromName(field_name)
|
|
|
|
context = QgsExpressionContext()
|
|
context.appendScopes(QgsExpressionContextUtils.globalProjectLayerScopes(self.layer))
|
|
|
|
for feature in self.layer.getFeatures():
|
|
area = feature.geometry().area()
|
|
feature[idx] = area
|
|
self.layer.updateFeature(feature)
|
|
|
|
self.layer.commitChanges()
|
|
|
|
def __str__(self):
|
|
return f'The {self.layer_name} has {self.data_count} records.'
|
|
|
|
@staticmethod
|
|
def cleanup():
|
|
QgsApplication.exitQgis()
|
|
|
|
|
|
|
|
|