Files
esenke 01adcfdf60 init
2025-12-08 22:16:31 +08:00

159 lines
4.9 KiB
Python

import xml.dom.minidom
import os
from glob import glob
import zipfile
from shapely import wkt
import geopandas as gpd
from osgeo import gdal
import imageio.v2 as iio
def parse_metadata(meta_xml_file):
"""Parse Sentinel-2 metadata XML file
Args:
meta_xml_file: Path to metadata XML file
Returns:
dict: Metadata information including sensing time, product URI, etc.
"""
record = {}
try:
dom = xml.dom.minidom.parse(meta_xml_file)
# Get sensing start time
sensing_start = dom.getElementsByTagName('DATATAKE_SENSING_START')[0].firstChild.data[0:10]
# Get product URI and image paths
product_uri = dom.getElementsByTagName('PRODUCT_URI')[0].firstChild.data
image_file = dom.getElementsByTagName('IMAGE_FILE')[0].firstChild.data
items = image_file.split('/')
granule_path = items[1]
img_name = items[4].split('_')[0] + '_' + items[4].split('_')[1]
# Get footprint
footprint = dom.getElementsByTagName('EXT_POS_LIST')[0].firstChild.data
geom_wkt = convert_footprint_to_wkt(footprint)
# Get cloud coverage info
cloud_coverage = float(dom.getElementsByTagName('Cloud_Coverage_Assessment')[0].firstChild.data)
cloud_shadow = float(dom.getElementsByTagName('CLOUD_SHADOW_PERCENTAGE')[0].firstChild.data)
medium_clouds = float(dom.getElementsByTagName('MEDIUM_PROBA_CLOUDS_PERCENTAGE')[0].firstChild.data)
high_clouds = float(dom.getElementsByTagName('HIGH_PROBA_CLOUDS_PERCENTAGE')[0].firstChild.data)
record.update({
'product_uri': product_uri,
'sensing_start': sensing_start,
'granule_path': granule_path,
'img_name': img_name,
'cloud_cover': cloud_coverage,
'cloud_shadow': cloud_shadow,
'medium_clouds': medium_clouds,
'high_clouds': high_clouds,
'geom_wkt': geom_wkt
})
except Exception as e:
print(f'Failed to parse XML: {e}')
return record
def convert_footprint_to_wkt(footprint):
"""Convert footprint string to WKT format"""
coords = footprint.strip().split(' ')
wkt_coords = []
for i in range(0, len(coords), 2):
wkt_coords.append(f"{coords[i+1]} {coords[i]}")
return f"MULTIPOLYGON ((({','.join(wkt_coords)})))"
def zip2rec(fn_zip):
id_img = os.path.splitext(os.path.basename(fn_zip))[0]
archive = zipfile.ZipFile(fn_zip, 'r')
fn_xml = archive.open(os.path.join(f'{id_img}.SAFE', 'MTD_MSIL2A.xml'))
rec = parse_metadata(fn_xml)
rec['geometry'] = wkt.loads(rec['geom_wkt'])
thumb = archive.open(os.path.join(f'{id_img}.SAFE', f'{id_img}-ql.jpg'))
thumb = iio.imread(thumb)
rec['thumb'] = thumb
return rec
def build_catalog(path, fn='catalog'):
'''
fn: filename or None
'''
list_fnames = glob(os.path.join(path, 'S2*.zip'))
list_rec = []
for fn_zip in list_fnames:
rec = zip2rec(fn_zip)
list_rec.append(rec)
gdf = gpd.GeoDataFrame(list_rec, crs='EPSG:4326').drop(columns='geom_wkt')
if fn is not None:
fn_geojson = os.path.join(path, f"{fn}.geojson")
gdf.to_file(fn_geojson, driver='GeoJSON')
return fn_geojson
else:
return gdf
def make_full_name(rec, band):
dict_bands = {
'B2': ['B02', '10m'],
'B3': ['B03', '10m'],
'B4': ['B04', '10m'],
'B8': ['B08', '10m'],
'B5': ['B05', '20m'],
'B6': ['B06', '20m'],
'B7': ['B07', '20m'],
'B8A': ['B8A', '20m'],
'B11': ['B11', '20m'],
'B12': ['B12', '20m'],
'SCL': ['SCL', '20m'],
}
fn_template = os.path.join(
'{p0}', 'GRANULE',
'{p1}', 'IMG_DATA', "R{p2}",
'{p3}_{p4}_{p2}.jp2'
)
return fn_template.format(**{
'p0': rec['product_uri'],
'p0b': rec['product_uri'].split('.')[0],
'p1': rec['granule_path'],
'p2': dict_bands[band][1],
'p3': rec['img_name'],
'p4': dict_bands[band][0],
})
def warp(
ds, outputBounds,
outputBoundsSRS='EPSG:4326',
xRes=10, yRes=10, targetAlignedPixels=True,
**kwargs,
):
options_warp = gdal.WarpOptions(
format="MEM",
outputBounds=outputBounds,
outputBoundsSRS=outputBoundsSRS,
xRes=xRes, yRes=yRes, targetAlignedPixels=targetAlignedPixels,
**kwargs,
)
ds_warp = gdal.Warp('', ds, options=options_warp)
return ds_warp
def get_ndarray(
ds, outputBounds,
outputBoundsSRS='EPSG:4326',
xRes=10, yRes=10, targetAlignedPixels=True,
**kwargs,
):
ds_warp = warp(
ds, outputBounds,
outputBoundsSRS='EPSG:4326',
xRes=10, yRes=10, targetAlignedPixels=True,
**kwargs
)
arr = ds_warp.ReadAsArray()
ds_warp = None
return arr