sending revision message of websocket

This commit is contained in:
Roland Osborne 2022-01-14 13:36:02 -08:00
parent bac890a660
commit a9ce9c938e
3 changed files with 77 additions and 40 deletions

View File

@ -3893,7 +3893,7 @@ components:
- profile - profile
- content - content
- view # revision increment on sharing changes - view # revision increment on sharing changes
- share - group
- label - label
- card - card
- dialogue - dialogue
@ -3908,7 +3908,7 @@ components:
label: label:
type: integer type: integer
format: int64 format: int64
share: group:
type: integer type: integer
format: int64 format: int64
card: card:

View File

@ -13,6 +13,7 @@ import (
"log" "log"
"sync" "sync"
"net/http" "net/http"
"encoding/json"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"databag/internal/store" "databag/internal/store"
) )
@ -28,8 +29,8 @@ type accountRevision struct {
InsightRevision int64 InsightRevision int64
} }
var wsSync sync.Mutex; var wsSync sync.Mutex
var statusListener = make(map[uint][]chan<-accountRevision) var statusListener = make(map[uint][]chan<-[]byte)
var upgrader = websocket.Upgrader{} var upgrader = websocket.Upgrader{}
func Status(w http.ResponseWriter, r *http.Request) { func Status(w http.ResponseWriter, r *http.Request) {
@ -37,83 +38,119 @@ func Status(w http.ResponseWriter, r *http.Request) {
// accept websocket connection // accept websocket connection
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
log.Print("Status: failed upgrade connection", err) log.Println("Status: failed upgrade connection")
return return
} }
defer conn.Close() defer conn.Close()
log.Println("CONNECTED") log.Println("CONNECTED")
// receive announce message // receive announce message
var act uint = 0
// get revisions for the account
var ar accountRevision
err = store.DB.Model(&Revision{}).Where("ID = ?", act).First(&ar).Error
if err != nil {
log.Println("Status - failed to get account revision")
return
}
rev := getRevision(ar)
// send current version
var msg []byte
msg, err = json.Marshal(rev)
if err != nil {
log.Println("Status - failed to marshal revision")
return
}
err = conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
log.Println("Status - failed to send initial revision")
return
}
// open channel for revisions // open channel for revisions
c := make(chan accountRevision) c := make(chan []byte)
AddStatusListener(0, c); defer close(c)
for { // register channel for revisions
messageType, message, err := conn.ReadMessage() AddStatusListener(0, c)
defer RemoveStatusListener(act, c)
// send revision until channel is closed
for msg := range c {
err = conn.WriteMessage(websocket.TextMessage, msg)
if err != nil { if err != nil {
log.Println("Error during message reading:", err) log.Println("Status - failed to send revision, closing")
break return
} }
log.Printf("Received: %s", message)
err = conn.WriteMessage(messageType, message)
if err != nil {
log.Println("Error during message writing:", err)
break
} }
} }
// close channel func getRevision(rev accountRevision) Revision {
RemoveStatusListener(0, c); var r Revision
close(c); r.Profile = rev.ProfileRevision
r.Content = rev.ContentRevision
r.Label = rev.LabelRevision
r.Group = rev.GroupRevision
r.Card = rev.CardRevision
r.Dialogue = rev.DialogueRevision
r.Insight = rev.InsightRevision
return r
} }
func SetStatus(act uint) { func SetStatus(act uint) {
// get revisions for the account // get revisions for the account
var rev accountRevision; var ar accountRevision
err := store.DB.Model(&Revision{}).Where("ID = ?", act).First(&rev).Error err := store.DB.Model(&Revision{}).Where("ID = ?", act).First(&ar).Error
if err != nil { if err != nil {
log.Println("SetStatus - failed to retrieve account revisions"); log.Println("SetStatus - failed to retrieve account revisions")
return
}
rev := getRevision(ar)
var msg []byte
msg, err = json.Marshal(rev)
if err != nil {
log.Println("SetStatus - failed to marshal revision")
return
} }
// lock access to statusListener // lock access to statusListener
wsSync.Lock() wsSync.Lock()
defer wsSync.Unlock(); defer wsSync.Unlock()
// check if we have any listeners
chs, ok := statusListener[act]
if ok {
// notify all listeners // notify all listeners
for _, ch := range chs {
ch <- rev
}
}
}
func AddStatusListener(act uint, ch chan<-accountRevision) {
// lock access to statusListener
wsSync.Lock()
defer wsSync.Unlock();
// check if account has listeners
chs, ok := statusListener[act] chs, ok := statusListener[act]
if ok { if ok {
chs = append(chs, ch); for _, ch := range chs {
} else { ch <- msg
statusListener[act] = []chan<-accountRevision{ch} }
} }
} }
func RemoveStatusListener(act uint, ch chan<-accountRevision) { func AddStatusListener(act uint, ch chan<-[]byte) {
// lock access to statusListener // lock access to statusListener
wsSync.Lock() wsSync.Lock()
defer wsSync.Unlock(); defer wsSync.Unlock()
// remove channel // add new listener to map
chs, ok := statusListener[act]
if ok {
chs = append(chs, ch)
} else {
statusListener[act] = []chan<-[]byte{ch}
}
}
func RemoveStatusListener(act uint, ch chan<-[]byte) {
// lock access to statusListener
wsSync.Lock()
defer wsSync.Unlock()
// remove channel from map
chs, ok := statusListener[act] chs, ok := statusListener[act]
if ok { if ok {
for i, c := range chs { for i, c := range chs {

View File

@ -17,7 +17,7 @@ type Revision struct {
Label int64 `json:"label"` Label int64 `json:"label"`
Share int64 `json:"share"` Group int64 `json:"share"`
Card int64 `json:"card"` Card int64 `json:"card"`