forked from streamingfast/substreams-sink-sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshared.go
More file actions
44 lines (34 loc) · 1.27 KB
/
shared.go
File metadata and controls
44 lines (34 loc) · 1.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package sinksql
import (
"fmt"
"strings"
pbsql "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/services/v1"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"google.golang.org/protobuf/proto"
)
var (
supportedDeployableUnits []string
deprecated_supportedDeployableService = "sf.substreams.sink.sql.v1.Service"
supportedDeployableService = "sf.substreams.sink.sql.service.v1.Service"
)
func init() {
supportedDeployableUnits = []string{
deprecated_supportedDeployableService,
}
}
const typeUrlPrefix = "type.googleapis.com/"
func ExtractSinkService(pkg *pbsubstreams.Package) (*pbsql.Service, error) {
if pkg.SinkConfig == nil {
return nil, fmt.Errorf("no sink config found in spkg")
}
configPackageID := strings.TrimPrefix(pkg.SinkConfig.TypeUrl, typeUrlPrefix)
switch configPackageID {
case deprecated_supportedDeployableService, supportedDeployableService:
service := &pbsql.Service{}
if err := proto.Unmarshal(pkg.SinkConfig.Value, service); err != nil {
return nil, fmt.Errorf("failed to proto unmarshal: %w", err)
}
return service, nil
}
return nil, fmt.Errorf("invalid config type %q, supported configs are %q", pkg.SinkConfig.TypeUrl, strings.Join(supportedDeployableUnits, ", "))
}