From dabfa09eb9eab6b0a5acd03ffd8522904c2c5c28 Mon Sep 17 00:00:00 2001 From: Roland Osborne Date: Tue, 1 Mar 2022 00:28:36 -0800 Subject: [PATCH] linking transcode scripts --- doc/api.oa3 | 5 +- net/server/internal/api_addChannelTopic.go | 1 + .../internal/api_addChannelTopicAsset.go | 48 ++++- net/server/internal/appValues.go | 2 + net/server/internal/configUtil.go | 1 + net/server/internal/main_test.go | 16 +- net/server/internal/modelUtil.go | 1 + net/server/internal/models.go | 2 + net/server/internal/store/schema.go | 4 + net/server/internal/transcodeUtil.go | 179 ++++++++++++++++++ net/server/internal/ucTopicShare_test.go | 3 +- 11 files changed, 257 insertions(+), 5 deletions(-) create mode 100644 net/server/internal/transcodeUtil.go diff --git a/doc/api.oa3 b/doc/api.oa3 index e0a9e5fe..ca632d8e 100644 --- a/doc/api.oa3 +++ b/doc/api.oa3 @@ -3305,7 +3305,10 @@ components: format: int64 status: type: string - enum: [ unconfirmed, confirmed, complete, error ] + enum: [ unconfirmed, confirmed ] + transform: + type: string + enum: [ complete, incomplete ] TagCount: type: object diff --git a/net/server/internal/api_addChannelTopic.go b/net/server/internal/api_addChannelTopic.go index 72f45913..231b7ff2 100644 --- a/net/server/internal/api_addChannelTopic.go +++ b/net/server/internal/api_addChannelTopic.go @@ -34,6 +34,7 @@ func AddChannelTopic(w http.ResponseWriter, r *http.Request) { topic.DetailRevision = act.ChannelRevision + 1 topic.TagRevision = act.ChannelRevision + 1 topic.Status = APP_TOPICUNCONFIRMED + topic.Transform = APP_TRANSFORMCOMPLETE if res := tx.Save(topic).Error; res != nil { return res } diff --git a/net/server/internal/api_addChannelTopicAsset.go b/net/server/internal/api_addChannelTopicAsset.go index 0d264eb0..185aa89a 100644 --- a/net/server/internal/api_addChannelTopicAsset.go +++ b/net/server/internal/api_addChannelTopicAsset.go @@ -26,8 +26,6 @@ func AddChannelTopicAsset(w http.ResponseWriter, r *http.Request) { } } -PrintMsg(transforms) - channelSlot, guid, err, code := getChannelSlot(r, true) if err != nil { ErrResponse(w, code, err) @@ -35,6 +33,15 @@ PrintMsg(transforms) } act := &channelSlot.Account + // check storage + if full, err := isStorageFull(act); err != nil { + ErrResponse(w, http.StatusInternalServerError, err) + return + } else if full { + ErrResponse(w, http.StatusNotAcceptable, errors.New("storage limit reached")) + return + } + // load topic var topicSlot store.TopicSlot if err = store.DB.Preload("Topic").Where("channel_id = ? AND topic_slot_id = ?", channelSlot.Channel.ID, topicId).First(&topicSlot).Error; err != nil { @@ -79,6 +86,7 @@ PrintMsg(transforms) asset := &store.Asset{} asset.AssetId = id asset.AccountID = channelSlot.Account.ID + asset.ChannelID = channelSlot.Channel.ID asset.TopicID = topicSlot.Topic.ID asset.Status = APP_ASSETREADY asset.Size = size @@ -92,6 +100,7 @@ PrintMsg(transforms) asset := &store.Asset{} asset.AssetId = uuid.New().String() asset.AccountID = channelSlot.Account.ID + asset.ChannelID = channelSlot.Channel.ID asset.TopicID = topicSlot.Topic.ID asset.Status = APP_ASSETWAITING asset.Transform = transform @@ -101,6 +110,14 @@ PrintMsg(transforms) } assets = append(assets, Asset{ AssetId: asset.AssetId, Transform: transform, Status: APP_ASSETWAITING}) } + if len(transforms) > 0 { + if res := tx.Model(&topicSlot.Topic).Update("transform", APP_TRANSFORMINCOMPLETE).Error; res != nil { + return res + } + if res := tx.Model(&topicSlot.Topic).Update("detail_revision", act.ChannelRevision + 1).Error; res != nil { + return res + } + } if res := tx.Model(&topicSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil { return res } @@ -117,6 +134,9 @@ PrintMsg(transforms) return } + // invoke transcoder + go transcode() + // determine affected contact list cards := make(map[string]store.Card) for _, card := range channelSlot.Channel.Cards { @@ -128,13 +148,37 @@ PrintMsg(transforms) } } + // notify SetStatus(act) for _, card := range cards { SetContactChannelNotification(act, &card) } + WriteResponse(w, &assets) } +func isStorageFull(act *store.Account) (full bool, err error) { + + storage := getNumConfigValue(CONFIG_STORAGE, 0); + if storage == 0 { + return + } + + var assets []store.Asset; + if err = store.DB.Where("account_id = ?", act.ID).Find(&assets).Error; err != nil { + return + } + + var size int64 + for _, asset := range assets { + size += asset.Size + } + if size >= storage { + full = true + } + return +} + func SaveAsset(src io.Reader, path string) (crc uint32, size int64, err error) { output, res := os.OpenFile(path, os.O_WRONLY | os.O_CREATE, 0666) diff --git a/net/server/internal/appValues.go b/net/server/internal/appValues.go index 7e210a5b..11e8d577 100644 --- a/net/server/internal/appValues.go +++ b/net/server/internal/appValues.go @@ -33,6 +33,8 @@ const APP_ASSETREADY = "ready" const APP_ASSETWAITING = "waiting" const APP_ASSETPROCESSING = "processing" const APP_ASSETERROR = "error" +const APP_TRANSFORMCOMPLETE = "complete" +const APP_TRANSFORMINCOMPLETE = "incomplete" func AppCardStatus(status string) bool { if status == APP_CARDPENDING { diff --git a/net/server/internal/configUtil.go b/net/server/internal/configUtil.go index ad1e1784..75f1a519 100644 --- a/net/server/internal/configUtil.go +++ b/net/server/internal/configUtil.go @@ -13,6 +13,7 @@ const CONFIG_DOMAIN = "domain" const CONFIG_PUBLICLIMIT = "public_limit" const CONFIG_STORAGE = "storage" const CONFIG_ASSETPATH = "asset_path" +const CONFIG_SCRIPTPATH = "script_path" func getStrConfigValue(configId string, empty string) string { var config store.Config diff --git a/net/server/internal/main_test.go b/net/server/internal/main_test.go index d4bc21ca..475e7002 100644 --- a/net/server/internal/main_test.go +++ b/net/server/internal/main_test.go @@ -12,11 +12,19 @@ func TestMain(m *testing.M) { SetKeySize(2048) os.Remove("databag.db") os.RemoveAll("testdata") + os.RemoveAll("testscripts") store.SetPath("databag.db") if err := os.Mkdir("testdata", os.ModePerm); err != nil { panic("failed to create testdata path") } + if err := os.Mkdir("testscripts", os.ModePerm); err != nil { + panic("failed to create testscripts path") + } + P01 := []byte("#!/bin/bash\n echo \"P01 $1 $2 $3\"\n") + if err := os.WriteFile("testscripts/P01.sh", P01, 0555); err != nil { + panic("failed to create P01 script") + } r, w, _ := NewRequest("GET", "/admin/status", nil) GetNodeStatus(w, r) @@ -33,10 +41,16 @@ func TestMain(m *testing.M) { panic("failed to claim server") } + // config data path + scripts := &store.Config{ ConfigId: CONFIG_SCRIPTPATH, StrValue: "./testscripts" } + if err := store.DB.Save(scripts).Error; err != nil { + panic("failed to configure scripts path") + } + // config data path path := &store.Config{ ConfigId: CONFIG_ASSETPATH, StrValue: "./testdata" } if err := store.DB.Save(path).Error; err != nil { - panic("failed to configure datapath") + panic("failed to configure data path") } // config server diff --git a/net/server/internal/modelUtil.go b/net/server/internal/modelUtil.go index 37f5f8d6..09759429 100644 --- a/net/server/internal/modelUtil.go +++ b/net/server/internal/modelUtil.go @@ -245,6 +245,7 @@ func getTopicDetailModel(slot *store.TopicSlot) *TopicDetail { Created: slot.Topic.Created, Updated: slot.Topic.Updated, Status: slot.Topic.Status, + Transform: slot.Topic.Transform, } } diff --git a/net/server/internal/models.go b/net/server/internal/models.go index df2760c4..6d208f72 100644 --- a/net/server/internal/models.go +++ b/net/server/internal/models.go @@ -421,6 +421,8 @@ type TopicDetail struct { Updated int64 `json:"updated"` Status string `json:"status"` + + Transform string `json:"transform"` } type TopicTags struct { diff --git a/net/server/internal/store/schema.go b/net/server/internal/store/schema.go index c78d21d6..8bae575b 100644 --- a/net/server/internal/store/schema.go +++ b/net/server/internal/store/schema.go @@ -223,6 +223,7 @@ type Topic struct { DataType string `gorm:"index"` Data string Status string `gorm:"not null;index"` + Transform string Created int64 `gorm:"autoCreateTime"` Updated int64 `gorm:"autoUpdateTime"` TagCount int32 `gorm:"not null"` @@ -230,12 +231,14 @@ type Topic struct { TagRevision int64 `gorm:"not null"` Assets []Asset Tags []Tag + TopicSlot TopicSlot } type Asset struct { ID uint `gorm:"primaryKey;not null;unique;autoIncrement"` AssetId string `gorm:"not null;index:asset,unique"` AccountID uint `gorm:"not null;index:asset,unique"` + ChannelID uint TopicID uint Status string `gorm:"not null;index"` Size int64 @@ -246,6 +249,7 @@ type Asset struct { Created int64 `gorm:"autoCreateTime"` Updated int64 `gorm:"autoUpdateTime"` Account Account + Channel *Channel Topic *Topic } diff --git a/net/server/internal/transcodeUtil.go b/net/server/internal/transcodeUtil.go new file mode 100644 index 00000000..bc698b42 --- /dev/null +++ b/net/server/internal/transcodeUtil.go @@ -0,0 +1,179 @@ +package databag + +import ( + "os" + "io" + "hash/crc32" + "errors" + "bytes" + "sync" + "regexp" + "databag/internal/store" + "os/exec" + "gorm.io/gorm" +) + +var transcodeSync sync.Mutex + +func transcode() { + + transcodeSync.Lock() + defer transcodeSync.Unlock() + + var assets []store.Asset + if err := store.DB.Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("status = ?", APP_ASSETWAITING).Find(&assets).Error; err != nil { + ErrMsg(err) + return + } + + // prepare script path + data := getStrConfigValue(CONFIG_ASSETPATH, ".") + script := getStrConfigValue(CONFIG_SCRIPTPATH, ".") + re := regexp.MustCompile("^[a-zA-Z0-9_]*$") + + for _, asset := range assets { + + if !re.MatchString(asset.Transform) { + ErrMsg(errors.New("invalid transformi")) + if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil { + ErrMsg(err) + } + } else { + input := data + "/" + asset.TransformId + output := data + "/" + asset.AssetId + cmd := exec.Command(script + "/" + asset.Transform + ".sh", input, output) + var out bytes.Buffer + cmd.Stdout = &out + if err := cmd.Run(); err != nil { + LogMsg(out.String()) + ErrMsg(err) + if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil { + ErrMsg(err) + } + } else { + LogMsg(out.String()) + crc, size, err := ScanAsset(output) + if err != nil { + ErrMsg(err) + if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil { + ErrMsg(err) + } + } else if err := UpdateAsset(&asset, APP_ASSETREADY, crc, size); err != nil { + ErrMsg(err) + } + } + } + } +} + +func isComplete(status string, asset *store.Asset) (complete bool, err error) { + if status == APP_ASSETREADY { + var assets []store.Asset + if err = store.DB.Where("topic_id = ?", asset.Topic.ID).Find(&assets).Error; err != nil { + return + } + for _, a := range asset.Topic.Assets { + if a.ID != asset.ID && asset.Status != APP_ASSETREADY { + return + } + } + complete = true + return + } + return +} + +func UpdateAsset(asset *store.Asset, status string, crc uint32, size int64) (err error) { + + act := asset.Account + err = store.DB.Transaction(func(tx *gorm.DB) error { + asset.Crc = crc + asset.Size = size + asset.Status = status + if res := tx.Save(asset).Error; res != nil { + return res + } + complete, ret := isComplete(status, asset) + if ret != nil { + return ret + } + if complete { + if res := tx.Model(&asset.Topic).Update("transform", APP_TRANSFORMCOMPLETE).Error; res != nil { + return res + } + } + if res := tx.Model(&asset.Topic).Update("detail_revision", act.ChannelRevision + 1).Error; res != nil { + return res + } + if res := tx.Model(&asset.Topic.TopicSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil { + return res + } + if res := tx.Model(&asset.Channel.ChannelSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil { + return res + } + if res := tx.Model(&act).Update("channel_revision", act.ChannelRevision + 1).Error; res != nil { + return res + } + return nil + }) + if err != nil { + return + } + + // determine affected contact list + cards := make(map[string]store.Card) + for _, card := range asset.Channel.Cards { + cards[card.Guid] = card + } + for _, group := range asset.Channel.Groups { + for _, card := range group.Cards { + cards[card.Guid] = card + } + } + + // notify + SetStatus(&act) + for _, card := range cards { + SetContactChannelNotification(&act, &card) + } + + return +} + +func ScanAsset(path string) (crc uint32, size int64, err error) { + + file, res := os.Open(path) + if res != nil { + err = res + return + } + defer file.Close() + + // prepare hash + table := crc32.MakeTable(crc32.IEEE) + + // compute has as data is saved + data := make([]byte, 4096) + for { + n, res := file.Read(data) + if res != nil { + if res == io.EOF { + break + } + err = res + return + } + + crc = crc32.Update(crc, table, data[:n]) + } + + // read size + info, ret := file.Stat() + if ret != nil { + err = ret + return + } + size = info.Size() + return +} + diff --git a/net/server/internal/ucTopicShare_test.go b/net/server/internal/ucTopicShare_test.go index 231f2106..6b5d9915 100644 --- a/net/server/internal/ucTopicShare_test.go +++ b/net/server/internal/ucTopicShare_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "encoding/json" "net/url" + "time" ) func TestTopicShare(t *testing.T) { @@ -96,12 +97,12 @@ func TestTopicShare(t *testing.T) { assert.NoError(t, ApiTestUpload(AddChannelTopicAsset, "POST", "/content/channels/{channelId}/topics/{topicId}/assets?transforms=" + url.QueryEscape(string(transforms)), ¶ms, img, APP_TOKENCONTACT, set.C.A.Token, assets, nil)) PrintMsg(assets) -PrintMsg(len(img)) // view topics topics := &[]Topic{} assert.NoError(t, ApiTestMsg(GetChannelTopics, "GET", "/content/channels/{channelId}/topics", ¶ms, nil, APP_TOKENAPP, set.A.Token, topics, nil)) + time.Sleep(time.Second) }