linking transcode scripts

This commit is contained in:
Roland Osborne 2022-03-01 00:28:36 -08:00
parent 6dd261ca12
commit dabfa09eb9
11 changed files with 257 additions and 5 deletions

View File

@ -3305,7 +3305,10 @@ components:
format: int64 format: int64
status: status:
type: string type: string
enum: [ unconfirmed, confirmed, complete, error ] enum: [ unconfirmed, confirmed ]
transform:
type: string
enum: [ complete, incomplete ]
TagCount: TagCount:
type: object type: object

View File

@ -34,6 +34,7 @@ func AddChannelTopic(w http.ResponseWriter, r *http.Request) {
topic.DetailRevision = act.ChannelRevision + 1 topic.DetailRevision = act.ChannelRevision + 1
topic.TagRevision = act.ChannelRevision + 1 topic.TagRevision = act.ChannelRevision + 1
topic.Status = APP_TOPICUNCONFIRMED topic.Status = APP_TOPICUNCONFIRMED
topic.Transform = APP_TRANSFORMCOMPLETE
if res := tx.Save(topic).Error; res != nil { if res := tx.Save(topic).Error; res != nil {
return res return res
} }

View File

@ -26,8 +26,6 @@ func AddChannelTopicAsset(w http.ResponseWriter, r *http.Request) {
} }
} }
PrintMsg(transforms)
channelSlot, guid, err, code := getChannelSlot(r, true) channelSlot, guid, err, code := getChannelSlot(r, true)
if err != nil { if err != nil {
ErrResponse(w, code, err) ErrResponse(w, code, err)
@ -35,6 +33,15 @@ PrintMsg(transforms)
} }
act := &channelSlot.Account act := &channelSlot.Account
// check storage
if full, err := isStorageFull(act); err != nil {
ErrResponse(w, http.StatusInternalServerError, err)
return
} else if full {
ErrResponse(w, http.StatusNotAcceptable, errors.New("storage limit reached"))
return
}
// load topic // load topic
var topicSlot store.TopicSlot var topicSlot store.TopicSlot
if err = store.DB.Preload("Topic").Where("channel_id = ? AND topic_slot_id = ?", channelSlot.Channel.ID, topicId).First(&topicSlot).Error; err != nil { if err = store.DB.Preload("Topic").Where("channel_id = ? AND topic_slot_id = ?", channelSlot.Channel.ID, topicId).First(&topicSlot).Error; err != nil {
@ -79,6 +86,7 @@ PrintMsg(transforms)
asset := &store.Asset{} asset := &store.Asset{}
asset.AssetId = id asset.AssetId = id
asset.AccountID = channelSlot.Account.ID asset.AccountID = channelSlot.Account.ID
asset.ChannelID = channelSlot.Channel.ID
asset.TopicID = topicSlot.Topic.ID asset.TopicID = topicSlot.Topic.ID
asset.Status = APP_ASSETREADY asset.Status = APP_ASSETREADY
asset.Size = size asset.Size = size
@ -92,6 +100,7 @@ PrintMsg(transforms)
asset := &store.Asset{} asset := &store.Asset{}
asset.AssetId = uuid.New().String() asset.AssetId = uuid.New().String()
asset.AccountID = channelSlot.Account.ID asset.AccountID = channelSlot.Account.ID
asset.ChannelID = channelSlot.Channel.ID
asset.TopicID = topicSlot.Topic.ID asset.TopicID = topicSlot.Topic.ID
asset.Status = APP_ASSETWAITING asset.Status = APP_ASSETWAITING
asset.Transform = transform asset.Transform = transform
@ -101,6 +110,14 @@ PrintMsg(transforms)
} }
assets = append(assets, Asset{ AssetId: asset.AssetId, Transform: transform, Status: APP_ASSETWAITING}) assets = append(assets, Asset{ AssetId: asset.AssetId, Transform: transform, Status: APP_ASSETWAITING})
} }
if len(transforms) > 0 {
if res := tx.Model(&topicSlot.Topic).Update("transform", APP_TRANSFORMINCOMPLETE).Error; res != nil {
return res
}
if res := tx.Model(&topicSlot.Topic).Update("detail_revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
}
if res := tx.Model(&topicSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil { if res := tx.Model(&topicSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil {
return res return res
} }
@ -117,6 +134,9 @@ PrintMsg(transforms)
return return
} }
// invoke transcoder
go transcode()
// determine affected contact list // determine affected contact list
cards := make(map[string]store.Card) cards := make(map[string]store.Card)
for _, card := range channelSlot.Channel.Cards { for _, card := range channelSlot.Channel.Cards {
@ -128,13 +148,37 @@ PrintMsg(transforms)
} }
} }
// notify
SetStatus(act) SetStatus(act)
for _, card := range cards { for _, card := range cards {
SetContactChannelNotification(act, &card) SetContactChannelNotification(act, &card)
} }
WriteResponse(w, &assets) WriteResponse(w, &assets)
} }
func isStorageFull(act *store.Account) (full bool, err error) {
storage := getNumConfigValue(CONFIG_STORAGE, 0);
if storage == 0 {
return
}
var assets []store.Asset;
if err = store.DB.Where("account_id = ?", act.ID).Find(&assets).Error; err != nil {
return
}
var size int64
for _, asset := range assets {
size += asset.Size
}
if size >= storage {
full = true
}
return
}
func SaveAsset(src io.Reader, path string) (crc uint32, size int64, err error) { func SaveAsset(src io.Reader, path string) (crc uint32, size int64, err error) {
output, res := os.OpenFile(path, os.O_WRONLY | os.O_CREATE, 0666) output, res := os.OpenFile(path, os.O_WRONLY | os.O_CREATE, 0666)

View File

@ -33,6 +33,8 @@ const APP_ASSETREADY = "ready"
const APP_ASSETWAITING = "waiting" const APP_ASSETWAITING = "waiting"
const APP_ASSETPROCESSING = "processing" const APP_ASSETPROCESSING = "processing"
const APP_ASSETERROR = "error" const APP_ASSETERROR = "error"
const APP_TRANSFORMCOMPLETE = "complete"
const APP_TRANSFORMINCOMPLETE = "incomplete"
func AppCardStatus(status string) bool { func AppCardStatus(status string) bool {
if status == APP_CARDPENDING { if status == APP_CARDPENDING {

View File

@ -13,6 +13,7 @@ const CONFIG_DOMAIN = "domain"
const CONFIG_PUBLICLIMIT = "public_limit" const CONFIG_PUBLICLIMIT = "public_limit"
const CONFIG_STORAGE = "storage" const CONFIG_STORAGE = "storage"
const CONFIG_ASSETPATH = "asset_path" const CONFIG_ASSETPATH = "asset_path"
const CONFIG_SCRIPTPATH = "script_path"
func getStrConfigValue(configId string, empty string) string { func getStrConfigValue(configId string, empty string) string {
var config store.Config var config store.Config

View File

@ -12,11 +12,19 @@ func TestMain(m *testing.M) {
SetKeySize(2048) SetKeySize(2048)
os.Remove("databag.db") os.Remove("databag.db")
os.RemoveAll("testdata") os.RemoveAll("testdata")
os.RemoveAll("testscripts")
store.SetPath("databag.db") store.SetPath("databag.db")
if err := os.Mkdir("testdata", os.ModePerm); err != nil { if err := os.Mkdir("testdata", os.ModePerm); err != nil {
panic("failed to create testdata path") panic("failed to create testdata path")
} }
if err := os.Mkdir("testscripts", os.ModePerm); err != nil {
panic("failed to create testscripts path")
}
P01 := []byte("#!/bin/bash\n echo \"P01 $1 $2 $3\"\n")
if err := os.WriteFile("testscripts/P01.sh", P01, 0555); err != nil {
panic("failed to create P01 script")
}
r, w, _ := NewRequest("GET", "/admin/status", nil) r, w, _ := NewRequest("GET", "/admin/status", nil)
GetNodeStatus(w, r) GetNodeStatus(w, r)
@ -33,10 +41,16 @@ func TestMain(m *testing.M) {
panic("failed to claim server") panic("failed to claim server")
} }
// config data path
scripts := &store.Config{ ConfigId: CONFIG_SCRIPTPATH, StrValue: "./testscripts" }
if err := store.DB.Save(scripts).Error; err != nil {
panic("failed to configure scripts path")
}
// config data path // config data path
path := &store.Config{ ConfigId: CONFIG_ASSETPATH, StrValue: "./testdata" } path := &store.Config{ ConfigId: CONFIG_ASSETPATH, StrValue: "./testdata" }
if err := store.DB.Save(path).Error; err != nil { if err := store.DB.Save(path).Error; err != nil {
panic("failed to configure datapath") panic("failed to configure data path")
} }
// config server // config server

View File

@ -245,6 +245,7 @@ func getTopicDetailModel(slot *store.TopicSlot) *TopicDetail {
Created: slot.Topic.Created, Created: slot.Topic.Created,
Updated: slot.Topic.Updated, Updated: slot.Topic.Updated,
Status: slot.Topic.Status, Status: slot.Topic.Status,
Transform: slot.Topic.Transform,
} }
} }

View File

@ -421,6 +421,8 @@ type TopicDetail struct {
Updated int64 `json:"updated"` Updated int64 `json:"updated"`
Status string `json:"status"` Status string `json:"status"`
Transform string `json:"transform"`
} }
type TopicTags struct { type TopicTags struct {

View File

@ -223,6 +223,7 @@ type Topic struct {
DataType string `gorm:"index"` DataType string `gorm:"index"`
Data string Data string
Status string `gorm:"not null;index"` Status string `gorm:"not null;index"`
Transform string
Created int64 `gorm:"autoCreateTime"` Created int64 `gorm:"autoCreateTime"`
Updated int64 `gorm:"autoUpdateTime"` Updated int64 `gorm:"autoUpdateTime"`
TagCount int32 `gorm:"not null"` TagCount int32 `gorm:"not null"`
@ -230,12 +231,14 @@ type Topic struct {
TagRevision int64 `gorm:"not null"` TagRevision int64 `gorm:"not null"`
Assets []Asset Assets []Asset
Tags []Tag Tags []Tag
TopicSlot TopicSlot
} }
type Asset struct { type Asset struct {
ID uint `gorm:"primaryKey;not null;unique;autoIncrement"` ID uint `gorm:"primaryKey;not null;unique;autoIncrement"`
AssetId string `gorm:"not null;index:asset,unique"` AssetId string `gorm:"not null;index:asset,unique"`
AccountID uint `gorm:"not null;index:asset,unique"` AccountID uint `gorm:"not null;index:asset,unique"`
ChannelID uint
TopicID uint TopicID uint
Status string `gorm:"not null;index"` Status string `gorm:"not null;index"`
Size int64 Size int64
@ -246,6 +249,7 @@ type Asset struct {
Created int64 `gorm:"autoCreateTime"` Created int64 `gorm:"autoCreateTime"`
Updated int64 `gorm:"autoUpdateTime"` Updated int64 `gorm:"autoUpdateTime"`
Account Account Account Account
Channel *Channel
Topic *Topic Topic *Topic
} }

View File

@ -0,0 +1,179 @@
package databag
import (
"os"
"io"
"hash/crc32"
"errors"
"bytes"
"sync"
"regexp"
"databag/internal/store"
"os/exec"
"gorm.io/gorm"
)
var transcodeSync sync.Mutex
func transcode() {
transcodeSync.Lock()
defer transcodeSync.Unlock()
var assets []store.Asset
if err := store.DB.Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("status = ?", APP_ASSETWAITING).Find(&assets).Error; err != nil {
ErrMsg(err)
return
}
// prepare script path
data := getStrConfigValue(CONFIG_ASSETPATH, ".")
script := getStrConfigValue(CONFIG_SCRIPTPATH, ".")
re := regexp.MustCompile("^[a-zA-Z0-9_]*$")
for _, asset := range assets {
if !re.MatchString(asset.Transform) {
ErrMsg(errors.New("invalid transformi"))
if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil {
ErrMsg(err)
}
} else {
input := data + "/" + asset.TransformId
output := data + "/" + asset.AssetId
cmd := exec.Command(script + "/" + asset.Transform + ".sh", input, output)
var out bytes.Buffer
cmd.Stdout = &out
if err := cmd.Run(); err != nil {
LogMsg(out.String())
ErrMsg(err)
if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil {
ErrMsg(err)
}
} else {
LogMsg(out.String())
crc, size, err := ScanAsset(output)
if err != nil {
ErrMsg(err)
if err := UpdateAsset(&asset, APP_ASSETERROR, 0, 0); err != nil {
ErrMsg(err)
}
} else if err := UpdateAsset(&asset, APP_ASSETREADY, crc, size); err != nil {
ErrMsg(err)
}
}
}
}
}
func isComplete(status string, asset *store.Asset) (complete bool, err error) {
if status == APP_ASSETREADY {
var assets []store.Asset
if err = store.DB.Where("topic_id = ?", asset.Topic.ID).Find(&assets).Error; err != nil {
return
}
for _, a := range asset.Topic.Assets {
if a.ID != asset.ID && asset.Status != APP_ASSETREADY {
return
}
}
complete = true
return
}
return
}
func UpdateAsset(asset *store.Asset, status string, crc uint32, size int64) (err error) {
act := asset.Account
err = store.DB.Transaction(func(tx *gorm.DB) error {
asset.Crc = crc
asset.Size = size
asset.Status = status
if res := tx.Save(asset).Error; res != nil {
return res
}
complete, ret := isComplete(status, asset)
if ret != nil {
return ret
}
if complete {
if res := tx.Model(&asset.Topic).Update("transform", APP_TRANSFORMCOMPLETE).Error; res != nil {
return res
}
}
if res := tx.Model(&asset.Topic).Update("detail_revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
if res := tx.Model(&asset.Topic.TopicSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
if res := tx.Model(&asset.Channel.ChannelSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
if res := tx.Model(&act).Update("channel_revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
return nil
})
if err != nil {
return
}
// determine affected contact list
cards := make(map[string]store.Card)
for _, card := range asset.Channel.Cards {
cards[card.Guid] = card
}
for _, group := range asset.Channel.Groups {
for _, card := range group.Cards {
cards[card.Guid] = card
}
}
// notify
SetStatus(&act)
for _, card := range cards {
SetContactChannelNotification(&act, &card)
}
return
}
func ScanAsset(path string) (crc uint32, size int64, err error) {
file, res := os.Open(path)
if res != nil {
err = res
return
}
defer file.Close()
// prepare hash
table := crc32.MakeTable(crc32.IEEE)
// compute has as data is saved
data := make([]byte, 4096)
for {
n, res := file.Read(data)
if res != nil {
if res == io.EOF {
break
}
err = res
return
}
crc = crc32.Update(crc, table, data[:n])
}
// read size
info, ret := file.Stat()
if ret != nil {
err = ret
return
}
size = info.Size()
return
}

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"encoding/json" "encoding/json"
"net/url" "net/url"
"time"
) )
func TestTopicShare(t *testing.T) { func TestTopicShare(t *testing.T) {
@ -96,12 +97,12 @@ func TestTopicShare(t *testing.T) {
assert.NoError(t, ApiTestUpload(AddChannelTopicAsset, "POST", "/content/channels/{channelId}/topics/{topicId}/assets?transforms=" + url.QueryEscape(string(transforms)), assert.NoError(t, ApiTestUpload(AddChannelTopicAsset, "POST", "/content/channels/{channelId}/topics/{topicId}/assets?transforms=" + url.QueryEscape(string(transforms)),
&params, img, APP_TOKENCONTACT, set.C.A.Token, assets, nil)) &params, img, APP_TOKENCONTACT, set.C.A.Token, assets, nil))
PrintMsg(assets) PrintMsg(assets)
PrintMsg(len(img))
// view topics // view topics
topics := &[]Topic{} topics := &[]Topic{}
assert.NoError(t, ApiTestMsg(GetChannelTopics, "GET", "/content/channels/{channelId}/topics", assert.NoError(t, ApiTestMsg(GetChannelTopics, "GET", "/content/channels/{channelId}/topics",
&params, nil, APP_TOKENAPP, set.A.Token, topics, nil)) &params, nil, APP_TOKENAPP, set.A.Token, topics, nil))
time.Sleep(time.Second)
} }