하아찡

C# 바이낸스 (Spot, 현물) WebSocket 본문

C#/바이낸스

C# 바이낸스 (Spot, 현물) WebSocket

하아찡 2025. 12. 21. 18:47

안녕하세요.

작업하면서 훗날 볼 자료 정리 내용입니다.

 

웹소켓 주소

wss://stream.binance.com:9443

또는

wss://stream.binance.com:443

 

필수 확인 사항

웹소켓 연결은 초당 5개로 수신이 제한됩니다.

IP주소당 5분마다 시도 할 수있는 연결제한 횟수는 300회.

하나의 웹소켓 연결로 1024개의 스트림을 수신할 수 있습니다.

 

스트림 구독과 구독 취소 방법으로 진행이 됩니다.

내가 원하는 데이터를 구독한 경우에 해당 데이터를 서버에서 받아올 수 있게되고, 더이상 받기 싫어지면 해당 데이터를 구독 취소를 하는 방법으로 진행하면 됩니다.

 

서버에게 데이터를 전달하는 방법은 크게 파라미터를 포함한 데이터와 포함하지 않은 데이터 두가지로 구분이 됩니다.

{
  "method": "SUBSCRIBE",
  "params": [
    "btcusdt@trade",
    "ethusdt@ticker"
  ],
  "id": 1
}

위와 같은 구문으로 서버에게 데이터를 전달하면 "btcusdt 트레이드 정보와 ethusdt Ticker 정보를 구독하게 됩니다."

단, 이전에 구독한 내용은 사라지지 않습니다.

 

method : 현재 내가 서버에게 보내는 데이터 타입의 종류 "SUBSCRIBE"(구독 신청), "UNSUBSCRIBE(구독 취소)", "LIST_SUBSCRIPTIONS(현재 내가 구독된 데이터를 보내줘 요청)" 

params : 보내는 데이터 타입에 맞는 파라미터를 전달.

id : 식별 고유번호.(사용자 정의)

 

요청을 보냈을때 응답 값

{
  "result": null,
  "id": 2
}

예시 이미지

위와같이 데이터가 정상적으로 서버에서 콜백이 오게 됩니다.

 

 

여러가지 스트림을 구독 했을 경우에 여러 데이터가 한번에 들어오게 됩니다.

 

 

해당 데이터를 가공해서 처리하면 되겠습니다.

 

음.. Websocket 베이스 코드를 공유드립니다.

using System.Buffers;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

public abstract class BinanceWebSocketBase : IAsyncDisposable
{
    private readonly Uri _wsUri;
    private ClientWebSocket? _ws;
    private readonly SemaphoreSlim _sendLock = new(1, 1);

    private readonly HashSet<string> _streams = new(StringComparer.OrdinalIgnoreCase);
    private int _nextRequestId = 1;

    private CancellationTokenSource? _cts;
    private Task? _runner;

    public event Action<string>? OnMessage;     // JSON 문자열 형태
    public event Action<Exception>? OnError;
    public event Action<bool>? OnConnectionStateChanged; // True 상태일경우 Connection 상태

    protected BinanceWebSocketBase(string wsUrl)
    {
        _wsUri = new Uri(wsUrl);
    }

    public bool IsRunning => _runner != null && !_runner.IsCompleted;

    

    public Task StartAsync(CancellationToken externalCt = default)
    {
        if (IsRunning) return Task.CompletedTask;

        _cts = CancellationTokenSource.CreateLinkedTokenSource(externalCt);
        _runner = Task.Run(() => RunLoopAsync(_cts.Token));
        return Task.CompletedTask;
    }

    public async Task StopAsync()
    {
        if (_cts == null) return;
        _cts.Cancel();

        if (_runner != null)
        {
            try { await _runner.ConfigureAwait(false); } catch { /* ignore */ }
        }

        await CloseSocketSilentlyAsync().ConfigureAwait(false);
        _runner = null;

        _cts.Dispose();
        _cts = null;
    }



    public async ValueTask DisposeAsync()
    {
        await StopAsync().ConfigureAwait(false);
        _sendLock.Dispose();
    }


    /// <summary>
    /// 현재 구독된 스트림 목록을 반환
    /// </summary>
    /// <returns></returns>
    public List<string> GetSubscribeList()
    {
        List<string> subscribes = new();
        foreach(var stream in _streams)
        {
            subscribes.Add(stream);
        }

        return subscribes;
    }

    /// <summary>
    /// Trade 스트림 구독 요청
    /// </summary>
    /// <param name="stream">심볼 데이터 전달.</param>
    /// <returns></returns>
    public async Task TradeSubscribeAsync(string stream)
    {
        string formattedStream = $"{stream}@trade";
        await SubscribeAsync(formattedStream);
    }

    /// <summary>
    /// Ticker 스트림 구독 요청
    /// </summary>
    /// <param name="stream">심볼 데이터 전달.</param>
    /// <returns></returns>
    public async Task TickerSubscribeAsync(string stream)
    {
        string formattedStream = $"{stream}@ticker";
        await SubscribeAsync(formattedStream);
    }

    /// <summary>
    /// Ticker 스트림 구독 요청
    /// </summary>
    /// <param name="stream">심볼 데이터 전달.</param>
    /// <returns></returns>
    public async Task TickerSubscribeAsync(string stream, string time)
    {
        string formattedStream = $"{stream}@kline_{time}";
        await SubscribeAsync(formattedStream);
    }


    /// <summary>
    /// 스트림 구독 요청
    /// </summary>
    /// <param name="stream">예시 : btcusdt@trade</param>
    /// <returns></returns>
    public async Task SubscribeAsync(string stream)
    {
        
        if (string.IsNullOrWhiteSpace(stream)) return;

        // 소문자만 처리가능 (바이낸스 규칙)
        stream = stream.ToLower();

        // 연결 전이라도 일단 기억해둠(재연결 시 자동 재구독)
        lock (_streams) _streams.Add(stream);

        if (_ws?.State == WebSocketState.Open)
            await SendSubscribeAsync(new[] { stream }).ConfigureAwait(false);
    }

    public async Task UnsubscribeAsync(string stream)
    {
        if (string.IsNullOrWhiteSpace(stream)) return;

        // 소문자만 처리가능 (바이낸스 규칙)
        stream = stream.ToLower();

        bool existed;
        lock (_streams) existed = _streams.Remove(stream);

        if (!existed) return;

        if (_ws?.State == WebSocketState.Open)
            await SendUnsubscribeAsync(new[] { stream }).ConfigureAwait(false);
    }

    public async Task ClearSubscriptionsAsync()
    {
        string[] current;
        lock (_streams)
        {
            current = new string[_streams.Count];
            _streams.CopyTo(current);
            _streams.Clear();
        }

        if (current.Length > 0 && _ws?.State == WebSocketState.Open)
            await SendUnsubscribeAsync(current).ConfigureAwait(false);
    }

    private async Task RunLoopAsync(CancellationToken ct)
    {
        var attempt = 0;

        while (!ct.IsCancellationRequested)
        {
            try
            {
                attempt++;
                await ConnectAsync(ct).ConfigureAwait(false);
                attempt = 0;

                // 연결되면 기존 구독 모두 재구독
                await ResubscribeAllAsync(ct).ConfigureAwait(false);

                // 수신 루프 (끊기면 예외/close로 빠짐)
                await ReceiveLoopAsync(ct).ConfigureAwait(false);
            }
            catch (OperationCanceledException) when (ct.IsCancellationRequested)
            {
                break;
            }
            catch (Exception ex)
            {
                OnError?.Invoke(ex);
                OnConnectionStateChanged?.Invoke(false);

                await CloseSocketSilentlyAsync().ConfigureAwait(false);

                // 재연결 backoff (최대 30초)
                var delayMs = Math.Min(30_000, 500 * (int)Math.Pow(2, Math.Min(attempt, 6))); // 0.5s,1s,2s,4s,8s,16s,30s...
                try { await Task.Delay(delayMs, ct).ConfigureAwait(false); } catch { }
            }
        }
    }



    private async Task ConnectAsync(CancellationToken ct)
    {
        await CloseSocketSilentlyAsync().ConfigureAwait(false);

        _ws = new ClientWebSocket();
        _ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(15);

        await _ws.ConnectAsync(_wsUri, ct).ConfigureAwait(false);
        OnConnectionStateChanged?.Invoke(true);
    }



    private async Task ReceiveLoopAsync(CancellationToken ct)
    {
        if (_ws == null) return;

        var buffer = ArrayPool<byte>.Shared.Rent(64 * 1024);
        try
        {
            while (!ct.IsCancellationRequested && _ws.State == WebSocketState.Open)
            {
                var sb = new StringBuilder();
                WebSocketReceiveResult result;

                do
                {
                    result = await _ws.ReceiveAsync(buffer, ct).ConfigureAwait(false);

                    if (result.MessageType == WebSocketMessageType.Close)
                        throw new WebSocketException("WebSocket closed by server.");

                    sb.Append(Encoding.UTF8.GetString(buffer, 0, result.Count));
                }
                while (!result.EndOfMessage);

                var json = sb.ToString();
                OnMessage?.Invoke(json);
            }
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(buffer);
        }
    }

    private async Task ResubscribeAllAsync(CancellationToken ct)
    {
        if (_ws?.State != WebSocketState.Open) return;

        string[] streams;
        lock (_streams)
        {
            streams = new string[_streams.Count];
            _streams.CopyTo(streams);
        }

        if (streams.Length == 0) return;

        // 바이낸스는 한 번에 너무 많이 보내면 위험 → 배치 전송
        const int batchSize = 50;
        for (int i = 0; i < streams.Length; i += batchSize)
        {
            var slice = new ArraySegment<string>(streams, i, Math.Min(batchSize, streams.Length - i));
            await SendSubscribeAsync(slice).ConfigureAwait(false);

            // 일정 시간 딜레이 서버 부담 완화
            await Task.Delay(50, ct).ConfigureAwait(false);
        }
    }

    private async Task SendSubscribeAsync(IReadOnlyCollection<string> streams)
        => await SendCommandAsync("SUBSCRIBE", streams).ConfigureAwait(false);

    private async Task SendUnsubscribeAsync(IReadOnlyCollection<string> streams)
        => await SendCommandAsync("UNSUBSCRIBE", streams).ConfigureAwait(false);

    private async Task SendCommandAsync(string method, IReadOnlyCollection<string> streams)
    {
        if (_ws?.State != WebSocketState.Open) return;

        var payload = new
        {
            method,
            @params = streams,
            id = Interlocked.Increment(ref _nextRequestId)
        };

        var json = JsonSerializer.Serialize(payload);
        var bytes = Encoding.UTF8.GetBytes(json);

        await _sendLock.WaitAsync().ConfigureAwait(false);
        try
        {
            await _ws.SendAsync(bytes, WebSocketMessageType.Text, endOfMessage: true, CancellationToken.None)
                     .ConfigureAwait(false);
        }
        finally
        {
            _sendLock.Release();
        }
    }

    private async Task CloseSocketSilentlyAsync()
    {
        if (_ws == null) return;

        try
        {
            if (_ws.State == WebSocketState.Open || _ws.State == WebSocketState.CloseReceived)
                await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "closing", CancellationToken.None)
                         .ConfigureAwait(false);
        }
        catch { /* ignore */ }
        finally
        {
            _ws.Dispose();
            _ws = null;
        }
    }
}

 

 

바이낸스 WebSocket 원글 주소

https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams

 

WebSocket Streams | Binance Open Platform

General WSS information

developers.binance.com

 

반응형