(ns observer
  (:require [clojure.spec.alpha :as s]))

(s/def ::topic keyword?)
(s/def ::message any?)
(s/def ::callback fn?)

(defprotocol Observable
  "Protocol defining Observable behaviors"
  (subscribe [this topic callback] "Subscribes a callback function to a specific topic")
  (unsubscribe [this topic callback] "Removes a callback subscription from a topic")
  (notify [this topic message] "Notifies all subscribers of a topic with a message"))

(defrecord EventBus [subscribers]
  Observable
  (subscribe [this topic callback]
    {:pre [(s/valid? ::topic topic)
           (s/valid? ::callback callback)]}
    (update-in this [:subscribers topic] (fnil conj #{}) callback))

  (unsubscribe [this topic callback]
    {:pre [(s/valid? ::topic topic)
           (s/valid? ::callback callback)]}
    (update-in this [:subscribers topic] disj callback))

  (notify [this topic message]
    {:pre [(s/valid? ::topic topic)
           (s/valid? ::message message)]}
    (doseq [callback (get-in this [:subscribers topic])]
      (callback message))
    this))

(defn create-event-bus
  "Creates a new event bus instance"
  []
  (->EventBus {}))

(defn create-stateful-subscriber
  "Creates a subscriber that maintains state between notifications"
  [initial-state update-fn]
  (let [state (atom initial-state)]
    (fn [message]
      (swap! state update-fn message))))

(defn create-logging-subscriber
  "Creates a subscriber that logs messages with timestamps"
  [topic-name]
  (fn [message]
    (println (format "[%s][%s] Received: %s"
                     (java.time.LocalDateTime/now)
                     topic-name
                     message))))

(comment
  (def event-bus (create-event-bus))

  (def order-logger (create-logging-subscriber "Orders"))

  (def bus-with-logger 
    (subscribe event-bus :orders order-logger))

  (def order-counter
    (create-stateful-subscriber 0 (fn [state _] (inc state))))

  (def bus-with-counter
    (subscribe bus-with-logger :orders order-counter))

  (notify bus-with-counter :orders {:id 1 :total 100.0})
  (notify bus-with-counter :orders {:id 2 :total 200.0})

  (def final-bus
    (unsubscribe bus-with-counter :orders order-logger))
)

(comment
  (require '[clojure.test :refer [deftest testing is]])

  (deftest observer-pattern-test
    (testing "Basic subscription and notification"
      (let [received (atom nil)
            callback #(reset! received %)
            bus (-> (create-event-bus)
                   (subscribe :test callback))]
        (notify bus :test "hello")
        (is (= @received "hello"))))

    (testing "Multiple subscribers"
      (let [results (atom [])
            callback-1 #(swap! results conj [:cb1 %])
            callback-2 #(swap! results conj [:cb2 %])
            bus (-> (create-event-bus)
                   (subscribe :test callback-1)
                   (subscribe :test callback-2))]
        (notify bus :test "hello")
        (is (= @results [[:cb1 "hello"] [:cb2 "hello"]]))))

    (testing "Unsubscribe"
      (let [received (atom nil)
            callback #(reset! received %)
            bus (-> (create-event-bus)
                   (subscribe :test callback)
                   (unsubscribe :test callback))]
        (notify bus :test "hello")
        (is (nil? @received))))

    (testing "Stateful subscriber"
      (let [counter (create-stateful-subscriber 0 (fn [state _] (inc state)))
            bus (subscribe (create-event-bus) :test counter)]
        (notify bus :test "event1")
        (notify bus :test "event2")
        (is (= @(#'observer/state counter) 2))))))

Source: View source