mirror of
https://github.com/balzack/databag.git
synced 2025-02-12 19:49:16 +00:00
230 lines
6.0 KiB
Go
230 lines
6.0 KiB
Go
package databag
|
|
|
|
import (
|
|
"bytes"
|
|
"databag/internal/store"
|
|
"errors"
|
|
"gorm.io/gorm"
|
|
"hash/crc32"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"regexp"
|
|
"sync"
|
|
)
|
|
|
|
var audioSync sync.Mutex
|
|
var videoSync sync.Mutex
|
|
var photoSync sync.Mutex
|
|
var defaultSync sync.Mutex
|
|
|
|
func transcode() {
|
|
go transcodeAudio()
|
|
go transcodeVideo()
|
|
go transcodePhoto()
|
|
go transcodeDefault()
|
|
}
|
|
|
|
func transcodeVideo() {
|
|
videoSync.Lock()
|
|
defer videoSync.Unlock()
|
|
for {
|
|
var asset store.Asset
|
|
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Members.Card").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue = ? AND status = ?", APPQueueVideo, APPAssetWaiting).First(&asset).Error; err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
ErrMsg(err)
|
|
}
|
|
return
|
|
}
|
|
transcodeAsset(&asset)
|
|
}
|
|
}
|
|
|
|
func transcodeAudio() {
|
|
audioSync.Lock()
|
|
defer audioSync.Unlock()
|
|
for {
|
|
var asset store.Asset
|
|
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Members.Card").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue = ? AND status = ?", APPQueueAudio, APPAssetWaiting).First(&asset).Error; err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
ErrMsg(err)
|
|
}
|
|
return
|
|
}
|
|
transcodeAsset(&asset)
|
|
}
|
|
}
|
|
|
|
func transcodePhoto() {
|
|
photoSync.Lock()
|
|
defer photoSync.Unlock()
|
|
for {
|
|
var asset store.Asset
|
|
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Members.Card").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue = ? AND status = ?", APPQueuePhoto, APPAssetWaiting).First(&asset).Error; err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
ErrMsg(err)
|
|
}
|
|
return
|
|
}
|
|
transcodeAsset(&asset)
|
|
}
|
|
}
|
|
|
|
func transcodeDefault() {
|
|
defaultSync.Lock()
|
|
defer defaultSync.Unlock()
|
|
for {
|
|
var asset store.Asset
|
|
if err := store.DB.Order("created asc").Preload("Account").Preload("Channel.Members.Card").Preload("Channel.Groups.Cards").Preload("Channel.ChannelSlot").Preload("Topic.TopicSlot").Where("transform_queue != ? AND transform_queue != ? AND transform_queue != ? AND status = ?", APPQueueVideo, APPQueueAudio, APPQueuePhoto, APPAssetWaiting).First(&asset).Error; err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
ErrMsg(err)
|
|
}
|
|
return
|
|
}
|
|
transcodeAsset(&asset)
|
|
}
|
|
}
|
|
|
|
func transcodeAsset(asset *store.Asset) {
|
|
|
|
// prepare script path
|
|
data := getStrConfigValue(CNFAssetPath, APPDefaultPath)
|
|
script := getStrConfigValue(CNFScriptPath, ".")
|
|
re := regexp.MustCompile("^[a-zA-Z0-9_]*$")
|
|
|
|
if !re.MatchString(asset.Transform) {
|
|
ErrMsg(errors.New("invalid transform"))
|
|
if err := updateAsset(asset, APPAssetError, 0, 0); err != nil {
|
|
ErrMsg(err)
|
|
}
|
|
} else {
|
|
input := data + "/" + asset.Account.GUID + "/" + asset.TransformID
|
|
output := data + "/" + asset.Account.GUID + "/" + asset.AssetID
|
|
cmd := exec.Command(script+"/transform_"+asset.Transform+".sh", input, output, asset.TransformParams)
|
|
var stdout bytes.Buffer
|
|
cmd.Stdout = &stdout
|
|
var stderr bytes.Buffer
|
|
cmd.Stderr = &stderr
|
|
|
|
if err := cmd.Run(); err != nil {
|
|
LogMsg(stdout.String())
|
|
LogMsg(stderr.String())
|
|
ErrMsg(err)
|
|
if err := updateAsset(asset, APPAssetError, 0, 0); err != nil {
|
|
ErrMsg(err)
|
|
}
|
|
} else {
|
|
if stdout.Len() > 0 {
|
|
LogMsg(stdout.String())
|
|
}
|
|
if stderr.Len() > 0 {
|
|
LogMsg(stderr.String())
|
|
}
|
|
crc, size, err := scanAsset(output)
|
|
|
|
if err != nil {
|
|
ErrMsg(err)
|
|
if err := updateAsset(asset, APPAssetError, 0, 0); err != nil {
|
|
ErrMsg(err)
|
|
}
|
|
} else if err := updateAsset(asset, APPAssetReady, crc, size); err != nil {
|
|
ErrMsg(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateAsset(asset *store.Asset, status string, crc uint32, size int64) (err error) {
|
|
|
|
topic := store.Topic{}
|
|
err = store.DB.Transaction(func(tx *gorm.DB) error {
|
|
asset.Crc = crc
|
|
asset.Size = size
|
|
asset.Status = status
|
|
if res := tx.Model(store.Asset{}).Where("id = ?", asset.ID).Updates(asset).Error; res != nil {
|
|
return res
|
|
}
|
|
if asset.Topic == nil {
|
|
return errors.New("asset not found")
|
|
}
|
|
if res := tx.Preload("Account").Preload("TopicSlot").Preload("Channel.Groups").Preload("Channel.Members.Card").Preload("Channel.ChannelSlot").First(&topic, asset.Topic.ID).Error; res != nil {
|
|
return res
|
|
}
|
|
if res := tx.Model(&topic).Update("detail_revision", topic.Account.ChannelRevision+1).Error; res != nil {
|
|
return res
|
|
}
|
|
if res := tx.Model(&topic.TopicSlot).Update("revision", topic.Account.ChannelRevision+1).Error; res != nil {
|
|
return res
|
|
}
|
|
if res := tx.Model(&topic.Channel).Update("topic_revision", topic.Account.ChannelRevision+1).Error; res != nil {
|
|
return res
|
|
}
|
|
if res := tx.Model(&topic.Channel.ChannelSlot).Update("revision", topic.Account.ChannelRevision+1).Error; res != nil {
|
|
return res
|
|
}
|
|
if res := tx.Model(&topic.Account).Update("channel_revision", topic.Account.ChannelRevision+1).Error; res != nil {
|
|
return res
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// determine affected contact list
|
|
cards := make(map[string]store.Card)
|
|
for _, member := range topic.Channel.Members {
|
|
cards[member.Card.GUID] = member.Card
|
|
}
|
|
for _, group := range topic.Channel.Groups {
|
|
for _, card := range group.Cards {
|
|
cards[card.GUID] = card
|
|
}
|
|
}
|
|
|
|
// notify
|
|
SetStatus(&topic.Account)
|
|
for _, card := range cards {
|
|
SetContactChannelNotification(&topic.Account, &card)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func scanAsset(path string) (crc uint32, size int64, err error) {
|
|
|
|
file, res := os.Open(path)
|
|
if res != nil {
|
|
err = res
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
// prepare hash
|
|
table := crc32.MakeTable(crc32.IEEE)
|
|
|
|
// compute has as data is saved
|
|
data := make([]byte, 4096)
|
|
for {
|
|
n, res := file.Read(data)
|
|
if res != nil {
|
|
if res == io.EOF {
|
|
break
|
|
}
|
|
err = res
|
|
return
|
|
}
|
|
|
|
crc = crc32.Update(crc, table, data[:n])
|
|
}
|
|
|
|
// read size
|
|
info, ret := file.Stat()
|
|
if ret != nil {
|
|
err = ret
|
|
return
|
|
}
|
|
size = info.Size()
|
|
return
|
|
}
|