Source code for kfinance.domains.estimates.estimates_tools

from abc import ABC, abstractmethod
from textwrap import dedent
from typing import Literal, Type

import httpx
from pydantic import BaseModel, Field

from kfinance.async_batch_execution import AsyncTask, batch_execute_async_tasks
from kfinance.client.id_resolution import unified_fetch_id_triples
from kfinance.client.models.date_and_period_models import (
    EstimatePeriodType,
    EstimateType,
    NumPeriodsBackward,
    NumPeriodsForward,
)
from kfinance.client.models.response_models import SingleResultResp
from kfinance.client.permission_models import Permission
from kfinance.domains.estimates.estimates_models import (
    AnalystRecommendations,
    ConsensusTargetPrice,
    Estimates,
)
from kfinance.domains.line_items.line_item_models import CalendarType
from kfinance.domains.line_items.response_notes import (
    insert_fiscal_period_notes,
)
from kfinance.integrations.tool_calling.tool_calling_models import (
    KfinanceTool,
    ToolArgsWithIdentifiers,
    ToolRespWithIdInfoAndErrors,
    ValidQuarter,
)


class GetEstimatesFromIdentifiersArgs(ToolArgsWithIdentifiers):
    period_type: EstimatePeriodType | None = Field(
        default=None, description="The period type (annual, semi-annual, or quarterly)."
    )
    fiscal_start_year: int | None = Field(
        default=None,
        description="The starting year for the data range. Use null for the most recent data.",
    )
    fiscal_end_year: int | None = Field(
        default=None,
        description="The ending year for the data range. Use null for the most recent data.",
    )
    fiscal_start_quarter: ValidQuarter | None = Field(
        default=None,
        description="Starting quarter (1-4). Used when period_type is semi-annual or quarterly.",
    )
    fiscal_end_quarter: ValidQuarter | None = Field(
        default=None,
        description="Ending quarter (1-4). Used when period_type is semi-annual or quarterly.",
    )
    num_periods_forward: NumPeriodsForward | None = Field(
        default=None, description="The number of periods forward from today (1-99)."
    )
    num_periods_backward: NumPeriodsBackward | None = Field(
        default=None,
        description="The number of periods to look back from today (1-99).",
    )


class GetEstimatesFromIdentifiersResp(ToolRespWithIdInfoAndErrors[Estimates]):
    notes: list[str] = Field(default_factory=list)


class GetEstimatesFromIdentifiers(KfinanceTool, ABC):
    args_schema: Type[BaseModel] = GetEstimatesFromIdentifiersArgs
    accepted_permissions: set[Permission] | None = {Permission.EstimatesPermission}

    @property
    @abstractmethod
    def estimate_type(self) -> EstimateType:
        """The estimate type property."""
        pass

    async def _arun(
        self,
        identifiers: list[str],
        period_type: EstimatePeriodType | None = None,
        fiscal_start_year: int | None = None,
        fiscal_end_year: int | None = None,
        fiscal_start_quarter: Literal[1, 2, 3, 4] | None = None,
        fiscal_end_quarter: Literal[1, 2, 3, 4] | None = None,
        num_periods_forward: int | None = None,
        num_periods_backward: int | None = None,
    ) -> GetEstimatesFromIdentifiersResp:
        """"""
        return await get_estimates_from_identifiers(
            identifiers=identifiers,
            estimate_type=self.estimate_type,
            httpx_client=self.kfinance_client.httpx_client,
            period_type=period_type,
            fiscal_start_year=fiscal_start_year,
            fiscal_end_year=fiscal_end_year,
            fiscal_start_quarter=fiscal_start_quarter,
            fiscal_end_quarter=fiscal_end_quarter,
            num_periods_forward=num_periods_forward,
            num_periods_backward=num_periods_backward,
        )


class GetConsensusEstimatesFromIdentifiers(GetEstimatesFromIdentifiers):
    name: str = "get_consensus_estimates_from_identifiers"
    description: str = dedent("""
        Get consensus analyst estimates (EPS, Revenue, EBITDA, etc.) for a given identifier.

        Returns statistical aggregates including high, low, mean, median, and number of estimates.
        When periods have ended, actual reported values are also returned.
    """).strip()

    @property
    def estimate_type(self) -> EstimateType:
        """The estimate type is consensus."""
        return EstimateType.consensus


class GetGuidanceFromIdentifiers(GetEstimatesFromIdentifiers):
    name: str = "get_guidance_from_identifiers"
    description: str = dedent("""
        Get company-issued financial guidance for a given identifier.

        Returns the most recent guidance provided by the company for future periods, or the final guidance issued before results were reported for past periods.
    """).strip()

    @property
    def estimate_type(self) -> EstimateType:
        """The estimate type is guidance."""
        return EstimateType.guidance


class GetConsensusTargetPriceFromIdentifiersResp(ToolRespWithIdInfoAndErrors[ConsensusTargetPrice]):
    pass


class GetConsensusTargetPriceFromIdentifiers(KfinanceTool):
    name: str = "get_consensus_target_price_from_identifiers"
    description: str = dedent("""
        Get consensus target price estimates for a given company. Returns the current consensus analyst target price including high, low, mean, and median values.
    """).strip()
    args_schema: Type[BaseModel] = ToolArgsWithIdentifiers
    accepted_permissions: set[Permission] | None = {Permission.EstimatesPermission}

    async def _arun(
        self,
        identifiers: list[str],
    ) -> GetConsensusTargetPriceFromIdentifiersResp:
        return await get_consensus_target_price_from_identifiers(
            identifiers=identifiers,
            httpx_client=self.kfinance_client.httpx_client,
        )


class GetAnalystRecommendationsFromIdentifiersResp(
    ToolRespWithIdInfoAndErrors[AnalystRecommendations]
):
    pass


class GetAnalystRecommendationsFromIdentifiers(KfinanceTool):
    name: str = "get_analyst_recommendations_from_identifiers"
    description: str = dedent("""
        Get analyst recommendations for a given company. Returns the current consensus analyst recommendation breakdown including buy, hold, sell counts and overall rating.
    """).strip()
    args_schema: Type[BaseModel] = ToolArgsWithIdentifiers
    accepted_permissions: set[Permission] | None = {Permission.EstimatesPermission}

    async def _arun(
        self,
        identifiers: list[str],
    ) -> GetAnalystRecommendationsFromIdentifiersResp:
        return await get_analyst_recommendations_from_identifiers(
            identifiers=identifiers,
            httpx_client=self.kfinance_client.httpx_client,
        )


async def get_estimates_from_identifiers(
    identifiers: list[str],
    estimate_type: EstimateType,
    httpx_client: httpx.AsyncClient,
    period_type: EstimatePeriodType | None = None,
    fiscal_start_year: int | None = None,
    fiscal_end_year: int | None = None,
    fiscal_start_quarter: Literal[1, 2, 3, 4] | None = None,
    fiscal_end_quarter: Literal[1, 2, 3, 4] | None = None,
    num_periods_forward: int | None = None,
    num_periods_backward: int | None = None,
) -> GetEstimatesFromIdentifiersResp:
    """Fetch estimates for all identifiers."""

    id_triple_resp = await unified_fetch_id_triples(
        identifiers=identifiers, httpx_client=httpx_client
    )
    errors: list[str] = list(id_triple_resp.errors.values())

    tasks = [
        AsyncTask(
            func=fetch_estimates_from_company_id,
            kwargs=dict(
                company_id=id_triple.company_id,
                estimate_type=estimate_type,
                httpx_client=httpx_client,
                period_type=period_type,
                fiscal_start_year=fiscal_start_year,
                fiscal_end_year=fiscal_end_year,
                fiscal_start_quarter=fiscal_start_quarter,
                fiscal_end_quarter=fiscal_end_quarter,
                num_periods_forward=num_periods_forward,
                num_periods_backward=num_periods_backward,
            ),
            result_key=identifier,
        )
        for identifier, id_triple in id_triple_resp.identifiers_to_id_triples.items()
    ]

    await batch_execute_async_tasks(tasks=tasks)

    results: dict[str, Estimates] = dict()
    for task in tasks:
        if task.error:
            errors.append(task.error)
        else:
            resp: SingleResultResp[Estimates] = task.result
            if resp.result is not None:
                results[task.result_key] = resp.result
            if resp.error is not None:
                error_msg = f"{task.result_key}: {resp.error}"
                errors.append(error_msg)

    resp_model = GetEstimatesFromIdentifiersResp(
        identifier_results=results,
        identifier_info=id_triple_resp.identifiers_to_id_triples,
        errors=errors,
    )

    # Add explanatory notes
    insert_fiscal_period_notes(
        calendar_type=CalendarType.fiscal,  # Estimates are always fiscal
        period_type=period_type,
        resp_model=resp_model,
    )

    return resp_model


async def fetch_estimates_from_company_id(
    company_id: int,
    estimate_type: EstimateType,
    httpx_client: httpx.AsyncClient,
    period_type: EstimatePeriodType | None = None,
    fiscal_start_year: int | None = None,
    fiscal_end_year: int | None = None,
    fiscal_start_quarter: Literal[1, 2, 3, 4] | None = None,
    fiscal_end_quarter: Literal[1, 2, 3, 4] | None = None,
    num_periods_forward: int | None = None,
    num_periods_backward: int | None = None,
) -> SingleResultResp[Estimates]:
    """Fetch estimates for one company_id."""
    # Build query parameters
    params = {
        "company_id": company_id,
        "estimate_type": estimate_type.value,
    }

    if period_type is not None:
        params["period_type"] = period_type.value
    if fiscal_start_year is not None:
        params["start_year"] = fiscal_start_year
    if fiscal_end_year is not None:
        params["end_year"] = fiscal_end_year
    if fiscal_start_quarter is not None:
        params["start_quarter"] = fiscal_start_quarter
    if fiscal_end_quarter is not None:
        params["end_quarter"] = fiscal_end_quarter
    if num_periods_forward is not None:
        params["num_periods_forward"] = num_periods_forward
    if num_periods_backward is not None:
        params["num_periods_backward"] = num_periods_backward

    resp = await httpx_client.post(url="/estimates/", json=params)
    resp.raise_for_status()
    return SingleResultResp[Estimates].model_validate(resp.json())


async def get_consensus_target_price_from_identifiers(
    identifiers: list[str],
    httpx_client: httpx.AsyncClient,
) -> GetConsensusTargetPriceFromIdentifiersResp:
    """Fetch consensus target price for all identifiers."""

    id_triple_resp = await unified_fetch_id_triples(
        identifiers=identifiers, httpx_client=httpx_client
    )
    errors: list[str] = list(id_triple_resp.errors.values())

    tasks = [
        AsyncTask(
            func=fetch_consensus_target_price_from_company_id,
            kwargs=dict(
                company_id=id_triple.company_id,
                httpx_client=httpx_client,
            ),
            result_key=identifier,
        )
        for identifier, id_triple in id_triple_resp.identifiers_to_id_triples.items()
    ]

    await batch_execute_async_tasks(tasks=tasks)

    results: dict[str, ConsensusTargetPrice] = dict()
    for task in tasks:
        if task.error:
            errors.append(task.error)
        else:
            resp: SingleResultResp[ConsensusTargetPrice] = task.result
            if resp.result is not None:
                results[task.result_key] = resp.result
            if resp.error is not None:
                error_msg = f"{task.result_key}: {resp.error}"
                errors.append(error_msg)

    return GetConsensusTargetPriceFromIdentifiersResp(
        identifier_results=results,
        identifier_info=id_triple_resp.identifiers_to_id_triples,
        errors=errors,
    )


async def fetch_consensus_target_price_from_company_id(
    company_id: int,
    httpx_client: httpx.AsyncClient,
) -> SingleResultResp[ConsensusTargetPrice]:
    """Fetch consensus target price for one company_id."""
    resp = await httpx_client.get(url=f"/estimates/consensus_target_price/{company_id}")
    resp.raise_for_status()
    return SingleResultResp[ConsensusTargetPrice].model_validate(resp.json())


async def get_analyst_recommendations_from_identifiers(
    identifiers: list[str],
    httpx_client: httpx.AsyncClient,
) -> GetAnalystRecommendationsFromIdentifiersResp:
    """Fetch analyst recommendations for all identifiers."""

    id_triple_resp = await unified_fetch_id_triples(
        identifiers=identifiers, httpx_client=httpx_client
    )
    errors: list[str] = list(id_triple_resp.errors.values())

    tasks = [
        AsyncTask(
            func=fetch_analyst_recommendations_from_company_id,
            kwargs=dict(
                company_id=id_triple.company_id,
                httpx_client=httpx_client,
            ),
            result_key=identifier,
        )
        for identifier, id_triple in id_triple_resp.identifiers_to_id_triples.items()
    ]

    await batch_execute_async_tasks(tasks=tasks)

    results: dict[str, AnalystRecommendations] = dict()
    for task in tasks:
        if task.error:
            errors.append(task.error)
        else:
            resp: SingleResultResp[AnalystRecommendations] = task.result
            if resp.result is not None:
                results[task.result_key] = resp.result
            if resp.error is not None:
                error_msg = f"{task.result_key}: {resp.error}"
                errors.append(error_msg)

    return GetAnalystRecommendationsFromIdentifiersResp(
        identifier_results=results,
        identifier_info=id_triple_resp.identifiers_to_id_triples,
        errors=errors,
    )


async def fetch_analyst_recommendations_from_company_id(
    company_id: int,
    httpx_client: httpx.AsyncClient,
) -> SingleResultResp[AnalystRecommendations]:
    """Fetch analyst recommendations for one company_id."""
    resp = await httpx_client.get(url=f"/estimates/analyst_recommendations/{company_id}")
    resp.raise_for_status()
    return SingleResultResp[AnalystRecommendations].model_validate(resp.json())

import kfinance
import datetime
from typing import Optional
[docs] def get_consensus_estimates_from_identifiers(identifiers: list[str], period_type: kfinance.client.models.date_and_period_models.EstimatePeriodType | None = None, fiscal_start_year: int | None = None, fiscal_end_year: int | None = None, fiscal_start_quarter: Optional[Literal[1, 2, 3, 4]] = None, fiscal_end_quarter: Optional[Literal[1, 2, 3, 4]] = None, num_periods_forward: int | None = None, num_periods_backward: int | None = None) -> 'GetEstimatesFromIdentifiersResp': """Get consensus analyst estimates (EPS, Revenue, EBITDA, etc.) for a given identifier. Returns statistical aggregates including high, low, mean, median, and number of estimates. When periods have ended, actual reported values are also returned. :param identifiers: The identifiers, which can be a list of ticker symbols, ISINs, or CUSIPs, or company_ids :type identifiers: list[str] :param period_type: The period type (annual, semi-annual, or quarterly). :type period_type: Union[EstimatePeriodType, NoneType] :param fiscal_start_year: The starting year for the data range. Use null for the most recent data. :type fiscal_start_year: Union[int, NoneType] :param fiscal_end_year: The ending year for the data range. Use null for the most recent data. :type fiscal_end_year: Union[int, NoneType] :param fiscal_start_quarter: Starting quarter (1-4). Used when period_type is semi-annual or quarterly. :type fiscal_start_quarter: Union[Annotated[Literal[1, 2, 3, 4], BeforeValidator], NoneType] :param fiscal_end_quarter: Ending quarter (1-4). Used when period_type is semi-annual or quarterly. :type fiscal_end_quarter: Union[Annotated[Literal[1, 2, 3, 4], BeforeValidator], NoneType] :param num_periods_forward: The number of periods forward from today (1-99). :type num_periods_forward: Union[Annotated[int, FieldInfo(annotation=NoneType, required=True, description='The number of periods in the future to retrieve estimate data.', metadata=[Ge(ge=0), Le(le=99)])], NoneType] :param num_periods_backward: The number of periods to look back from today (1-99). :type num_periods_backward: Union[Annotated[int, FieldInfo(annotation=NoneType, required=True, description='The number of periods in the past to retrieve estimate data.', metadata=[Ge(ge=0), Le(le=99)])], NoneType] :rtype: GetEstimatesFromIdentifiersResp"""
import kfinance import datetime from typing import Optional
[docs] def get_guidance_from_identifiers(identifiers: list[str], period_type: kfinance.client.models.date_and_period_models.EstimatePeriodType | None = None, fiscal_start_year: int | None = None, fiscal_end_year: int | None = None, fiscal_start_quarter: Optional[Literal[1, 2, 3, 4]] = None, fiscal_end_quarter: Optional[Literal[1, 2, 3, 4]] = None, num_periods_forward: int | None = None, num_periods_backward: int | None = None) -> 'GetEstimatesFromIdentifiersResp': """Get company-issued financial guidance for a given identifier. Returns the most recent guidance provided by the company for future periods, or the final guidance issued before results were reported for past periods. :param identifiers: The identifiers, which can be a list of ticker symbols, ISINs, or CUSIPs, or company_ids :type identifiers: list[str] :param period_type: The period type (annual, semi-annual, or quarterly). :type period_type: Union[EstimatePeriodType, NoneType] :param fiscal_start_year: The starting year for the data range. Use null for the most recent data. :type fiscal_start_year: Union[int, NoneType] :param fiscal_end_year: The ending year for the data range. Use null for the most recent data. :type fiscal_end_year: Union[int, NoneType] :param fiscal_start_quarter: Starting quarter (1-4). Used when period_type is semi-annual or quarterly. :type fiscal_start_quarter: Union[Annotated[Literal[1, 2, 3, 4], BeforeValidator], NoneType] :param fiscal_end_quarter: Ending quarter (1-4). Used when period_type is semi-annual or quarterly. :type fiscal_end_quarter: Union[Annotated[Literal[1, 2, 3, 4], BeforeValidator], NoneType] :param num_periods_forward: The number of periods forward from today (1-99). :type num_periods_forward: Union[Annotated[int, FieldInfo(annotation=NoneType, required=True, description='The number of periods in the future to retrieve estimate data.', metadata=[Ge(ge=0), Le(le=99)])], NoneType] :param num_periods_backward: The number of periods to look back from today (1-99). :type num_periods_backward: Union[Annotated[int, FieldInfo(annotation=NoneType, required=True, description='The number of periods in the past to retrieve estimate data.', metadata=[Ge(ge=0), Le(le=99)])], NoneType] :rtype: GetEstimatesFromIdentifiersResp"""
import kfinance import datetime from typing import Optional
[docs] def get_consensus_target_price_from_identifiers(identifiers: list[str]) -> 'GetConsensusTargetPriceFromIdentifiersResp': """Get consensus target price estimates for a given company. Returns the current consensus analyst target price including high, low, mean, and median values. :param identifiers: The identifiers, which can be a list of ticker symbols, ISINs, or CUSIPs, or company_ids :type identifiers: list[str] :rtype: GetConsensusTargetPriceFromIdentifiersResp"""
import kfinance import datetime from typing import Optional
[docs] def get_analyst_recommendations_from_identifiers(identifiers: list[str]) -> 'GetAnalystRecommendationsFromIdentifiersResp': """Get analyst recommendations for a given company. Returns the current consensus analyst recommendation breakdown including buy, hold, sell counts and overall rating. :param identifiers: The identifiers, which can be a list of ticker symbols, ISINs, or CUSIPs, or company_ids :type identifiers: list[str] :rtype: GetAnalystRecommendationsFromIdentifiersResp"""