Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mirrored ObjectStore #155

Open
Naymi opened this issue Nov 22, 2024 · 0 comments
Open

Support mirrored ObjectStore #155

Naymi opened this issue Nov 22, 2024 · 0 comments

Comments

@Naymi
Copy link

Naymi commented Nov 22, 2024

Observed behavior

no stream subject

Expected behavior

getting value

Server and client version

Nats server
Version: 2.10.22
Git: [240e9a4]

nats cli - 0.1.5

node package manager - yarn 4.3.1
package nats - 2.28.2

Host environment

macos 15.1 (24B83)
docker 4.21.1
image nats:alpine - sha256:75531e0cd58e417f674a8e110cb1b6e032052abc49a34eedf099ba501b1d32b4

Steps to reproduce

nats-io/nats-server#5106

import 'source-map-support'
import { connect } from "nats";
function stringToReadableStream(str: string) {
  const encoder = new TextEncoder();
  const encodedStr = encoder.encode(str);

  return new ReadableStream({
    start(controller) {
      controller.enqueue(encodedStr);
      controller.close();
    }
  });
}

async function readStreamToString(stream: any) {
  if (!stream) return stream
  const reader = stream.getReader();
  const decoder = new TextDecoder();
  let result = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) {
      break;
    }
    result += decoder.decode(value, { stream: true });
  }

  // Декодирование оставшихся данных
  result += decoder.decode();

  return result;
}

const main = async () => {
  const nc = await connect()
  const js = nc.jetstream()
  const jsm = await js.jetstreamManager()
  const sourceS3 = await js.views.os('source')
//  const c = await js.consumers.get('ss')
  console.log('source initialized');
  let message = await jsm.streams.list();
  for await (const messageElement of message) {
    console.log(messageElement.config.name)
  }
  let osDestStreamName: string = 'OBJ_dest';
  await jsm.streams.delete(osDestStreamName).catch(()=>{})
  let objSourceStreamName: string = 'OBJ_source';
  jsm.streams.list()
  const destStream = await jsm.streams.add({
    name: osDestStreamName,
    allow_rollup_hdrs: true,
    mirror: {
      name: objSourceStreamName,
      subject_transforms: [
        {
          src: '$O.source.C.>',
          dest: '$O.dest.C.>',
        },
        {
          src: '$O.source.M.>',
          dest: '$O.dest.M.>',
        },
      ]
    }
  })
  console.log('dest initialized');
  const destS3 = await js.views.os('dest')
  await sourceS3.put({
    name: 'foo',
  }, stringToReadableStream('bar'))
  setTimeout(async ()=> {
    const fooFromSourceValue = await sourceS3.get('foo')
    const fooFromDestValue = await destS3.get('foo')
    console.log('fooFromDestValue', await readStreamToString(fooFromDestValue?.data))
    console.log('fooFromSourceValue', await readStreamToString(fooFromSourceValue?.data))

    let message = await jsm.streams.list();
    for await (const messageElement of message) {
      console.log(messageElement.config.name)
    }
    {
      const c = await js.consumers.get(osDestStreamName)
      for await (const fetch of await c.fetch()) {
        console.log('itm dst', fetch.string());
        console.log('itm dst subj', fetch.subject);
      }
    }
    {
      const c = await js.consumers.get(objSourceStreamName)
      for await (const fetch of await c.fetch()) {
        console.log('itm src', fetch.string());
        console.log('itm src subj', fetch.subject);
      }
    }
    process.exit()
  }, 5e3)
}

main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant