Skip to content

Commit

Permalink
correctly clean up and add destroy_topic
Browse files Browse the repository at this point in the history
  • Loading branch information
okkdev committed Jun 25, 2024
1 parent 9f4b837 commit 7d3b1eb
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 52 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# glubsub

tiny pubsub inspired abstraction.

[![Package Version](https://img.shields.io/hexpm/v/glubsub)](https://hex.pm/packages/glubsub)
[![Hex Docs](https://img.shields.io/badge/hex-docs-ffaff3)](https://hexdocs.pm/glubsub/)

tiny pubsub inspired abstraction.

```sh
gleam add glubsub
```
Expand Down
135 changes: 107 additions & 28 deletions src/glubsub.gleam
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import gleam/erlang/process.{type Subject}
import gleam/erlang/process.{type ProcessMonitor, type Selector, type Subject}
import gleam/list
import gleam/otp/actor
import gleam/result
import gleam/set.{type Set}

pub type Topic(m) {
Topic(Subject(Message(m)))
Expand All @@ -15,20 +14,44 @@ pub opaque type Message(m) {
client: Subject(m),
)
Broadcast(message: m)
GetSubscribers(reply_with: Subject(Set(Subject(m))))
GetSubscribers(reply_with: Subject(List(Subscriber(m))))
SubscriberDown(process.ProcessDown)
Shutdown
}

pub opaque type GlubsubError {
AlreadySubscribed
NotSubscribed
StartError(actor.StartError)
}

type State(m) {
State(subscribers: List(Subscriber(m)), selector: Selector(Message(m)))
}

pub type Subscriber(m) {
Subscriber(client: Subject(m), monitor: ProcessMonitor)
}

const timeout = 1000

/// Creates a new topic. Which is a pubsub channel that clients can subscribe to.
pub fn new_topic() -> Result(Topic(m), actor.StartError) {
actor.start(set.new(), handle_message)
pub fn new_topic() -> Result(Topic(m), GlubsubError) {
actor.start_spec(actor.Spec(
init: fn() {
let selector = process.new_selector()
actor.Ready(State(subscribers: [], selector: selector), selector)
},
init_timeout: timeout,
loop: handle_message,
))
|> result.map(fn(subject) { Topic(subject) })
|> result.map_error(fn(err) { StartError(err) })
}

/// Destroys the given topic, unsubscribing all clients.
pub fn destroy_topic(topic: Topic(m)) -> Nil {
actor.send(topic_to_subject(topic), Shutdown)
}

/// Subscribes the given client to the given topic.
Expand Down Expand Up @@ -62,55 +85,111 @@ pub fn broadcast(topic: Topic(m), message: m) -> Result(Nil, Nil) {
}

/// Returns a set of all subscribers to the given topic.
pub fn get_subscribers(topic: Topic(m)) -> Set(Subject(m)) {
pub fn get_subscribers(topic: Topic(m)) -> List(Subscriber(m)) {
actor.call(topic_to_subject(topic), GetSubscribers, timeout)
}

fn handle_message(
message: Message(m),
subs: Set(Subject(m)),
) -> actor.Next(a, Set(Subject(m))) {
state: State(m),
) -> actor.Next(Message(m), State(m)) {
case message {
Subscribe(reply, client) -> {
case set.contains(subs, client) {
True -> {
case list.find(state.subscribers, fn(sub) { sub.client == client }) {
Ok(_) -> {
actor.send(reply, Error(AlreadySubscribed))
actor.continue(subs)
actor.continue(state)
}
False -> {
let new_subs = set.insert(subs, client)
Error(Nil) -> {
let monitor = process.monitor_process(process.subject_owner(client))
let new_selector =
state.selector
|> process.selecting_process_down(monitor, SubscriberDown)

let new_subs = [
Subscriber(client: client, monitor: monitor),
..state.subscribers
]

actor.send(reply, Ok(Nil))
actor.continue(new_subs)
actor.continue(State(subscribers: new_subs, selector: new_selector))
|> actor.with_selector(new_selector)
}
}
}
Unsubscribe(reply, client) -> {
case set.contains(subs, client) {
False -> {
case list.find(state.subscribers, fn(sub) { sub.client == client }) {
Error(Nil) -> {
actor.send(reply, Error(NotSubscribed))
actor.continue(subs)
actor.continue(state)
}
True -> {
let new_subs = set.delete(subs, client)
Ok(unsub) -> {
let new_subs = remove_subscriber(state.subscribers, unsub)

let new_selector =
process.new_selector()
|> selector_down_from_subscribers(new_subs)

actor.send(reply, Ok(Nil))
actor.continue(new_subs)
actor.continue(State(subscribers: new_subs, selector: new_selector))
}
}
}
Broadcast(message) -> {
set.to_list(subs)
|> list.each(fn(subscriber) { actor.send(subscriber, message) })
actor.continue(subs)
state.subscribers
|> list.each(fn(sub) { actor.send(sub.client, message) })
actor.continue(state)
}
GetSubscribers(reply) -> {
actor.send(reply, subs)
actor.continue(subs)
actor.send(reply, state.subscribers)
actor.continue(state)
}
SubscriberDown(client) -> {
case
state.subscribers
|> list.find(fn(sub) { process.subject_owner(sub.client) == client.pid })
{
Error(Nil) -> {
actor.continue(state)
}
Ok(unsub) -> {
process.demonitor_process(unsub.monitor)
let new_subs = remove_subscriber(state.subscribers, unsub)

let new_selector =
process.new_selector()
|> selector_down_from_subscribers(new_subs)

actor.continue(State(subscribers: new_subs, selector: new_selector))
}
}
}
Shutdown -> {
actor.Stop(process.Normal)
}
}
}

fn topic_to_subject(topic: Topic(m)) -> Subject(Message(m)) {
case topic {
Topic(subject) -> subject
}
let Topic(subject) = topic
subject
}

fn remove_subscriber(
subscribers: List(Subscriber(m)),
unsubscriber: Subscriber(m),
) -> List(Subscriber(m)) {
subscribers
|> list.pop(fn(sub) { sub == unsubscriber })
|> result.map(fn(res) { res.1 })
|> result.unwrap(subscribers)
}

fn selector_down_from_subscribers(
selector: Selector(Message(m)),
subscribers: List(Subscriber(m)),
) -> Selector(Message(m)) {
list.fold(subscribers, selector, fn(selector, sub) {
process.selecting_process_down(selector, sub.monitor, SubscriberDown)
})
}
125 changes: 103 additions & 22 deletions test/glubsub_test.gleam
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import gleam/erlang/process
import gleam/list
import gleam/otp/actor
import gleam/set
import gleeunit
import gleeunit/should

Expand All @@ -13,9 +13,10 @@ pub fn main() {
type Message {
Hello(String)
Hi(String)
Shutdown
}

pub fn glubsub_test() {
pub fn glubsub_subscribe_test() {
let assert Ok(topic) = glubsub.new_topic()

let assert Ok(actor1) = actor.start(Nil, handle_message)
Expand All @@ -29,25 +30,19 @@ pub fn glubsub_test() {
glubsub.subscribe(topic, actor2)
|> should.be_ok

glubsub.broadcast(topic, Hello("Louis"))
|> should.be_ok

glubsub.broadcast(topic, Hi("You"))
|> should.be_ok

let subs1 = glubsub.get_subscribers(topic)

subs1
|> set.size
|> list.length
|> should.equal(2)

subs1
|> set.contains(actor1)
|> should.be_true
|> list.find(fn(sub) { sub.client == actor1 })
|> should.be_ok

subs1
|> set.contains(actor2)
|> should.be_true
|> list.find(fn(sub) { sub.client == actor2 })
|> should.be_ok

glubsub.unsubscribe(topic, actor2)
|> should.be_ok
Expand All @@ -58,28 +53,114 @@ pub fn glubsub_test() {
let subs2 = glubsub.get_subscribers(topic)

subs2
|> set.size
|> list.length
|> should.equal(1)

subs2
|> set.contains(actor1)
|> should.be_true
|> list.find(fn(sub) { sub.client == actor1 })
|> should.be_ok

subs2
|> set.contains(actor2)
|> should.be_false
|> list.find(fn(sub) { sub.client == actor2 })
|> should.be_error
}

pub fn glubsub_cleanup_test() {
let assert Ok(topic) = glubsub.new_topic()

let assert Ok(actor1) = actor.start(Nil, handle_message)
let assert Ok(actor2) = actor.start(Nil, handle_message)

glubsub.subscribe(topic, actor1)
|> should.be_ok
glubsub.subscribe(topic, actor1)
|> should.be_error

glubsub.subscribe(topic, actor2)
|> should.be_ok

let subs1 = glubsub.get_subscribers(topic)

subs1
|> list.length
|> should.equal(2)

subs1
|> list.find(fn(sub) { sub.client == actor1 })
|> should.be_ok

subs1
|> list.find(fn(sub) { sub.client == actor2 })
|> should.be_ok

actor.send(actor2, Shutdown)
process.sleep(100)

process.sleep(500)
let subs2 = glubsub.get_subscribers(topic)

subs2
|> list.length
|> should.equal(1)

subs2
|> list.find(fn(sub) { sub.client == actor1 })
|> should.be_ok

subs2
|> list.find(fn(sub) { sub.client == actor2 })
|> should.be_error
}

pub fn glubsub_broadcast_test() {
let assert Ok(topic) = glubsub.new_topic()

let assert Ok(actor1) = actor.start(Nil, handle_message)
let assert Ok(actor2) = actor.start(Nil, handle_message)

glubsub.subscribe(topic, actor1)
|> should.be_ok
glubsub.subscribe(topic, actor1)
|> should.be_error

glubsub.subscribe(topic, actor2)
|> should.be_ok

glubsub.broadcast(topic, Hello("Louis"))
|> should.be_ok

glubsub.broadcast(topic, Hi("You"))
|> should.be_ok

process.sleep(200)
}

pub fn glubsub_destroy_test() {
let assert Ok(topic) = glubsub.new_topic()
let glubsub.Topic(s) = topic

process.is_alive(process.subject_owner(s))
|> should.be_true

glubsub.destroy_topic(topic)
|> should.equal(Nil)

process.is_alive(process.subject_owner(s))
|> should.be_false
}

fn handle_message(message: Message, state) -> actor.Next(a, Nil) {
case message {
Hello(x) ->
Hello(x) -> {
x
|> should.equal("Louis")
Hi(x) ->
actor.continue(state)
}

Hi(x) -> {
x
|> should.equal("You")
actor.continue(state)
}
Shutdown -> actor.Stop(process.Normal)
}
actor.continue(state)
}

0 comments on commit 7d3b1eb

Please sign in to comment.