added second transform queue

This commit is contained in:
Roland Osborne 2022-03-01 10:35:14 -08:00
parent dabfa09eb9
commit ef88a857ed
7 changed files with 115 additions and 54 deletions

View File

@ -2225,6 +2225,12 @@ paths:
required: true required: true
schema: schema:
type: string type: string
- name: confirm
in: query
description: set if intial state is confirmed
required: false
schema:
type: boolean
responses: responses:
'201': '201':
description: entry created description: entry created

View File

@ -9,6 +9,8 @@ import (
func AddChannelTopic(w http.ResponseWriter, r *http.Request) { func AddChannelTopic(w http.ResponseWriter, r *http.Request) {
confirm := r.FormValue("confirm")
var subject Subject var subject Subject
if err := ParseRequest(r, w, &subject); err != nil { if err := ParseRequest(r, w, &subject); err != nil {
ErrResponse(w, http.StatusBadRequest, err) ErrResponse(w, http.StatusBadRequest, err)
@ -33,7 +35,11 @@ func AddChannelTopic(w http.ResponseWriter, r *http.Request) {
topic.Guid = guid topic.Guid = guid
topic.DetailRevision = act.ChannelRevision + 1 topic.DetailRevision = act.ChannelRevision + 1
topic.TagRevision = act.ChannelRevision + 1 topic.TagRevision = act.ChannelRevision + 1
if confirm == "true" {
topic.Status = APP_TOPICCONFIRMED
} else {
topic.Status = APP_TOPICUNCONFIRMED topic.Status = APP_TOPICUNCONFIRMED
}
topic.Transform = APP_TRANSFORMCOMPLETE topic.Transform = APP_TRANSFORMCOMPLETE
if res := tx.Save(topic).Error; res != nil { if res := tx.Save(topic).Error; res != nil {
return res return res

View File

@ -3,6 +3,7 @@ package databag
import ( import (
"os" "os"
"io" "io"
"strings"
"errors" "errors"
"github.com/google/uuid" "github.com/google/uuid"
"net/http" "net/http"
@ -103,8 +104,17 @@ func AddChannelTopicAsset(w http.ResponseWriter, r *http.Request) {
asset.ChannelID = channelSlot.Channel.ID asset.ChannelID = channelSlot.Channel.ID
asset.TopicID = topicSlot.Topic.ID asset.TopicID = topicSlot.Topic.ID
asset.Status = APP_ASSETWAITING asset.Status = APP_ASSETWAITING
asset.Transform = transform
asset.TransformId = id asset.TransformId = id
t := strings.Split(transform, ";")
if len(t) > 0 {
asset.Transform = t[0]
}
if len(t) > 1 {
asset.TransformQueue = t[1]
}
if len(t) > 2 {
asset.TransformParams = t[2]
}
if res := tx.Save(asset).Error; res != nil { if res := tx.Save(asset).Error; res != nil {
return res return res
} }
@ -135,7 +145,7 @@ func AddChannelTopicAsset(w http.ResponseWriter, r *http.Request) {
} }
// invoke transcoder // invoke transcoder
go transcode() transcode()
// determine affected contact list // determine affected contact list
cards := make(map[string]store.Card) cards := make(map[string]store.Card)

View File

@ -35,6 +35,9 @@ const APP_ASSETPROCESSING = "processing"
const APP_ASSETERROR = "error" const APP_ASSETERROR = "error"
const APP_TRANSFORMCOMPLETE = "complete" const APP_TRANSFORMCOMPLETE = "complete"
const APP_TRANSFORMINCOMPLETE = "incomplete" const APP_TRANSFORMINCOMPLETE = "incomplete"
const APP_TRANSFORMERROR = "error"
const APP_TRANSFORMQUEUEA = "A"
const APP_TRANSFORMQUEUEB = "B"
func AppCardStatus(status string) bool { func AppCardStatus(status string) bool {
if status == APP_CARDPENDING { if status == APP_CARDPENDING {

View File

@ -245,7 +245,8 @@ type Asset struct {
Crc uint32 Crc uint32
Transform string Transform string
TransformId string TransformId string
TransformData string TransformParams string
TransformQueue string
Created int64 `gorm:"autoCreateTime"` Created int64 `gorm:"autoCreateTime"`
Updated int64 `gorm:"autoUpdateTime"` Updated int64 `gorm:"autoUpdateTime"`
Account Account Account Account

View File

@ -13,41 +13,74 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
) )
var transcodeSync sync.Mutex var aSync sync.Mutex
var bSync sync.Mutex
func transcode() { func transcode() {
transcodeSync.Lock() // quick transforms should use A (eg image resize)
defer transcodeSync.Unlock() go transcodeA()
var assets []store.Asset // slow transofrms should use B (eg video transcode)
if err := store.DB.Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("status = ?", APP_ASSETWAITING).Find(&assets).Error; err != nil { go transcodeB()
}
func transcodeA() {
aSync.Lock()
defer aSync.Unlock()
for ;; {
var asset store.Asset
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue = ? AND status = ?", APP_TRANSFORMQUEUEA, APP_ASSETWAITING).First(&asset).Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
ErrMsg(err) ErrMsg(err)
}
return return
} }
transcodeAsset(&asset)
}
}
func transcodeB() {
bSync.Lock()
defer bSync.Unlock()
for ;; {
var asset store.Asset
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue != ? AND status = ?", APP_TRANSFORMQUEUEB, APP_ASSETWAITING).First(&asset).Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
ErrMsg(err)
}
return
}
transcodeAsset(&asset)
}
}
func transcodeAsset(asset *store.Asset) {
// prepare script path // prepare script path
data := getStrConfigValue(CONFIG_ASSETPATH, ".") data := getStrConfigValue(CONFIG_ASSETPATH, ".")
script := getStrConfigValue(CONFIG_SCRIPTPATH, ".") script := getStrConfigValue(CONFIG_SCRIPTPATH, ".")
re := regexp.MustCompile("^[a-zA-Z0-9_]*$") re := regexp.MustCompile("^[a-zA-Z0-9_]*$")
for _, asset := range assets {
if !re.MatchString(asset.Transform) { if !re.MatchString(asset.Transform) {
ErrMsg(errors.New("invalid transformi")) ErrMsg(errors.New("invalid transform"))
if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil { if err := UpdateAsset(asset, APP_ASSETERROR, 0, 0); err != nil {
ErrMsg(err) ErrMsg(err)
} }
} else { } else {
input := data + "/" + asset.TransformId input := data + "/" + asset.TransformId
output := data + "/" + asset.AssetId output := data + "/" + asset.AssetId
cmd := exec.Command(script + "/" + asset.Transform + ".sh", input, output) cmd := exec.Command(script + "/" + asset.Transform + ".sh", input, output, asset.TransformParams)
var out bytes.Buffer var out bytes.Buffer
cmd.Stdout = &out cmd.Stdout = &out
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
LogMsg(out.String()) LogMsg(out.String())
ErrMsg(err) ErrMsg(err)
if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil { if err := UpdateAsset(asset, APP_ASSETERROR, 0, 0); err != nil {
ErrMsg(err) ErrMsg(err)
} }
} else { } else {
@ -55,32 +88,28 @@ func transcode() {
crc, size, err := ScanAsset(output) crc, size, err := ScanAsset(output)
if err != nil { if err != nil {
ErrMsg(err) ErrMsg(err)
if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil { if err := UpdateAsset(asset, APP_ASSETERROR, 0, 0); err != nil {
ErrMsg(err) ErrMsg(err)
} }
} else if err := UpdateAsset(&asset, APP_ASSETREADY, crc, size); err != nil { } else if err := UpdateAsset(asset, APP_ASSETREADY, crc, size); err != nil {
ErrMsg(err) ErrMsg(err)
} }
} }
} }
}
} }
func isComplete(status string, asset *store.Asset) (complete bool, err error) { func isComplete(id uint) (complete bool, err error) {
if status == APP_ASSETREADY {
var assets []store.Asset var assets []store.Asset
if err = store.DB.Where("topic_id = ?", asset.Topic.ID).Find(&assets).Error; err != nil { if err = store.DB.Where("topic_id = ?", id).Find(&assets).Error; err != nil {
return return
} }
for _, a := range asset.Topic.Assets { for _, asset := range assets {
if a.ID != asset.ID && asset.Status != APP_ASSETREADY { if id != asset.ID && asset.Status != APP_ASSETREADY {
return return
} }
} }
complete = true complete = true
return return
}
return
} }
func UpdateAsset(asset *store.Asset, status string, crc uint32, size int64) (err error) { func UpdateAsset(asset *store.Asset, status string, crc uint32, size int64) (err error) {
@ -93,7 +122,12 @@ func UpdateAsset(asset *store.Asset, status string, crc uint32, size int64) (err
if res := tx.Save(asset).Error; res != nil { if res := tx.Save(asset).Error; res != nil {
return res return res
} }
complete, ret := isComplete(status, asset) if status == APP_ASSETERROR {
if res := tx.Model(&asset.Topic).Update("transform", APP_TRANSFORMERROR).Error; res != nil {
return res
}
} else if status == APP_ASSETREADY {
complete, ret := isComplete(asset.ID)
if ret != nil { if ret != nil {
return ret return ret
} }
@ -102,6 +136,7 @@ func UpdateAsset(asset *store.Asset, status string, crc uint32, size int64) (err
return res return res
} }
} }
}
if res := tx.Model(&asset.Topic).Update("detail_revision", act.ChannelRevision + 1).Error; res != nil { if res := tx.Model(&asset.Topic).Update("detail_revision", act.ChannelRevision + 1).Error; res != nil {
return res return res
} }

View File

@ -92,7 +92,7 @@ func TestTopicShare(t *testing.T) {
// add asset to topic // add asset to topic
assets := &[]Asset{} assets := &[]Asset{}
params["topicId"] = topic.Id params["topicId"] = topic.Id
transforms, err := json.Marshal([]string{ "P01", "P02", "P03" }) transforms, err := json.Marshal([]string{ "P01;A;1234", "P02", "P03" })
assert.NoError(t, err) assert.NoError(t, err)
assert.NoError(t, ApiTestUpload(AddChannelTopicAsset, "POST", "/content/channels/{channelId}/topics/{topicId}/assets?transforms=" + url.QueryEscape(string(transforms)), assert.NoError(t, ApiTestUpload(AddChannelTopicAsset, "POST", "/content/channels/{channelId}/topics/{topicId}/assets?transforms=" + url.QueryEscape(string(transforms)),
&params, img, APP_TOKENCONTACT, set.C.A.Token, assets, nil)) &params, img, APP_TOKENCONTACT, set.C.A.Token, assets, nil))