from __future__ import annotations
import warnings
from pathlib import Path
import nested_pandas as npd
import pandas as pd
import pyarrow as pa
from deprecated import deprecated # type: ignore
from mocpy import MOC
from typing_extensions import Self
from upath import UPath
from hats.catalog.catalog_snapshot import CatalogSnapshot
from hats.catalog.dataset import Dataset
from hats.catalog.dataset.table_properties import TableProperties
from hats.catalog.partition_info import PartitionInfo
from hats.inspection import plot_pixels
from hats.inspection.visualize_catalog import plot_moc
from hats.io import file_io, paths
from hats.io.parquet_metadata import (
aggregate_column_statistics,
aggregate_column_statistics_from_cache,
per_partition_statistics,
per_partition_statistics_from_cache,
)
from hats.pixel_math import HealpixPixel
from hats.pixel_math.region_to_moc import box_to_moc, cone_to_moc, pixel_list_to_moc, polygon_to_moc
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN, SPATIAL_INDEX_ORDER
from hats.pixel_tree import PixelAlignment, PixelAlignmentType
from hats.pixel_tree.moc_filter import filter_by_moc
from hats.pixel_tree.pixel_alignment import align_with_mocs
from hats.pixel_tree.pixel_tree import PixelTree
[docs]
class HealpixDataset(Dataset):
"""A HATS dataset partitioned with a HEALPix partitioning structure.
Catalogs of this type are partitioned based on the ra and dec of the points with each partition
containing points within a given HEALPix pixel. The files are in the form::
Norder=/Dir=/Npix=.parquet
"""
def __init__(
self,
catalog_info: TableProperties,
pixels: PartitionInfo | PixelTree | list[HealpixPixel],
catalog_path: str | Path | UPath | None = None,
moc: MOC | None = None,
schema: pa.Schema | None = None,
snapshot: CatalogSnapshot | None = None,
generate_snapshot: bool = False,
) -> None:
"""Initializes a Catalog
Parameters
----------
catalog_info: TableProperties
A TableProperties object with the catalog metadata
pixels: PartitionInfo | PixelTree | list[HealpixPixel]
Specifies the pixels contained in the catalog. Can be either a
list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object
catalog_path: str | Path | UPath | None
If the catalog is stored on disk, specify the location of the catalog
Does not load the catalog from this path, only store as metadata
moc : mocpy.MOC
MOC object representing the coverage of the catalog
schema : pa.Schema
The pyarrow schema for the catalog. May be modified e.g. based on loaded columns
snapshot : CatalogSnapshot
Immutable snapshot of the catalog's original on-disk schema and partition info.
May NOT be modified e.g. based on loaded columns or filtered pixels.
generate_snapshot : bool
If True and no snapshot is provided, automatically generate one from the current
schema, catalog_info, and partition_info. Default False.
"""
super().__init__(catalog_info, catalog_path=catalog_path, schema=schema, snapshot=snapshot)
[docs]
self.partition_info = self._get_partition_info_from_pixels(pixels)
[docs]
self.pixel_tree = self._get_pixel_tree_from_pixels(pixels)
if snapshot is None and generate_snapshot:
self.snapshot = CatalogSnapshot(
schema=schema, catalog_info=catalog_info, partition_info=self.partition_info
)
[docs]
def get_healpix_pixels(self) -> list[HealpixPixel]:
"""Get healpix pixel objects for all pixels contained in the catalog.
Returns
-------
list[HealpixPixel]
List of HealpixPixel
"""
return self.partition_info.get_healpix_pixels()
@staticmethod
def _get_partition_info_from_pixels(
pixels: PartitionInfo | PixelTree | list[HealpixPixel],
) -> PartitionInfo:
if isinstance(pixels, PartitionInfo):
return pixels
if isinstance(pixels, PixelTree):
return PartitionInfo.from_healpix(pixels.get_healpix_pixels())
if pd.api.types.is_list_like(pixels):
return PartitionInfo.from_healpix(pixels)
raise TypeError("Pixels must be of type PartitionInfo, PixelTree, or List[HealpixPixel]")
@staticmethod
def _get_pixel_tree_from_pixels(pixels: PartitionInfo | PixelTree | list[HealpixPixel]) -> PixelTree:
if isinstance(pixels, PartitionInfo):
return PixelTree.from_healpix(pixels.get_healpix_pixels())
if isinstance(pixels, PixelTree):
return pixels
if pd.api.types.is_list_like(pixels):
return PixelTree.from_healpix(pixels)
raise TypeError("Pixels must be of type PartitionInfo, PixelTree, or List[HealpixPixel]")
[docs]
def __len__(self):
"""The number of rows in the catalog.
Returns
-------
int
The number of rows in the catalog, as specified in its metadata.
This value is undetermined when the catalog is modified, and
therefore an error is raised.
"""
if self.catalog_info.total_rows is None:
raise ValueError("The number of rows is undetermined because the catalog was modified.")
return self.catalog_info.total_rows
[docs]
def get_max_coverage_order(self, default_order: int = 3) -> int:
"""Gets the maximum HEALPix order for which the coverage of the catalog is known from the pixel
tree and moc if it exists
Parameters
----------
default_order : int
The order to return if the dataset has no pixels.
(Default value = 3)
Returns
-------
int
maximum HEALPix order
"""
if len(self.pixel_tree) == 0:
return default_order
max_order = (
max(self.moc.max_order, self.pixel_tree.get_max_depth())
if self.moc is not None
else self.pixel_tree.get_max_depth()
)
return max_order
[docs]
def filter_from_pixel_list(self, pixels: list[HealpixPixel]) -> Self:
"""Filter the pixels in the catalog to only include any that overlap with the requested pixels.
Parameters
----------
pixels : list[HealpixPixel]
the pixels to include
Returns
-------
HealpixDataset
A new catalog with only the pixels that overlap with the given pixels. Note that we reset the
total_rows to None, as updating would require a scan over the new pixel sizes.
"""
return self.filter_by_moc(pixel_list_to_moc(pixels))
[docs]
def filter_by_cone(self, ra: float, dec: float, radius_arcsec: float) -> Self:
"""Filter the pixels in the catalog to only include the pixels that overlap with a cone
Parameters
----------
ra : float
Right ascension of the center of the cone, in degrees
dec : float
Declination of the center of the cone, in degrees
radius_arcsec : float
Radius of the cone, in arcseconds
Returns
-------
HealpixDataset
A new catalog with only the pixels that overlap with the specified cone
"""
return self.filter_by_moc(cone_to_moc(ra, dec, radius_arcsec, self.get_max_coverage_order()))
[docs]
def filter_by_box(self, ra: tuple[float, float], dec: tuple[float, float]) -> Self:
"""Filter the pixels in the catalog to only include the pixels that overlap with a
zone, defined by right ascension and declination ranges. The right ascension edges follow
great arc circles and the declination edges follow small arc circles.
Parameters
----------
ra : tuple[float, float]
Right ascension range, in degrees
dec : tuple[float, float]
Declination range, in degrees
Returns
-------
HealpixDataset
A new catalog with only the pixels that overlap with the specified region
"""
return self.filter_by_moc(box_to_moc(ra, dec, self.get_max_coverage_order()))
[docs]
def filter_by_polygon(self, vertices: list[tuple[float, float]]) -> Self:
"""Filter the pixels in the catalog to only include the pixels that overlap
with a polygonal sky region.
Parameters
----------
vertices : list[tuple[float, float]]
The list of vertice coordinates for the polygon, (ra, dec), in degrees.
Returns
-------
HealpixDataset
A new catalog with only the pixels that overlap with the specified polygon.
"""
return self.filter_by_moc(polygon_to_moc(vertices, self.get_max_coverage_order()))
[docs]
def filter_by_moc(self, moc: MOC) -> Self:
"""Filter the pixels in the catalog to only include the pixels that overlap with the moc provided.
Parameters
----------
moc : mocpy.MOC
the moc to filter by
Returns
-------
HealpixDataset
A new catalog with only the pixels that overlap with the moc. Note that we reset the total_rows
to 0, as updating would require a scan over the new pixel sizes.
"""
filtered_tree = filter_by_moc(self.pixel_tree, moc)
filtered_moc = self.moc.intersection(moc) if self.moc is not None else None
filtered_catalog_info = self.catalog_info.copy_and_update(total_rows=None)
return self.__class__(
filtered_catalog_info,
pixels=filtered_tree,
catalog_path=self.catalog_path,
moc=filtered_moc,
schema=self.schema,
snapshot=self.snapshot,
)
[docs]
def align(
self, other_cat: Self, alignment_type: PixelAlignmentType = PixelAlignmentType.INNER
) -> PixelAlignment:
"""Performs an alignment to another catalog, using the pixel tree and mocs if available
An alignment compares the pixel structures of the two catalogs, checking which pixels overlap.
The alignment includes the mapping of all pairs of pixels in each tree that overlap with each other,
and the aligned tree which consists of the overlapping pixels in the two input catalogs, using the
higher order pixels where there is overlap with differing orders.
For more information, see this document:
https://docs.google.com/document/d/1gqb8qb3HiEhLGNav55LKKFlNjuusBIsDW7FdTkc5mJU/edit?usp=sharing
Parameters
----------
other_cat : Catalog
The catalog to align to
alignment_type : PixelAlignmentType
The type of alignment describing how to handle nodes which exist in one tree but not the other.
Mirrors the 'how' argument of a pandas/sql join. Options are:
- "inner" - only use pixels that appear in both catalogs
- "left" - use all pixels that appear in the left catalog and any overlapping from the right
- "right" - use all pixels that appear in the right catalog and any overlapping from the left
- "outer" - use all pixels from both catalogs
Returns
-------
PixelAlignment
A `PixelAlignment` object with the alignment from the two catalogs
"""
return align_with_mocs(
self.pixel_tree, other_cat.pixel_tree, self.moc, other_cat.moc, alignment_type=alignment_type
)
[docs]
def plot_pixels(self, **kwargs):
"""Create a visual map of the pixel density of the catalog.
Parameters
----------
**kwargs
Additional args to pass to `hats.inspection.visualize_catalog.plot_healpix_map`
"""
return plot_pixels(self, **kwargs)
[docs]
def plot_moc(self, **kwargs):
"""Create a visual map of the coverage of the catalog.
Parameters
----------
**kwargs
Additional args to pass to `hats.inspection.visualize_catalog.plot_moc`
"""
default_title = f"Coverage of {self.catalog_name}"
plot_args = {"title": default_title}
plot_args.update(kwargs)
return plot_moc(self.moc, **plot_args)
[docs]
def aggregate_column_statistics(
self,
exclude_hats_columns: bool = True,
exclude_columns: list[str] | None = None,
include_columns: list[str] | None = None,
only_numeric_columns: bool = False,
include_pixels: list[HealpixPixel] | None = None,
):
"""Read footer statistics in parquet metadata, and report on global min/max values.
Parameters
----------
exclude_hats_columns : bool
exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns : list[str] | None
additional columns to exclude from the statistics.
include_columns : list[str] | None
if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
only_numeric_columns : bool
only include columns that are numeric (integer or floating point) in the
statistics. If True, the entire frame should be numeric.
(Default value = False)
include_pixels: list[HealpixPixel] | None
(Default value = None)
Returns
-------
Dataframe
aggregated statistics
"""
if not self.on_disk:
warnings.warn("Calling aggregate_column_statistics on an in-memory catalog. No results.")
return pd.DataFrame()
if not self.unmodified:
warnings.warn(
"Calling aggregate_column_statistics on a modified catalog. Results may be inaccurate."
)
if include_pixels is None:
include_pixels = self.get_healpix_pixels()
if (self.catalog_base_dir / "per_partition_statistics.parquet").exists():
return aggregate_column_statistics_from_cache(
self.catalog_base_dir / "per_partition_statistics.parquet",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
only_numeric_columns=only_numeric_columns,
include_pixels=include_pixels,
)
return aggregate_column_statistics(
self.catalog_base_dir / "dataset" / "_metadata",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
only_numeric_columns=only_numeric_columns,
include_pixels=include_pixels,
)
@deprecated(
version="0.9.1",
reason="`per_pixel_statistics` will be removed in the future, "
"use `per_partition_statistics` instead.",
)
[docs]
def per_pixel_statistics(
self,
*,
exclude_hats_columns: bool = True,
exclude_columns: list[str] | None = None,
include_columns: list[str] | None = None,
only_numeric_columns: bool = False,
include_stats: list[str] | None = None,
multi_index=False,
include_pixels: list[HealpixPixel] | None = None,
per_row_group: bool = False,
):
return self.per_partition_statistics(
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
only_numeric_columns=only_numeric_columns,
include_stats=include_stats,
multi_index=multi_index,
include_pixels=include_pixels,
per_row_group=per_row_group,
)
[docs]
def per_partition_statistics(
self,
*,
exclude_hats_columns: bool = True,
exclude_columns: list[str] | None = None,
include_columns: list[str] | None = None,
only_numeric_columns: bool = False,
include_stats: list[str] | None = None,
multi_index=False,
include_pixels: list[HealpixPixel] | None = None,
per_row_group: bool = False,
):
"""Read footer statistics in parquet metadata, and report on statistics about
each pixel partition.
Parameters
----------
exclude_hats_columns : bool
exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns : list[str] | None
additional columns to exclude from the statistics.
include_columns : list[str] | None
if specified, only return statistics for the column
names provided. Defaults to None, and returns all non-hats columns.
include_stats : list[str] | None
if specified, only return the kinds of values from list
(min_value, max_value, null_count, row_count). Defaults to None, and returns all values.
multi_index : bool
should the returned frame be created with a multi-index, first on
pixel, then on column name? (Default value = False)
include_pixels : list[HealpixPixel] | None
if specified, only return statistics
for the pixels indicated. Defaults to none, and returns all pixels.
Returns
-------
Dataframe
granular statistics
"""
if not self.on_disk:
warnings.warn("Calling per_partition_statistics on an in-memory catalog. No results.")
return pd.DataFrame()
if not self.unmodified:
warnings.warn(
"Calling per_partition_statistics on a modified catalog. Results may be inaccurate."
)
if include_pixels is None:
include_pixels = self.get_healpix_pixels()
if (self.catalog_base_dir / "per_partition_statistics.parquet").exists():
return per_partition_statistics_from_cache(
self.catalog_base_dir / "per_partition_statistics.parquet",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
only_numeric_columns=only_numeric_columns,
include_stats=include_stats,
multi_index=multi_index,
include_pixels=include_pixels,
per_row_group=per_row_group,
)
return per_partition_statistics(
self.catalog_base_dir / "dataset" / "_metadata",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
only_numeric_columns=only_numeric_columns,
include_stats=include_stats,
multi_index=multi_index,
include_pixels=include_pixels,
per_row_group=per_row_group,
)
[docs]
def has_healpix_column(self):
"""Does this catalog's schema contain a healpix spatial index column?
This is True if either:
- there is a value for the ``hats_col_healpix`` property, and that string
exists as a column name in the pyarrow schema
- there is a ``_healpix_29`` column in the pyarrow schema
Returns
-------
bool
if the dataset has a healpix column in the properties
"""
property_column = self.catalog_info.healpix_column
if property_column:
return not self.schema or property_column in self.schema.names
if self.schema:
if SPATIAL_INDEX_COLUMN in self.schema.names:
self.catalog_info.healpix_column = SPATIAL_INDEX_COLUMN
self.catalog_info.healpix_order = SPATIAL_INDEX_ORDER
return True
return False
[docs]
def get_pixel_paths(self):
"""Generate paths to all pixel files.
Pixels will be traversed in "breadth-first" healpix order. If any spatial filters
have been applied to this catalog, only those pixels that remain will be included.
Yields
------
UPath
Universal Pathlib pointing to either an npix directory, or to a single
pixel partition data file.
"""
if not self.on_disk:
warnings.warn("Calling read_pixel_to_pandas on an in-memory catalog. No results.")
return
for pixel in self.get_healpix_pixels():
yield paths.pixel_catalog_file(
self.catalog_base_dir, pixel, npix_suffix=self.catalog_info.npix_suffix
)
[docs]
def read_pixel_to_pandas(self, pixel: HealpixPixel, **kwargs) -> npd.NestedFrame:
"""Read the parquet file(s) for this pixel into a pandas dataframe.
Parameters
----------
pixel : HealpixPixel
desired data partition, by healpix pixel
**kwargs
Additional arguments to pass to pandas read_parquet method
Returns
-------
NestedFrame
Pandas DataFrame with the data from the parquet file(s)
"""
if not self.on_disk:
warnings.warn("Calling read_pixel_to_pandas on an in-memory catalog. No results.")
return pd.DataFrame()
object_path = paths.pixel_catalog_file(
self.catalog_base_dir, pixel, npix_suffix=self.catalog_info.npix_suffix
)
return file_io.read_parquet_file_to_pandas(
object_path, schema=self.schema, is_dir=(self.catalog_info.npix_suffix == "/"), **kwargs
)