databag/net/server/internal/transcodeUtil.go

249 lines
6.5 KiB
Go
Raw Normal View History

2022-03-01 08:28:36 +00:00
package databag
import (
"os"
"io"
"hash/crc32"
"errors"
"bytes"
"sync"
"regexp"
"databag/internal/store"
"os/exec"
"gorm.io/gorm"
)
2022-03-01 19:20:51 +00:00
var audioSync sync.Mutex
var videoSync sync.Mutex
var photoSync sync.Mutex
var defaultSync sync.Mutex
2022-03-01 08:28:36 +00:00
func transcode() {
2022-03-01 19:20:51 +00:00
go transcodeAudio()
go transcodeVideo()
go transcodePhoto()
go transcodeDefault()
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeVideo() {
videoSync.Lock()
defer videoSync.Unlock()
2022-03-01 18:35:14 +00:00
for ;; {
var asset store.Asset
2022-03-01 19:20:51 +00:00
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue = ? AND status = ?", APP_QUEUEVIDEO, APP_ASSETWAITING).First(&asset).Error; err != nil {
2022-03-01 18:35:14 +00:00
if !errors.Is(err, gorm.ErrRecordNotFound) {
ErrMsg(err)
}
return
}
transcodeAsset(&asset)
2022-03-01 08:28:36 +00:00
}
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeAudio() {
audioSync.Lock()
defer audioSync.Unlock()
for ;; {
var asset store.Asset
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue = ? AND status = ?", APP_QUEUEAUDIO, APP_ASSETWAITING).First(&asset).Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
ErrMsg(err)
}
return
}
transcodeAsset(&asset)
}
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodePhoto() {
photoSync.Lock()
defer photoSync.Unlock()
2022-03-01 18:35:14 +00:00
for ;; {
var asset store.Asset
2022-03-01 19:20:51 +00:00
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue = ? AND status = ?", APP_QUEUEPHOTO, APP_ASSETWAITING).First(&asset).Error; err != nil {
2022-03-01 18:35:14 +00:00
if !errors.Is(err, gorm.ErrRecordNotFound) {
ErrMsg(err)
}
return
}
2022-03-01 19:20:51 +00:00
transcodeAsset(&asset)
}
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodeDefault() {
defaultSync.Lock()
defer defaultSync.Unlock()
for ;; {
var asset store.Asset
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Cards").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue != ? AND transform_queue != ? AND transform_queue != ? AND status = ?", APP_QUEUEVIDEO, APP_QUEUEAUDIO, APP_QUEUEPHOTO, APP_ASSETWAITING).First(&asset).Error; err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
ErrMsg(err)
}
return
}
2022-03-01 18:35:14 +00:00
transcodeAsset(&asset)
}
}
func transcodeAsset(asset *store.Asset) {
2022-03-01 08:28:36 +00:00
// prepare script path
data := getStrConfigValue(CONFIG_ASSETPATH, ".")
script := getStrConfigValue(CONFIG_SCRIPTPATH, ".")
re := regexp.MustCompile("^[a-zA-Z0-9_]*$")
2022-03-01 18:35:14 +00:00
if !re.MatchString(asset.Transform) {
ErrMsg(errors.New("invalid transform"))
if err := UpdateAsset(asset, APP_ASSETERROR, 0, 0); err != nil {
ErrMsg(err)
}
} else {
2022-03-01 19:20:51 +00:00
input := data + "/" + asset.Account.Guid + "/" + asset.TransformId
output := data + "/" + asset.Account.Guid + "/" + asset.AssetId
cmd := exec.Command(script + "/transform_" + asset.Transform + ".sh", input, output, asset.TransformParams)
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
2022-03-01 18:35:14 +00:00
if err := cmd.Run(); err != nil {
2022-03-01 19:20:51 +00:00
LogMsg(stdout.String())
LogMsg(stderr.String())
2022-03-01 18:35:14 +00:00
ErrMsg(err)
if err := UpdateAsset(asset, APP_ASSETERROR, 0, 0); err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg(err)
}
} else {
2022-03-01 19:20:51 +00:00
if stdout.Len() > 0 {
LogMsg(stdout.String())
}
if stderr.Len() > 0 {
LogMsg(stderr.String())
}
2022-03-01 18:35:14 +00:00
crc, size, err := ScanAsset(output)
if err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg(err)
2022-03-01 18:35:14 +00:00
if err := UpdateAsset(asset, APP_ASSETERROR, 0, 0); err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg(err)
}
2022-03-01 18:35:14 +00:00
} else if err := UpdateAsset(asset, APP_ASSETREADY, crc, size); err != nil {
ErrMsg(err)
2022-03-01 08:28:36 +00:00
}
}
}
}
2022-03-01 18:35:14 +00:00
func isComplete(id uint) (complete bool, err error) {
var assets []store.Asset
if err = store.DB.Where("topic_id = ?", id).Find(&assets).Error; err != nil {
return
}
for _, asset := range assets {
if id != asset.ID && asset.Status != APP_ASSETREADY {
2022-03-01 08:28:36 +00:00
return
}
}
2022-03-01 18:35:14 +00:00
complete = true
2022-03-01 08:28:36 +00:00
return
}
func UpdateAsset(asset *store.Asset, status string, crc uint32, size int64) (err error) {
act := asset.Account
err = store.DB.Transaction(func(tx *gorm.DB) error {
asset.Crc = crc
asset.Size = size
asset.Status = status
if res := tx.Save(asset).Error; res != nil {
return res
}
2022-03-01 18:35:14 +00:00
if status == APP_ASSETERROR {
if res := tx.Model(&asset.Topic).Update("transform", APP_TRANSFORMERROR).Error; res != nil {
2022-03-01 08:28:36 +00:00
return res
}
2022-03-01 18:35:14 +00:00
} else if status == APP_ASSETREADY {
complete, ret := isComplete(asset.ID)
if ret != nil {
return ret
}
if complete {
if res := tx.Model(&asset.Topic).Update("transform", APP_TRANSFORMCOMPLETE).Error; res != nil {
return res
}
}
2022-03-01 08:28:36 +00:00
}
if res := tx.Model(&asset.Topic).Update("detail_revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
if res := tx.Model(&asset.Topic.TopicSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
if res := tx.Model(&asset.Channel.ChannelSlot).Update("revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
if res := tx.Model(&act).Update("channel_revision", act.ChannelRevision + 1).Error; res != nil {
return res
}
return nil
})
if err != nil {
return
}
// determine affected contact list
cards := make(map[string]store.Card)
for _, card := range asset.Channel.Cards {
cards[card.Guid] = card
}
for _, group := range asset.Channel.Groups {
for _, card := range group.Cards {
cards[card.Guid] = card
}
}
// notify
SetStatus(&act)
for _, card := range cards {
SetContactChannelNotification(&act, &card)
}
return
}
func ScanAsset(path string) (crc uint32, size int64, err error) {
file, res := os.Open(path)
if res != nil {
err = res
return
}
defer file.Close()
// prepare hash
table := crc32.MakeTable(crc32.IEEE)
// compute has as data is saved
data := make([]byte, 4096)
for {
n, res := file.Read(data)
if res != nil {
if res == io.EOF {
break
}
err = res
return
}
crc = crc32.Update(crc, table, data[:n])
}
// read size
info, ret := file.Stat()
if ret != nil {
err = ret
return
}
size = info.Size()
return
}