-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebhook_event_channel.py
39 lines (33 loc) · 1.13 KB
/
webhook_event_channel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from dataclasses import dataclass, field
from typing import Optional, Dict
import requests
from marshy import ExternalType
from marshy.marshaller.marshaller_abc import MarshallerABC
from schemey import Schema
from servey.event_channel.event_channel_abc import EventChannelABC, T
from servey.trigger.web_trigger import WebTriggerMethod
@dataclass(frozen=True)
class WebhookEventChannel(EventChannelABC[T]):
"""
Publisher which passes an event_channel to a web hook
"""
url: str
event_marshaller: MarshallerABC[T]
event_schema: Optional[Schema]
method: WebTriggerMethod = WebTriggerMethod.POST
headers: Optional[Dict[str, str]] = field(
default_factory=lambda: {"ContentType": "application/json"}
)
description: Optional[str] = None
timeout: int = 5
def publish(self, event: ExternalType):
dumped = self.event_marshaller.dump(event)
if self.event_schema:
self.event_schema.validate(dumped)
requests.request(
self.url,
self.method.value,
headers=self.headers,
json=dumped,
timeout=self.timeout,
)