회사에서 소켓통신을 한다고하여 공부했다.
여기저기 소스를 살펴본 후에 개념정리가 된 것 같아 소스 참고하면서 개발했다.
데이터 흐름 : 클라이언트1 < - > 소켓서버 < - > 클라이언트2
SERVER
public class Application { static List<ConnectionToClient> clients1 = new ArrayList<>(); static HashMap<String, Object> clients2 = new HashMap<String, Object>(); static ServerSocket server = null; static Logger logger = Logger.getLogger(Application.class); public Application(){ URL confPropURL = this.getClass().getClassLoader().getResource("config.properties"); URL log4jPropURL = this.getClass().getClassLoader().getResource("log4j.properties"); Properties confProp = new Properties(); Properties log4Jprop = new Properties(); String sNowServerType = ""; try { confProp.load(confPropURL.openStream()); sNowServerType = confProp.getProperty("now.server.type"); log4Jprop.load(log4jPropURL.openStream()); PropertyConfigurator.configure(log4Jprop); logger.info("loading log4j configure"); } catch (IOException e) { e.printStackTrace(); } logger.debug("------- "+ sNowServerType +" 소켓 서버 시작-------"); try { server = new ServerSocket(5000); Socket socket = null; logger.debug("-------서버 연결 대기중-------"); while ((socket = server.accept()) != null) { new ServerThread(socket).start(); } server.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new Application(); } static class ServerThread extends Thread { Socket socket; ConnectionToClient conToClient; ServerThread(Socket socket) { this.socket = socket; conToClient = new ConnectionToClient(socket); clients1.add(conToClient); } public void run() { try { String input = ""; while ((input = conToClient.read()) != null) { String data[] = input.split(";"); if("shutter".equals(data[0])){ logger.info("차단기 DATA : " + input); sendToReceiveData(input, data[1]); } else if("display".equals(data[0])){ logger.info("전광판 DATA : " + input); sendToReceiveData(input, data[1]); } else if("connect".equals(data[0])){ // 시설IDX 기반으로 연결된 클라이언트 HashMap clients2.put(data[1], conToClient); logger.info("접속한 L_IDX : " + data[1]); logger.info("서버가 가지고 있는 소켓리스트 : " + clients2); conToClient.write("connect;success"); } else if("ack".equals(data[0])){ logger.info("ACK L_IDX : " + data[1]); clients2.put(data[1], conToClient); conToClient.write("ack;success"); } else { // 장비 신호 전달 sendToReceiveData(data[0], "API"); } } } catch (Exception e) { e.printStackTrace(); } } } static class ConnectionToClient { Socket socket; BufferedReader br; ObjectOutputStream oos; ConnectionToClient(Socket socket) { this.socket = socket; try { br = new BufferedReader(new InputStreamReader(socket.getInputStream())); oos = new ObjectOutputStream(socket.getOutputStream()); } catch (Exception e) { e.printStackTrace(); } } public String read() { try { return br.readLine(); } catch (Exception e) { e.printStackTrace(); return null; } } public void write(Object obj) { try { oos.writeObject(obj); oos.flush(); } catch (Exception e) { e.printStackTrace(); } } } public static void sendToReceiveData(String data, String idx) { ConnectionToClient conToClient = (ConnectionToClient) clients2.get(idx); // 소켓이 있으면 if(conToClient != null){ logger.info("전송할 SOCKET : " + conToClient.socket); // 데이터를 보낼 준비 try { conToClient.write(data); } catch (Exception e) { e.printStackTrace(); } } } } | cs |
CLIENT1
최초 CONNECT로 소켓 연결 후 ACK로 소켓 연결상태를 체크한다.
클라이언트1에서 데이터를 전송하기 위해 receiveData를 이용하여 전송한다.
ACK 메시지를 보냈을 때 반응이 없으면 소켓을 닫는다.
public class SocketClient { Logger log = Logger.getLogger(this.getClass()); @Value("${prop.center.socket.ip}") private String sCenterSocketIP; @Value("${prop.center.socket.port}") private int nCenterSocketPORT; @Value("${prop.local.idx}") private String sLocalIdx; static Socket socket = null; static BufferedReader br = null; static PrintWriter pw = null; static ObjectInputStream ois = null; static String sLIdx; static String sIP; static int nPORT; /** * 스프링부트 초기화 시점에서 socket 정보를 저장 * * @date 2018. 1. 16. * @author chanjung * @returnType void * */ @PostConstruct public void initSocketClient(){ sLIdx = sLocalIdx; sIP = sCenterSocketIP; nPORT = nCenterSocketPORT; } public void clientRun(){ String requestData = "connect;"+sLIdx; try{ socket = new Socket(sIP, nPORT); br = new BufferedReader(new InputStreamReader(socket.getInputStream())); pw = new PrintWriter(socket.getOutputStream()); ois = new ObjectInputStream(socket.getInputStream()); pw.println(requestData); pw.flush(); String responseData = (String) ois.readObject(); System.out.println(responseData); }catch(Exception e){ log.error("------중앙 소켓서버가 죽었습니다. 스케줄러에 따라 소켓 재요청합니다.------"); e.printStackTrace(); }finally { } } public static String receiveData(String requestData, String idx) throws ClassNotFoundException{ String receiveData = ""; try { pw.println(requestData); pw.flush(); receiveData = (String) ois.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); } return receiveData; } // @Scheduled(cron = "0/30 * * * * *") public void socketConnect(){ try { if(socket != null){ if(socket.isConnected() == true && socket.getKeepAlive() == false) { String ack_req_msg = "ack;"+sLIdx; pw.println(ack_req_msg); pw.flush(); String ack_res_msg; try { // 2초이내 ACK 메시지가 않오면 소켓을 닫는다. socket.setSoTimeout(2000); ack_res_msg = (String) ois.readObject(); System.out.println(ack_res_msg); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); try { System.out.println("중앙서버와 소켓이 끊겼습니다. 재요청을 위해 소켓을 닫습니다."); socket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } else { SocketClient client = new SocketClient(); client.clientRun(); } } else { SocketClient client = new SocketClient(); client.clientRun(); } } catch (SocketException e1) { } } } | cs |
CLIENT2
최초 CONNECT로 소켓 연결 후 ACK로 소켓 연결상태를 체크한다.
소켓서버로부터 받은 데이터를 후처리작업을 한다.
ACK 메시지를 보냈을 때 반응이 없으면 소켓을 닫는다.
package kr.avis.local.utils; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.ObjectInputStream; import java.io.PrintWriter; import java.net.Socket; import java.net.SocketException; import java.net.URL; import javax.annotation.PostConstruct; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class SocketClient { Logger log = Logger.getLogger(this.getClass()); @Value("${prop.center.socket.ip}") private String sCenterSocketIP; @Value("${prop.center.socket.port}") private int nCenterSocketPORT; @Value("${prop.local.server.ip}") private String sLocalServerIP; @Value("${prop.local.server.port}") private int sLocalServerPORT; @Value("${prop.local.idx}") private String sLocalIdx; static Socket socket = null; static BufferedReader br = null; static PrintWriter pw = null; static ObjectInputStream ois = null; static String sLIdx; static String sIP; static int nPORT; static String sApiIP; static int nApiPORT; // 응답 ack 상태값 static boolean resonese_ack_status = false; /** * 스프링부트 초기화 시점에서 socket 정보를 저장 * * @date 2018. 1. 16. * @author chanjung * @returnType void * */ @PostConstruct public void initSocketClient(){ sLIdx = sLocalIdx; sIP = sCenterSocketIP; nPORT = nCenterSocketPORT; sApiIP = sLocalServerIP; nApiPORT = sLocalServerPORT; } public void clientRun(){ String requestData = "connect;"+sLIdx; try{ socket = new Socket(sIP, nPORT); br = new BufferedReader(new InputStreamReader(socket.getInputStream())); pw = new PrintWriter(socket.getOutputStream()); ois = new ObjectInputStream(socket.getInputStream()); pw.println(requestData); pw.flush(); ReceiveData receiveDataThread = new ReceiveData(); receiveDataThread.start(); }catch(Exception e){ e.printStackTrace(); }finally { } } class ReceiveData extends Thread { String output = new String(); public ReceiveData() { output = new String(); } public String getResult(){ return output.toString(); } public void run() { String responseData = ""; try { while((responseData = (String) ois.readObject()) != null){ String result = ""; System.out.println("SocketClient 받은 메시지:" + responseData); String data[] = responseData.split(";"); if("shutter".equals(data[0])){ System.out.println("----shutter receive-----"); result = shutterSendPost(responseData); pw.println(result); pw.flush(); } else if("display".equals(data[0])){ System.out.println("----display receive-----"); result = displaySendPost(responseData); pw.println(result); pw.flush(); } else if("connect".equals(data[0])){ System.out.println("SocketClient 상태 : CONNECT SUCCESS"); } else if("ack".equals(data[0])){ resonese_ack_status = true; System.out.println("SocketClient 상태 : ACK SUCCESS"); } } } catch (Exception e) { e.printStackTrace(); } } } public String shutterSendPost(String data){ String sJsonStr = ""; String dataArray[] = data.split(";"); try { URL callURL = new URL("http://"+ sApiIP +":"+ nApiPORT +"/local/agent/shutterHandle"); HttpRequestor requestor = new HttpRequestor(callURL); requestor.addParameter("action_type", dataArray[2]); // 응답 내용(BODY) 구하기 InputStream is; is = requestor.sendPost(); BufferedReader br = new BufferedReader(new InputStreamReader(is)); ByteArrayOutputStream out = new ByteArrayOutputStream(); byte[] buf = new byte[1024 * 8]; int length = 0; while ((length = is.read(buf)) != -1) { out.write(buf, 0, length); } sJsonStr = new String(out.toByteArray(), "UTF-8"); br.close(); out.close(); System.out.println(sJsonStr); } catch (IOException e) { e.printStackTrace(); } return sJsonStr; } public String displaySendPost(String data){ String sJsonStr = ""; String dataArray[] = data.split(";"); try { URL callURL = new URL("http://"+ sApiIP +":"+ nApiPORT +"/local/agent/displayHandle"); HttpRequestor requestor = new HttpRequestor(callURL); requestor.addParameter("action_type", dataArray[2]); requestor.addParameter("action_msg1", dataArray[3]); requestor.addParameter("action_msg2", dataArray[4]); requestor.addParameter("action_msg1_color", dataArray[5]); requestor.addParameter("action_msg2_color", dataArray[6]); // 응답 내용(BODY) 구하기 InputStream is; is = requestor.sendPost(); BufferedReader br = new BufferedReader(new InputStreamReader(is)); ByteArrayOutputStream out = new ByteArrayOutputStream(); byte[] buf = new byte[1024 * 8]; int length = 0; while ((length = is.read(buf)) != -1) { out.write(buf, 0, length); } sJsonStr = new String(out.toByteArray(), "UTF-8"); br.close(); out.close(); System.out.println(sJsonStr); System.out.println("-----LOCAL - 미들웨어 통신----"); } catch (IOException e) { e.printStackTrace(); } return sJsonStr; } // @Scheduled(cron = "0/30 * * * * *") public void socketConnect(){ try { if(socket != null){ if( socket.isConnected() == true && socket.getKeepAlive() == false) { String ack_req_msg = "ack;"+sLIdx; pw.println(ack_req_msg); pw.flush(); // read할때까지 2초간 대기. // 소켓서버에서 응답이 없으면 소켓을 죽은걸로 간주하고 클라이언트 소켓도 닫는다. try { Thread.sleep(2000); if(resonese_ack_status == false){ socket.close(); } else { resonese_ack_status = false; } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } else { log.error("------소켓 재요청합니다.------"); SocketClient client = new SocketClient(); client.clientRun(); } } catch (SocketException e1) { e1.printStackTrace(); } } } | cs |