Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用 python 接口测试 psi 的时候,如何设置打印文件日志。 #151

Open
notinghere opened this issue Jul 11, 2024 · 11 comments

Comments

@notinghere
Copy link

Feature Request Type

Build/Install

Have you searched existing issues?

Yes

Is your feature request related to a problem?

  • 测试用例 legacy_psi_test.py 中,如何添加文件日志。

  • 添加以下代码:
    `
    def run_streaming_psi(self, wsize, self_rank, link_id, party_ids, addrs, inputs, outputs, selected_fields, protocol):

      log_options = logging.LogOptions()
      log_options.log_level = logging.LogLevel.DEBUG
      log_options.system_log_path = "./alice.log"
      logging.setup_logging(log_options)
    

`
libpsi 中的日志并不会打印到日志文件中,会打印到console 中,如何设置可以让日志都打印到文件中。

Describe features you want to add to SPU

  • 使用python接口,可以设置统一的文件日志。

Describe features you want to add to SPU

  • 使用python接口,可以设置统一的文件日志。
@aokaokd
Copy link

aokaokd commented Jul 11, 2024

可以截图看一下具体是哪些日志吗

@notinghere
Copy link
Author

  • console 日志
----------test_ecdh_2pc-------------
rank = 0
rank = 1
2024-07-11 15:12:54.167 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=1, max_retry=3, interval_ms=1000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '111', http status code '0', response header '', response body '', error msg '[E111]Fail to connect Socket{id=0 addr=127.0.0.1:20223} (0x0x17cb600): Connection refused'
2024-07-11 15:12:55.169 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=2, max_retry=3, interval_ms=3000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
2024-07-11 15:12:58.170 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=3, max_retry=3, interval_ms=5000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
[2024-07-11 15:13:03.177] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_PSI_2PC","broadcast_result":true,"input_params":{"path":"./data/alice.csv","select_fields":["id","idx"]},"output_params":{"path":"./alice-kkrt.csv","need_sort":true},"curve_type":"CURVE_25519"}
[2024-07-11 15:13:03.177] [info] [bucket_psi.cc:400] bucket size set to 1048576
[2024-07-11 15:13:03.178] [info] [bucket_psi.cc:252] Begin sanity check for input file: ./data/alice.csv, precheck_switch:false
[2024-07-11 15:13:03.198] [info] [bucket_psi.cc:265] End sanity check for input file: ./data/alice.csv, size=1546
[2024-07-11 15:13:03.198] [info] [bucket_psi.cc:425] Run psi protocol=1, self_items_count=1546
[2024-07-11 15:13:03.199] [info] [cryptor_selector.cc:38] Using IPPCP
[2024-07-11 15:13:03.246] [info] [thread_pool.cc:30] Create a fixed thread pool with size 1
[2024-07-11 15:13:03.290] [info] [ecdh_psi.cc:106] MaskSelf:9999 --finished, batch_count=1, self_item_count=1546
[2024-07-11 15:13:03.304] [info] [ecdh_psi.cc:365] ID 9999: MaskSelf finished.
[2024-07-11 15:13:03.360] [info] [ecdh_psi.cc:169] MaskPeer:9999 --finished, batch_count=1, peer_item_count=1560
[2024-07-11 15:13:03.360] [info] [ecdh_psi.cc:369] ID 9999: MaskPeer finished.
[2024-07-11 15:13:03.374] [info] [ecdh_psi.cc:212] RecvDualMaskedSelf:9999 recv last batch finished, batch_count=1
[2024-07-11 15:13:03.374] [info] [ecdh_psi.cc:373] ID 9999: RecvDualMaskedSelf finished.
[2024-07-11 15:13:03.420] [info] [bucket_psi.cc:382] Begin post filtering, indices.size=165, should_sort=true
[2024-07-11 15:13:03.435] [info] [key.cc:91] Executing sort scripts: tail -n +2 ./tmp-sort-in-9bc353dc-d742-45ea-a530-7122ea3c61a8 | LC_ALL=C sort  --parallel=2 --buffer-size=1G --stable --field-separator=, --key=2,2 --key=1,1  >>./tmp-sort-out-9bc353dc-d742-45ea-a530-7122ea3c61a8
[2024-07-11 15:13:03.467] [info] [key.cc:93] Finished sort scripts: tail -n +2 ./tmp-sort-in-9bc353dc-d742-45ea-a530-7122ea3c61a8 | LC_ALL=C sort  --parallel=2 --buffer-size=1G --stable --field-separator=, --key=2,2 --key=1,1  >>./tmp-sort-out-9bc353dc-d742-45ea-a530-7122ea3c61a8, ret=0
[2024-07-11 15:13:03.467] [info] [bucket_psi.cc:390] End post filtering, in=./data/alice.csv, out=./alice-kkrt.csv
id:abc, psi_type: 1, original_count: 1546, intersection_count: 165, source_count: 1547, output_count: 166

  • 文件日志
2024-07-11 15:12:54.167 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=1, max_retry=3, interval_ms=1000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '111', http status code '0', response header '', response body '', error msg '[E111]Fail to connect Socket{id=0 addr=127.0.0.1:20223} (0x0x17cb600): Connection refused'
2024-07-11 15:12:55.169 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=2, max_retry=3, interval_ms=3000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'
2024-07-11 15:12:58.170 [debug] [channel.cc:SendRequestWithRetry:359] >>> send request failed and retry, retry_count=3, max_retry=3, interval_ms=5000, message=[external/yacl/yacl/link/transport/interconnection_link.cc:56] cntl ErrorCode '112', http status code '0', response header '', response body '', error msg '[E112]Not connected to 127.0.0.1:20223 yet, server_id=0'

@notinghere
Copy link
Author

求交双方代码如下

  • alice.py
# Copyright 2021 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import unittest

import multiprocess

from absl import app, flags

import libspu.link as link
import libspu.logging as logging
import psi as psi
from utils import get_free_port, wc_count
# from spu.utils.simulation import PropagatingThread


class Test:

    def run_streaming_psi(self, wsize, self_rank, link_id, party_ids, addrs, inputs, outputs, selected_fields, protocol):
        time_stamp = time.time()
        lctx_desc = link.Desc()
        lctx_desc.id = link_id
        lctx_desc.recv_timeout_ms = 30*1000

        log_options = logging.LogOptions()
        log_options.log_level = logging.LogLevel.DEBUG
        # log_options.enable_console_logger = False
        log_options.system_log_path = "./alice.log"
        # log_options.trace_log_path = "./alice_trace.log"
        
        logging.setup_logging(log_options)


        for rank in range(wsize):
            print(f"rank = {rank}")
            lctx_desc.add_party(party_ids[rank], addrs[rank])

        def wrap(rank, selected_fields, input_path, output_path, type):
            lctx = link.create_brpc(lctx_desc, rank)

            config = psi.BucketPsiConfig(
                psi_type=type,
                broadcast_result=True,
                input_params=psi.InputParams(
                    path=input_path, select_fields=selected_fields
                ),
                output_params=psi.OutputParams(path=output_path, need_sort=True),
                curve_type=psi.CurveType.CURVE_25519,
            )

            if type == psi.PsiType.DP_PSI_2PC:
                config.dppsi_params.bob_sub_sampling = 0.9
                config.dppsi_params.epsilon = 3

            report = psi.bucket_psi(lctx, config)

            source_count = wc_count(input_path)
            output_count = wc_count(output_path)
            print(
                f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
            )

            lctx.stop_link()

        # launch with multiprocess
        job = multiprocess.Process(
                target=wrap,
                args=(
                    self_rank,
                    selected_fields,
                    inputs[self_rank],
                    outputs[self_rank],
                    protocol,
                ),
            )
        job.start()
        job.join()

    def test_kkrt_2pc(self):
        print("----------test_kkrt_2pc-------------")

        wsize = 2
        self_rank = 0
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
        )

    def test_ecdh_2pc(self):
        print("----------test_ecdh_2pc-------------")

        wsize = 2
        self_rank = 0
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
        )

    def test_ecdh_2pc(self):
        print("----------test_ecdh_2pc-------------")

        wsize = 2
        self_rank = 0
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
        )

    def test_ecdh_3pc(self):
        print("----------test_ecdh_3pc-------------")

        wsize = 3
        self_rank = 0
        link_id = "abc"
        inputs = [
            "./data/alice.csv",
            "./data/bob.csv",
            "./data/carol.csv",
        ]
        outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000","9998"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
        # addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
        )

def run_psi(_):
    t= Test()
    # t.test_dppsi_2pc()
    t.test_ecdh_2pc()
    # t.test_ecdh_3pc()
    # t.test_kkrt_2pc()
    

if __name__ == '__main__':
    app.run(run_psi)

  • bob.py

# Copyright 2021 Ant Group Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import unittest

import multiprocess

from absl import app, flags

import libspu.link as link
import psi as psi
from utils import get_free_port, wc_count
# from spu.utils.simulation import PropagatingThread


class Test:

    def run_streaming_psi(self, wsize, self_rank, link_id,party_ids, addrs, inputs, outputs, selected_fields, protocol):
        time_stamp = time.time()
        lctx_desc = link.Desc()
        lctx_desc.id = link_id
        lctx_desc.recv_timeout_ms = 30*1000

        for rank in range(wsize):
            print(f"rank = {rank}")
            lctx_desc.add_party(party_ids[rank], addrs[rank])

        def wrap(rank, selected_fields, input_path, output_path, type):
            lctx = link.create_brpc(lctx_desc, rank)

            config = psi.BucketPsiConfig(
                psi_type=type,
                broadcast_result=True,
                input_params=psi.InputParams(
                    path=input_path, select_fields=selected_fields
                ),
                output_params=psi.OutputParams(path=output_path, need_sort=True),
                curve_type=psi.CurveType.CURVE_25519,
            )

            if type == psi.PsiType.DP_PSI_2PC:
                config.dppsi_params.bob_sub_sampling = 0.9
                config.dppsi_params.epsilon = 3

            report = psi.bucket_psi(lctx, config)

            source_count = wc_count(input_path)
            output_count = wc_count(output_path)
            print(
                f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
            )

            lctx.stop_link()

        # launch with multiprocess
        job = multiprocess.Process(
                target=wrap,
                args=(
                    self_rank,
                    selected_fields,
                    inputs[self_rank],
                    outputs[self_rank],
                    protocol,
                ),
            )
        job.start()
        job.join()

    def test_kkrt_2pc(self):
        print("----------test_kkrt_2pc-------------")

        wsize = 2
        self_rank = 1
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
        )

    def test_ecdh_2pc(self):
        print("----------test_kkrt_2pc-------------")

        wsize = 2
        self_rank = 1
        link_id = "abc"
        inputs = ["./data/alice.csv", "./data/bob.csv"]
        outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

        self.run_streaming_psi(
            wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
        )
    def test_ecdh_3pc(self):
        print("----------test_ecdh_3pc-------------")

        wsize = 3
        self_rank = 1
        link_id = "abc"
        inputs = [
            "./data/alice.csv",
            "./data/bob.csv",
            "./data/carol.csv",
        ]
        outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
        selected_fields = ["id", "idx"]

        party_ids = ["9999","10000","9998"]
        addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
        # addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]

        self.run_streaming_psi(
            wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
        )


def run_psi(_):
    t= Test()
    # t.test_dppsi_2pc()
    t.test_ecdh_2pc()
    # t.test_ecdh_3pc()
    # t.test_kkrt_2pc()
    

if __name__ == '__main__':
    app.run(run_psi)

@aokaokd
Copy link

aokaokd commented Jul 11, 2024

好的,我看到所有debug 的日志都打印了。你把 log_options.log_level = logging.LogLevel.DEBUG 这里的DEBUG改成 INFO 试一下

@notinghere
Copy link
Author

  • 试了,打印不出来. console 中打印的大部分 info 信息来自 libpsi.cc相关代码中。 这部分代码在psi库中。

  • 文件中打印的内容来自yacl 代码中。

  • spu 中定义的日志级别

enum class LogLevel {
  Debug = 0,
  Info = 1,
  Warn = 2,
  Error = 3,
};

@aokaokd
Copy link

aokaokd commented Jul 11, 2024

刚才又确认了下,不好意思,c++生成的日志不受python影响。如果你的需求比较强烈的话,可以用nohup ... & 运行

@notinghere
Copy link
Author

  • c++ 日志如何配置,可以落到文件?

@aokaokd
Copy link

aokaokd commented Jul 11, 2024

抱歉,目前不支持的哈

@notinghere
Copy link
Author

  • 日志使用的spdlog , console 可以打印日志,没法打印到文件?

@anakinxc
Copy link
Contributor

Hi @notinghere

You should be able to redirect stdout to file

@anakinxc anakinxc transferred this issue from secretflow/spu Jul 19, 2024
Copy link

Stale issue message. Please comment to remove stale tag. Otherwise this issue will be closed soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants