package realtime
import (
"fmt"
"net/http"
"time"
)
type Hub struct {
subscribe chan chan string
unsubscribe chan chan string
broadcast chan string
}
func NewHub() *Hub {
h := &Hub{subscribe: make(chan chan string), unsubscribe: make(chan chan string), broadcast: make(chan string, 128)}
go h.run()
return h
}
func (h *Hub) run() {
clients := map[chan string]struct{}{}
for {
select {
case c := <-h.subscribe:
clients[c] = struct{}{}
case c := <-h.unsubscribe:
delete(clients, c)
close(c)
case msg := <-h.broadcast:
for c := range clients {
select {
case c <- msg:
default:
}
}
}
}
}
func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ch := make(chan string, 16)
h.subscribe <- ch
defer func() { h.unsubscribe <- ch }()
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-r.Context().Done():
return
case <-heartbeat.C:
fmt.Fprint(w, ": ping
")
flusher.Flush()
case msg := <-ch:
fmt.Fprintf(w, "data: %s
", msg)
flusher.Flush()
}
}
}
SSE is my go-to for “live updates” when I don’t need full bidirectional WebSockets. The key is to set the right headers (Content-Type: text/event-stream, Cache-Control: no-cache) and to flush periodically so intermediaries don’t buffer. I send heartbeats as : comments every ~15 seconds to keep load balancers happy and to detect dead clients. I also remove clients when r.Context().Done() fires, which prevents leaking channels and goroutines over time. The nice part is that SSE stays friendly to existing HTTP infrastructure and works well through proxies. In production I also cap the per-client buffer and drop slow consumers rather than letting them back up the whole broadcaster.