Source code for osm_flex.clip

"""
This file is part of OSM-flex.
Copyright (C) 2023 OSM-flex contributors listed in AUTHORS.
OSM-flex is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
OSM-flex is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.
-----
clipping functions
"""

import logging
import numpy as np
import pathlib
import shapely
import subprocess
from cartopy.io import shapereader

from osm_flex.config import POLY_DIR, OSMCONVERT_PATH
[docs] LOGGER = logging.getLogger(__name__)
[docs] def get_admin1_shapes(country): """Provide Natural Earth registry info and shape files for countries Parameters ---------- country : str string of ISO 3166 code Returns ------- country_shapes : dict Shapes (according to Natural Earth) of admin1 regions of country with name as keys """ if not isinstance(country, str): LOGGER.error("country needs to be of type str") raise TypeError("Invalid type for input parameter 'country'") admin1_file = shapereader.natural_earth(resolution='10m', category='cultural', name='admin_1_states_provinces') admin1_recs = shapereader.Reader(admin1_file) admin1_shapes = {} for rec in admin1_recs.records(): if rec.attributes['adm0_a3'] == country: name = rec.attributes['name_en'] admin1_shapes[name] = rec.geometry if not admin1_shapes: raise LookupError(f'natural_earth records are empty for country {country}') return admin1_shapes
[docs] def get_country_shape(country): """Provide Natural Earth registry info and shape files for admin1 regions of chosen country Parameters ---------- country : str string of ISO 3166 code Returns ------- country_shape : (multi-)polygon Shape of the country according to Natural Earth. """ if not isinstance(country, str): LOGGER.error("country needs to be of type str") raise TypeError("Invalid type for input parameter 'country'") admin0_file = shapereader.natural_earth(resolution='10m', category='cultural', name='admin_0_countries') admin0_recs = shapereader.Reader(admin0_file) for rec in admin0_recs.records(): if rec.attributes['ADM0_A3'] == country: return rec.geometry raise LookupError(f'natural_earth records are empty for country {country}')
[docs] def _simplify_shapelist(geom_list, thres=None): """ remove tiny shapes and simplify outlines to save on file size for .poly files """ if thres is None: thresh = 0.1 if shapely.unary_union(geom_list).area > 1 else 0.01 geom_list = [geom for geom in geom_list if geom.area>thresh] return [geom.simplify(tolerance=0.01, preserve_topology=True) for geom in geom_list]
[docs] def _shapely2poly(geom_list, filename): """ Convert list of shapely (multi)polygon(s) into .poly files needed for osmosis to generate cut-outs from bigger osm.pbf files. Parameters --------- geom_list : list list of polygon, polygons or multipolygons containing a (complex) shape to be cut out of a bigger file filename : pathlib.Path or str output filename including directory path. Returns ------- None Note ---- For more info on what .poly files are (incl. several tools for creating them), see https://wiki.openstreetmap.org/wiki/Osmosis/Polygon_Filter_File_Format """ filename = pathlib.Path(filename).with_suffix('.poly') if filename.exists(): raise ValueError(f'File {filename} already exists, aborting.') # start writing the .poly file file = open(filename, 'w') file.write('Polygons' + "\n") # loop over the different polygons, get their exterior and write the # coordinates of the ring to the .poly file i = 0 for shape in geom_list: if shape.geom_type == 'MultiPolygon': polygons = shape.geoms elif shape.geom_type == 'Polygon': polygons = [shape] for polygon in polygons: polygon = np.array(polygon.exterior.coords) file.write(str(i) + "\n") i += 1 for ring in polygon: file.write(" " + str(ring[0]) + " " + str(ring[1]) +"\n") # close the ring of one subpolygon if done file.write("END" +"\n") # close the file when done file.write("END" +"\n") file.close()
[docs] def _build_osmconvert_cmd(shape, osmpbf_clip_from, osmpbf_output): """ builds osmconvert command for clipping Parameters ----------- shape : list or str or pathlib.Path list containing [xmin, ymin, xmax, ymax] for a bounding box or a string/Path to the .poly file path delimiting the bounds. osmpbf_clip_from: str or pathlib.Path file path to planet.osm.pbf or other osm.pbf file to clip osmpbf_output : str or pathlib.Path file path (incl. name & ending) under which extract will be stored Note ----- If cross-border multi-polygons or lines should be retained, comment out below options '--complete-ways' and '--complete-multipolygons'. This only works with parent files < 2GB. """ if isinstance(shape, (pathlib.Path, str)): return [str(OSMCONVERT_PATH), str(osmpbf_clip_from), f'-B={str(shape)}', f'-o={osmpbf_output}' ] #'--complete-ways', '--complete-multipolygons' # add option if isinstance(shape[0], (float, int)): return [str(OSMCONVERT_PATH), str(osmpbf_clip_from), '-b='+str(shape[0])+','+str(shape[1])+','+str(shape[2])+','+str(shape[3]), f'-o={osmpbf_output}' ] # '--complete-ways', '--complete-multipolygons' # add option
[docs] def _osmconvert_clip(shape, osmpbf_clip_from, osmpbf_output, overwrite=False): osmpbf_clip_from = pathlib.Path(osmpbf_clip_from) if not osmpbf_clip_from.suffix: osmpbf_clip_from = osmpbf_clip_from.with_suffix('.osm.pbf') if not osmpbf_clip_from.is_file(): raise ValueError(f"OSM file {osmpbf_clip_from} to clip from not found.") if ((not pathlib.Path(osmpbf_output).is_file()) or (pathlib.Path(osmpbf_output).is_file() and overwrite)): LOGGER.info("""File doesn`t yet exist or overwriting old one. Assembling osmosis command.""") cmd = _build_osmconvert_cmd(shape, osmpbf_clip_from, osmpbf_output) LOGGER.info('''Extracting from larger file... This will take a while''') return subprocess.run(cmd, stdout=subprocess.PIPE, universal_newlines=True)
[docs] def _build_osmosis_cmd(shape, osmpbf_clip_from, osmpbf_output): """ builds osmosis command for clipping Parameters ----------- shape : list or str or pathlib.Path list containing [xmin, ymin, xmax, ymax] for a bounding box or a string/Path to the .poly file path delimiting the bounds. osmpbf_clip_from: str or pathlib.Path file path to planet.osm.pbf or other osm.pbf file to clip osmpbf_output : str or pathlib.Path file path (incl. name & ending) under which extract will be stored """ if isinstance(shape, (pathlib.Path, str)): return ['osmosis', '--read-pbf', 'file='+str(osmpbf_clip_from), '--bounding-polygon', 'file='+str(shape), '--write-pbf', 'file='+str(osmpbf_output)] if isinstance(shape[0], (float, int)): return['osmosis', '--read-pbf', 'file='+str(osmpbf_clip_from), '--bounding-box', f'top={shape[3]}', f'left={shape[0]}', f'bottom={shape[1]}', f'right={shape[2]}', '--write-pbf', 'file='+str(osmpbf_output)] raise ValueError('''shape does not have the correct format. Only bounding boxes, shapely (multi-)polygons or filepaths to .poly files are allowed''')
[docs] def _osmosis_clip(shape, osmpbf_clip_from, osmpbf_output, overwrite=False): """ Runs the command line tool osmosis to cut out all map info within shape (bounding box or poygon(s)), from a bigger parent file. If your device doesn't have osmosis yet, see installation instructions: https://wiki.openstreetmap.org/wiki/Osmosis/Installation Parameters ----------- shape : list or str or pathlib.Path list containing [xmin, ymin, xmax, ymax] for a bounding box or a string/Path to the .poly file path delimiting the bounds. osmpbf_clip_from: str or pathlib.Path file path to planet.osm.pbf or other osm.pbf file to clip osmpbf_output : str or pathlib.Path file path (incl. name & ending) under which extract will be stored overwrite : bool default is False. Whether to overwrite files if they already exist. Returns ------- None or subprocess """ osmpbf_clip_from = pathlib.Path(osmpbf_clip_from) if not osmpbf_clip_from.suffix: osmpbf_clip_from = osmpbf_clip_from.with_suffix('.osm.pbf') if not osmpbf_clip_from.is_file(): raise ValueError(f"OSM file {osmpbf_clip_from} to clip from not found.") if ((not pathlib.Path(osmpbf_output).is_file()) or (pathlib.Path(osmpbf_output).is_file() and overwrite)): LOGGER.info("""File doesn`t yet exist or overwriting old one. Assembling osmosis command.""") cmd = _build_osmosis_cmd(shape, osmpbf_clip_from, osmpbf_output) LOGGER.info('''Extracting from larger file... This will take a while''') return subprocess.run(cmd, stdout=subprocess.PIPE, universal_newlines=True) raise ValueError(f"File {osmpbf_output} already exists. Abort.")
[docs] def clip_from_bbox(bbox, osmpbf_clip_from, osmpbf_output, overwrite=False, kernel='osmosis'): """ get OSM raw data from abounding-box, which is extracted from a bigger (e.g. the planet) file. Parameters ---------- bbox : list bounding box [xmin, ymin, xmax, ymax] osmpbf_clip_from: str or pathlib.Path file path to planet.osm.pbf or other osm.pbf file to clip osmpbf_output : str or pathlib.Path file path (incl. name & ending) under which extract will be stored overwrite : bool default is False. Whether to overwrite files if they already exist. kernel : str name of the clipping kernel: 'osmconvert' or 'osmosis' Default is 'osmosis'. Note ---- This function uses the command line tool osmosis to cut out new osm.pbf files from the original ones. Installation instructions (windows, linux, apple) - see https://wiki.openstreetmap.org/wiki/Osmosis/Installation """ # TODO: allow for osmpbf_output to be only file name & save in default DIR if kernel == 'osmosis': _osmosis_clip(bbox, osmpbf_clip_from, osmpbf_output, overwrite) return if kernel == 'osmconvert': _osmconvert_clip(bbox, osmpbf_clip_from, osmpbf_output, overwrite) return raise ValueError(f"Kernel '{kernel}' is not valid. Abort.")
[docs] def clip_from_poly(poly_file, osmpbf_clip_from, osmpbf_output, overwrite=False, kernel='osmosis'): """ get OSM raw data from a custom shape defined in .poly file which is clipped from a .osm.pbf file. Parameters ---------- poly_file : str file path to a .poly file osmpbf_output : str or pathlib.Path file path (incl. name & ending) under which extract will be stored clip_from_file : str or pathlib.Path file path to planet-latest.osm.pbf. Will download & store it as indicated, if doesn`t yet exist. Default is DATA_DIR/planet-latest.osm.pbf overwrite : bool default is False. Whether to overwrite files if they already exist. kernel : str name of the clipping kernel: 'osmconvert' or 'osmosis' Default is 'osmosis'. Note ---- This function uses the command line tool osmosis or osmconvert to clip new osm.pbf files from the original ones. Installation instructions (windows, linux, apple) - see https://wiki.openstreetmap.org/wiki/Osmosis/Installation or https://wiki.openstreetmap.org/wiki/Osmconvert """ if kernel == 'osmosis': _osmosis_clip(poly_file, osmpbf_clip_from, osmpbf_output, overwrite) return if kernel == 'osmconvert': _osmconvert_clip(poly_file, osmpbf_clip_from, osmpbf_output, overwrite) return raise ValueError(f"Kernel '{kernel}' is not valid. Abort.")
[docs] def clip_from_shapes(shape_list, osmpbf_clip_from, osmpbf_output, overwrite=False, kernel='osmosis'): """ get OSM raw data from a custom shape defined by a list of polygons which is extracted from the entire OSM planet file. The list of shapes first needs to be converted to a .poly file and then passed back to the function (under the hood, a temporary file is created and deleted upon completion again). Parameters ---------- shape_list : list list of (Multi-)Polygon(s) that define the shape which should be cut, as e.g. obtained osmpbf_output : str Full file path under which the clipped data will be stored. osmpbf_clip_from : str or pathlib.Path file path (including filename) to the *.osm.pbf file to clip from. overwrite : bool default is False. Whether to overwrite files if they already exist. kernel : str name of the clipping kernel: 'osmconvert' or 'osmosis' Default is 'osmosis'. Note ---- This function uses the command line tool osmosis or osmconvert to clip new osm.pbf files from the original ones. Installation instructions (windows, linux, apple) - see https://wiki.openstreetmap.org/wiki/Osmosis/Installation or https://wiki.openstreetmap.org/wiki/Osmconvert """ shape_list = _simplify_shapelist(shape_list) poly_file = POLY_DIR / 'temp_shp.poly' _shapely2poly(shape_list, poly_file) if kernel == 'osmosis': _osmosis_clip(poly_file, osmpbf_clip_from, osmpbf_output, overwrite) poly_file.unlink() return if kernel == 'osmconvert': _osmconvert_clip(poly_file, osmpbf_clip_from, osmpbf_output, overwrite) poly_file.unlink() return raise ValueError(f"Kernel '{kernel}' is not valid. Abort.")