Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File Sink Output - JSON Lines type - Not completing the last log line at the end of the file #2343

Closed
canob opened this issue Oct 20, 2023 · 14 comments · Fixed by #2370
Closed

Comments

@canob
Copy link

canob commented Oct 20, 2023

I configured different sources on eKuiper:

  • File Source, with JSON Lines on it
    imagen
  • HTTPPush Source, receiving JSON Lines from FluentBit HTTP Ouput
    imagen
    On both cases, I'm sending the output to a file with eKuiper:
    imagen
    imagen
    On the first case, JSON Lines file source -> JSON Lines file, I see two problems:
    The first one is that every JSON Line of the source file is processed again every time eKuiper "review the file" (I configured to do that every 30 seconds), and my expectation is that eKuiper only process the new JSON Lines in the file, not all of them again.
    The second problem is the one mentioned in the title of this issue: the destination JSON Lines file that eKuiper is filling, the last line is incomplete. For example:
{"CertIssuerName":null,"CertSerialNumber":null,"CertThumbprint":null,"Computername":"ARG01.company.cl","ID":4768,"IpAddress":"::ffff:10.199.0.84","IpPort":"58745","Keywords":["Audit Success"],"LogName":"Security","Message":"A Kerberos authentication ticket (TGT) was requested.\r\n\r\nAccount Information:\r\n\tAccount Name:\t\tUSER\r\n\tSupplied Realm Name:\tcompany\r\n\tUser ID:\t\t\tS-1-5-21-1844237615-1563985344-1957994488-20730\r\n\r\nService Information:\r\n\tService Name:\t\tkrbtgt\r\n\tService ID:\t\tS-1-5-21-1844237615-1563985344-1957994488-502\r\n\r\nNetwork Information:\r\n\tClient Address:\t\t::ffff:10.199.0.84\r\n\tClient Port:\t\t58745\r\n\r\nAdditional Information:\r\n\tTicket Options:\t\t0x40810010\r\n\tResult Code:\t\t0x0\r\n\tTicket Encryption Type:\t0x12\r\n\tPre-Authentication Type:\t2\r\n\r\nCertificate Information:\r\n\tCertificate Issuer Name:\t\t\r\n\tCertificate Serial Number:\t\r\n\tCertificate Thumbprint:\t\t\r\n\r\nCertificate information is only provided if a certificate was used for pre-authentication.\r\n\r\nPre-authentication types, ticket options, encryption types and result codes are defined in RFC 4120.","PreAuthType":"2","RecordType":"Information\u0000","ServiceName":"krbtgt","ServiceSid":"S-1-5-21-1844237615-1563985344-1957994488-502","Source":"Microsoft-Windows-Security-Auditing","Status":"0x0","TargetDomainName":"company","TargetSid":"S-1-5-21-1844237615-1563985344-1957994488-20730","TargetUserName":"USER","TicketEncryptionType":"0x12","TicketOptions":"0x40810010","TimeCreated":"2023-10-19T14:05:40.5864226-03:00"}
{"Computername":"ARG01.company.cl","ID":4769,"IpAddress":"::ffff:10.199.0.84","IpPort":"58746","Keywords":["Audit Success"],"LogName":"Security","LogonGuid":"{37498229-0ff7-2cf2-7a12-d293665eec62}","Message":"A Kerberos service ticket was requested.\r\n\r\nAccount Information:\r\n\tAccount Name:\t\tUSER@company.CL\r\n\tAccount Domain:\t\tcompany.CL\r\n\tLogon GUID:\t\t{37498229-0ff7-2cf2-7a12-d293665eec62}\r\n\r\nService Information:\r\n\tService Name:\t\tUSER-NTB$\r\n\tService ID:\t\tS-1-5-21-1844237615-1563985344-1957994488-30606\r\n\r\nNetwork Information:\r\n\tClient Address:\t\t::ffff:10.199.0.84\r\n\tClient Port:\t\t58746\r\n\r\nAdditional Information:\r\n\tTicket Options:\t\t0x40810000\r\n\tTicket Encryption Type:\t0x12\r\n\tFailure Code:\t\t0x0\r\n\tTransited Services:\t-\r\n\r\nThis event is generated every time access is requested to a resource such as a computer or a Windows service.  The service name indicates the resource to which access was requested.\r\n\r\nThis event can be correlated with Windows logon events by comparing the Logon GUID fields in each event.  The logon event occurs on the machine that was accessed, which is often a different machine than the domain controller which issued the service ticket.\r\n\r\nTicket options, encryption types, and failure codes are defined in RFC 4120.","RecordType":"Information\u0000","ServiceName":"USER-NTB$","ServiceSid":"S-1-5-21-1844237615-1563985344-1957994488-30606","Source":"Microsoft-Windows-Security-Auditing","Status":"0x0","TargetDomainName":"company.CL","TargetUserName":"USER@company.CL","TicketEncryptionType":"0x12","TicketOptions":"0x40810000","TimeCreated":"2023-10-19T14:05:40.6128959-03:00","Transmitte

On the second case, HTTPPush Input -> JSON Lines, the problem is again the one mentioned in the tittle of the issue, the destination JSON Lines file that eKuiper is filling, the last line is incomplete. For example:

{"Mem.free":1350104,"Mem.total":32819404,"Mem.used":31469300,"Swap.free":7880444,"Swap.total":7995388,"Swap.used":114944,"date":"2023-10-20T13:57:35.462334Z","tag":"memory"}
{"Mem.free":1350104,"Mem.total":32819404,"Mem.used":31469300,"Swap.free":7880444,"Swap.total":7995388,"Swap.used":114944,"date":"2023-10-20T13:57:36.462228Z","tag":"memory"}
{"cpu0.p_cpu":3,"cpu0.p_system":1,"cpu0.p_user":2,"cpu1.p_cpu":3,"cpu1.p_system":1,"cpu1.p_user":2,"cpu2.p_cpu":5,"cpu2.p_system":2,"cpu2.p_user":3,"cpu3.p_cpu":4,"cpu3.p_system":1,"cpu3.p_user":3,"cpu_p":4.25,"date":"2023-10-20T13:57:36.462027Z","system_p":1.5,"tag":"cpu","user_p":2.75}
{"Mem.free":1347332,"Mem.total":32819404,"Mem.used":31472072,"Swap.free":7880444,"Swap.total":7995388,"Swap.used":114944,"date":"2023-10-20T13:57:37.462247Z","tag":"memory"}
{"cpu0.p_cpu":6,"cpu0.p_system":2,"cpu0.p_user":4,"cpu1.p_cpu":10,"cpu1.p_system":4,"cpu1.p_user":6,"cpu2.p_cpu":6,"cpu2.p_system":2,"cpu2.p_user":4,"cpu3.p_cpu":12,"cpu3.p_system":4,"cpu3.p_u

This is for me a big issue, because if I use other program to "consume" the output file in real time, is going to have many problems with last incomplete line.

The really strange thing here, is that after some time (sometimes, minutes), eKuiper completes the last JSON line of the output file with cutted part.

Environment:

  • eKuiper version (e.g. 1.3.0): lfedge/ekuiper:1.11.4, Docker image
  • Hardware configuration (e.g. lscpu): x86_64, Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz, 16 GB RAM
  • OS (e.g. cat /etc/os-release): Manjaro Linux, but using a Docker Image for eKuiper
  • Others:

What happened and what you expected to happen:
Two things happened:

  • One, the File Source read all the JSON lines on the file every time, instead of reading only the new JSON lines.
  • Two, eKuiper is cutting last JSON line before it ends, so the JSON line is not valid, and I cannot process that file with other piece of software in that case.
    What I expected to happen:
  • Every time that eKuiper review the source JSON lines file, process only the new JSON lines in the file, not the old ones that are already processed by eKuiper.
  • That eKuiper completes the processed JSON line in the output file, and not cut it.

How to reproduce it (as minimally and precisely as possible):

  • Create a JSON lines file, with two or three lines, for example:
{"Mem.free":1347332,"Mem.total":32819404,"Mem.used":31472072,"Swap.free":7880444,"Swap.total":7995388,"Swap.used":114944,"date":"2023-10-20T13:57:38.462427Z","tag":"memory"}
{"cpu0.p_cpu":8,"cpu0.p_system":3,"cpu0.p_user":5,"cpu1.p_cpu":9,"cpu1.p_system":2,"cpu1.p_user":7,"cpu2.p_cpu":8,"cpu2.p_system":4,"cpu2.p_user":4,"cpu3.p_cpu":10,"cpu3.p_system":4,"cpu3.p_user":6,"cpu_p":8.5,"date":"2023-10-20T13:57:38.462195Z","system_p":3,"tag":"cpu","user_p":5.5}
{"cpu0.p_cpu":3,"cpu0.p_system":1,"cpu0.p_user":2,"cpu1.p_cpu":5,"cpu1.p_system":2,"cpu1.p_user":3,"cpu2.p_cpu":5,"cpu2.p_system":1,"cpu2.p_user":4,"cpu3.p_cpu":5,"cpu3.p_system":2,"cpu3.p_user":3,"cpu_p":4.5,"date":"2023-10-20T13:57:39.462017Z","system_p":1.5,"tag":"cpu","user_p":3}
{"Mem.free":1347332,"Mem.total":32819404,"Mem.used":31472072,"Swap.free":7880444,"Swap.total":7995388,"Swap.used":114944,"date":"2023-10-20T13:57:39.462232Z","tag":"memory"}
{"cpu0.p_cpu":3,"cpu0.p_system":1,"cpu0.p_user":2,"cpu1.p_cpu":5,"cpu1.p_system":2,"cpu1.p_user":3,"cpu2.p_cpu":3,"cpu2.p_system":1,"cpu2.p_user":2,"cpu3.p_cpu":3,"cpu3.p_system":1,"cpu3.p_user":2,"cpu_p":3.5,"date":"2023-10-20T13:57:40.462038Z","system_p":1.25,"tag":"cpu","user_p":2.25}
  • Create a rule that process that JSON lines file, and output it to a JSON lines file.
  • Review that JSON lines file output in real time, and you are going to see that the last time is cutted.
  • If you wait some minutes, you can review the JSON lines file output again, and you are going to see that the last JSON line was completed.

Anything else we need to know?:
Nothing in particular.

@canob
Copy link
Author

canob commented Oct 22, 2023

To continue testing this, I added a REST sink Output, sending the result of a rule to FluentBit HTTP Input, and eKuiper is reporting this error:
unexpected end of JSON input: http error. | method=POST path="http://192.168.1.50:8888/" status=201 request_body="{"Mem.free":2312340,"Mem.total":32819404}" response_body=""
Is not clear to me if this problem is related to the already mentioned one, which is the last line in the logs is cutted when I send the output to a file, but maybe is related.
imagen

@ngjaying
Copy link
Collaborator

@canob Appreciate the detail description. Are you trying to use files to transfer data between rules? That's not the purpose of file sink. It is not designed to write each json output to a single file ( which means "completing the last line" ). Instead file sink is used to save batch data.

To transfer data between rules, try to use memory sink/source pair. Check https://ekuiper.org/docs/en/latest/guide/rules/rule_pipeline.html.

The REST error means the JSON output format has some problems. Please try to debug https://ekuiper.org/docs/en/latest/getting_started/debug_rules.html

@canob
Copy link
Author

canob commented Oct 23, 2023

Hi @ngjaying . Thanks for your answer.
I'm not trying to use file transfers between rules, I'm using the same rule to consume a source (a file in a rule, a HTTP Push in other rule, both JSON content) and send that data to an output (a different file output in both rules), so nothing is shared between rules.
Thinking in the idea that the output file in both rules is not being completed in real time (the idea of "completing last line"), I thought that the REST errors were because eKuiper is trying to process the file and is not finding the last line complete, so is not a valid JSON.
I'm going to try the debug rule option to get more information about the error and add more information to this post, in the case that is useful for you to debug the problem.

@ngjaying
Copy link
Collaborator

Thanks for clarifying. You said "and I cannot process that file with other piece of software in that case."

I don't think file sink is designed for "realtime" data transfer. Would you like to use MQTT or REST sink to publish "realtime" data to external system? For file sink, it has a rolling strategy(https://ekuiper.org/docs/en/latest/guide/sinks/builtin/file.html#rolling-strategy), and will guarantee to complete the writing for each roll.

@canob
Copy link
Author

canob commented Oct 23, 2023

Based on your idea of using other type of sink to publish "realtime" data to external system (not file sink) is that I configured a REST sink Output to send data to FluentBit HTTP Input (my second post in this issue), but in that case happened the problem with JSON unexpected end of JSON input that I need to try to debug with the provided procedure that you commented me in your other answer. I even tried this for example:
FluentBit HTTP Output -> eKuiper HTTP Push Source -> A filter with a rule -> HTTP Pull Sink -> FluentBit HTTP Input, and is not working with the same message error of JSON, and there is no file involved in that path of data.
Please don't misunderstand my questions regarding the functionality of eKuiper: it's been a long time since I reviewed a project as promising and with so many interesting features as this one, I'm really amazed about the capabilities of eKuiper, and want to understand better how to use it.

@ngjaying
Copy link
Collaborator

Each sink can have multiple actions, during rule composing, you can add an additional log action to watch the output. I guess you'll just need to set "sendSingle" to true in the sink property.

@canob
Copy link
Author

canob commented Oct 23, 2023

I already changed to "sendSingle" on sink output when I saw the first time the unexpected end of JSON input: http error error.
today I'm going to enable log action and debug on rules to try to find the problem.
Thanks for all your help.

@canob
Copy link
Author

canob commented Oct 23, 2023

Hi again @ngjaying,

So, I did many tests to try to understand what happening, but is no really clear for me why I'm getting unexpected end of JSON input: http error error.

The final conclusion is that If I create this path:

Fluentbit Istance 1 HTTP output (memory statistics) to eKuiper
-> eKuiper HTTP Push Source
    -> eKuiper rule to select only two fields of the received JSON, Mem.free and Mem.total
         -> eKuiper HTTP Pull Sink
              -> Fluentbit Instance 2 HTTP input from eKuiper
                  -> Fluentbit Instance 2 File output to ekuiper file

I'm receive the error, but I'm not loosing any event:
imagen
imagen

As you can see, I send 6 events from Fluentbit to eKuiper, and I ended with 6 events on ekuiper file (you are going to see an additional field, "tag", because I'm adding that field with a filter in Fluentbit before send the events to ekuiper file):
imagen

The debug log of eKuiper is not showing any particular error:

time="2023-10-23 13:07:50" level=info msg="Serving kuiper (version - 1.11.4) on port 20498, and restful api on http://0.0.0.0:9081." file="server/server.go:208"
Serving kuiper (version - 1.11.4) on port 20498, and restful api on http://0.0.0.0:9081.
time="2023-10-23 13:08:00" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:10" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:20" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:30" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:40" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:50" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:50" level=debug msg="httppush received message map[Mem.free:%!s(float64=711716) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2107688e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:49.461940Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:50" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:50" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:711716 Mem.total:3.2819404e+07 Mem.used:3.2107688e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:49.461940Z tag:memory] 1698066530479 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="project plan receive &{HTTPInput map[Mem.free:711716 Mem.total:3.2819404e+07 Mem.used:3.2107688e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:49.461940Z tag:memory] 1698066530479 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="sending data: [map[Mem.free:711716 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="rest sink receive map[Mem.free:%!s(float64=711716) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0000d5200), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0003e6db0)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f0dc0 0 [] false false map[] 0xc0006e5600 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:50" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:08:51" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:51" level=debug msg="httppush received message map[Mem.free:%!s(float64=711100) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2108304e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:50.461888Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:51" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:51" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:711100 Mem.total:3.2819404e+07 Mem.used:3.2108304e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:50.461888Z tag:memory] 1698066531477 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="project plan receive &{HTTPInput map[Mem.free:711100 Mem.total:3.2819404e+07 Mem.used:3.2108304e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:50.461888Z tag:memory] 1698066531477 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="sending data: [map[Mem.free:711100 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="rest sink receive map[Mem.free:%!s(float64=711100) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc000699320), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0007da960)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:51" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f4a40 0 [] false false map[] 0xc000689c00 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:52" level=debug msg="httppush received message map[Mem.free:%!s(float64=710848) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2108556e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:51.461825Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:52" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:52" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:710848 Mem.total:3.2819404e+07 Mem.used:3.2108556e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:51.461825Z tag:memory] 1698066532466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="project plan receive &{HTTPInput map[Mem.free:710848 Mem.total:3.2819404e+07 Mem.used:3.2108556e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:51.461825Z tag:memory] 1698066532466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="sending data: [map[Mem.free:710848 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="rest sink receive map[Mem.free:%!s(float64=710848) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0006993b0), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0007db050)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:52" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f4dc0 0 [] false false map[] 0xc00091c000 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:53" level=debug msg="httppush received message map[Mem.free:%!s(float64=728528) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2090876e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:52.461838Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:53" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:53" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:728528 Mem.total:3.2819404e+07 Mem.used:3.2090876e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:52.461838Z tag:memory] 1698066533466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="project plan receive &{HTTPInput map[Mem.free:728528 Mem.total:3.2819404e+07 Mem.used:3.2090876e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:52.461838Z tag:memory] 1698066533466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="sending data: [map[Mem.free:728528 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="rest sink receive map[Mem.free:%!s(float64=728528) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0005bc240), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0005beab0)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:53" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f4f20 0 [] false false map[] 0xc0005dc600 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="httppush received message map[Mem.free:%!s(float64=726552) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2092852e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:53.461865Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:726552 Mem.total:3.2819404e+07 Mem.used:3.2092852e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:53.461865Z tag:memory] 1698066534466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="project plan receive &{HTTPInput map[Mem.free:726552 Mem.total:3.2819404e+07 Mem.used:3.2092852e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:53.461865Z tag:memory] 1698066534466 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="sending data: [map[Mem.free:726552 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink receive map[Mem.free:%!s(float64=726552) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0000d5cb0), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0009a2240)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc00007e860 0 [] false false map[] 0xc000766700 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="receive http request: /api/data" file="httpserver/data_server.go:83" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="httppush received message map[Mem.free:%!s(float64=725800) Mem.total:%!s(float64=3.2819404e+07) Mem.used:%!s(float64=3.2093604e+07) Swap.free:%!s(float64=7.761148e+06) Swap.total:%!s(float64=7.995388e+06) Swap.used:%!s(float64=234240) date:2023-10-23T13:08:54.461890Z tag:memory]" file="httpserver/data_server.go:92" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="memory source broadcast from topic $$httppush//api/data to fluentbit-output_HTTPInput_0 done" file="pubsub/manager.go:137" httppush_connection=0
time="2023-10-23 13:08:54" level=debug msg="filter plan receive &{HTTPInput map[Mem.free:725800 Mem.total:3.2819404e+07 Mem.used:3.2093604e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:54.461890Z tag:memory] 1698066534789 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/filter_operator.go:37" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="project plan receive &{HTTPInput map[Mem.free:725800 Mem.total:3.2819404e+07 Mem.used:3.2093604e+07 Swap.free:7.761148e+06 Swap.total:7.995388e+06 Swap.used:234240 date:2023-10-23T13:08:54.461890Z tag:memory] 1698066534789 map[topic:$$httppush//api/data] {{{0 0} 0 0 {{} 0} {{} 0}} map[] map[]} {0 0} map[]}" file="operator/project_operator.go:54" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="sending data: [map[Mem.free:725800 Mem.total:3.2819404e+07]]" file="node/sink_node.go:242" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink receive map[Mem.free:%!s(float64=725800) Mem.total:%!s(float64=3.2819404e+07)]" file="http/rest_sink.go:87" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="do request: &http.Request{Method:\"POST\", URL:(*url.URL)(0xc0006994d0), Proto:\"HTTP/1.1\", ProtoMajor:1, ProtoMinor:1, Header:http.Header{\"Content-Type\":[]string{\"application/json\"}}, Body:io.nopCloserWriterTo{Reader:(*bytes.Buffer)(0xc0007db9b0)}, GetBody:(func() (io.ReadCloser, error))(0xfcaea0), ContentLength:40, TransferEncoding:[]string(nil), Close:false, Host:\"192.168.1.50:8888\", Form:url.Values(nil), PostForm:url.Values(nil), MultipartForm:(*multipart.Form)(nil), Trailer:http.Header(nil), RemoteAddr:\"\", RequestURI:\"\", TLS:(*tls.ConnectionState)(nil), Cancel:(<-chan struct {})(nil), Response:(*http.Response)(nil), ctx:(*context.emptyCtx)(0xc000046080)}" file="httpx/http.go:99" rule=fluentbit-output
time="2023-10-23 13:08:54" level=debug msg="rest sink got response &{201 Created  201 HTTP/1.1 1 1 map[Content-Length:[0] Ok:[ok] Server:[Fluent Bit v2.1.10]] 0xc0001f56c0 0 [] false false map[] 0xc00091c700 <nil>}" file="http/rest_sink.go:67" rule=fluentbit-output
time="2023-10-23 13:09:00" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"
time="2023-10-23 13:09:10" level=debug msg="rule fluentbit-output origin state: Running, sginal: 0" file="server/server.go:334"

So, my questions to try to understand are basically:

  • Is the unexpected end of JSON input: http error error that I'm seeing in eKuiper Dashboard being generated by eKuiper when is processing the JSON lines received by HTTP Push Source, or is being generated by Fluentbit when receive the output of HTTP Pull sink? Because I don't see that error in any log files from eKuiper or from Fluentbit.
  • Is there a way to disable that alarms on eKuiper/eKuiper Dashboard? Because, at the end, the stream processing is working as expected, but if I want to have metrics about that, is going to be impossible.

Thanks in advance for your help.

@ngjaying
Copy link
Collaborator

@canob The error happens when eKuiper parse the response from Fluentbit. Looks like the response from Fluentbit is not a JSON string. The error message is misleading, we'll optimize that.

I guess you have set debugResp option to true for rest sink. You can just set it to false to skip parsing response to avoid this problem.

@canob
Copy link
Author

canob commented Oct 24, 2023

Thanks @ngjaying !
With debugResp set to false I don't see the unexpected end of JSON input: http error error anymore.
Amazing.

@canob
Copy link
Author

canob commented Oct 25, 2023

Hi @ngjaying,
Back with the title of this issue, "File Sink Output not completing the last log line", I already managed to use REST Sink Output without any errors thanks to your help, but I really want to understand the behavior of File Sink, because I still think that something is not working as expected, and maybe I was not enough clear to explain the problem before, and I'm going to try again with a simpler explanation:
So, for example, I have this workflow:

Fluentbit HTTP Output
-> Stream eKuiper HTTP Push Source, named dns
    -> Rule with "select * FROM dns"
        -> File Sink, rolling by time (as you explained to me before in one of your answers in other post)

Now, what for me is not ok or is not the expected behavior (or in other words, I not saw that behavior in other stream processing solutions), is that for many seconds, in the oldest file, last line of the file, when I do a "cat filename", contains a "cutted" JSON line at the end, and after 10-20 seconds (one or two "cat filename"), that mentioned line complete their content, additional JSON lines appear, and the file is closed and a new file is created by eKuiper to continue appending JSON lines to it.
Here is a simple example of what I'm trying to explain (between parenthesis my comments):

root@ar09-ubuntu:/disk/ekuiper/output# ls -lah
total 17.6K
drwxr-xr-x 2 root root 4.0K Oct 25 18:55 .
drwxr-xr-x 5 root root 4.0K Oct 24 14:35 ..
-rw-r--r-- 1 root root 6.7K Oct 25 18:54 dns-query-1698260023842.json
-rw-r--r-- 1 root root 6.9K Oct 25 18:55 dns-query-1698260065851.json
-rw-r--r-- 1 root root 4.0K Oct 25 18:55 dns-query-1698260107832.json (the file, for now incomplete, of 4.0K, but can be of 8.0K, 12.0K, and so on, depends on how many logs is receiving before the rotation)

root@ar09-ubuntu:/disk/ekuiper/output# cat dns-query-1698260107832.json
... (more lines here that I deleted for the example)
{"date":"2023-10-25T18:56:00.000000Z","qname":"encrypted-tbn0.gstatic.com","query_type":"A","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:00"}
{"date":"2023-10-25T18:56:01.000000Z","qname":"login.microsoftonline.com","query_type":"A","source_ip":"10.199.0.65","tag":"dns_query" (this last JSON line is not complete)

root@ar09-ubuntu:/disk/ekuiper/output# ls -lah
total 17.6K
drwxr-xr-x 2 root root 4.0K Oct 25 18:55 .
drwxr-xr-x 5 root root 4.0K Oct 24 14:35 ..
-rw-r--r-- 1 root root 6.7K Oct 25 18:54 dns-query-1698260023842.json
-rw-r--r-- 1 root root 6.9K Oct 25 18:55 dns-query-1698260065851.json
-rw-r--r-- 1 root root 4.0K Oct 25 18:55 dns-query-1698260107832.json (the file, for now incomplete, of 4.0K, but can be of 8.0K, 12.0K, and so on, depends on how many logs is receiving before the rotation)

root@ar09-ubuntu:/disk/ekuiper/output# cat dns-query-1698260107832.json (executing this 10 seconds later)
... (more lines here that I deleted for the example)
{"date":"2023-10-25T18:56:00.000000Z","qname":"encrypted-tbn0.gstatic.com","query_type":"A","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:00"}
{"date":"2023-10-25T18:56:01.000000Z","qname":"login.microsoftonline.com","query_type":"A","source_ip":"10.199.0.65","tag":"dns_query" (again, the same last JSON line is not complete)

root@ar09-ubuntu:/disk/ekuiper/output# cat dns-query-1698260107832.json (executing this 10 seconds later)
... (more lines here that I deleted for the example)
{"date":"2023-10-25T18:56:00.000000Z","qname":"encrypted-tbn0.gstatic.com","query_type":"A","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:00"}
{"date":"2023-10-25T18:56:01.000000Z","qname":"login.microsoftonline.com","query_type":"A","source_ip":"10.199.0.65","tag":"dns_query","timestamp":"Oct 25 15:56:01"} (the JSON line is now complete, and you have others after)
{"date":"2023-10-25T18:56:02.000000Z","qname":"www.googleapis.com","query_type":"HTTPS","source_ip":"10.11.21.43","tag":"dns_query","timestamp":"Oct 25 15:56:02"}
{"date":"2023-10-25T18:56:03.000000Z","qname":"ssl01.sjc2.goskope.com","query_type":"AAAA","source_ip":"10.199.0.190","tag":"dns_query","timestamp":"Oct 25 15:56:03"}
{"date":"2023-10-25T18:56:05.000000Z","qname":"nam.loki.delve.office.com","query_type":"A","source_ip":"10.199.0.190","tag":"dns_query","timestamp":"Oct 25 15:56:05"}
{"date":"2023-10-25T18:56:10.000000Z","qname":"dit.whatsapp.net","query_type":"A","source_ip":"10.199.0.39","tag":"dns_query","timestamp":"Oct 25 15:56:10"}
{"date":"2023-10-25T18:56:17.000000Z","qname":"www.msftconnecttest.com","query_type":"A","source_ip":"10.11.21.37","tag":"dns_query","timestamp":"Oct 25 15:56:17"}
{"date":"2023-10-25T18:56:18.000000Z","qname":"ssl01.sjc2.goskope.com","query_type":"AAAA","source_ip":"10.199.0.190","tag":"dns_query","timestamp":"Oct 25 15:56:18"}

root@ar09-ubuntu:/disk/ekuiper/output# ls -lah
total 19.4K
drwxr-xr-x 2 root root 4.0K Oct 25 18:55 .
drwxr-xr-x 5 root root 4.0K Oct 24 14:35 ..
-rw-r--r-- 1 root root 6.7K Oct 25 18:54 dns-query-1698260023842.json
-rw-r--r-- 1 root root 6.9K Oct 25 18:55 dns-query-1698260065851.json
-rw-r--r-- 1 root root 6.8K Oct 25 18:55 dns-query-1698260107832.json (the file, now complete)
-rw-r--r-- 1 root root    0 Oct 25 18:55 dns-query-1698260149841.json

My assumption is that eKuiper has a kind of buffer/cache of 4K to send the generated rule output content to the file, because every time the startsize of the file after receiving the first bunch of events is of 4.0K, and so on (8.0K, 12.0K, etc.).
What I want to know is if I can avoid that, that is, receive the JSON lines in real time in the file, or if that is not possible, receive only complete JSON lines in the file, not cutted.

I already tried to "play" with Cache configs and Async configs on eKuiper, but nothing helped to change this behavior, so maybe it is by design, but I want to understand why, because I prefer to use File Sink than REST Sink, thinking in performance.

Thanks in advance for your help, and sorry for the big explanation, but I feel that when you try to explain a problem/issue, is better to give all the details that you can (and more if your are not a native english speaker, like me).

@ngjaying
Copy link
Collaborator

ngjaying commented Oct 26, 2023

As file sink is not designed for real time data transfer, we are using golang bufio to write files for better io perfermance. That should be its default behavior. MQTT may be suitable for realtime transfer. We are also working on websocket sink, which is also suitable for data transfer.

Regarding json "not completed", that's because you cannot complete a json array in the middle. Think about you expect to have 10 message to write to a json array, when you get the first image, you cannot "compete" it. [{"id":1}], otherwise, how to append the second message? This is not a valid json [{"id":1}],[{"id":2}] should be [{"id":1"},{"id:2"} and append the last ] once all expected messages are arrived. There is another format lines which do not require a hook to "complete" at last.

Finally, as we are an open source project, we encourage you to read the source code directly if neede.

@canob
Copy link
Author

canob commented Oct 26, 2023

Hi @ngjaying . Thanks for your answer.
Perfect, so File Sink Output is not an option for me. I can use MQTT but only for Input in FluentBit (MQTT Sink in eKuiper side), because Output to MQTT is not supported: fluent/fluent-bit#674 (you can see in that feature request that I already talked about eKuiper as a really good companion for FluentBit).
About "JSON not completed", I'm already using JSON lines format for my File Output, so the idea is that every line (so, every log) is a valid JSON, and that was the "why" of my question, only that.
I tried to read all the docs I can before asked, but sometimes I don't find the answer and that is the reason I'm bothering you with maybe stupid questions, and I'm sorry about that.
Regarding source code, I have notions of programming, but I'm not a developer, so understand the source code, at least for me, sometimes is complicated.
Again, thanks for all your help and all your patience. I really think that eKuiper is an amazing project, and I think that at the end with my questions and your answers we are contributing to clarify things for everyone, that's all.

@ngjaying
Copy link
Collaborator

@canob You're welcome!
About JSON incomplete, sorry that I did not read carefully. We should expect to have complete JSON outputs each time. Let me look into that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants