2022-03-01 08:28:36 +00:00
package databag
import (
2022-07-22 19:28:14 +00:00
"bytes"
"databag/internal/store"
"errors"
"gorm.io/gorm"
"hash/crc32"
"io"
"os"
"os/exec"
"regexp"
"sync"
2022-03-01 08:28:36 +00:00
)
2022-03-01 19:20:51 +00:00
var audioSync sync . Mutex
var videoSync sync . Mutex
var photoSync sync . Mutex
var defaultSync sync . Mutex
2022-03-01 08:28:36 +00:00
func transcode ( ) {
2022-07-22 19:28:14 +00:00
go transcodeAudio ( )
go transcodeVideo ( )
go transcodePhoto ( )
go transcodeDefault ( )
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeVideo ( ) {
2022-07-22 19:28:14 +00:00
videoSync . Lock ( )
defer videoSync . Unlock ( )
for {
var asset store . Asset
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APPQueueVideo , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
}
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeAudio ( ) {
2022-07-22 19:28:14 +00:00
audioSync . Lock ( )
defer audioSync . Unlock ( )
for {
var asset store . Asset
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APPQueueAudio , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
}
2022-03-01 19:20:51 +00:00
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodePhoto ( ) {
2022-07-22 19:28:14 +00:00
photoSync . Lock ( )
defer photoSync . Unlock ( )
for {
var asset store . Asset
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APPQueuePhoto , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
}
2022-03-01 19:20:51 +00:00
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodeDefault ( ) {
2022-07-22 19:28:14 +00:00
defaultSync . Lock ( )
defer defaultSync . Unlock ( )
for {
var asset store . Asset
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue != ? AND transform_queue != ? AND transform_queue != ? AND status = ?" , APPQueueVideo , APPQueueAudio , APPQueuePhoto , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
}
2022-03-01 18:35:14 +00:00
}
func transcodeAsset ( asset * store . Asset ) {
2022-03-01 08:28:36 +00:00
2022-07-22 19:28:14 +00:00
// prepare script path
data := getStrConfigValue ( CNFAssetPath , APPDefaultPath )
script := getStrConfigValue ( CNFScriptPath , "." )
re := regexp . MustCompile ( "^[a-zA-Z0-9_]*$" )
if ! re . MatchString ( asset . Transform ) {
ErrMsg ( errors . New ( "invalid transform" ) )
if err := updateAsset ( asset , APPAssetError , 0 , 0 ) ; err != nil {
ErrMsg ( err )
}
} else {
input := data + "/" + asset . Account . GUID + "/" + asset . TransformID
output := data + "/" + asset . Account . GUID + "/" + asset . AssetID
cmd := exec . Command ( script + "/transform_" + asset . Transform + ".sh" , input , output , asset . TransformParams )
var stdout bytes . Buffer
cmd . Stdout = & stdout
var stderr bytes . Buffer
cmd . Stderr = & stderr
if err := cmd . Run ( ) ; err != nil {
LogMsg ( stdout . String ( ) )
LogMsg ( stderr . String ( ) )
ErrMsg ( err )
if err := updateAsset ( asset , APPAssetError , 0 , 0 ) ; err != nil {
ErrMsg ( err )
}
} else {
if stdout . Len ( ) > 0 {
LogMsg ( stdout . String ( ) )
}
if stderr . Len ( ) > 0 {
LogMsg ( stderr . String ( ) )
}
crc , size , err := scanAsset ( output )
if err != nil {
ErrMsg ( err )
if err := updateAsset ( asset , APPAssetError , 0 , 0 ) ; err != nil {
ErrMsg ( err )
}
} else if err := updateAsset ( asset , APPAssetReady , crc , size ) ; err != nil {
ErrMsg ( err )
}
}
}
2022-03-01 08:28:36 +00:00
}
2022-07-22 18:37:28 +00:00
func updateAsset ( asset * store . Asset , status string , crc uint32 , size int64 ) ( err error ) {
2022-03-01 08:28:36 +00:00
2022-07-22 19:28:14 +00:00
topic := store . Topic { }
err = store . DB . Transaction ( func ( tx * gorm . DB ) error {
asset . Crc = crc
asset . Size = size
asset . Status = status
if res := tx . Model ( store . Asset { } ) . Where ( "id = ?" , asset . ID ) . Updates ( asset ) . Error ; res != nil {
return res
}
if asset . Topic == nil {
return errors . New ( "asset not found" )
}
if res := tx . Preload ( "Account" ) . Preload ( "TopicSlot" ) . Preload ( "Channel.Groups" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.ChannelSlot" ) . First ( & topic , asset . Topic . ID ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic ) . Update ( "detail_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . TopicSlot ) . Update ( "revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Channel ) . Update ( "topic_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Channel . ChannelSlot ) . Update ( "revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Account ) . Update ( "channel_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
return nil
} )
if err != nil {
return
}
// determine affected contact list
cards := make ( map [ string ] store . Card )
2022-11-11 23:26:43 +00:00
for _ , member := range topic . Channel . Members {
cards [ member . Card . GUID ] = member . Card
2022-07-22 19:28:14 +00:00
}
for _ , group := range topic . Channel . Groups {
for _ , card := range group . Cards {
cards [ card . GUID ] = card
}
}
// notify
SetStatus ( & topic . Account )
for _ , card := range cards {
SetContactChannelNotification ( & topic . Account , & card )
}
return
2022-03-01 08:28:36 +00:00
}
2022-07-22 18:37:28 +00:00
func scanAsset ( path string ) ( crc uint32 , size int64 , err error ) {
2022-03-01 08:28:36 +00:00
2022-07-22 19:28:14 +00:00
file , res := os . Open ( path )
if res != nil {
err = res
return
}
defer file . Close ( )
// prepare hash
table := crc32 . MakeTable ( crc32 . IEEE )
// compute has as data is saved
data := make ( [ ] byte , 4096 )
for {
n , res := file . Read ( data )
if res != nil {
if res == io . EOF {
break
}
err = res
return
}
crc = crc32 . Update ( crc , table , data [ : n ] )
}
// read size
info , ret := file . Stat ( )
if ret != nil {
err = ret
return
}
size = info . Size ( )
return
2022-03-01 08:28:36 +00:00
}