Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport some feature from main to 2.0 #1553

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/processor/ProcessorSplitLogStringNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ void ProcessorSplitLogStringNative::Process(PipelineEventGroup& logGroup) {
}
*mSplitLines = newEvents.size();
logGroup.SwapEvents(newEvents);

return;
}

bool ProcessorSplitLogStringNative::IsSupportedEvent(const PipelineEventPtr& e) const {
Expand Down
1 change: 1 addition & 0 deletions core/processor/ProcessorSplitMultilineLogStringNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ void ProcessorSplitMultilineLogStringNative::Process(PipelineEventGroup& logGrou
}
mProcMatchedLinesCnt->Add(inputLines - unmatchLines);
mProcUnmatchedLinesCnt->Add(unmatchLines);
*mSplitLines = newEvents.size();
logGroup.SwapEvents(newEvents);
}

Expand Down
1 change: 1 addition & 0 deletions docs/cn/plugins/input/service-canal.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
| TextToString | Boolean,`false` | 是否将text类型的数据转换成字符串。不设置时,默认为false,表示不转换。|
| PackValues | Boolean,`false` | 是否将事件数据打包成JSON格式。默认为false,表示不打包。如果设置为true,Logtail会将事件数据以JSON格式集中打包到data和old_data两个字段中,其中old_data仅在row_update事件中有意义。 示例:假设数据表有三列数据c1,c2,c3,设置为false,row_insert事件数据中会有c1,c2,c3三个字段,而设置为true时,c1,c2,c3会被统一打包为data字段,值为`{"c1":"...", "c2": "...", "c3": "..."}`。|
| EnableEventMeta | Boolean,`false` | 是否采集事件的元数据,默认为false,表示不采集。 Binlog事件的元数据包括event_time、event_log_position、event_size和event_server_id。|
| UseDecimal | Boolean,`false` | Binlog解析DECIMAL类型时,是否保持原格式输出,而不是使用科学计数法。如果未设置,系统默认为false,即默认使用科学计数法。|

## 样例

Expand Down
65 changes: 45 additions & 20 deletions pkg/helper/container_discover_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ type ContainerDiscoverManager struct {
fetchOneLock sync.Mutex
}

func NewContainerDiscoverManager(enableDockerDiscover, enableCRIDiscover, enableStaticDiscover bool) *ContainerDiscoverManager {
func NewContainerDiscoverManager() *ContainerDiscoverManager {
return &ContainerDiscoverManager{
enableDockerDiscover: enableDockerDiscover,
enableCRIDiscover: enableCRIDiscover,
enableStaticDiscover: enableStaticDiscover,
enableDockerDiscover: false,
enableCRIDiscover: false,
enableStaticDiscover: false,
}
}

Expand Down Expand Up @@ -125,7 +125,7 @@ func (c *ContainerDiscoverManager) fetchCRI() error {
return criRuntimeWrapper.fetchAll()
}

func (c *ContainerDiscoverManager) SyncContainers() {
func (c *ContainerDiscoverManager) StartSyncContainers() {
if c.enableCRIDiscover {
logger.Debug(context.Background(), "discover manager start sync containers goroutine", "cri")
go criRuntimeWrapper.loopSyncContainers()
Expand Down Expand Up @@ -159,8 +159,42 @@ func (c *ContainerDiscoverManager) LogAlarm(err error, msg string) {
}
}

func (c *ContainerDiscoverManager) Init(initTryTimes int) {
func (c *ContainerDiscoverManager) Init() bool {
defer dockerCenterRecover()

// discover which runtime is valid
if IsCRIRuntimeValid(containerdUnixSocket) {
var err error
criRuntimeWrapper, err = NewCRIRuntimeWrapper(dockerCenterInstance)
if err != nil {
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] creare cri-runtime client error: %v", err)
criRuntimeWrapper = nil
} else {
logger.Infof(context.Background(), "[CRIRuntime] create cri-runtime client successfully")
}
}
if ok, err := util.PathExists(DefaultLogtailMountPath); err == nil {
if !ok {
logger.Info(context.Background(), "no docker mount path", "set empty")
DefaultLogtailMountPath = ""
}
} else {
logger.Warning(context.Background(), "check docker mount path error", err.Error())
}
c.enableCRIDiscover = criRuntimeWrapper != nil
c.enableDockerDiscover = dockerCenterInstance.initClient() == nil
c.enableStaticDiscover = isStaticContainerInfoEnabled()
discoverdRuntime := false
if len(os.Getenv("USE_CONTAINERD")) > 0 {
discoverdRuntime = c.enableCRIDiscover
} else {
discoverdRuntime = c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover
}
if !discoverdRuntime {
return false
}

// try to connect to runtime
logger.Info(context.Background(), "input", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover)
listenLoopIntervalSec := 0
// Get env in the same order as in C Logtail
Expand Down Expand Up @@ -222,31 +256,22 @@ func (c *ContainerDiscoverManager) Init(initTryTimes int) {

var err error
if c.enableDockerDiscover {
for i := 0; i < initTryTimes; i++ {
if err = c.fetchDocker(); err == nil {
break
}
}
if err != nil {
if err = c.fetchDocker(); err != nil {
c.enableDockerDiscover = false
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch docker containers error in %d times, close docker discover", initTryTimes)
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch docker containers error, close docker discover, will retry")
}
}
if c.enableCRIDiscover {
for i := 0; i < initTryTimes; i++ {
if err = c.fetchCRI(); err == nil {
break
}
}
if err != nil {
if err = c.fetchCRI(); err != nil {
c.enableCRIDiscover = false
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch cri containers error in %d times, close cri discover", initTryTimes)
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch cri containers error, close cri discover, will retry")
}
}
if c.enableStaticDiscover {
c.fetchStatic()
}
logger.Info(context.Background(), "final", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover)
return c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover
}

func (c *ContainerDiscoverManager) TimerFetch() {
Expand Down
43 changes: 14 additions & 29 deletions pkg/helper/docker_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,39 +632,24 @@ func getDockerCenterInstance() *DockerCenter {
dockerCenterInstance = &DockerCenter{}
dockerCenterInstance.imageCache = make(map[string]string)
dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail)
if IsCRIRuntimeValid(containerdUnixSocket) {
retryTimes := 10
for i := 0; i < retryTimes; i++ {
var err error
criRuntimeWrapper, err = NewCRIRuntimeWrapper(dockerCenterInstance)
if err != nil {
logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] creare cri-runtime client error: %v", err)
criRuntimeWrapper = nil
} else {
logger.Infof(context.Background(), "[CRIRuntime] create cri-runtime client successfully")
// containerFindingManager works in a producer-consumer model
// so even manager is not initialized, it will not affect consumers like service_stdout
go func() {
retryCount := 0
containerFindingManager = NewContainerDiscoverManager()
for {
if containerFindingManager.Init() {
break
}
time.Sleep(time.Second * 1)
if i == retryTimes-1 {
logger.Error(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] create cri-runtime client failed")
if retryCount%10 == 0 {
logger.Error(context.Background(), "DOCKER_CENTER_ALARM", "docker center init failed", "retry count", retryCount)
}
retryCount++
time.Sleep(time.Second * 1)
}
}
if ok, err := util.PathExists(DefaultLogtailMountPath); err == nil {
if !ok {
logger.Info(context.Background(), "no docker mount path", "set empty")
DefaultLogtailMountPath = ""
}
} else {
logger.Warning(context.Background(), "check docker mount path error", err.Error())
}
var enableCriFinding = criRuntimeWrapper != nil
var enableDocker = dockerCenterInstance.initClient() == nil
var enableStatic = isStaticContainerInfoEnabled()
containerFindingManager = NewContainerDiscoverManager(enableDocker, enableCriFinding, enableStatic)
containerFindingManager.Init(3)
containerFindingManager.TimerFetch()
containerFindingManager.SyncContainers()
containerFindingManager.TimerFetch()
containerFindingManager.StartSyncContainers()
}()
})
return dockerCenterInstance
}
Expand Down
29 changes: 6 additions & 23 deletions pkg/helper/docker_cri_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ type CRIRuntimeWrapper struct {
}

func IsCRIRuntimeValid(criRuntimeEndpoint string) bool {
if len(os.Getenv("USE_CONTAINERD")) > 0 {
return true
}

// Verify containerd.sock cri valid.
if fi, err := os.Stat(criRuntimeEndpoint); err == nil && !fi.IsDir() {
if IsCRIStatusValid(criRuntimeEndpoint) {
Expand All @@ -105,30 +101,17 @@ func IsCRIStatusValid(criRuntimeEndpoint string) bool {

client := cri.NewRuntimeServiceClient(conn)
// check cri status
for tryCount := 0; tryCount < 5; tryCount++ {
_, err = client.Status(ctx, &cri.StatusRequest{})
if err == nil {
break
}
if strings.Contains(err.Error(), "code = Unimplemented") {
logger.Debug(context.Background(), "Status failed", err)
return false
}
time.Sleep(time.Millisecond * 100)
}
_, err = client.Status(ctx, &cri.StatusRequest{})
if err != nil {
logger.Debug(context.Background(), "Status failed", err)
return false
}
// check running containers
for tryCount := 0; tryCount < 5; tryCount++ {
var containersResp *cri.ListContainersResponse
containersResp, err = client.ListContainers(ctx, &cri.ListContainersRequest{Filter: nil})
if err == nil {
logger.Debug(context.Background(), "ListContainers result", containersResp.Containers)
return containersResp.Containers != nil
}
time.Sleep(time.Millisecond * 100)
var containersResp *cri.ListContainersResponse
containersResp, err = client.ListContainers(ctx, &cri.ListContainersRequest{Filter: nil})
if err == nil {
logger.Debug(context.Background(), "ListContainers result", containersResp.Containers)
return containersResp.Containers != nil
}
logger.Debug(context.Background(), "ListContainers failed", err)
return false
Expand Down
4 changes: 4 additions & 0 deletions pkg/helper/mount_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func GetMountedFilePathWithBasePath(basePath, logPath string) string {

func TryGetRealPath(path string) (string, fs.FileInfo) {
sepLen := len(string(os.PathSeparator))
if len(path) < sepLen {
return "", nil
}

index := 0 // assume path is absolute
for i := 0; i < 10; i++ {
if f, err := os.Stat(path); err == nil {
Expand Down
3 changes: 3 additions & 0 deletions plugins/input/canal/input_canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type ServiceCanal struct {
Charset string
// Pack values into two fields: new_data and old_data. False by default.
PackValues bool
// True时 binlog解析DECIMAL输出原格式而非科学计数法, like:https:/go-mysql-org/go-mysql/blob/6c99b4bff931a5aced0978b78aadb5867afcdcd3/canal/dump.go#L85
UseDecimal bool

shutdown chan struct{}
waitGroup sync.WaitGroup
Expand Down Expand Up @@ -189,6 +191,7 @@ func (sc *ServiceCanal) Init(context pipeline.Context) (int, error) {
sc.config.DiscardNoMetaRowEvent = true
sc.config.IncludeTableRegex = sc.IncludeTables
sc.config.ExcludeTableRegex = sc.ExcludeTables
sc.config.UseDecimal = sc.UseDecimal

sc.lastErrorChan = make(chan error, 1)

Expand Down
Loading