Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4635] Implement the function of file source connector #4640

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,26 @@

package org.apache.eventmesh.connector.file.source.connector;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Locale;
import org.apache.eventmesh.connector.file.source.config.FileSourceConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;

import java.util.List;
Expand All @@ -36,6 +50,9 @@ public class FileSourceConnector implements Source {

private OffsetStorageReader offsetStorageReader;

private String filePath;
private String fileName;
private InputStream inputStream;
@Override
public Class<? extends Config> configClass() {
return FileSourceConfig.class;
Expand All @@ -45,7 +62,8 @@ public Class<? extends Config> configClass() {
public void init(Config config) throws Exception {
// init config for hdfs source connector
this.sourceConfig = (FileSourceConfig) config;

this.filePath = buildFilePath();
this.fileName = buildFileName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the file name and file path be directly configured in the configuration file? Additionally, could unnecessary configurations in the configuration file be removed?

}

@Override
Expand All @@ -58,12 +76,16 @@ public void init(ConnectorContext connectorContext) throws Exception {

@Override
public void start() throws Exception {

if (fileName == null || fileName.length() == 0 || filePath == null || filePath.length() == 0){
this.inputStream = System.in;
}
else{
this.inputStream = Files.newInputStream(Paths.get(filePath+fileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.READ);
}
}

@Override
public void commit(ConnectRecord record) {

}

@Override
Expand All @@ -73,12 +95,46 @@ public String name() {

@Override
public void stop() {

try {
inputStream.close();
} catch (Exception e){
log.error("Error closing resources: {}", e.getMessage());
}
}

@Override
public List<ConnectRecord> poll() {
return null;
List<ConnectRecord> connectRecords = new ArrayList<>();
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream,StandardCharsets.UTF_8));
String line;
while ((line = bufferedReader.readLine())!=null){
ConnectRecord connectRecord = new ConnectRecord(new RecordPartition(), new RecordOffset(),System.currentTimeMillis(),line);
connectRecords.add(connectRecord);
}
} catch (IOException e){
log.error("Error reading data from the file: {}",e.getMessage());
}
return connectRecords;
}

private String buildFileName(){
Calendar calendar = Calendar.getInstance(Locale.CHINA);
long currentTime = calendar.getTime().getTime();
return sourceConfig.getConnectorConfig().getTopic()+"-"+calendar.get(Calendar.HOUR_OF_DAY)+"-"+currentTime;
}
private String buildFilePath(){
Calendar calendar = Calendar.getInstance(Locale.CHINA);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH) + 1;
int day = calendar.get(Calendar.DATE);
String filePath = sourceConfig.getConnectorConfig().getTopic()+ File.separator+year+File.separator+month+File.separator+day+File.separator;
File path = new File(filePath);
if (!path.exists()){
if(!path.mkdirs()){
log.error("make file dir {} error", filePath);
}
}
return filePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ public List<EventMeshDataInfo> findEventMeshInfoByCluster(String clusterName) th
eventMeshDataInfoList.add(eventMeshDataInfo);
}
}
} catch (InterruptedException e) {
log.error("[EtcdRegistryService][findEventMeshInfoByCluster] InterruptedException", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("[EtcdRegistryService][findEventMeshInfoByCluster] error, clusterName: {}", clusterName, e);
throw new MetaException(e.getMessage());
Expand Down Expand Up @@ -294,7 +297,10 @@ public void run() {
List<KeyValue> keyValues = null;
try {
keyValues = etcdClient.getKVClient().get(etcdKey).get().getKvs();
} catch (InterruptedException | ExecutionException e) {
} catch (InterruptedException e) {
log.error("get etcdKey[{}] failed[InterruptedException]", etcdKey, e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("get etcdKey[{}] failed", etcdKey, e);
}
if (CollectionUtils.isEmpty(keyValues)) {
Expand Down
Loading