2022-03-01 08:28:36 +00:00
package databag
import (
"os"
"io"
"hash/crc32"
"errors"
"bytes"
"sync"
"regexp"
"databag/internal/store"
"os/exec"
"gorm.io/gorm"
)
2022-03-01 19:20:51 +00:00
var audioSync sync . Mutex
var videoSync sync . Mutex
var photoSync sync . Mutex
var defaultSync sync . Mutex
2022-03-01 08:28:36 +00:00
func transcode ( ) {
2022-03-01 19:20:51 +00:00
go transcodeAudio ( )
go transcodeVideo ( )
go transcodePhoto ( )
go transcodeDefault ( )
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeVideo ( ) {
videoSync . Lock ( )
defer videoSync . Unlock ( )
2022-03-01 18:35:14 +00:00
for ; ; {
var asset store . Asset
2022-07-22 17:52:13 +00:00
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APPQueueVideo , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
2022-03-01 18:35:14 +00:00
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
2022-03-01 08:28:36 +00:00
}
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeAudio ( ) {
audioSync . Lock ( )
defer audioSync . Unlock ( )
for ; ; {
var asset store . Asset
2022-07-22 17:52:13 +00:00
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APPQueueAudio , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
2022-03-01 19:20:51 +00:00
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
}
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodePhoto ( ) {
photoSync . Lock ( )
defer photoSync . Unlock ( )
2022-03-01 18:35:14 +00:00
for ; ; {
var asset store . Asset
2022-07-22 17:52:13 +00:00
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APPQueuePhoto , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
2022-03-01 18:35:14 +00:00
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
2022-03-01 19:20:51 +00:00
transcodeAsset ( & asset )
}
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodeDefault ( ) {
defaultSync . Lock ( )
defer defaultSync . Unlock ( )
for ; ; {
var asset store . Asset
2022-07-22 17:52:13 +00:00
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue != ? AND transform_queue != ? AND transform_queue != ? AND status = ?" , APPQueueVideo , APPQueueAudio , APPQueuePhoto , APPAssetWaiting ) . First ( & asset ) . Error ; err != nil {
2022-03-01 19:20:51 +00:00
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
2022-03-01 18:35:14 +00:00
transcodeAsset ( & asset )
}
}
func transcodeAsset ( asset * store . Asset ) {
2022-03-01 08:28:36 +00:00
// prepare script path
2022-07-22 18:01:29 +00:00
data := getStrConfigValue ( CNFAssetPath , APPDefaultPath )
script := getStrConfigValue ( CNFScriptPath , "." )
2022-03-01 08:28:36 +00:00
re := regexp . MustCompile ( "^[a-zA-Z0-9_]*$" )
2022-03-01 18:35:14 +00:00
if ! re . MatchString ( asset . Transform ) {
ErrMsg ( errors . New ( "invalid transform" ) )
2022-07-22 17:52:13 +00:00
if err := UpdateAsset ( asset , APPAssetError , 0 , 0 ) ; err != nil {
2022-03-01 18:35:14 +00:00
ErrMsg ( err )
}
} else {
2022-07-22 17:15:44 +00:00
input := data + "/" + asset . Account . GUID + "/" + asset . TransformID
output := data + "/" + asset . Account . GUID + "/" + asset . AssetID
2022-03-01 19:20:51 +00:00
cmd := exec . Command ( script + "/transform_" + asset . Transform + ".sh" , input , output , asset . TransformParams )
var stdout bytes . Buffer
cmd . Stdout = & stdout
var stderr bytes . Buffer
cmd . Stderr = & stderr
2022-03-08 21:31:04 +00:00
2022-03-01 18:35:14 +00:00
if err := cmd . Run ( ) ; err != nil {
2022-03-01 19:20:51 +00:00
LogMsg ( stdout . String ( ) )
LogMsg ( stderr . String ( ) )
2022-03-01 18:35:14 +00:00
ErrMsg ( err )
2022-07-22 17:52:13 +00:00
if err := UpdateAsset ( asset , APPAssetError , 0 , 0 ) ; err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg ( err )
}
} else {
2022-03-01 19:20:51 +00:00
if stdout . Len ( ) > 0 {
LogMsg ( stdout . String ( ) )
}
if stderr . Len ( ) > 0 {
LogMsg ( stderr . String ( ) )
}
2022-03-01 18:35:14 +00:00
crc , size , err := ScanAsset ( output )
2022-03-02 21:19:16 +00:00
2022-03-01 18:35:14 +00:00
if err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg ( err )
2022-07-22 17:52:13 +00:00
if err := UpdateAsset ( asset , APPAssetError , 0 , 0 ) ; err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg ( err )
}
2022-07-22 17:52:13 +00:00
} else if err := UpdateAsset ( asset , APPAssetReady , crc , size ) ; err != nil {
2022-03-01 18:35:14 +00:00
ErrMsg ( err )
2022-03-01 08:28:36 +00:00
}
}
}
}
func UpdateAsset ( asset * store . Asset , status string , crc uint32 , size int64 ) ( err error ) {
2022-05-02 07:49:11 +00:00
topic := store . Topic { } ;
2022-03-01 08:28:36 +00:00
err = store . DB . Transaction ( func ( tx * gorm . DB ) error {
asset . Crc = crc
asset . Size = size
asset . Status = status
2022-03-02 21:19:16 +00:00
if res := tx . Model ( store . Asset { } ) . Where ( "id = ?" , asset . ID ) . Updates ( asset ) . Error ; res != nil {
2022-03-01 08:28:36 +00:00
return res
}
2022-07-04 07:24:15 +00:00
if asset . Topic == nil {
return errors . New ( "asset not found" ) ;
}
if res := tx . Preload ( "Account" ) . Preload ( "TopicSlot" ) . Preload ( "Channel.Groups" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.ChannelSlot" ) . First ( & topic , asset . Topic . ID ) . Error ; res != nil {
return res ;
}
if res := tx . Model ( & topic ) . Update ( "detail_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . TopicSlot ) . Update ( "revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Channel ) . Update ( "topic_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Channel . ChannelSlot ) . Update ( "revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Account ) . Update ( "channel_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
2022-03-01 08:28:36 +00:00
}
return nil
} )
if err != nil {
return
}
// determine affected contact list
cards := make ( map [ string ] store . Card )
2022-05-02 07:49:11 +00:00
for _ , card := range topic . Channel . Cards {
2022-07-22 17:15:44 +00:00
cards [ card . GUID ] = card
2022-03-01 08:28:36 +00:00
}
2022-05-02 07:49:11 +00:00
for _ , group := range topic . Channel . Groups {
2022-03-01 08:28:36 +00:00
for _ , card := range group . Cards {
2022-07-22 17:15:44 +00:00
cards [ card . GUID ] = card
2022-03-01 08:28:36 +00:00
}
}
// notify
2022-05-02 07:49:11 +00:00
SetStatus ( & topic . Account )
2022-03-01 08:28:36 +00:00
for _ , card := range cards {
2022-05-02 07:49:11 +00:00
SetContactChannelNotification ( & topic . Account , & card )
2022-03-01 08:28:36 +00:00
}
return
}
func ScanAsset ( path string ) ( crc uint32 , size int64 , err error ) {
file , res := os . Open ( path )
if res != nil {
err = res
return
}
defer file . Close ( )
// prepare hash
table := crc32 . MakeTable ( crc32 . IEEE )
// compute has as data is saved
data := make ( [ ] byte , 4096 )
for {
n , res := file . Read ( data )
if res != nil {
if res == io . EOF {
break
}
err = res
return
}
crc = crc32 . Update ( crc , table , data [ : n ] )
}
// read size
info , ret := file . Stat ( )
if ret != nil {
err = ret
return
}
size = info . Size ( )
return
}