From 4f9c95754aac099f4a19d5c4b4a15be7e57ea173 Mon Sep 17 00:00:00 2001 From: Bingchang Chen Date: Fri, 7 Jun 2024 02:24:26 -0700 Subject: [PATCH 1/4] fix: infinite retry container discovery (#1454) * fix: infinite retry container discovery * fix * Merge remote-tracking branch 'alibaba/main' * fix * fix * fix --- pkg/helper/container_discover_controller.go | 65 ++++++++++++++------- pkg/helper/docker_center.go | 43 +++++--------- pkg/helper/docker_cri_adapter.go | 29 ++------- 3 files changed, 65 insertions(+), 72 deletions(-) diff --git a/pkg/helper/container_discover_controller.go b/pkg/helper/container_discover_controller.go index aabdf4131f..c06eb72947 100644 --- a/pkg/helper/container_discover_controller.go +++ b/pkg/helper/container_discover_controller.go @@ -45,11 +45,11 @@ type ContainerDiscoverManager struct { fetchOneLock sync.Mutex } -func NewContainerDiscoverManager(enableDockerDiscover, enableCRIDiscover, enableStaticDiscover bool) *ContainerDiscoverManager { +func NewContainerDiscoverManager() *ContainerDiscoverManager { return &ContainerDiscoverManager{ - enableDockerDiscover: enableDockerDiscover, - enableCRIDiscover: enableCRIDiscover, - enableStaticDiscover: enableStaticDiscover, + enableDockerDiscover: false, + enableCRIDiscover: false, + enableStaticDiscover: false, } } @@ -125,7 +125,7 @@ func (c *ContainerDiscoverManager) fetchCRI() error { return criRuntimeWrapper.fetchAll() } -func (c *ContainerDiscoverManager) SyncContainers() { +func (c *ContainerDiscoverManager) StartSyncContainers() { if c.enableCRIDiscover { logger.Debug(context.Background(), "discover manager start sync containers goroutine", "cri") go criRuntimeWrapper.loopSyncContainers() @@ -159,8 +159,42 @@ func (c *ContainerDiscoverManager) LogAlarm(err error, msg string) { } } -func (c *ContainerDiscoverManager) Init(initTryTimes int) { +func (c *ContainerDiscoverManager) Init() bool { defer dockerCenterRecover() + + // discover which runtime is valid + if IsCRIRuntimeValid(containerdUnixSocket) { + var err error + criRuntimeWrapper, err = NewCRIRuntimeWrapper(dockerCenterInstance) + if err != nil { + logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] creare cri-runtime client error: %v", err) + criRuntimeWrapper = nil + } else { + logger.Infof(context.Background(), "[CRIRuntime] create cri-runtime client successfully") + } + } + if ok, err := util.PathExists(DefaultLogtailMountPath); err == nil { + if !ok { + logger.Info(context.Background(), "no docker mount path", "set empty") + DefaultLogtailMountPath = "" + } + } else { + logger.Warning(context.Background(), "check docker mount path error", err.Error()) + } + c.enableCRIDiscover = criRuntimeWrapper != nil + c.enableDockerDiscover = dockerCenterInstance.initClient() == nil + c.enableStaticDiscover = isStaticContainerInfoEnabled() + discoverdRuntime := false + if len(os.Getenv("USE_CONTAINERD")) > 0 { + discoverdRuntime = c.enableCRIDiscover + } else { + discoverdRuntime = c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover + } + if !discoverdRuntime { + return false + } + + // try to connect to runtime logger.Info(context.Background(), "input", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover) listenLoopIntervalSec := 0 // Get env in the same order as in C Logtail @@ -222,31 +256,22 @@ func (c *ContainerDiscoverManager) Init(initTryTimes int) { var err error if c.enableDockerDiscover { - for i := 0; i < initTryTimes; i++ { - if err = c.fetchDocker(); err == nil { - break - } - } - if err != nil { + if err = c.fetchDocker(); err != nil { c.enableDockerDiscover = false - logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch docker containers error in %d times, close docker discover", initTryTimes) + logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch docker containers error, close docker discover, will retry") } } if c.enableCRIDiscover { - for i := 0; i < initTryTimes; i++ { - if err = c.fetchCRI(); err == nil { - break - } - } - if err != nil { + if err = c.fetchCRI(); err != nil { c.enableCRIDiscover = false - logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch cri containers error in %d times, close cri discover", initTryTimes) + logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch cri containers error, close cri discover, will retry") } } if c.enableStaticDiscover { c.fetchStatic() } logger.Info(context.Background(), "final", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover) + return c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover } func (c *ContainerDiscoverManager) TimerFetch() { diff --git a/pkg/helper/docker_center.go b/pkg/helper/docker_center.go index 8fd4db31e9..8e117dd1b4 100644 --- a/pkg/helper/docker_center.go +++ b/pkg/helper/docker_center.go @@ -632,39 +632,24 @@ func getDockerCenterInstance() *DockerCenter { dockerCenterInstance = &DockerCenter{} dockerCenterInstance.imageCache = make(map[string]string) dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) - if IsCRIRuntimeValid(containerdUnixSocket) { - retryTimes := 10 - for i := 0; i < retryTimes; i++ { - var err error - criRuntimeWrapper, err = NewCRIRuntimeWrapper(dockerCenterInstance) - if err != nil { - logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] creare cri-runtime client error: %v", err) - criRuntimeWrapper = nil - } else { - logger.Infof(context.Background(), "[CRIRuntime] create cri-runtime client successfully") + // containerFindingManager works in a producer-consumer model + // so even manager is not initialized, it will not affect consumers like service_stdout + go func() { + retryCount := 0 + containerFindingManager = NewContainerDiscoverManager() + for { + if containerFindingManager.Init() { break } - time.Sleep(time.Second * 1) - if i == retryTimes-1 { - logger.Error(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] create cri-runtime client failed") + if retryCount%10 == 0 { + logger.Error(context.Background(), "DOCKER_CENTER_ALARM", "docker center init failed", "retry count", retryCount) } + retryCount++ + time.Sleep(time.Second * 1) } - } - if ok, err := util.PathExists(DefaultLogtailMountPath); err == nil { - if !ok { - logger.Info(context.Background(), "no docker mount path", "set empty") - DefaultLogtailMountPath = "" - } - } else { - logger.Warning(context.Background(), "check docker mount path error", err.Error()) - } - var enableCriFinding = criRuntimeWrapper != nil - var enableDocker = dockerCenterInstance.initClient() == nil - var enableStatic = isStaticContainerInfoEnabled() - containerFindingManager = NewContainerDiscoverManager(enableDocker, enableCriFinding, enableStatic) - containerFindingManager.Init(3) - containerFindingManager.TimerFetch() - containerFindingManager.SyncContainers() + containerFindingManager.TimerFetch() + containerFindingManager.StartSyncContainers() + }() }) return dockerCenterInstance } diff --git a/pkg/helper/docker_cri_adapter.go b/pkg/helper/docker_cri_adapter.go index d6ede0462c..00c96f476c 100644 --- a/pkg/helper/docker_cri_adapter.go +++ b/pkg/helper/docker_cri_adapter.go @@ -76,10 +76,6 @@ type CRIRuntimeWrapper struct { } func IsCRIRuntimeValid(criRuntimeEndpoint string) bool { - if len(os.Getenv("USE_CONTAINERD")) > 0 { - return true - } - // Verify containerd.sock cri valid. if fi, err := os.Stat(criRuntimeEndpoint); err == nil && !fi.IsDir() { if IsCRIStatusValid(criRuntimeEndpoint) { @@ -105,30 +101,17 @@ func IsCRIStatusValid(criRuntimeEndpoint string) bool { client := cri.NewRuntimeServiceClient(conn) // check cri status - for tryCount := 0; tryCount < 5; tryCount++ { - _, err = client.Status(ctx, &cri.StatusRequest{}) - if err == nil { - break - } - if strings.Contains(err.Error(), "code = Unimplemented") { - logger.Debug(context.Background(), "Status failed", err) - return false - } - time.Sleep(time.Millisecond * 100) - } + _, err = client.Status(ctx, &cri.StatusRequest{}) if err != nil { logger.Debug(context.Background(), "Status failed", err) return false } // check running containers - for tryCount := 0; tryCount < 5; tryCount++ { - var containersResp *cri.ListContainersResponse - containersResp, err = client.ListContainers(ctx, &cri.ListContainersRequest{Filter: nil}) - if err == nil { - logger.Debug(context.Background(), "ListContainers result", containersResp.Containers) - return containersResp.Containers != nil - } - time.Sleep(time.Millisecond * 100) + var containersResp *cri.ListContainersResponse + containersResp, err = client.ListContainers(ctx, &cri.ListContainersRequest{Filter: nil}) + if err == nil { + logger.Debug(context.Background(), "ListContainers result", containersResp.Containers) + return containersResp.Containers != nil } logger.Debug(context.Background(), "ListContainers failed", err) return false From 120a96ee8b60a6a79c8b790f6c2103ba1e7e99fb Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Tue, 11 Jun 2024 13:46:37 +0800 Subject: [PATCH 2/4] fix core caused by empty container log path --- pkg/helper/mount_others.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/helper/mount_others.go b/pkg/helper/mount_others.go index b3596d815f..ace958683d 100644 --- a/pkg/helper/mount_others.go +++ b/pkg/helper/mount_others.go @@ -35,6 +35,10 @@ func GetMountedFilePathWithBasePath(basePath, logPath string) string { func TryGetRealPath(path string) (string, fs.FileInfo) { sepLen := len(string(os.PathSeparator)) + if len(path) < sepLen { + return "", nil + } + index := 0 // assume path is absolute for i := 0; i < 10; i++ { if f, err := os.Stat(path); err == nil { From aebe53ad9299e0d0dea04767e4aae3da0a50efa2 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Mon, 17 Jun 2024 15:24:53 +0800 Subject: [PATCH 3/4] Add UseDecimal parameter to the service_canal plugin. When UseDecimal is set to true, the binlog parsing outputs the original DECIMAL format instead of scientific notation (#1509) * add UseDecimal --- docs/cn/plugins/input/service-canal.md | 1 + plugins/input/canal/input_canal.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/docs/cn/plugins/input/service-canal.md b/docs/cn/plugins/input/service-canal.md index ae407b6b60..38aca4446b 100644 --- a/docs/cn/plugins/input/service-canal.md +++ b/docs/cn/plugins/input/service-canal.md @@ -31,6 +31,7 @@ | TextToString | Boolean,`false` | 是否将text类型的数据转换成字符串。不设置时,默认为false,表示不转换。| | PackValues | Boolean,`false` | 是否将事件数据打包成JSON格式。默认为false,表示不打包。如果设置为true,Logtail会将事件数据以JSON格式集中打包到data和old_data两个字段中,其中old_data仅在row_update事件中有意义。 示例:假设数据表有三列数据c1,c2,c3,设置为false,row_insert事件数据中会有c1,c2,c3三个字段,而设置为true时,c1,c2,c3会被统一打包为data字段,值为`{"c1":"...", "c2": "...", "c3": "..."}`。| | EnableEventMeta | Boolean,`false` | 是否采集事件的元数据,默认为false,表示不采集。 Binlog事件的元数据包括event_time、event_log_position、event_size和event_server_id。| +| UseDecimal | Boolean,`false` | Binlog解析DECIMAL类型时,是否保持原格式输出,而不是使用科学计数法。如果未设置,系统默认为false,即默认使用科学计数法。| ## 样例 diff --git a/plugins/input/canal/input_canal.go b/plugins/input/canal/input_canal.go index 55969667e2..710baa345d 100644 --- a/plugins/input/canal/input_canal.go +++ b/plugins/input/canal/input_canal.go @@ -141,6 +141,8 @@ type ServiceCanal struct { Charset string // Pack values into two fields: new_data and old_data. False by default. PackValues bool + // True时 binlog解析DECIMAL输出原格式而非科学计数法, like:https://github.com/go-mysql-org/go-mysql/blob/6c99b4bff931a5aced0978b78aadb5867afcdcd3/canal/dump.go#L85 + UseDecimal bool shutdown chan struct{} waitGroup sync.WaitGroup @@ -189,6 +191,7 @@ func (sc *ServiceCanal) Init(context pipeline.Context) (int, error) { sc.config.DiscardNoMetaRowEvent = true sc.config.IncludeTableRegex = sc.IncludeTables sc.config.ExcludeTableRegex = sc.ExcludeTables + sc.config.UseDecimal = sc.UseDecimal sc.lastErrorChan = make(chan error, 1) From 015b8ad70d1ec5f693361a3baabf9d363241c040 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Mon, 17 Jun 2024 17:50:22 +0800 Subject: [PATCH 4/4] fix: revert missing splitline of shennong log profile in processorsplitmultilinelog (#1544) * fix split line shennong log profile --- core/processor/ProcessorSplitLogStringNative.cpp | 2 -- core/processor/ProcessorSplitMultilineLogStringNative.cpp | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/processor/ProcessorSplitLogStringNative.cpp b/core/processor/ProcessorSplitLogStringNative.cpp index e1c5083f5e..edf7c72119 100644 --- a/core/processor/ProcessorSplitLogStringNative.cpp +++ b/core/processor/ProcessorSplitLogStringNative.cpp @@ -83,8 +83,6 @@ void ProcessorSplitLogStringNative::Process(PipelineEventGroup& logGroup) { } *mSplitLines = newEvents.size(); logGroup.SwapEvents(newEvents); - - return; } bool ProcessorSplitLogStringNative::IsSupportedEvent(const PipelineEventPtr& e) const { diff --git a/core/processor/ProcessorSplitMultilineLogStringNative.cpp b/core/processor/ProcessorSplitMultilineLogStringNative.cpp index 12df049b34..2157c1e39c 100644 --- a/core/processor/ProcessorSplitMultilineLogStringNative.cpp +++ b/core/processor/ProcessorSplitMultilineLogStringNative.cpp @@ -92,6 +92,7 @@ void ProcessorSplitMultilineLogStringNative::Process(PipelineEventGroup& logGrou } mProcMatchedLinesCnt->Add(inputLines - unmatchLines); mProcUnmatchedLinesCnt->Add(unmatchLines); + *mSplitLines = newEvents.size(); logGroup.SwapEvents(newEvents); }