from __future__ import annotations
import typing
from itertools import chain
import numpy as np
import pandas as pd
from ..doctools import document
from ..exceptions import PlotnineError
from ..iapi import range_view
from ..utils import alias, array_kind, match
from ._expand import expand_range
from .range import RangeContinuous
from .scale import scale_continuous, scale_datetime, scale_discrete
if typing.TYPE_CHECKING:
from typing import Sequence
from plotnine.typing import (
Trans,
TupleFloat2,
TupleFloat4,
)
# positions scales have a couple of differences (quirks) that
# make necessary to override some of the scale_discrete and
# scale_continuous methods
#
# scale_position_discrete and scale_position_continuous
# are intermediate base classes where the required overriding
# is done
@document
class scale_position_discrete(scale_discrete):
"""
Base class for discrete position scales
Parameters
----------
{superclass_parameters}
limits : array_like, optional
Limits of the scale. For discrete scale, these are
the categories (unique values) of the variable.
For scales that deal with categoricals, these may
be a subset or superset of the categories.
"""
# All positions have no guide
guide = None
# Keeps two ranges, range and range_c
range_c: RangeContinuous
def __init__(self, *args, **kwargs):
self.range_c = RangeContinuous()
scale_discrete.__init__(self, *args, **kwargs)
def reset(self):
# Can't reset discrete scale because
# no way to recover values
self.range_c.reset()
def is_empty(self) -> bool:
return super().is_empty() and self.range_c.is_empty()
def train(self, series):
# The discrete position scale is capable of doing
# training for continuous data.
# This complicates training and mapping, but makes it
# possible to place objects at non-integer positions,
# as is necessary for jittering etc.
if array_kind.continuous(series):
self.range_c.train(series)
else:
self.range.train(series, drop=self.drop)
def map(self, series, limits=None):
# Discrete values are converted into integers starting
# at 1
if limits is None:
limits = self.limits
if array_kind.discrete(series):
# TODO: Rewrite without using numpy
seq = np.arange(1, len(limits) + 1)
idx = np.asarray(match(series, limits, nomatch=len(series)))
if not len(idx):
return np.array([])
try:
seq = seq[idx]
except IndexError:
# Deal with missing data
# - Insert NaN where there is no match
seq = np.hstack((seq.astype(float), np.nan))
idx = np.clip(idx, 0, len(seq) - 1)
seq = seq[idx]
return seq
return series
@property
def limits(self):
if self.is_empty():
return (0, 1)
elif self._limits is not None and not callable(self._limits):
return self._limits
elif self._limits is None:
# discrete range
return self.range.range
elif callable(self._limits):
limits = self._limits(self.range.range)
# Functions that return iterators e.g. reversed
if iter(limits) is limits:
limits = list(limits)
return limits
else:
raise PlotnineError("Lost, do not know what the limits are.")
@limits.setter
def limits(self, value):
if isinstance(value, tuple):
value = list(value)
self._limits = value
def dimension(self, expand=(0, 0, 0, 0), limits=None):
"""
Get the phyical size of the scale
Unlike limits, this always returns a numeric vector of length 2
"""
from mizani.bounds import expand_range_distinct
if limits is None:
limits = self.limits
if self.is_empty():
return (0, 1)
if self.range.is_empty(): # only continuous
return expand_range_distinct(self.range_c.range, expand)
elif self.range_c.is_empty(): # only discrete
# FIXME: I think this branch should not exist
return expand_range_distinct((1, len(self.limits)), expand)
else: # both
# e.g categorical bar plot have discrete items, but
# are plot on a continuous x scale
a = np.hstack(
[
self.range_c.range,
expand_range_distinct((1, len(self.range.range)), expand),
]
)
return a.min(), a.max()
def expand_limits(
self,
limits: Sequence[str],
expand: TupleFloat2 | TupleFloat4,
coord_limits: TupleFloat2,
trans: Trans,
) -> range_view:
# Turn discrete limits into a tuple of continuous limits
if self.is_empty():
climits = (0, 1)
else:
climits = (1, len(limits))
self.range_c.range
if coord_limits is not None:
# - Override None in coord_limits
# - Expand limits in coordinate space
# - Remove any computed infinite values &
c0, c1 = coord_limits
climits = (
climits[0] if c0 is None else c0,
climits[1] if c1 is None else c1,
)
# Expand discrete range
rv_d = expand_range(climits, expand, trans)
if self.range_c.is_empty():
return rv_d
# Expand continuous range
no_expand = self.default_expansion(0, 0)
rv_c = expand_range(self.range_c.range, no_expand, trans)
# Merge the ranges
rv = range_view(
range=(
min(chain(rv_d.range, rv_c.range)),
max(chain(rv_d.range, rv_c.range)),
),
range_coord=(
min(chain(rv_d.range_coord, rv_c.range_coord)),
max(chain(rv_d.range_coord, rv_c.range_coord)),
),
)
rv.range = min(rv.range), max(rv.range)
rv.range_coord = min(rv.range_coord), max(rv.range_coord)
return rv
@document
class scale_position_continuous(scale_continuous):
"""
Base class for continuous position scales
Parameters
----------
{superclass_parameters}
"""
# All positions have no guide
guide = None
def map(self, series, limits=None):
# Position aesthetics don't map, because the coordinate
# system takes care of it.
# But the continuous scale has to deal with out of bound points
if not len(series):
return series
if limits is None:
limits = self.limits
scaled = self.oob(series, limits)
scaled[pd.isnull(scaled)] = self.na_value
return scaled
[docs]
@document
class scale_x_discrete(scale_position_discrete):
"""
Discrete x position
Parameters
----------
{superclass_parameters}
"""
_aesthetics = ["x", "xmin", "xmax", "xend"]
[docs]
@document
class scale_y_discrete(scale_position_discrete):
"""
Discrete y position
Parameters
----------
{superclass_parameters}
"""
_aesthetics = ["y", "ymin", "ymax", "yend"]
# Not part of the user API
alias("scale_x_ordinal", scale_x_discrete)
alias("scale_y_ordinal", scale_y_discrete)
[docs]
@document
class scale_x_continuous(scale_position_continuous):
"""
Continuous x position
Parameters
----------
{superclass_parameters}
"""
_aesthetics = ["x", "xmin", "xmax", "xend", "xintercept"]
[docs]
@document
class scale_y_continuous(scale_position_continuous):
"""
Continuous y position
Parameters
----------
{superclass_parameters}
"""
_aesthetics = [
"y",
"ymin",
"ymax",
"yend",
"yintercept",
"ymin_final",
"ymax_final",
"lower",
"middle",
"upper",
] # pyright: ignore
# Transformed scales
[docs]
@document
class scale_x_datetime(scale_datetime, scale_x_continuous):
"""
Continuous x position for datetime data points
Parameters
----------
{superclass_parameters}
"""
[docs]
@document
class scale_y_datetime(scale_datetime, scale_y_continuous):
"""
Continuous y position for datetime data points
Parameters
----------
{superclass_parameters}
"""
alias("scale_x_date", scale_x_datetime)
alias("scale_y_date", scale_y_datetime)
[docs]
@document
class scale_x_timedelta(scale_x_continuous):
"""
Continuous x position for timedelta data points
Parameters
----------
{superclass_parameters}
"""
_trans = "pd_timedelta"
[docs]
@document
class scale_y_timedelta(scale_y_continuous):
"""
Continuous y position for timedelta data points
Parameters
----------
{superclass_parameters}
"""
_trans = "pd_timedelta"
[docs]
@document
class scale_x_sqrt(scale_x_continuous):
"""
Continuous x position sqrt transformed scale
Parameters
----------
{superclass_parameters}
"""
_trans = "sqrt"
[docs]
@document
class scale_y_sqrt(scale_y_continuous):
"""
Continuous y position sqrt transformed scale
Parameters
----------
{superclass_parameters}
"""
_trans = "sqrt"
[docs]
@document
class scale_x_log10(scale_x_continuous):
"""
Continuous x position log10 transformed scale
Parameters
----------
{superclass_parameters}
"""
_trans = "log10"
[docs]
@document
class scale_y_log10(scale_y_continuous):
"""
Continuous y position log10 transformed scale
Parameters
----------
{superclass_parameters}
"""
_trans = "log10"
[docs]
@document
class scale_x_reverse(scale_x_continuous):
"""
Continuous x position reverse transformed scale
Parameters
----------
{superclass_parameters}
"""
_trans = "reverse"
[docs]
@document
class scale_y_reverse(scale_y_continuous):
"""
Continuous y position reverse transformed scale
Parameters
----------
{superclass_parameters}
"""
_trans = "reverse"