2022-03-01 08:28:36 +00:00
package databag
import (
"os"
"io"
"hash/crc32"
"errors"
"bytes"
"sync"
"regexp"
"databag/internal/store"
"os/exec"
"gorm.io/gorm"
)
2022-03-01 19:20:51 +00:00
var audioSync sync . Mutex
var videoSync sync . Mutex
var photoSync sync . Mutex
var defaultSync sync . Mutex
2022-03-01 08:28:36 +00:00
func transcode ( ) {
2022-03-01 19:20:51 +00:00
go transcodeAudio ( )
go transcodeVideo ( )
go transcodePhoto ( )
go transcodeDefault ( )
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeVideo ( ) {
videoSync . Lock ( )
defer videoSync . Unlock ( )
2022-03-01 18:35:14 +00:00
for ; ; {
var asset store . Asset
2022-03-01 19:20:51 +00:00
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APP_QUEUEVIDEO , APP_ASSETWAITING ) . First ( & asset ) . Error ; err != nil {
2022-03-01 18:35:14 +00:00
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
2022-03-01 08:28:36 +00:00
}
2022-03-01 18:35:14 +00:00
}
2022-03-01 19:20:51 +00:00
func transcodeAudio ( ) {
audioSync . Lock ( )
defer audioSync . Unlock ( )
for ; ; {
var asset store . Asset
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APP_QUEUEAUDIO , APP_ASSETWAITING ) . First ( & asset ) . Error ; err != nil {
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
transcodeAsset ( & asset )
}
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodePhoto ( ) {
photoSync . Lock ( )
defer photoSync . Unlock ( )
2022-03-01 18:35:14 +00:00
for ; ; {
var asset store . Asset
2022-03-01 19:20:51 +00:00
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue = ? AND status = ?" , APP_QUEUEPHOTO , APP_ASSETWAITING ) . First ( & asset ) . Error ; err != nil {
2022-03-01 18:35:14 +00:00
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
2022-03-01 19:20:51 +00:00
transcodeAsset ( & asset )
}
}
2022-03-01 18:35:14 +00:00
2022-03-01 19:20:51 +00:00
func transcodeDefault ( ) {
defaultSync . Lock ( )
defer defaultSync . Unlock ( )
for ; ; {
var asset store . Asset
if err := store . DB . Order ( "created asc" ) . Preload ( "Account" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.Groups.Cards" ) . Preload ( "Channel.ChannelSlot" ) . Preload ( "Topic.TopicSlot" ) . Where ( "transform_queue != ? AND transform_queue != ? AND transform_queue != ? AND status = ?" , APP_QUEUEVIDEO , APP_QUEUEAUDIO , APP_QUEUEPHOTO , APP_ASSETWAITING ) . First ( & asset ) . Error ; err != nil {
if ! errors . Is ( err , gorm . ErrRecordNotFound ) {
ErrMsg ( err )
}
return
}
2022-03-01 18:35:14 +00:00
transcodeAsset ( & asset )
}
}
func transcodeAsset ( asset * store . Asset ) {
2022-03-01 08:28:36 +00:00
// prepare script path
2022-03-11 07:57:27 +00:00
data := getStrConfigValue ( CONFIG_ASSETPATH , APP_DEFAULTPATH )
2022-03-01 08:28:36 +00:00
script := getStrConfigValue ( CONFIG_SCRIPTPATH , "." )
re := regexp . MustCompile ( "^[a-zA-Z0-9_]*$" )
2022-03-01 18:35:14 +00:00
if ! re . MatchString ( asset . Transform ) {
ErrMsg ( errors . New ( "invalid transform" ) )
if err := UpdateAsset ( asset , APP_ASSETERROR , 0 , 0 ) ; err != nil {
ErrMsg ( err )
}
} else {
2022-03-01 19:20:51 +00:00
input := data + "/" + asset . Account . Guid + "/" + asset . TransformId
output := data + "/" + asset . Account . Guid + "/" + asset . AssetId
cmd := exec . Command ( script + "/transform_" + asset . Transform + ".sh" , input , output , asset . TransformParams )
var stdout bytes . Buffer
cmd . Stdout = & stdout
var stderr bytes . Buffer
cmd . Stderr = & stderr
2022-03-08 21:31:04 +00:00
2022-03-01 18:35:14 +00:00
if err := cmd . Run ( ) ; err != nil {
2022-03-01 19:20:51 +00:00
LogMsg ( stdout . String ( ) )
LogMsg ( stderr . String ( ) )
2022-03-01 18:35:14 +00:00
ErrMsg ( err )
if err := UpdateAsset ( asset , APP_ASSETERROR , 0 , 0 ) ; err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg ( err )
}
} else {
2022-03-01 19:20:51 +00:00
if stdout . Len ( ) > 0 {
LogMsg ( stdout . String ( ) )
}
if stderr . Len ( ) > 0 {
LogMsg ( stderr . String ( ) )
}
2022-03-01 18:35:14 +00:00
crc , size , err := ScanAsset ( output )
2022-03-02 21:19:16 +00:00
2022-03-01 18:35:14 +00:00
if err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg ( err )
2022-03-01 18:35:14 +00:00
if err := UpdateAsset ( asset , APP_ASSETERROR , 0 , 0 ) ; err != nil {
2022-03-01 08:28:36 +00:00
ErrMsg ( err )
}
2022-03-01 18:35:14 +00:00
} else if err := UpdateAsset ( asset , APP_ASSETREADY , crc , size ) ; err != nil {
ErrMsg ( err )
2022-03-01 08:28:36 +00:00
}
}
}
}
func UpdateAsset ( asset * store . Asset , status string , crc uint32 , size int64 ) ( err error ) {
2022-05-02 07:49:11 +00:00
topic := store . Topic { } ;
2022-03-01 08:28:36 +00:00
err = store . DB . Transaction ( func ( tx * gorm . DB ) error {
asset . Crc = crc
asset . Size = size
asset . Status = status
2022-03-02 21:19:16 +00:00
if res := tx . Model ( store . Asset { } ) . Where ( "id = ?" , asset . ID ) . Updates ( asset ) . Error ; res != nil {
2022-03-01 08:28:36 +00:00
return res
}
2022-07-04 07:11:48 +00:00
if asset . Topic != nil {
if res := tx . Preload ( "Account" ) . Preload ( "TopicSlot" ) . Preload ( "Channel.Groups" ) . Preload ( "Channel.Cards" ) . Preload ( "Channel.ChannelSlot" ) . First ( & topic , asset . Topic . ID ) . Error ; res != nil {
return res ;
}
if res := tx . Model ( & topic ) . Update ( "detail_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . TopicSlot ) . Update ( "revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Channel ) . Update ( "topic_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Channel . ChannelSlot ) . Update ( "revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
if res := tx . Model ( & topic . Account ) . Update ( "channel_revision" , topic . Account . ChannelRevision + 1 ) . Error ; res != nil {
return res
}
2022-03-01 08:28:36 +00:00
}
return nil
} )
if err != nil {
return
}
// determine affected contact list
cards := make ( map [ string ] store . Card )
2022-05-02 07:49:11 +00:00
for _ , card := range topic . Channel . Cards {
2022-03-01 08:28:36 +00:00
cards [ card . Guid ] = card
}
2022-05-02 07:49:11 +00:00
for _ , group := range topic . Channel . Groups {
2022-03-01 08:28:36 +00:00
for _ , card := range group . Cards {
cards [ card . Guid ] = card
}
}
// notify
2022-05-02 07:49:11 +00:00
SetStatus ( & topic . Account )
2022-03-01 08:28:36 +00:00
for _ , card := range cards {
2022-05-02 07:49:11 +00:00
SetContactChannelNotification ( & topic . Account , & card )
2022-03-01 08:28:36 +00:00
}
return
}
func ScanAsset ( path string ) ( crc uint32 , size int64 , err error ) {
file , res := os . Open ( path )
if res != nil {
err = res
return
}
defer file . Close ( )
// prepare hash
table := crc32 . MakeTable ( crc32 . IEEE )
// compute has as data is saved
data := make ( [ ] byte , 4096 )
for {
n , res := file . Read ( data )
if res != nil {
if res == io . EOF {
break
}
err = res
return
}
crc = crc32 . Update ( crc , table , data [ : n ] )
}
// read size
info , ret := file . Stat ( )
if ret != nil {
err = ret
return
}
size = info . Size ( )
return
}