Skip to content

Reference

Bases: ABC

Abstract class representing a messagable target.

Source code in hoist/_messages.py
class BaseMessagable(ABC):
    """Abstract class representing a messagable target."""

    async def pend_message(
        self,
        msg: Optional[str] = None,
        data: Optional[Payload] = None,
        replying: Optional["Message"] = None,
    ) -> "PendingMessage":
        """Get a message to be sent later.

        Args:
            msg: Message content.
            data: Payload to include with the message.
            replying: Message to reply to.
        """
        from .message import PendingMessage

        return PendingMessage(
            self,
            msg,
            data=data,
            replying=replying,
        )

    @asynccontextmanager
    async def message_later(
        self,
        msg: Optional[str] = None,
        data: Optional[Payload] = None,
        replying: Optional["Message"] = None,
    ) -> AsyncIterator["PendingMessage"]:
        """Send a message after the context has finished.

        Args:
            msg: Message content.
            data: Payload to include with the message.
            replying: Message to reply to.
        """
        from .message import PendingMessage

        obj = PendingMessage(
            self,
            msg,
            data=data,
            replying=replying,
        )
        try:
            yield obj
        finally:
            await obj.send()

    @abstractmethod
    async def message(
        self,
        msg: str,
        data: Optional[Payload] = None,
        replying: Optional["Message"] = None,
        listeners: Optional[MessageListeners] = None,
    ) -> "Message":
        """Send a message.

        Args:
            msg: Message content.
            data: Payload to include with the message.
            replying: Message to reply to.
            listeners: Extra listeners to include.

        Returns:
            Created message.
        """
        ...

message(msg, data=None, replying=None, listeners=None) async abstractmethod

Send a message.

Parameters:

Name Type Description Default
msg str

Message content.

required
data Optional[Payload]

Payload to include with the message.

None
replying Optional[Message]

Message to reply to.

None
listeners Optional[MessageListeners]

Extra listeners to include.

None

Returns:

Type Description
Message

Created message.

Source code in hoist/_messages.py
@abstractmethod
async def message(
    self,
    msg: str,
    data: Optional[Payload] = None,
    replying: Optional["Message"] = None,
    listeners: Optional[MessageListeners] = None,
) -> "Message":
    """Send a message.

    Args:
        msg: Message content.
        data: Payload to include with the message.
        replying: Message to reply to.
        listeners: Extra listeners to include.

    Returns:
        Created message.
    """
    ...

message_later(msg=None, data=None, replying=None) async

Send a message after the context has finished.

Parameters:

Name Type Description Default
msg Optional[str]

Message content.

None
data Optional[Payload]

Payload to include with the message.

None
replying Optional[Message]

Message to reply to.

None
Source code in hoist/_messages.py
@asynccontextmanager
async def message_later(
    self,
    msg: Optional[str] = None,
    data: Optional[Payload] = None,
    replying: Optional["Message"] = None,
) -> AsyncIterator["PendingMessage"]:
    """Send a message after the context has finished.

    Args:
        msg: Message content.
        data: Payload to include with the message.
        replying: Message to reply to.
    """
    from .message import PendingMessage

    obj = PendingMessage(
        self,
        msg,
        data=data,
        replying=replying,
    )
    try:
        yield obj
    finally:
        await obj.send()

pend_message(msg=None, data=None, replying=None) async

Get a message to be sent later.

Parameters:

Name Type Description Default
msg Optional[str]

Message content.

None
data Optional[Payload]

Payload to include with the message.

None
replying Optional[Message]

Message to reply to.

None
Source code in hoist/_messages.py
async def pend_message(
    self,
    msg: Optional[str] = None,
    data: Optional[Payload] = None,
    replying: Optional["Message"] = None,
) -> "PendingMessage":
    """Get a message to be sent later.

    Args:
        msg: Message content.
        data: Payload to include with the message.
        replying: Message to reply to.
    """
    from .message import PendingMessage

    return PendingMessage(
        self,
        msg,
        data=data,
        replying=replying,
    )

Base class for handling message listening.

Source code in hoist/_messages.py
class MessageListener:
    """Base class for handling message listening."""

    __slots__ = (
        "_message_listeners",
        "_current_id",
        "_all_messages",
    )

    def __init__(
        self,
        extra_listeners: Optional[MessageListeners] = None,
    ):
        self._message_listeners: MessageListeners = {
            **(extra_listeners or {}),
        }
        self._current_id = 0
        self._all_messages: Dict[int, "Message"] = {}

    @property
    def message_listeners(self) -> MessageListeners:
        """Listener function for messages."""
        return self._message_listeners

    @message_listeners.setter
    def message_listeners(self, value: MessageListeners) -> None:
        self._message_listeners = value

    async def _call_listeners(
        self,
        ws: "BaseMessagable",
        message: str,
        payload: Payload,
        replying: Optional[dict],
        id: int,
    ) -> None:
        ml = self.message_listeners
        listeners = ml.get(message)
        obj = await self.create_or_lookup(
            ws,
            message,
            payload,
            id,
            replying,
        )
        reply = obj.replying

        if reply:
            await _process_listeners(
                reply.message_listeners.get(message),
                reply,
                hide_warning=True,
            )

        await _process_listeners(listeners, obj)

        glbl = ml.get(None)
        await _process_listeners(
            glbl,
            obj,
            hide_warning=True,
        )

    def receive(
        self,
        message: Optional[Union[str, Tuple[str, ...]]] = None,
        parameter: Optional[Union[Schema, T]] = None,
    ):
        """Add a listener for message receiving.

        Args:
            message: Message to listen for.
            parameter: Parameter type to use.
        """

        def decorator(func: Listener):
            listeners = self.message_listeners

            param: Optional[Union[DataclassLike, Schema]] = parameter

            if not param:
                hints = get_type_hints(func)
                if hints:
                    with suppress(IndexError):
                        param = hints[tuple(hints.keys())[1]]

            params = len(inspect.signature(func).parameters)
            value = ListenerData(
                func,
                param or dict,
                ListenerParam.NONE
                if not params
                else ListenerParam.MESSAGE_ONLY
                if params == 1
                else ListenerParam.MESSAGE_AND_PAYLOAD,
            )

            if message in listeners:
                listeners[message].append(value)  # type: ignore
            else:
                if not isinstance(message, tuple):
                    listeners[message] = [value]
                else:
                    for i in message:
                        if i in listeners:
                            listeners[i].append(value)
                        else:
                            listeners[i] = [value]

        return decorator

    @property
    def current_id(self) -> int:
        """Current message ID."""
        return self._current_id

    async def create_message(
        self,
        conn: "BaseMessagable",
        data: Payload,
    ) -> "Message":
        """Build a message from a payload.

        Args:
            conn: Messagable connection target.
            data: Payload to create the message from.
        """
        mid: int = data["id"]
        self._all_messages[mid] = await self._build_message(conn, data)
        return self._all_messages[mid]

    async def create_or_lookup(
        self,
        conn: "BaseMessagable",
        content: str,
        message_data: Payload,
        id: int,
        replying: Optional[Union["Message", dict]],
        *,
        listeners: Optional[MessageListeners] = None,
    ) -> "Message":
        """Create a new message wtih the specified ID, or look it up if it already exists.

        Args:
            conn: Messagable connection target.
            content: Message content.
            message_data: Payload to include with the message.
            id: ID of the message:
            replying: Message that the target is replying to.
            listeners: Extra message listeners.

        Returns:
            Created message.
        """  # noqa
        mid: int = id
        obj = self._all_messages.get(mid)

        if obj:
            hlog(
                "create or lookup",
                f"using existing {obj}",
                level=logging.DEBUG,
            )
            return obj

        log(
            "create or lookup",
            f"id {mid} does not exist, creating it",
            level=logging.DEBUG,
        )
        obj = await self.new_message(
            conn,
            content,
            message_data,
            replying,
            id=id,
            listeners=listeners,
        )

        if listeners:
            obj.message_listeners = listeners

        return obj

    async def new_message(
        self,
        conn: "BaseMessagable",
        content: str,
        message_data: Payload,
        replying: Optional[Union["Message", dict]],
        *,
        id: Optional[int] = None,
        listeners: Optional[MessageListeners] = None,
    ) -> "Message":
        """Create a new message.

        Args:
            conn: Messagable connection target.
            content: Message content.
            message_data: Payload to include with the message.
            id: ID of the message:
            replying: Message that the target is replying to.
            listeners: Extra message listeners.

        Returns:
            Created message.

        Raises:
            ValueError: Message ID is already in use.
        """
        from .message import Message

        self._current_id += 1
        mid = id or self._current_id

        if mid in self._all_messages:
            raise ValueError(
                f"message {id} has already been created",
            )

        obj = Message(
            conn,
            content,
            mid,
            data=message_data,
            replying=replying
            if not isinstance(replying, dict)
            else await self._build_message(
                conn,
                replying,
            ),
        )

        if listeners:
            obj.message_listeners = listeners

        self._all_messages[mid] = obj
        hlog(
            "message create",
            f"constructed new message {obj}",
            level=logging.DEBUG,
        )
        return obj

    async def lookup(self, id: int) -> "Message":
        """Lookup a message by its ID.

        Args:
            id: ID of the message to lookup.

        Raises:
            ValueError: Message ID does not exist.
        """
        try:
            obj = self._all_messages[id]
        except KeyError as e:
            raise ValueError(
                f"(internal error) message {id} does not exist",
            ) from e
        hlog(
            "message lookup",
            f"looked up {obj}",
            level=logging.DEBUG,
        )
        return obj

    async def _build_message(
        self,
        conn: "BaseMessagable",
        data: Payload,
    ) -> "Message":
        """Generate a message object from a payload."""
        return await self.create_or_lookup(
            conn,
            data["message"],
            data["data"],
            data["id"],
            data["replying"],
        )

create_message(conn, data) async

Build a message from a payload.

Parameters:

Name Type Description Default
conn BaseMessagable

Messagable connection target.

required
data Payload

Payload to create the message from.

required
Source code in hoist/_messages.py
async def create_message(
    self,
    conn: "BaseMessagable",
    data: Payload,
) -> "Message":
    """Build a message from a payload.

    Args:
        conn: Messagable connection target.
        data: Payload to create the message from.
    """
    mid: int = data["id"]
    self._all_messages[mid] = await self._build_message(conn, data)
    return self._all_messages[mid]

create_or_lookup(conn, content, message_data, id, replying, *, listeners=None) async

Create a new message wtih the specified ID, or look it up if it already exists.

Parameters:

Name Type Description Default
conn BaseMessagable

Messagable connection target.

required
content str

Message content.

required
message_data Payload

Payload to include with the message.

required
id int

ID of the message:

required
replying Optional[Union[Message, dict]]

Message that the target is replying to.

required
listeners Optional[MessageListeners]

Extra message listeners.

None

Returns:

Type Description
Message

Created message.

Source code in hoist/_messages.py
async def create_or_lookup(
    self,
    conn: "BaseMessagable",
    content: str,
    message_data: Payload,
    id: int,
    replying: Optional[Union["Message", dict]],
    *,
    listeners: Optional[MessageListeners] = None,
) -> "Message":
    """Create a new message wtih the specified ID, or look it up if it already exists.

    Args:
        conn: Messagable connection target.
        content: Message content.
        message_data: Payload to include with the message.
        id: ID of the message:
        replying: Message that the target is replying to.
        listeners: Extra message listeners.

    Returns:
        Created message.
    """  # noqa
    mid: int = id
    obj = self._all_messages.get(mid)

    if obj:
        hlog(
            "create or lookup",
            f"using existing {obj}",
            level=logging.DEBUG,
        )
        return obj

    log(
        "create or lookup",
        f"id {mid} does not exist, creating it",
        level=logging.DEBUG,
    )
    obj = await self.new_message(
        conn,
        content,
        message_data,
        replying,
        id=id,
        listeners=listeners,
    )

    if listeners:
        obj.message_listeners = listeners

    return obj

current_id() property

Current message ID.

Source code in hoist/_messages.py
@property
def current_id(self) -> int:
    """Current message ID."""
    return self._current_id

lookup(id) async

Lookup a message by its ID.

Parameters:

Name Type Description Default
id int

ID of the message to lookup.

required

Raises:

Type Description
ValueError

Message ID does not exist.

Source code in hoist/_messages.py
async def lookup(self, id: int) -> "Message":
    """Lookup a message by its ID.

    Args:
        id: ID of the message to lookup.

    Raises:
        ValueError: Message ID does not exist.
    """
    try:
        obj = self._all_messages[id]
    except KeyError as e:
        raise ValueError(
            f"(internal error) message {id} does not exist",
        ) from e
    hlog(
        "message lookup",
        f"looked up {obj}",
        level=logging.DEBUG,
    )
    return obj

message_listeners() writable property

Listener function for messages.

Source code in hoist/_messages.py
@property
def message_listeners(self) -> MessageListeners:
    """Listener function for messages."""
    return self._message_listeners

new_message(conn, content, message_data, replying, *, id=None, listeners=None) async

Create a new message.

Parameters:

Name Type Description Default
conn BaseMessagable

Messagable connection target.

required
content str

Message content.

required
message_data Payload

Payload to include with the message.

required
id Optional[int]

ID of the message:

None
replying Optional[Union[Message, dict]]

Message that the target is replying to.

required
listeners Optional[MessageListeners]

Extra message listeners.

None

Returns:

Type Description
Message

Created message.

Raises:

Type Description
ValueError

Message ID is already in use.

Source code in hoist/_messages.py
async def new_message(
    self,
    conn: "BaseMessagable",
    content: str,
    message_data: Payload,
    replying: Optional[Union["Message", dict]],
    *,
    id: Optional[int] = None,
    listeners: Optional[MessageListeners] = None,
) -> "Message":
    """Create a new message.

    Args:
        conn: Messagable connection target.
        content: Message content.
        message_data: Payload to include with the message.
        id: ID of the message:
        replying: Message that the target is replying to.
        listeners: Extra message listeners.

    Returns:
        Created message.

    Raises:
        ValueError: Message ID is already in use.
    """
    from .message import Message

    self._current_id += 1
    mid = id or self._current_id

    if mid in self._all_messages:
        raise ValueError(
            f"message {id} has already been created",
        )

    obj = Message(
        conn,
        content,
        mid,
        data=message_data,
        replying=replying
        if not isinstance(replying, dict)
        else await self._build_message(
            conn,
            replying,
        ),
    )

    if listeners:
        obj.message_listeners = listeners

    self._all_messages[mid] = obj
    hlog(
        "message create",
        f"constructed new message {obj}",
        level=logging.DEBUG,
    )
    return obj

receive(message=None, parameter=None)

Add a listener for message receiving.

Parameters:

Name Type Description Default
message Optional[Union[str, Tuple[str, ...]]]

Message to listen for.

None
parameter Optional[Union[Schema, T]]

Parameter type to use.

None
Source code in hoist/_messages.py
def receive(
    self,
    message: Optional[Union[str, Tuple[str, ...]]] = None,
    parameter: Optional[Union[Schema, T]] = None,
):
    """Add a listener for message receiving.

    Args:
        message: Message to listen for.
        parameter: Parameter type to use.
    """

    def decorator(func: Listener):
        listeners = self.message_listeners

        param: Optional[Union[DataclassLike, Schema]] = parameter

        if not param:
            hints = get_type_hints(func)
            if hints:
                with suppress(IndexError):
                    param = hints[tuple(hints.keys())[1]]

        params = len(inspect.signature(func).parameters)
        value = ListenerData(
            func,
            param or dict,
            ListenerParam.NONE
            if not params
            else ListenerParam.MESSAGE_ONLY
            if params == 1
            else ListenerParam.MESSAGE_AND_PAYLOAD,
        )

        if message in listeners:
            listeners[message].append(value)  # type: ignore
        else:
            if not isinstance(message, tuple):
                listeners[message] = [value]
            else:
                for i in message:
                    if i in listeners:
                        listeners[i].append(value)
                    else:
                        listeners[i] = [value]

    return decorator

Server

Bases: MessageListener

Class for handling a server.

Source code in hoist/server.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
class Server(MessageListener):
    """Class for handling a server."""

    __slots__ = (
        "_token",
        "_hide_token",
        "_login_func",
        "_minimum_version",
        "_operations",
        "_supported_operations",
        "_unsupported_operations",
        "_clients",
        "_server",
        "_start_called",
        "_all_connections",
        "_is_fancy",
        "_exceptions",
    )

    def __init__(
        self,
        token: Optional[str] = None,
        *,
        default_token_len: int = 25,
        default_token_choices: "SupportsLenAndGetItem[str]" = ascii_letters,
        hide_token: bool = False,
        login_func: LoginFunc = _base_login,
        log_level: Optional[int] = None,
        minimum_version: Optional[VersionLike] = None,
        extra_operations: Optional[Operations] = None,
        unsupported_operations: Optional[Sequence[str]] = None,
        supported_operations: Optional[Sequence[str]] = None,
        extra_listeners: Optional[MessageListeners] = None,
        fancy: Optional[bool] = None,
        disable_builtins: bool = False,
    ) -> None:
        """Constructor for `Server`.

        Args:
            token: Token to use on the server.
            default_token_len: Length of autogenerated token.
            default_token_choices: Choices to use for the autogenerated token.
            hide_token: Should the token be hidden from the startup message.
            login_func: Function to use to handles logins to the server.
            minimum_version: Minimum version of Hoist required to connect.
            extra_operations: Extra operations to include on the server.
            unsupported_operations: Operations not supported by the server.
            supported_operations: Operations supported by the server.
            extra_listeners: Extra message listeners.
            fancy: Should the server run in fancy mode.
            disable_builtins: Should all builtin operations be disabled.
        """
        self._token: str = token or "".join(
            [choice(default_token_choices) for _ in range(default_token_len)],
        )
        self._hide_token = hide_token
        self._login_func = login_func

        if log_level:
            logging.getLogger("hoist").setLevel(log_level)

        self._minimum_version = minimum_version
        self._operations = {**BASE_OPERATIONS, **(extra_operations or {})}
        self._supported_operations = supported_operations or ["*"]
        self._unsupported_operations = (
            unsupported_operations or []
            if not disable_builtins
            else tuple(BASE_OPERATIONS.keys())
        )
        self._clients: List[Socket] = []
        self._server: Optional[UvicornServer] = None
        self._start_called: bool = False
        self._all_connections: List[Socket] = []
        self._is_fancy: Optional[bool] = fancy
        self._exceptions: List[Exception] = []
        super().__init__(extra_listeners)

    async def _call_operation(
        self,
        op: OperationData,
        payload: Payload,
    ) -> JSONLike:
        """Call an operation."""
        func = op[0]
        typ = op[1]
        custom_payload = op[2]

        if typ is OperatorParam.NONE:
            result = await func()  # type: ignore
        elif typ is OperatorParam.SERVER_ONLY:
            result = await func(self)  # type: ignore
        elif typ in {
            OperatorParam.PAYLOAD_ONLY,
            OperatorParam.SERVER_AND_PAYLOAD,
        }:
            if custom_payload:
                hints = get_type_hints(func)
                cl: Type = hints[tuple(hints.keys())[0]]

                verify_schema(get_type_hints(cl), payload)
                data = cl(**payload)
            else:
                data = payload  # type: ignore
            args = (
                (
                    self,
                    data,
                )
                if typ is OperatorParam.SERVER_AND_PAYLOAD
                else (data,)
            )
            result = await func(*args)  # type: ignore
        else:
            hints = get_type_hints(func)
            verify_schema(hints, payload)
            result = await func(**payload)  # type: ignore

        result_type = type(result)

        if (result_type in _PY_BUILTINS) or (not result):
            return result

        dct: Optional[dict] = getattr(result, "__dict__", None)

        if not dct:
            raise TypeError(
                f"operation handler {func.__name__} returned non-json type: {result_type}",  # noqa
            )

        return dct

    @property
    def fancy(self) -> bool:
        """Whether the server is running with fancy output."""
        return self._is_fancy or False

    @property
    def supported_operations(self) -> Sequence[str]:
        """Operations supported by the server."""
        return self._supported_operations

    @property
    def unsupported_operations(self) -> Sequence[str]:
        """Operations blacklisted by the server."""
        return self._unsupported_operations

    def _verify_operations(self) -> None:
        """Verify current operations lists."""
        so = self.supported_operations
        uo = self.unsupported_operations

        for i in {so, uo}:
            if "*" in i:
                if len(i) > 1:
                    raise ValueError(
                        '"*" should be the only operation',
                    )
                return

        for i in so:
            if i in uo:
                raise ValueError(
                    f'operation "{i}" is both supported and unsupported',
                )

    async def _verify_operation(  # not sure if i need async here
        self, operation: str
    ) -> bool:
        """Verify that an operation is supported by the server."""
        so = self.supported_operations
        uo = self.unsupported_operations

        if ("*" in uo) and (operation not in so):
            return False

        if operation in uo:
            return False

        if "*" in so:
            return not (operation in uo)

        return not (operation not in self.supported_operations)

    @property
    def token(self) -> str:
        """Authentication token used to connect."""
        return self._token

    @staticmethod
    async def _handle_schema(
        ws: Socket,
        payload: Payload,
        schema: Schema,
    ) -> List[Any]:
        """Verify a JSON object received from the user via a schema."""
        try:
            verify_schema(
                schema,
                payload,
            )
        except SchemaValidationError as e:
            await ws.error(
                INVALID_CONTENT,
                payload=invalid_payload(e),
            )

        return [payload[i] for i in schema]

    async def _process_operation(
        self,
        ws: Socket,
        payload: Payload,
        id: int,
    ) -> None:
        """Execute an operation."""
        operation, data = await self._handle_schema(
            ws,
            payload,
            {
                "operation": str,
                "data": dict,
            },
        )

        op = self._operations.get(operation)

        if not op:
            await ws.error(UNKNOWN_OPERATION)

        if not (await self._verify_operation(operation)):
            await ws.error(UNSUPPORTED_OPERATION)

        try:
            result = await self._call_operation(op, data)
        except SchemaValidationError as e:
            await ws.error(INVALID_CONTENT, payload=invalid_payload(e))

        await ws.success(id, payload={"result": result})

    async def _process_message(
        self,
        ws: Socket,
        payload: Payload,
        id: int,
    ) -> None:
        """Call message listeners."""
        message, data, replying = await self._handle_schema(
            ws,
            payload,
            {
                "message": str,
                "data": dict,
                "replying": (dict, None),
            },
        )

        transport = _SocketMessageTransport(ws, self, id)
        obj = await self.new_message(transport, message, data, replying)
        mid = obj.id

        await ws.success(
            id,
            payload={"id": mid},
            message=LISTENER_OPEN,
        )

        try:
            await self._call_listeners(
                transport,
                message,
                data,
                replying,
                mid,
            )
        except Exception as e:
            if isinstance(e, SchemaValidationError):
                await ws.error(INVALID_CONTENT, payload=invalid_payload(e))

            await self._internal_error(ws, e)

        await ws.success(id, message=LISTENER_CLOSE)

    async def _ws_wrapper(self, ws: Socket) -> None:
        """Main implementation of WebSocket logic."""
        version, token, id = await ws.recv(
            {
                "version": str,
                "token": str,
                "id": int,
            }
        )

        minver = self._minimum_version

        if minver:
            minver_actual = (
                minver
                if isinstance(minver, Version)
                else parse_version(minver)  # fmt: off
            )

            if not (parse_version(version) >= minver_actual):
                await ws.error(
                    BAD_VERSION,
                    payload={"needed": minver_actual.to_string()},
                )

        if not (await self._login_func(self, token)):
            await ws.error(LOGIN_FAILED)

        await ws.success(id)

        log(
            "login",
            f"{make_client(ws.address)} has successfully authenticated",  # noqa
        )

        while True:
            data: Payload
            action, data, nid = await ws.recv(
                {
                    "action": str,
                    "data": dict,
                    "id": int,
                }
            )

            if action == "operation":
                await self._process_operation(ws, data, nid)
            elif action == "message":
                await self._process_message(ws, data, nid)
            else:
                await ws.error(INVALID_ACTION)

    async def _ws(self, ws: Socket) -> None:
        """WebSocket entry point for Starlette."""  # noqa
        self._clients.append(ws)
        self._all_connections.append(ws)
        ws.status = Status.CONNECTED

        try:
            await self._ws_wrapper(ws)
        except Exception as e:
            ws.status = Status.KILLED
            log(
                "exc",
                f"{e.__class__.__name__}: {str(e) or '<no message>'}",
                level=logging.DEBUG,
            )
            addr = ws.address
            if isinstance(e, WebSocketDisconnect):
                log(
                    "disconnect",
                    f"unexpected disconnect{make_client_msg(addr)}",
                    level=logging.WARNING,
                )
            elif isinstance(e, ClientError):
                log(
                    "error",
                    f"connection from {make_client(addr)} encountered error {e.code} ([bold red]{e.error}[/]): [bold white]{e.message}",  # noqa
                    level=logging.ERROR,
                )
                await ws.close(1003)
            elif isinstance(e, CloseSocket):
                ws.status = Status.CLOSED
                await ws.close(1000)
            else:
                log(
                    "exception",
                    f"exception occured while receiving{make_client_msg(addr)}",  # noqa
                    level=logging.CRITICAL,
                )

                await self._internal_error(ws, e)
                await ws.close(1003)

        self._clients.remove(ws)

    async def _internal_error(
        self,
        ws: Socket,
        e: Exception,
    ):
        self._exceptions.append(e)

        if not self.fancy:
            print_exc(show_locals=True)

        with suppress(ClientError):
            await ws.error(
                SERVER_ERROR,
                payload={"message": str(e), "exc": e.__class__.__name__},
            )

    async def _app(
        self,
        scope: Scope,
        receive: Receive,
        send: Send,
    ) -> None:
        """Main Starlette app implementation."""
        typ: str = scope["type"]

        if typ != "lifespan":
            path: str = scope["path"]
            hlog(
                "request" if typ == "http" else "websocket",
                f"[bold white]{path}[/] {scope}",
                level=logging.DEBUG,
            )

            if typ == "http":
                response: Response
                if path == "/hoist/ack":
                    msg = {"version": __version__}
                    hlog("ack", msg, level=logging.DEBUG)
                    response = JSONResponse(msg)
                else:
                    response = HTMLResponse(
                        HTML,
                    )
                await response(scope, receive, send)

            if typ == "websocket":
                if path == "/hoist":
                    socket = WebSocket(scope, receive, send)
                    obj = Socket(socket)
                    await obj.connect()

                    return await self._ws(obj)

                response = Response(
                    "Not found.",
                    media_type="text/plain",
                    status_code=404,
                )
                await response(scope, receive, send)

    @staticmethod
    def _ensure_none(url: URL):
        """Ensure that the target URL is not already being used."""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        result = sock.connect_ex((url.host, url.port))

        if not result:
            raise AlreadyInUseError(
                f"{url} is an existing server (did you forget to close it?)",  # noqa
            )

    def _fancy(self):
        with Live(screen=True) as live:
            while self.running:
                table = Table(
                    box=box.ROUNDED,
                    title=f"[bold cyan]Hoist {__version__}",
                )
                table.add_column("Address")
                table.add_column("Status")
                excs = self._exceptions
                rendered_table = Padding(Align.center(table), 1)

                layout = Layout(
                    rendered_table if not excs else None,
                )

                if excs:
                    lay = Layout()
                    lay.split_row(
                        *[
                            Panel(
                                "".join(
                                    format_exception(
                                        type(e),
                                        e,
                                        e.__traceback__,
                                    )
                                ),
                                title=f"[bold red]{e.__class__.__name__}",
                            )
                            for e in excs
                        ]
                    )

                    layout.split_column(
                        Layout(rendered_table, ratio=2),
                        lay,
                    )

                for i in self._all_connections:
                    table.add_row(
                        f"[bold cyan]{i.make_address()}",
                        i.rich_status,
                    )

                live.update(layout)

    def start(  # type: ignore
        self,
        *,
        host: str = "0.0.0.0",
        port: int = 5000,
        fancy: Optional[bool] = None,
    ) -> None:
        """Start the server.

        Args:
            host: Where to host the server.
            port: Port to put the server on.
            fancy: Should the server run in fancy mode. When `None`, uses the current `fancy` property.

        Example:
            ```py
            server = hoist.Server(...)
            server.start()
            ```

        Raises:
            AlreadyInUseError: Server is already running.
            RuntimeError: Attempted to start the server from a running event loop.
        """  # noqa
        if self.running:
            raise AlreadyInUseError(
                "server is already running (did you forget to call close?)",
            )

        self._is_fancy = fancy if fancy is not None else self._is_fancy
        fancy = self._is_fancy

        async def _app(
            scope: Scope,
            receive: Receive,
            send: Send,
        ) -> None:
            await self._app(scope, receive, send)

        self._ensure_none(
            URL.build(
                host=host,
                port=port,
                scheme="http",
            )
        )

        config = uvicorn.Config(_app, host=host, port=port, lifespan="on")
        self._server = UvicornServer(config)

        if fancy:
            logging.getLogger("hoist").setLevel(logging.ERROR)

        args = (
            self._hide_token,
            self.token,
        )

        try:
            if not fancy:
                self._server.run_in_thread(*args)
            else:
                with _CONSOLE.status("Starting...", spinner="bouncingBar"):
                    self._server.run_in_thread(*args)

                Thread(target=self._fancy).start()
        except RuntimeError as e:
            raise RuntimeError(
                "server cannot start from a running event loop",
            ) from e

        self._start_called = True

    async def broadcast(
        self,
        message: str,
        payload: Optional[Payload] = None,
    ) -> None:
        """Send a message to all connections.

        Args:
            message: Message to send.
            payload: Payload to include.

        Example:
            ```py
            @server.operation("broadcast")
            async def bc():
                await server.broadcast("this is a broadcast")
            ```
        """
        if not self._server:
            raise ServerNotStartedError(
                "server is not started (did you forget to call start?)"
            )

        if not self._clients:
            log(
                "broadcast",
                "no clients are connected",
                level=logging.WARNING,
            )

        for i in self._clients:
            transport = _SocketMessageTransport(
                i,
                self,
                None,
                SINGLE_NEW_MESSAGE,
            )
            try:
                await transport.message(message, payload)
            except Exception as e:
                hlog(
                    "broadcast",
                    f"exception occured while sending: {type(e).__name__} - {e}",  # noqa
                    level=logging.WARNING,
                )

    def close(self) -> None:
        """Close the server.

        Example:
            ```py
            server = hoist.start(...)
            server.close()
            ```

        Raises:
            ServerNotStartedError: Server is not started.
        """
        if not self._server:
            hlp = (
                "call close twice"
                if self._start_called
                else "forget to call start"  # fmt: off
            )
            raise ServerNotStartedError(
                f"server is not started (did you {hlp}?)",
            )
        self._server.close_thread()
        self._server = None
        self._start_called = False

    @property
    def running(self) -> bool:
        """Whether the server is running."""
        return bool(self._server)

    def stop(self) -> None:
        """Alias to `close`."""
        self.close()

    def operation(self, name: str):
        """Add a function for an operation.

        Args:
            name: Name of the operation.

        Example:
            ```py
            @server.operation("my operation name")
            async def something():
                ...
            ```
        """

        def decorator(func: Operator):
            pl_type: bool = False
            op_type = OperatorParam.NONE
            params = inspect.signature(func).parameters

            if params:
                annotations = [i.annotation for i in params.values()]
                plen: int = len(params)
                first = annotations[0]

                if annotations == [Server]:
                    op_type = OperatorParam.SERVER_ONLY
                elif first == Server:
                    op_type = OperatorParam.SERVER_AND_PAYLOAD
                else:
                    if (first in _PY_BUILTINS) and (first is not dict):
                        op_type = OperatorParam.DYNAMIC
                    else:
                        op_type = (
                            OperatorParam.SERVER_AND_PAYLOAD
                            if plen == 2
                            else OperatorParam.PAYLOAD_ONLY
                            if plen == 1
                            else OperatorParam.DYNAMIC
                        )
            else:
                op_type = OperatorParam.NONE

            self._operations[name] = (
                func,
                op_type,
                pl_type,
            )

        return decorator

__init__(token=None, *, default_token_len=25, default_token_choices=ascii_letters, hide_token=False, login_func=_base_login, log_level=None, minimum_version=None, extra_operations=None, unsupported_operations=None, supported_operations=None, extra_listeners=None, fancy=None, disable_builtins=False)

Constructor for Server.

Parameters:

Name Type Description Default
token Optional[str]

Token to use on the server.

None
default_token_len int

Length of autogenerated token.

25
default_token_choices SupportsLenAndGetItem[str]

Choices to use for the autogenerated token.

ascii_letters
hide_token bool

Should the token be hidden from the startup message.

False
login_func LoginFunc

Function to use to handles logins to the server.

_base_login
minimum_version Optional[VersionLike]

Minimum version of Hoist required to connect.

None
extra_operations Optional[Operations]

Extra operations to include on the server.

None
unsupported_operations Optional[Sequence[str]]

Operations not supported by the server.

None
supported_operations Optional[Sequence[str]]

Operations supported by the server.

None
extra_listeners Optional[MessageListeners]

Extra message listeners.

None
fancy Optional[bool]

Should the server run in fancy mode.

None
disable_builtins bool

Should all builtin operations be disabled.

False
Source code in hoist/server.py
def __init__(
    self,
    token: Optional[str] = None,
    *,
    default_token_len: int = 25,
    default_token_choices: "SupportsLenAndGetItem[str]" = ascii_letters,
    hide_token: bool = False,
    login_func: LoginFunc = _base_login,
    log_level: Optional[int] = None,
    minimum_version: Optional[VersionLike] = None,
    extra_operations: Optional[Operations] = None,
    unsupported_operations: Optional[Sequence[str]] = None,
    supported_operations: Optional[Sequence[str]] = None,
    extra_listeners: Optional[MessageListeners] = None,
    fancy: Optional[bool] = None,
    disable_builtins: bool = False,
) -> None:
    """Constructor for `Server`.

    Args:
        token: Token to use on the server.
        default_token_len: Length of autogenerated token.
        default_token_choices: Choices to use for the autogenerated token.
        hide_token: Should the token be hidden from the startup message.
        login_func: Function to use to handles logins to the server.
        minimum_version: Minimum version of Hoist required to connect.
        extra_operations: Extra operations to include on the server.
        unsupported_operations: Operations not supported by the server.
        supported_operations: Operations supported by the server.
        extra_listeners: Extra message listeners.
        fancy: Should the server run in fancy mode.
        disable_builtins: Should all builtin operations be disabled.
    """
    self._token: str = token or "".join(
        [choice(default_token_choices) for _ in range(default_token_len)],
    )
    self._hide_token = hide_token
    self._login_func = login_func

    if log_level:
        logging.getLogger("hoist").setLevel(log_level)

    self._minimum_version = minimum_version
    self._operations = {**BASE_OPERATIONS, **(extra_operations or {})}
    self._supported_operations = supported_operations or ["*"]
    self._unsupported_operations = (
        unsupported_operations or []
        if not disable_builtins
        else tuple(BASE_OPERATIONS.keys())
    )
    self._clients: List[Socket] = []
    self._server: Optional[UvicornServer] = None
    self._start_called: bool = False
    self._all_connections: List[Socket] = []
    self._is_fancy: Optional[bool] = fancy
    self._exceptions: List[Exception] = []
    super().__init__(extra_listeners)

broadcast(message, payload=None) async

Send a message to all connections.

Parameters:

Name Type Description Default
message str

Message to send.

required
payload Optional[Payload]

Payload to include.

None
Example
@server.operation("broadcast")
async def bc():
    await server.broadcast("this is a broadcast")
Source code in hoist/server.py
async def broadcast(
    self,
    message: str,
    payload: Optional[Payload] = None,
) -> None:
    """Send a message to all connections.

    Args:
        message: Message to send.
        payload: Payload to include.

    Example:
        ```py
        @server.operation("broadcast")
        async def bc():
            await server.broadcast("this is a broadcast")
        ```
    """
    if not self._server:
        raise ServerNotStartedError(
            "server is not started (did you forget to call start?)"
        )

    if not self._clients:
        log(
            "broadcast",
            "no clients are connected",
            level=logging.WARNING,
        )

    for i in self._clients:
        transport = _SocketMessageTransport(
            i,
            self,
            None,
            SINGLE_NEW_MESSAGE,
        )
        try:
            await transport.message(message, payload)
        except Exception as e:
            hlog(
                "broadcast",
                f"exception occured while sending: {type(e).__name__} - {e}",  # noqa
                level=logging.WARNING,
            )

close()

Close the server.

Example
server = hoist.start(...)
server.close()

Raises:

Type Description
ServerNotStartedError

Server is not started.

Source code in hoist/server.py
def close(self) -> None:
    """Close the server.

    Example:
        ```py
        server = hoist.start(...)
        server.close()
        ```

    Raises:
        ServerNotStartedError: Server is not started.
    """
    if not self._server:
        hlp = (
            "call close twice"
            if self._start_called
            else "forget to call start"  # fmt: off
        )
        raise ServerNotStartedError(
            f"server is not started (did you {hlp}?)",
        )
    self._server.close_thread()
    self._server = None
    self._start_called = False

fancy() property

Whether the server is running with fancy output.

Source code in hoist/server.py
@property
def fancy(self) -> bool:
    """Whether the server is running with fancy output."""
    return self._is_fancy or False

operation(name)

Add a function for an operation.

Parameters:

Name Type Description Default
name str

Name of the operation.

required
Example
@server.operation("my operation name")
async def something():
    ...
Source code in hoist/server.py
def operation(self, name: str):
    """Add a function for an operation.

    Args:
        name: Name of the operation.

    Example:
        ```py
        @server.operation("my operation name")
        async def something():
            ...
        ```
    """

    def decorator(func: Operator):
        pl_type: bool = False
        op_type = OperatorParam.NONE
        params = inspect.signature(func).parameters

        if params:
            annotations = [i.annotation for i in params.values()]
            plen: int = len(params)
            first = annotations[0]

            if annotations == [Server]:
                op_type = OperatorParam.SERVER_ONLY
            elif first == Server:
                op_type = OperatorParam.SERVER_AND_PAYLOAD
            else:
                if (first in _PY_BUILTINS) and (first is not dict):
                    op_type = OperatorParam.DYNAMIC
                else:
                    op_type = (
                        OperatorParam.SERVER_AND_PAYLOAD
                        if plen == 2
                        else OperatorParam.PAYLOAD_ONLY
                        if plen == 1
                        else OperatorParam.DYNAMIC
                    )
        else:
            op_type = OperatorParam.NONE

        self._operations[name] = (
            func,
            op_type,
            pl_type,
        )

    return decorator

running() property

Whether the server is running.

Source code in hoist/server.py
@property
def running(self) -> bool:
    """Whether the server is running."""
    return bool(self._server)

start(*, host='0.0.0.0', port=5000, fancy=None)

Start the server.

Parameters:

Name Type Description Default
host str

Where to host the server.

'0.0.0.0'
port int

Port to put the server on.

5000
fancy Optional[bool]

Should the server run in fancy mode. When None, uses the current fancy property.

None
Example
server = hoist.Server(...)
server.start()

Raises:

Type Description
AlreadyInUseError

Server is already running.

RuntimeError

Attempted to start the server from a running event loop.

Source code in hoist/server.py
def start(  # type: ignore
    self,
    *,
    host: str = "0.0.0.0",
    port: int = 5000,
    fancy: Optional[bool] = None,
) -> None:
    """Start the server.

    Args:
        host: Where to host the server.
        port: Port to put the server on.
        fancy: Should the server run in fancy mode. When `None`, uses the current `fancy` property.

    Example:
        ```py
        server = hoist.Server(...)
        server.start()
        ```

    Raises:
        AlreadyInUseError: Server is already running.
        RuntimeError: Attempted to start the server from a running event loop.
    """  # noqa
    if self.running:
        raise AlreadyInUseError(
            "server is already running (did you forget to call close?)",
        )

    self._is_fancy = fancy if fancy is not None else self._is_fancy
    fancy = self._is_fancy

    async def _app(
        scope: Scope,
        receive: Receive,
        send: Send,
    ) -> None:
        await self._app(scope, receive, send)

    self._ensure_none(
        URL.build(
            host=host,
            port=port,
            scheme="http",
        )
    )

    config = uvicorn.Config(_app, host=host, port=port, lifespan="on")
    self._server = UvicornServer(config)

    if fancy:
        logging.getLogger("hoist").setLevel(logging.ERROR)

    args = (
        self._hide_token,
        self.token,
    )

    try:
        if not fancy:
            self._server.run_in_thread(*args)
        else:
            with _CONSOLE.status("Starting...", spinner="bouncingBar"):
                self._server.run_in_thread(*args)

            Thread(target=self._fancy).start()
    except RuntimeError as e:
        raise RuntimeError(
            "server cannot start from a running event loop",
        ) from e

    self._start_called = True

stop()

Alias to close.

Source code in hoist/server.py
def stop(self) -> None:
    """Alias to `close`."""
    self.close()

supported_operations() property

Operations supported by the server.

Source code in hoist/server.py
@property
def supported_operations(self) -> Sequence[str]:
    """Operations supported by the server."""
    return self._supported_operations

token() property

Authentication token used to connect.

Source code in hoist/server.py
@property
def token(self) -> str:
    """Authentication token used to connect."""
    return self._token

unsupported_operations() property

Operations blacklisted by the server.

Source code in hoist/server.py
@property
def unsupported_operations(self) -> Sequence[str]:
    """Operations blacklisted by the server."""
    return self._unsupported_operations

Connection

Bases: BaseMessagable, MessageListener

Class handling a connection to a server.

Source code in hoist/client.py
class Connection(BaseMessagable, MessageListener):
    """Class handling a connection to a server."""

    __slots__ = (
        "_url",
        "_token",
        "_connected",
        "_loop",
        "_session",
        "_ws",
        "_minimum_version",
        "_message_id",
        "_finalizer",
        "_closed",
        "_opened",
    )

    def __init__(
        self,
        url: UrlLike,
        token: Optional[str] = None,
        *,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        session: Optional[aiohttp.ClientSession] = None,
        extra_listeners: Optional[MessageListeners] = None,
        minimum_version: Optional[VersionLike] = None,
    ) -> None:
        """Constructor for `Connection`.

        Args:
            url: URL to connect to.
            token: Token to connect with.
            loop: Event loop to use.
            session: `aiohttp` client session to use.
            extra_listeners: Extra message listeners.
            minimum_version: Minimum version required to connect to the server.
        """
        self._url = url
        self._token: Optional[str] = token
        self._connected: bool = False
        self._loop = loop or asyncio.get_event_loop()
        self._session = session or aiohttp.ClientSession(loop=self._loop)
        self._ws: Optional[ServerSocket] = None
        self._minimum_version = minimum_version
        self._message_id: int = 0
        self._finalizer = weakref.finalize(self, self.close_sync)
        self._closed: bool = False
        self._opened: bool = False
        super().__init__(extra_listeners)

    @property
    def opened(self) -> bool:
        """Whether the connection was ever opened."""
        return self._opened

    @property
    def closed(self) -> bool:
        """Whether the client is closed."""
        return self._closed

    @property
    def url(self) -> UrlLike:
        """URL of the server."""
        return self._url

    @property
    def token(self) -> Optional[str]:
        """Authentication token of the server."""
        return self._token

    @property
    def connected(self) -> bool:
        """Whether the server is currently connected."""
        return self._connected

    def close_sync(self) -> None:
        """Close the client synchronously.

        Example:
            ```py
            c = hoist.Connection(...)
            await c.connect()
            c.close_sync()
            ```
        """
        loop = self._loop
        coro = self.close()

        try:
            try:
                loop.run_until_complete(coro)
            except RuntimeError:
                loop = asyncio.new_event_loop()
                loop.run_until_complete(coro)
        except Exception as e:
            coro.throw(e)
            raise e

    async def close(self) -> None:
        """Close the connection.

        Example:
            ```py
            c = hoist.Connection(...)
            await c.connect()
            await c.close()
            ```
        """
        if self.closed:
            return

        if self._ws:
            await self._ws.close()

        await self._session.close()
        self._closed = True
        self._connected = False

    async def _ack(self, url: URL) -> None:
        """Acknowledge that the server supports hoist."""
        async with self._session.get(url.with_path("/hoist/ack")) as response:
            try:
                json = await response.json()
            except aiohttp.ContentTypeError as e:
                raise ServerConnectError(
                    "failed to acknowledge the server (does it support hoist?)"
                ) from e

            hlog(
                "ack",
                json,
                level=logging.DEBUG,
            )

            version: str = json["version"]
            minver = self._minimum_version

            if minver:
                minver_actual = (
                    minver
                    if isinstance(minver, Version)
                    else parse_version(minver)  # fmt: off
                )

                if not (parse_version(version) >= minver_actual):
                    raise InvalidVersionError(
                        f"server has version {version}, but required is {minver_actual.to_string()}",  # noqa
                    )

    async def connect(self, token: Optional[str] = None) -> None:
        """Open the connection.

        Args:
            token: Token to connect with. When `None`, uses the current `token` property.

        Raises:
            AlreadyConnectedError: Already connected to the server.
            ServerConnectError: Something went wrong when connecting.
            ValueError: Both the `token` argument and `token` property are `None`
        """  # noqa
        if self.connected:
            raise AlreadyConnectedError(
                "already connected to socket",
            )

        raw_url = self.url
        url_obj = raw_url if isinstance(raw_url, URL) else URL(raw_url)

        try:
            await self._ack(url_obj)
        except aiohttp.ClientConnectionError as e:
            raise ServerConnectError(
                f"could not connect to {url_obj} (is the server turned on?)"
            ) from e

        url = url_obj.with_scheme(
            "wss" if url_obj.scheme == "https" else "ws",
        ).with_path("/hoist")

        auth: Optional[str] = token or self.token

        if not auth:
            raise ValueError(
                "no authentication token (did you forget to pass it?)",
            )

        self._connected = True
        try:
            conn = await self._session.ws_connect(url)
        except aiohttp.WSServerHandshakeError as e:
            raise ServerConnectError(
                f"failed to connect to {url}, does it support hoist?"
            ) from e

        self._ws = ServerSocket(
            self,
            conn,
            auth,
        )
        hlog(
            "connect",
            f"connected to {url}",
            level=logging.DEBUG,
        )
        await self._ws.login(self._call_listeners)
        self._opened = True

    async def _execute_action(
        self,
        action: str,
        payload: Optional[Payload] = None,
        *,
        process_messages: bool = True,
    ):
        """Run an action."""
        if not self._ws:
            raise NotConnectedError(
                "not connected to websocket (did you forget to call connect?)"
            )

        self._message_id += 1

        res = await self._ws.send(
            {
                "action": action,
                "data": payload or {},
            },
            self._message_id,
            reply=True,
        )

        if process_messages:
            await self._ws.process_messages()

        return res

    async def message(
        self,
        msg: str,
        data: Optional[Payload] = None,
        replying: Optional[Message] = None,
        listeners: Optional[MessageListeners] = None,
    ) -> Message:
        """Send a message to the server.

        Args:
            msg: Content of the message.
            data: Payload to include with the message.
            replying: Message object to reply to.
            listeners: Message listeners to add before dispatching.

        Returns:
            Created message.

        Example:
            ```py
            async with hoist.connect(...) as c:
                await c.message("hello world", {"a": "b"})
            ```

        Raises:
            NotConnectedError: Not connected to the server.
        """
        if not self._ws:
            raise NotConnectedError(
                "not connected to websocket (did you forget to call connect?)"
            )

        d = data or {}

        res = await self._execute_action(
            "message",
            {
                "message": msg,
                "data": d,
                "replying": replying.to_dict() if replying else None,
            },
            process_messages=False,
        )

        assert res.data

        obj = await self.new_message(
            self,
            msg,
            d,
            replying,
            listeners=listeners,
            id=res.data["id"],
        )

        await self._ws.process_messages()
        return obj

    async def operation(
        self,
        name: str,
        payload: Optional[Payload] = None,
        **payload_json: Any,
    ) -> Any:
        """Execute an operation on the server.

        Args:
            name: Name of the operation to execute.
            payload: Payload to send.

        Example:
            ```py
            async with hoist.connect(...) as c:
                await c.operation("print", {"text": "hi"})
            ```

        Raises:
            NotConnectedError: Not connected to the server.
            ValueError: Specified operation is not valid.
            ServerResponseError: Arbitrary server response error.
        """
        if not self._ws:
            raise NotConnectedError(
                "not connected to websocket (did you forget to call connect?)"
            )

        data = payload or {}
        try:
            res = await self._execute_action(
                "operation",
                {
                    "operation": name,
                    "data": {**data, **payload_json},
                },
            )
        except ServerResponseError as e:
            if e.code == UNKNOWN_OPERATION:
                raise ValueError(
                    f'"{name}" is not a valid operation',
                ) from e
            raise e

        assert res.data
        return res.data["result"]

    async def print(self, text: str):
        """Alias to `operation("print", {"text": text})`"""
        await self.operation("print", {"text": text})

    async def read(self, path: str) -> str:
        """Alias to `operation("path", {"path": path})`"""
        return await self.operation("path", {"path": path})

    async def get(self, url: str) -> str:
        """Alias to `operation("get", {"url": url})`"""
        return await self.operation("get", {"url": url})

__init__(url, token=None, *, loop=None, session=None, extra_listeners=None, minimum_version=None)

Constructor for Connection.

Parameters:

Name Type Description Default
url UrlLike

URL to connect to.

required
token Optional[str]

Token to connect with.

None
loop Optional[asyncio.AbstractEventLoop]

Event loop to use.

None
session Optional[aiohttp.ClientSession]

aiohttp client session to use.

None
extra_listeners Optional[MessageListeners]

Extra message listeners.

None
minimum_version Optional[VersionLike]

Minimum version required to connect to the server.

None
Source code in hoist/client.py
def __init__(
    self,
    url: UrlLike,
    token: Optional[str] = None,
    *,
    loop: Optional[asyncio.AbstractEventLoop] = None,
    session: Optional[aiohttp.ClientSession] = None,
    extra_listeners: Optional[MessageListeners] = None,
    minimum_version: Optional[VersionLike] = None,
) -> None:
    """Constructor for `Connection`.

    Args:
        url: URL to connect to.
        token: Token to connect with.
        loop: Event loop to use.
        session: `aiohttp` client session to use.
        extra_listeners: Extra message listeners.
        minimum_version: Minimum version required to connect to the server.
    """
    self._url = url
    self._token: Optional[str] = token
    self._connected: bool = False
    self._loop = loop or asyncio.get_event_loop()
    self._session = session or aiohttp.ClientSession(loop=self._loop)
    self._ws: Optional[ServerSocket] = None
    self._minimum_version = minimum_version
    self._message_id: int = 0
    self._finalizer = weakref.finalize(self, self.close_sync)
    self._closed: bool = False
    self._opened: bool = False
    super().__init__(extra_listeners)

close() async

Close the connection.

Example
c = hoist.Connection(...)
await c.connect()
await c.close()
Source code in hoist/client.py
async def close(self) -> None:
    """Close the connection.

    Example:
        ```py
        c = hoist.Connection(...)
        await c.connect()
        await c.close()
        ```
    """
    if self.closed:
        return

    if self._ws:
        await self._ws.close()

    await self._session.close()
    self._closed = True
    self._connected = False

close_sync()

Close the client synchronously.

Example
c = hoist.Connection(...)
await c.connect()
c.close_sync()
Source code in hoist/client.py
def close_sync(self) -> None:
    """Close the client synchronously.

    Example:
        ```py
        c = hoist.Connection(...)
        await c.connect()
        c.close_sync()
        ```
    """
    loop = self._loop
    coro = self.close()

    try:
        try:
            loop.run_until_complete(coro)
        except RuntimeError:
            loop = asyncio.new_event_loop()
            loop.run_until_complete(coro)
    except Exception as e:
        coro.throw(e)
        raise e

closed() property

Whether the client is closed.

Source code in hoist/client.py
@property
def closed(self) -> bool:
    """Whether the client is closed."""
    return self._closed

connect(token=None) async

Open the connection.

Parameters:

Name Type Description Default
token Optional[str]

Token to connect with. When None, uses the current token property.

None

Raises:

Type Description
AlreadyConnectedError

Already connected to the server.

ServerConnectError

Something went wrong when connecting.

ValueError

Both the token argument and token property are None

Source code in hoist/client.py
async def connect(self, token: Optional[str] = None) -> None:
    """Open the connection.

    Args:
        token: Token to connect with. When `None`, uses the current `token` property.

    Raises:
        AlreadyConnectedError: Already connected to the server.
        ServerConnectError: Something went wrong when connecting.
        ValueError: Both the `token` argument and `token` property are `None`
    """  # noqa
    if self.connected:
        raise AlreadyConnectedError(
            "already connected to socket",
        )

    raw_url = self.url
    url_obj = raw_url if isinstance(raw_url, URL) else URL(raw_url)

    try:
        await self._ack(url_obj)
    except aiohttp.ClientConnectionError as e:
        raise ServerConnectError(
            f"could not connect to {url_obj} (is the server turned on?)"
        ) from e

    url = url_obj.with_scheme(
        "wss" if url_obj.scheme == "https" else "ws",
    ).with_path("/hoist")

    auth: Optional[str] = token or self.token

    if not auth:
        raise ValueError(
            "no authentication token (did you forget to pass it?)",
        )

    self._connected = True
    try:
        conn = await self._session.ws_connect(url)
    except aiohttp.WSServerHandshakeError as e:
        raise ServerConnectError(
            f"failed to connect to {url}, does it support hoist?"
        ) from e

    self._ws = ServerSocket(
        self,
        conn,
        auth,
    )
    hlog(
        "connect",
        f"connected to {url}",
        level=logging.DEBUG,
    )
    await self._ws.login(self._call_listeners)
    self._opened = True

connected() property

Whether the server is currently connected.

Source code in hoist/client.py
@property
def connected(self) -> bool:
    """Whether the server is currently connected."""
    return self._connected

get(url) async

Alias to operation("get", {"url": url})

Source code in hoist/client.py
async def get(self, url: str) -> str:
    """Alias to `operation("get", {"url": url})`"""
    return await self.operation("get", {"url": url})

message(msg, data=None, replying=None, listeners=None) async

Send a message to the server.

Parameters:

Name Type Description Default
msg str

Content of the message.

required
data Optional[Payload]

Payload to include with the message.

None
replying Optional[Message]

Message object to reply to.

None
listeners Optional[MessageListeners]

Message listeners to add before dispatching.

None

Returns:

Type Description
Message

Created message.

Example
async with hoist.connect(...) as c:
    await c.message("hello world", {"a": "b"})

Raises:

Type Description
NotConnectedError

Not connected to the server.

Source code in hoist/client.py
async def message(
    self,
    msg: str,
    data: Optional[Payload] = None,
    replying: Optional[Message] = None,
    listeners: Optional[MessageListeners] = None,
) -> Message:
    """Send a message to the server.

    Args:
        msg: Content of the message.
        data: Payload to include with the message.
        replying: Message object to reply to.
        listeners: Message listeners to add before dispatching.

    Returns:
        Created message.

    Example:
        ```py
        async with hoist.connect(...) as c:
            await c.message("hello world", {"a": "b"})
        ```

    Raises:
        NotConnectedError: Not connected to the server.
    """
    if not self._ws:
        raise NotConnectedError(
            "not connected to websocket (did you forget to call connect?)"
        )

    d = data or {}

    res = await self._execute_action(
        "message",
        {
            "message": msg,
            "data": d,
            "replying": replying.to_dict() if replying else None,
        },
        process_messages=False,
    )

    assert res.data

    obj = await self.new_message(
        self,
        msg,
        d,
        replying,
        listeners=listeners,
        id=res.data["id"],
    )

    await self._ws.process_messages()
    return obj

opened() property

Whether the connection was ever opened.

Source code in hoist/client.py
@property
def opened(self) -> bool:
    """Whether the connection was ever opened."""
    return self._opened

operation(name, payload=None, **payload_json) async

Execute an operation on the server.

Parameters:

Name Type Description Default
name str

Name of the operation to execute.

required
payload Optional[Payload]

Payload to send.

None
Example
async with hoist.connect(...) as c:
    await c.operation("print", {"text": "hi"})

Raises:

Type Description
NotConnectedError

Not connected to the server.

ValueError

Specified operation is not valid.

ServerResponseError

Arbitrary server response error.

Source code in hoist/client.py
async def operation(
    self,
    name: str,
    payload: Optional[Payload] = None,
    **payload_json: Any,
) -> Any:
    """Execute an operation on the server.

    Args:
        name: Name of the operation to execute.
        payload: Payload to send.

    Example:
        ```py
        async with hoist.connect(...) as c:
            await c.operation("print", {"text": "hi"})
        ```

    Raises:
        NotConnectedError: Not connected to the server.
        ValueError: Specified operation is not valid.
        ServerResponseError: Arbitrary server response error.
    """
    if not self._ws:
        raise NotConnectedError(
            "not connected to websocket (did you forget to call connect?)"
        )

    data = payload or {}
    try:
        res = await self._execute_action(
            "operation",
            {
                "operation": name,
                "data": {**data, **payload_json},
            },
        )
    except ServerResponseError as e:
        if e.code == UNKNOWN_OPERATION:
            raise ValueError(
                f'"{name}" is not a valid operation',
            ) from e
        raise e

    assert res.data
    return res.data["result"]

print(text) async

Alias to operation("print", {"text": text})

Source code in hoist/client.py
async def print(self, text: str):
    """Alias to `operation("print", {"text": text})`"""
    await self.operation("print", {"text": text})

read(path) async

Alias to operation("path", {"path": path})

Source code in hoist/client.py
async def read(self, path: str) -> str:
    """Alias to `operation("path", {"path": path})`"""
    return await self.operation("path", {"path": path})

token() property

Authentication token of the server.

Source code in hoist/client.py
@property
def token(self) -> Optional[str]:
    """Authentication token of the server."""
    return self._token

url() property

URL of the server.

Source code in hoist/client.py
@property
def url(self) -> UrlLike:
    """URL of the server."""
    return self._url

connect(token, url='http://localhost:5000', **kwargs) async

Connect to a Hoist server.

Parameters:

Name Type Description Default
token str

Token to use when connecting.

required
url UrlLike

Target server URL.

'http://localhost:5000'
Example
async with hoist.connect("target token") as c:
    ...
Source code in hoist/utils.py
@asynccontextmanager
async def connect(
    token: str,
    url: UrlLike = "http://localhost:5000",
    **kwargs: Any,
):
    """Connect to a Hoist server.

    Args:
        token: Token to use when connecting.
        url: Target server URL.

    Example:
        ```py
        async with hoist.connect("target token") as c:
            ...
        ```
    """
    try:
        conn = Connection(url, token, **kwargs)
        await conn.connect()
        yield conn
    finally:
        await conn.close()

connect_directly(token, url='http://localhost:5000', **kwargs) async

Connect to a Hoist server without a context manager.

Parameters:

Name Type Description Default
token str

Token to use when connecting.

required
url UrlLike

Target server URL.

'http://localhost:5000'
Example
server = await hoist.connect_directly("...")
Source code in hoist/utils.py
async def connect_directly(
    token: str,
    url: UrlLike = "http://localhost:5000",
    **kwargs: Any,
):
    """Connect to a Hoist server without a context manager.

    Args:
        token: Token to use when connecting.
        url: Target server URL.

    Example:
        ```py
        server = await hoist.connect_directly("...")
        ```
    """
    conn = Connection(url, token, **kwargs)
    await conn.connect()
    return conn

connect_with(token, url='http://localhost:5000', **kwargs)

Call a function with the connection.

Parameters:

Name Type Description Default
token str

Token to use when connecting.

required
url UrlLike

Target server URL.

'http://localhost:5000'
Example
@hoist.connect_with("...")
async def conn(server: hoist.Connection):
    ...
Source code in hoist/utils.py
def connect_with(
    token: str,
    url: UrlLike = "http://localhost:5000",
    **kwargs: Any,
):
    """Call a function with the connection.

    Args:
        token: Token to use when connecting.
        url: Target server URL.

    Example:
        ```py
        @hoist.connect_with("...")
        async def conn(server: hoist.Connection):
            ...
        ```
    """

    def inner(func: Callable[[Connection], Awaitable[Any]]):
        async def _wrapper():
            conn = Connection(url, token, **kwargs)

            try:
                await conn.connect()
                await func(conn)
            except BaseException as e:
                if isinstance(e, KeyboardInterrupt):
                    return

                print_exc(show_locals=True)
            finally:
                if not conn.closed:
                    await conn.close()

        coro = _wrapper()

        try:
            asyncio.run(coro)
        except RuntimeError:
            asyncio.get_event_loop().create_task(coro)

    return inner

debug(*, trace=False, enable_uvicorn=False)

Enable debug logging.

Parameters:

Name Type Description Default
trace Union[bool, str]

Should debug tracing should be enabled.

False
enable_uvicorn bool

Should uvicorn logs be enabled.

False
Source code in hoist/utils.py
def debug(
    *,
    trace: Union[bool, str] = False,
    enable_uvicorn: bool = False,
) -> None:
    """Enable debug logging.

    Args:
        trace: Should debug tracing should be enabled.
        enable_uvicorn: Should uvicorn logs be enabled.
    """
    logging.getLogger("hoist").setLevel(logging.DEBUG)
    os.environ["HOIST_TRACE"] = (
        trace if not isinstance(trace, bool) else "all" if trace else ""
    )

    if enable_uvicorn:
        logging.getLogger("uvicorn.error").disabled = False
        logging.getLogger("uvicorn.access").disabled = False

main(func)

Run a main async function.

Parameters:

Name Type Description Default
func Callable[[], Coroutine[Any, Any, Any]]

Function to call.

required
Example
@hoist.main
async def main() -> None:
    ...
Source code in hoist/utils.py
def main(func: Callable[[], Coroutine[Any, Any, Any]]) -> None:
    """Run a main async function.

    Args:
        func: Function to call.

    Example:
        ```py
        @hoist.main
        async def main() -> None:
            ...
        ```
    """
    frame = inspect.currentframe()
    assert frame
    assert frame.f_back

    if frame.f_back.f_globals["__name__"] == "__main__":
        try:
            asyncio.run(func())
        except BaseException as e:
            if isinstance(e, KeyboardInterrupt):
                return

            print_exc(show_locals=True)

serve(token=None, server=None, *, host='0.0.0.0', port=5000, **kwargs)

Serve a Hoist server.

Parameters:

Name Type Description Default
token Optional[str]

Token to use on the server.

None
server Optional[Server]

Existing server to use.

None
host str

Where to host the server.

'0.0.0.0'
port int

What port to put the server on.

5000
Example
with hoist.serve("my authentication token") as server:
    ...
Source code in hoist/utils.py
@contextmanager
def serve(
    token: Optional[str] = None,
    server: Optional[Server] = None,
    *,
    host: str = "0.0.0.0",
    port: int = 5000,
    **kwargs,
):
    """Serve a Hoist server.

    Args:
        token: Token to use on the server.
        server: Existing server to use.
        host: Where to host the server.
        port: What port to put the server on.

    Example:
        ```py
        with hoist.serve("my authentication token") as server:
            ...
        ```
    """
    try:

        srvr = server or Server(token, **kwargs)
        srvr.start(host=host, port=port)
        yield srvr
    finally:
        srvr.close()

start(token=None, server=None, *, host='0.0.0.0', port=5000, fancy=False, **kwargs)

Start a Hoist server.

Parameters:

Name Type Description Default
token Optional[str]

Token to use on the server.

None
server Optional[Server]

Existing server to use.

None
host str

Where to host the server.

'0.0.0.0'
port int

What port to put the server on.

5000
fancy bool

Whether fancy output should be enabled.

False

Returns:

Type Description
Server

Started server object.

Example
server = hoist.start("my token", host="localhost", port=5001)
Source code in hoist/utils.py
def start(
    token: Optional[str] = None,
    server: Optional[Server] = None,
    *,
    host: str = "0.0.0.0",
    port: int = 5000,
    fancy: bool = False,
    **kwargs,
) -> Server:
    """Start a Hoist server.

    Args:
        token: Token to use on the server.
        server: Existing server to use.
        host: Where to host the server.
        port: What port to put the server on.
        fancy: Whether fancy output should be enabled.

    Returns:
        Started server object.

    Example:
        ```py
        server = hoist.start("my token", host="localhost", port=5001)
        ```
    """
    srvr = server or Server(token, **kwargs)
    srvr.start(host=host, port=port, fancy=fancy)
    return srvr

AlreadyConnectedError

Bases: Exception

Attempted to connect to the WebSocket twice.

Source code in hoist/exceptions.py
class AlreadyConnectedError(Exception):
    """Attempted to connect to the WebSocket twice."""

AlreadyInUseError

Bases: Exception

Port is already in use.

Source code in hoist/exceptions.py
class AlreadyInUseError(Exception):
    """Port is already in use."""

BadContentError

Bases: Exception

Invalid JSON content was sent to the server.

Source code in hoist/exceptions.py
class BadContentError(Exception):
    """Invalid JSON content was sent to the server."""

ClientError

Bases: _ResponseError

The client caused an error on the server.

Source code in hoist/exceptions.py
class ClientError(_ResponseError):
    """The client caused an error on the server."""

CloseSocket

Bases: Exception

Close the socket.

Source code in hoist/exceptions.py
class CloseSocket(Exception):
    """Close the socket."""

InternalServerError

Bases: Exception

Exception occured on the server.

Source code in hoist/exceptions.py
class InternalServerError(Exception):
    """Exception occured on the server."""

InvalidOperationError

Bases: Exception

Operation was not found.

Source code in hoist/exceptions.py
class InvalidOperationError(Exception):
    """Operation was not found."""

InvalidVersionError

Bases: Exception

Version is not high enough.

Source code in hoist/exceptions.py
class InvalidVersionError(Exception):
    """Version is not high enough."""

NotConnectedError

Bases: Exception

Socket is not connected to the server.

Source code in hoist/exceptions.py
class NotConnectedError(Exception):
    """Socket is not connected to the server."""

SchemaValidationError

Bases: Exception

Schema validation failed.

Source code in hoist/exceptions.py
class SchemaValidationError(Exception):
    """Schema validation failed."""

    __slots__ = (
        "_current",
        "_needed",
    )

    def __init__(
        self,
        *args,
        current: Optional[Type[Any]],
        needed: SchemaNeededType,
        **kwargs,
    ) -> None:
        super().__init__(*args, **kwargs)
        self._current = current
        self._needed = needed

    @property
    def needed(self) -> SchemaNeededType:
        """Type(s) needed to be valid."""
        return self._needed

    @property
    def current(self) -> Optional[Type[Any]]:
        """Current type."""
        return self._current

    def format_current(self) -> str:
        """Format the current type."""
        c = self.current
        return c.__name__ if c else str(c)

    def format_needed(self) -> str:
        """Format the needed type."""
        needed = self.needed
        if not isinstance(needed, tuple):
            return needed.__name__  # type: ignore

        return ",".join([i.__name__ if i else str(i) for i in needed])

current() property

Current type.

Source code in hoist/exceptions.py
@property
def current(self) -> Optional[Type[Any]]:
    """Current type."""
    return self._current

format_current()

Format the current type.

Source code in hoist/exceptions.py
def format_current(self) -> str:
    """Format the current type."""
    c = self.current
    return c.__name__ if c else str(c)

format_needed()

Format the needed type.

Source code in hoist/exceptions.py
def format_needed(self) -> str:
    """Format the needed type."""
    needed = self.needed
    if not isinstance(needed, tuple):
        return needed.__name__  # type: ignore

    return ",".join([i.__name__ if i else str(i) for i in needed])

needed() property

Type(s) needed to be valid.

Source code in hoist/exceptions.py
@property
def needed(self) -> SchemaNeededType:
    """Type(s) needed to be valid."""
    return self._needed

ServerConnectError

Bases: Exception

Failed to connect to a server.

Source code in hoist/exceptions.py
class ServerConnectError(Exception):
    """Failed to connect to a server."""

ServerLoginError

Bases: Exception

Failed to log in to the target server.

Source code in hoist/exceptions.py
class ServerLoginError(Exception):
    """Failed to log in to the target server."""

ServerNotStartedError

Bases: Exception

Server is not started.

Source code in hoist/exceptions.py
class ServerNotStartedError(Exception):
    """Server is not started."""

ServerResponseError

Bases: _ResponseError

Generic bad server response.

Source code in hoist/exceptions.py
class ServerResponseError(_ResponseError):
    """Generic bad server response."""

BaseMessage

Bases: MessageListener, ABC

Base class for handling a message.

Source code in hoist/message.py
class BaseMessage(MessageListener, ABC):
    """Base class for handling a message."""

    __slots__ = (
        "_conn",
        "_msg",
        "_data",
        "_replying",
    )

    def __init__(
        self,
        conn: BaseMessagable,
        msg: str,
        *,
        data: Optional[Payload] = None,
        replying: Optional["Message"] = None,
    ) -> None:
        """Construct for `BaseMessage`.

        Args:
            conn: Messagable connection target.
            msg: Content of the message.
            data: Payload included with the message.
            replying: Message object that the current one is replying to.

        """
        self._conn = conn
        self._msg = msg
        self._data = data or {}
        self._replying = replying
        super().__init__()

    @property
    def content(self) -> str:
        """Message content."""
        return self._msg

    @property
    def data(self) -> Payload:
        """Raw message payload."""
        return self._data

    @property
    def replying(self) -> Optional["Message"]:
        """Message that the current message is replying to."""
        return self._replying

    @abstractmethod
    def to_dict(
        self,
        convert_replies: bool = True,
    ) -> dict:
        """Convert the message to a dictionary.

        Args:
            convert_replies: Should message objects under `replying` be converted to a `dict`.

        Returns:
            The created `dict` object.
        """  # noqa
        ...

    def __repr__(self) -> str:
        values = [
            f"{k}={repr(v)}"
            for k, v in self.to_dict(
                convert_replies=False,
            ).items()
        ]
        return f"{self.__class__.__name__}({', '.join(values)})"

__init__(conn, msg, *, data=None, replying=None)

Construct for BaseMessage.

Parameters:

Name Type Description Default
conn BaseMessagable

Messagable connection target.

required
msg str

Content of the message.

required
data Optional[Payload]

Payload included with the message.

None
replying Optional[Message]

Message object that the current one is replying to.

None
Source code in hoist/message.py
def __init__(
    self,
    conn: BaseMessagable,
    msg: str,
    *,
    data: Optional[Payload] = None,
    replying: Optional["Message"] = None,
) -> None:
    """Construct for `BaseMessage`.

    Args:
        conn: Messagable connection target.
        msg: Content of the message.
        data: Payload included with the message.
        replying: Message object that the current one is replying to.

    """
    self._conn = conn
    self._msg = msg
    self._data = data or {}
    self._replying = replying
    super().__init__()

content() property

Message content.

Source code in hoist/message.py
@property
def content(self) -> str:
    """Message content."""
    return self._msg

data() property

Raw message payload.

Source code in hoist/message.py
@property
def data(self) -> Payload:
    """Raw message payload."""
    return self._data

replying() property

Message that the current message is replying to.

Source code in hoist/message.py
@property
def replying(self) -> Optional["Message"]:
    """Message that the current message is replying to."""
    return self._replying

to_dict(convert_replies=True) abstractmethod

Convert the message to a dictionary.

Parameters:

Name Type Description Default
convert_replies bool

Should message objects under replying be converted to a dict.

True

Returns:

Type Description
dict

The created dict object.

Source code in hoist/message.py
@abstractmethod
def to_dict(
    self,
    convert_replies: bool = True,
) -> dict:
    """Convert the message to a dictionary.

    Args:
        convert_replies: Should message objects under `replying` be converted to a `dict`.

    Returns:
        The created `dict` object.
    """  # noqa
    ...

Message

Bases: BaseMessage

Object handling a message.

Source code in hoist/message.py
class Message(BaseMessage):
    """Object handling a message."""

    __slots__ = ("_id",)

    def __init__(
        self,
        conn: BaseMessagable,
        msg: str,
        id: int,
        *,
        data: Optional[Payload] = None,
        replying: Optional["Message"] = None,
    ) -> None:
        self._id = id
        super().__init__(conn, msg, data=data, replying=replying)

    @property
    def id(self) -> int:
        """Message ID."""
        return self._id

    async def reply(
        self,
        msg: str,
        data: Optional[Payload] = None,
    ) -> "Message":
        """Reply to the current message.

        Args:
            msg: Content to reply with.
            data: Payload to include in the reply.

        Returns:
            Created message.
        """
        return await self._conn.message(msg, data or {}, replying=self)

    def to_dict(  # noqa
        self,
        convert_replies: bool = True,
    ) -> dict:
        reply = self.replying

        return {
            "replying": (reply.to_dict() if reply else None)
            if convert_replies
            else reply,
            "id": self.id,
            "data": self.data,
            "message": self.content,
        }

    def receive(self, *args, **kwargs):  # noqa
        warn(
            "receive() should not be called on a message object\nif you would like to handle replies, please use message_later()",  # noqa
        )
        return super().receive(*args, **kwargs)

id() property

Message ID.

Source code in hoist/message.py
@property
def id(self) -> int:
    """Message ID."""
    return self._id

reply(msg, data=None) async

Reply to the current message.

Parameters:

Name Type Description Default
msg str

Content to reply with.

required
data Optional[Payload]

Payload to include in the reply.

None

Returns:

Type Description
Message

Created message.

Source code in hoist/message.py
async def reply(
    self,
    msg: str,
    data: Optional[Payload] = None,
) -> "Message":
    """Reply to the current message.

    Args:
        msg: Content to reply with.
        data: Payload to include in the reply.

    Returns:
        Created message.
    """
    return await self._conn.message(msg, data or {}, replying=self)

PendingMessage

Bases: BaseMessage

Object handling a message that has not yet been sent to the server.

Source code in hoist/message.py
class PendingMessage(BaseMessage):
    """Object handling a message that has not yet been sent to the server."""

    def __init__(
        self,
        conn: BaseMessagable,
        msg: Optional[str] = None,
        *,
        data: Optional[Payload] = None,
        replying: Optional["Message"] = None,
    ) -> None:
        super().__init__(conn, msg or "", data=data, replying=replying)

    @property
    def content(self) -> str:  # noqa
        return self._msg

    @content.setter
    def content(self, value: str) -> None:
        self._msg = value

    def to_dict(  # noqa
        self,
        convert_replies: bool = True,
    ) -> dict:
        reply = self.replying

        return {
            "replying": (reply.to_dict() if reply else None)
            if convert_replies
            else reply,
            "data": self.data,
            "message": self.content,
        }

    async def send(self) -> Message:
        """Send the message.

        Returns:
            Created message.
        """
        return await self._conn.message(
            self.content,
            self.data,
            self.replying,
            listeners=self.message_listeners,
        )

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args, **kwargs):
        await self.send()

send() async

Send the message.

Returns:

Type Description
Message

Created message.

Source code in hoist/message.py
async def send(self) -> Message:
    """Send the message.

    Returns:
        Created message.
    """
    return await self._conn.message(
        self.content,
        self.data,
        self.replying,
        listeners=self.message_listeners,
    )