"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT."""
# @generated-id: 6d4f674ce8ef

from .basesdk import BaseSDK
from mistralai.client import errors, models, utils
from mistralai.client._hooks import HookContext
from mistralai.client.types import OptionalNullable, UNSET
from mistralai.client.utils import eventstreaming, get_security_from_env
from mistralai.client.utils.unmarshal_json_response import unmarshal_json_response
from typing import Any, Dict, List, Mapping, Optional


class WorkflowsEvents(BaseSDK):
    def get_stream_events(
        self,
        *,
        scope: Optional[models.Scope] = "*",
        activity_name: Optional[str] = "*",
        activity_id: Optional[str] = "*",
        workflow_name: Optional[str] = "*",
        workflow_exec_id: Optional[str] = "*",
        root_workflow_exec_id: Optional[str] = "*",
        parent_workflow_exec_id: Optional[str] = "*",
        stream: Optional[str] = "*",
        start_seq: Optional[int] = 0,
        metadata_filters: OptionalNullable[Dict[str, Any]] = UNSET,
        workflow_event_types: OptionalNullable[List[models.WorkflowEventType]] = UNSET,
        last_event_id: OptionalNullable[str] = UNSET,
        retries: OptionalNullable[utils.RetryConfig] = UNSET,
        server_url: Optional[str] = None,
        timeout_ms: Optional[int] = None,
        http_headers: Optional[Mapping[str, str]] = None,
    ) -> eventstreaming.EventStream[
        models.GetStreamEventsV1WorkflowsEventsStreamGetResponseBody
    ]:
        r"""Get Stream Events

        :param scope:
        :param activity_name:
        :param activity_id:
        :param workflow_name:
        :param workflow_exec_id:
        :param root_workflow_exec_id:
        :param parent_workflow_exec_id:
        :param stream:
        :param start_seq:
        :param metadata_filters:
        :param workflow_event_types:
        :param last_event_id:
        :param retries: Override the default retry configuration for this method
        :param server_url: Override the default server URL for this method
        :param timeout_ms: Override the default request timeout configuration for this method in milliseconds
        :param http_headers: Additional headers to set or replace on requests.
        """
        base_url = None
        url_variables = None
        if timeout_ms is None:
            timeout_ms = self.sdk_configuration.timeout_ms

        if timeout_ms is None:
            timeout_ms = 30000

        if server_url is not None:
            base_url = server_url
        else:
            base_url = self._get_url(base_url, url_variables)

        request = models.GetStreamEventsV1WorkflowsEventsStreamGetRequest(
            scope=scope,
            activity_name=activity_name,
            activity_id=activity_id,
            workflow_name=workflow_name,
            workflow_exec_id=workflow_exec_id,
            root_workflow_exec_id=root_workflow_exec_id,
            parent_workflow_exec_id=parent_workflow_exec_id,
            stream=stream,
            start_seq=start_seq,
            metadata_filters=metadata_filters,
            workflow_event_types=workflow_event_types,
            last_event_id=last_event_id,
        )

        req = self._build_request(
            method="GET",
            path="/v1/workflows/events/stream",
            base_url=base_url,
            url_variables=url_variables,
            request=request,
            request_body_required=False,
            request_has_path_params=False,
            request_has_query_params=True,
            user_agent_header="user-agent",
            accept_header_value="text/event-stream",
            http_headers=http_headers,
            security=self.sdk_configuration.security,
            allow_empty_value=None,
            timeout_ms=timeout_ms,
        )

        if retries == UNSET:
            if self.sdk_configuration.retry_config is not UNSET:
                retries = self.sdk_configuration.retry_config

        retry_config = None
        if isinstance(retries, utils.RetryConfig):
            retry_config = (retries, ["429", "500", "502", "503", "504"])

        http_res = self.do_request(
            hook_ctx=HookContext(
                config=self.sdk_configuration,
                base_url=base_url or "",
                operation_id="get_stream_events_v1_workflows_events_stream_get",
                oauth2_scopes=None,
                security_source=get_security_from_env(
                    self.sdk_configuration.security, models.Security
                ),
            ),
            request=req,
            error_status_codes=["422", "4XX", "5XX"],
            stream=True,
            retry_config=retry_config,
        )

        response_data: Any = None
        if utils.match_response(http_res, "200", "text/event-stream"):
            return eventstreaming.EventStream(
                http_res,
                lambda raw: utils.unmarshal_json(
                    raw, models.GetStreamEventsV1WorkflowsEventsStreamGetResponseBody
                ),
                client_ref=self,
                data_required=False,
            )
        if utils.match_response(http_res, "422", "application/json"):
            http_res_text = utils.stream_to_text(http_res)
            response_data = unmarshal_json_response(
                errors.HTTPValidationErrorData, http_res, http_res_text
            )
            raise errors.HTTPValidationError(response_data, http_res, http_res_text)
        if utils.match_response(http_res, "4XX", "*"):
            http_res_text = utils.stream_to_text(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)
        if utils.match_response(http_res, "5XX", "*"):
            http_res_text = utils.stream_to_text(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)

        http_res_text = utils.stream_to_text(http_res)
        raise errors.SDKError("Unexpected response received", http_res, http_res_text)

    async def get_stream_events_async(
        self,
        *,
        scope: Optional[models.Scope] = "*",
        activity_name: Optional[str] = "*",
        activity_id: Optional[str] = "*",
        workflow_name: Optional[str] = "*",
        workflow_exec_id: Optional[str] = "*",
        root_workflow_exec_id: Optional[str] = "*",
        parent_workflow_exec_id: Optional[str] = "*",
        stream: Optional[str] = "*",
        start_seq: Optional[int] = 0,
        metadata_filters: OptionalNullable[Dict[str, Any]] = UNSET,
        workflow_event_types: OptionalNullable[List[models.WorkflowEventType]] = UNSET,
        last_event_id: OptionalNullable[str] = UNSET,
        retries: OptionalNullable[utils.RetryConfig] = UNSET,
        server_url: Optional[str] = None,
        timeout_ms: Optional[int] = None,
        http_headers: Optional[Mapping[str, str]] = None,
    ) -> eventstreaming.EventStreamAsync[
        models.GetStreamEventsV1WorkflowsEventsStreamGetResponseBody
    ]:
        r"""Get Stream Events

        :param scope:
        :param activity_name:
        :param activity_id:
        :param workflow_name:
        :param workflow_exec_id:
        :param root_workflow_exec_id:
        :param parent_workflow_exec_id:
        :param stream:
        :param start_seq:
        :param metadata_filters:
        :param workflow_event_types:
        :param last_event_id:
        :param retries: Override the default retry configuration for this method
        :param server_url: Override the default server URL for this method
        :param timeout_ms: Override the default request timeout configuration for this method in milliseconds
        :param http_headers: Additional headers to set or replace on requests.
        """
        base_url = None
        url_variables = None
        if timeout_ms is None:
            timeout_ms = self.sdk_configuration.timeout_ms

        if timeout_ms is None:
            timeout_ms = 30000

        if server_url is not None:
            base_url = server_url
        else:
            base_url = self._get_url(base_url, url_variables)

        request = models.GetStreamEventsV1WorkflowsEventsStreamGetRequest(
            scope=scope,
            activity_name=activity_name,
            activity_id=activity_id,
            workflow_name=workflow_name,
            workflow_exec_id=workflow_exec_id,
            root_workflow_exec_id=root_workflow_exec_id,
            parent_workflow_exec_id=parent_workflow_exec_id,
            stream=stream,
            start_seq=start_seq,
            metadata_filters=metadata_filters,
            workflow_event_types=workflow_event_types,
            last_event_id=last_event_id,
        )

        req = self._build_request_async(
            method="GET",
            path="/v1/workflows/events/stream",
            base_url=base_url,
            url_variables=url_variables,
            request=request,
            request_body_required=False,
            request_has_path_params=False,
            request_has_query_params=True,
            user_agent_header="user-agent",
            accept_header_value="text/event-stream",
            http_headers=http_headers,
            security=self.sdk_configuration.security,
            allow_empty_value=None,
            timeout_ms=timeout_ms,
        )

        if retries == UNSET:
            if self.sdk_configuration.retry_config is not UNSET:
                retries = self.sdk_configuration.retry_config

        retry_config = None
        if isinstance(retries, utils.RetryConfig):
            retry_config = (retries, ["429", "500", "502", "503", "504"])

        http_res = await self.do_request_async(
            hook_ctx=HookContext(
                config=self.sdk_configuration,
                base_url=base_url or "",
                operation_id="get_stream_events_v1_workflows_events_stream_get",
                oauth2_scopes=None,
                security_source=get_security_from_env(
                    self.sdk_configuration.security, models.Security
                ),
            ),
            request=req,
            error_status_codes=["422", "4XX", "5XX"],
            stream=True,
            retry_config=retry_config,
        )

        response_data: Any = None
        if utils.match_response(http_res, "200", "text/event-stream"):
            return eventstreaming.EventStreamAsync(
                http_res,
                lambda raw: utils.unmarshal_json(
                    raw, models.GetStreamEventsV1WorkflowsEventsStreamGetResponseBody
                ),
                client_ref=self,
                data_required=False,
            )
        if utils.match_response(http_res, "422", "application/json"):
            http_res_text = await utils.stream_to_text_async(http_res)
            response_data = unmarshal_json_response(
                errors.HTTPValidationErrorData, http_res, http_res_text
            )
            raise errors.HTTPValidationError(response_data, http_res, http_res_text)
        if utils.match_response(http_res, "4XX", "*"):
            http_res_text = await utils.stream_to_text_async(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)
        if utils.match_response(http_res, "5XX", "*"):
            http_res_text = await utils.stream_to_text_async(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)

        http_res_text = await utils.stream_to_text_async(http_res)
        raise errors.SDKError("Unexpected response received", http_res, http_res_text)

    def get_workflow_events(
        self,
        *,
        root_workflow_exec_id: OptionalNullable[str] = UNSET,
        workflow_exec_id: OptionalNullable[str] = UNSET,
        workflow_run_id: OptionalNullable[str] = UNSET,
        limit: Optional[int] = 100,
        cursor: OptionalNullable[str] = UNSET,
        retries: OptionalNullable[utils.RetryConfig] = UNSET,
        server_url: Optional[str] = None,
        timeout_ms: Optional[int] = None,
        http_headers: Optional[Mapping[str, str]] = None,
    ) -> models.ListWorkflowEventResponse:
        r"""Get Workflow Events

        :param root_workflow_exec_id: Execution ID of the root workflow that initiated this execution chain.
        :param workflow_exec_id: Execution ID of the workflow that emitted this event.
        :param workflow_run_id: Run ID of the workflow that emitted this event.
        :param limit: Maximum number of events to return.
        :param cursor: Cursor for pagination.
        :param retries: Override the default retry configuration for this method
        :param server_url: Override the default server URL for this method
        :param timeout_ms: Override the default request timeout configuration for this method in milliseconds
        :param http_headers: Additional headers to set or replace on requests.
        """
        base_url = None
        url_variables = None
        if timeout_ms is None:
            timeout_ms = self.sdk_configuration.timeout_ms

        if timeout_ms is None:
            timeout_ms = 30000

        if server_url is not None:
            base_url = server_url
        else:
            base_url = self._get_url(base_url, url_variables)

        request = models.GetWorkflowEventsV1WorkflowsEventsListGetRequest(
            root_workflow_exec_id=root_workflow_exec_id,
            workflow_exec_id=workflow_exec_id,
            workflow_run_id=workflow_run_id,
            limit=limit,
            cursor=cursor,
        )

        req = self._build_request(
            method="GET",
            path="/v1/workflows/events/list",
            base_url=base_url,
            url_variables=url_variables,
            request=request,
            request_body_required=False,
            request_has_path_params=False,
            request_has_query_params=True,
            user_agent_header="user-agent",
            accept_header_value="application/json",
            http_headers=http_headers,
            security=self.sdk_configuration.security,
            allow_empty_value=None,
            timeout_ms=timeout_ms,
        )

        if retries == UNSET:
            if self.sdk_configuration.retry_config is not UNSET:
                retries = self.sdk_configuration.retry_config

        retry_config = None
        if isinstance(retries, utils.RetryConfig):
            retry_config = (retries, ["429", "500", "502", "503", "504"])

        http_res = self.do_request(
            hook_ctx=HookContext(
                config=self.sdk_configuration,
                base_url=base_url or "",
                operation_id="get_workflow_events_v1_workflows_events_list_get",
                oauth2_scopes=None,
                security_source=get_security_from_env(
                    self.sdk_configuration.security, models.Security
                ),
            ),
            request=req,
            error_status_codes=["422", "4XX", "5XX"],
            retry_config=retry_config,
        )

        response_data: Any = None
        if utils.match_response(http_res, "200", "application/json"):
            return unmarshal_json_response(models.ListWorkflowEventResponse, http_res)
        if utils.match_response(http_res, "422", "application/json"):
            response_data = unmarshal_json_response(
                errors.HTTPValidationErrorData, http_res
            )
            raise errors.HTTPValidationError(response_data, http_res)
        if utils.match_response(http_res, "4XX", "*"):
            http_res_text = utils.stream_to_text(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)
        if utils.match_response(http_res, "5XX", "*"):
            http_res_text = utils.stream_to_text(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)

        raise errors.SDKError("Unexpected response received", http_res)

    async def get_workflow_events_async(
        self,
        *,
        root_workflow_exec_id: OptionalNullable[str] = UNSET,
        workflow_exec_id: OptionalNullable[str] = UNSET,
        workflow_run_id: OptionalNullable[str] = UNSET,
        limit: Optional[int] = 100,
        cursor: OptionalNullable[str] = UNSET,
        retries: OptionalNullable[utils.RetryConfig] = UNSET,
        server_url: Optional[str] = None,
        timeout_ms: Optional[int] = None,
        http_headers: Optional[Mapping[str, str]] = None,
    ) -> models.ListWorkflowEventResponse:
        r"""Get Workflow Events

        :param root_workflow_exec_id: Execution ID of the root workflow that initiated this execution chain.
        :param workflow_exec_id: Execution ID of the workflow that emitted this event.
        :param workflow_run_id: Run ID of the workflow that emitted this event.
        :param limit: Maximum number of events to return.
        :param cursor: Cursor for pagination.
        :param retries: Override the default retry configuration for this method
        :param server_url: Override the default server URL for this method
        :param timeout_ms: Override the default request timeout configuration for this method in milliseconds
        :param http_headers: Additional headers to set or replace on requests.
        """
        base_url = None
        url_variables = None
        if timeout_ms is None:
            timeout_ms = self.sdk_configuration.timeout_ms

        if timeout_ms is None:
            timeout_ms = 30000

        if server_url is not None:
            base_url = server_url
        else:
            base_url = self._get_url(base_url, url_variables)

        request = models.GetWorkflowEventsV1WorkflowsEventsListGetRequest(
            root_workflow_exec_id=root_workflow_exec_id,
            workflow_exec_id=workflow_exec_id,
            workflow_run_id=workflow_run_id,
            limit=limit,
            cursor=cursor,
        )

        req = self._build_request_async(
            method="GET",
            path="/v1/workflows/events/list",
            base_url=base_url,
            url_variables=url_variables,
            request=request,
            request_body_required=False,
            request_has_path_params=False,
            request_has_query_params=True,
            user_agent_header="user-agent",
            accept_header_value="application/json",
            http_headers=http_headers,
            security=self.sdk_configuration.security,
            allow_empty_value=None,
            timeout_ms=timeout_ms,
        )

        if retries == UNSET:
            if self.sdk_configuration.retry_config is not UNSET:
                retries = self.sdk_configuration.retry_config

        retry_config = None
        if isinstance(retries, utils.RetryConfig):
            retry_config = (retries, ["429", "500", "502", "503", "504"])

        http_res = await self.do_request_async(
            hook_ctx=HookContext(
                config=self.sdk_configuration,
                base_url=base_url or "",
                operation_id="get_workflow_events_v1_workflows_events_list_get",
                oauth2_scopes=None,
                security_source=get_security_from_env(
                    self.sdk_configuration.security, models.Security
                ),
            ),
            request=req,
            error_status_codes=["422", "4XX", "5XX"],
            retry_config=retry_config,
        )

        response_data: Any = None
        if utils.match_response(http_res, "200", "application/json"):
            return unmarshal_json_response(models.ListWorkflowEventResponse, http_res)
        if utils.match_response(http_res, "422", "application/json"):
            response_data = unmarshal_json_response(
                errors.HTTPValidationErrorData, http_res
            )
            raise errors.HTTPValidationError(response_data, http_res)
        if utils.match_response(http_res, "4XX", "*"):
            http_res_text = await utils.stream_to_text_async(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)
        if utils.match_response(http_res, "5XX", "*"):
            http_res_text = await utils.stream_to_text_async(http_res)
            raise errors.SDKError("API error occurred", http_res, http_res_text)

        raise errors.SDKError("Unexpected response received", http_res)
