using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace Mjpeg
{
public class MJpegStreamingServer
{
private static string _contentLengthString = "__PayloadHeaderContentLength__";
private AsyncTcpServer _server;
private ConcurrentDictionary<string, TcpClient> _clients;
public MJpegStreamingServer(int listenPort)
: this(listenPort, "--dennisgao")
{
}
public MJpegStreamingServer(int listenPort, string boundary)
{
Port = listenPort;
Boundary = boundary;
_server = new AsyncTcpServer(Port);
_server.Encoding = Encoding.ASCII;
_clients = new ConcurrentDictionary<string, TcpClient>();
}
/// <summary>
/// 监听的端口
/// </summary>
public int Port { get; private set; }
/// <summary>
/// 分隔符
/// </summary>
public string Boundary { get; private set; }
/// <summary>
/// 流头部
/// </summary>
public string StreamHeader
{
get
{
return "HTTP/1.1 200 OK" +
"\r\n" +
"Content-Type: multipart/x-mixed-replace; boundary=" + this.Boundary +
"\r\n";
}
}
/// <summary>
/// 图片头部
/// </summary>
public string PayloadHeader
{
get
{
return "\r\n" +
this.Boundary +
"\r\n" +
"Content-Type: image/jpeg" +
"\r\n" +
"Content-Length: " + _contentLengthString +
"\r\n\r\n";
}
}
public void Start()
{
_server.Start(10);
_server.ClientConnected += new EventHandler<TcpClientConnectedEventArgs>(OnClientConnected);
_server.ClientDisconnected += new EventHandler<TcpClientDisconnectedEventArgs>(OnClientDisconnected);
}
public void Stop()
{
_server.Stop();
_server.ClientConnected -= new EventHandler<TcpClientConnectedEventArgs>(OnClientConnected);
_server.ClientDisconnected -= new EventHandler<TcpClientDisconnectedEventArgs>(OnClientDisconnected);
}
private void OnClientConnected(object sender, TcpClientConnectedEventArgs e)
{
_clients.AddOrUpdate(e.TcpClient.Client.RemoteEndPoint.ToString(), e.TcpClient, (n, o) => { return e.TcpClient; });
}
private void OnClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
{
TcpClient clientToBeThrowAway;
_clients.TryRemove(e.TcpClient.Client.RemoteEndPoint.ToString(), out clientToBeThrowAway);
}
public void Write(Bitmap image)
{
if (_server.IsRunning)
{
byte[] payload = BytesOf(image);
WriteStreamHeader();
WritePayload(payload);
}
}
private void WriteStreamHeader()
{
if (_clients.Count > 0)
{
foreach (var item in _clients)
{
//Logger.Debug(string.Format(CultureInfo.InvariantCulture, "Writing stream header, {0}, {1}{2}", item.Key, Environment.NewLine, StreamHeader));
_server.Send(item.Value, StreamHeader);
TcpClient clientToBeThrowAway;
_clients.TryRemove(item.Key, out clientToBeThrowAway);
}
}
}
private void WritePayload(byte[] payload)
{
string payloadHeader = this.PayloadHeader.Replace(_contentLengthString, payload.Length.ToString());
string payloadTail = "\r\n";
//Logger.Debug(string.Format(CultureInfo.InvariantCulture, "Writing payload header, {0}{1}", Environment.NewLine, payloadHeader));
byte[] payloadHeaderBytes = _server.Encoding.GetBytes(payloadHeader);
byte[] payloadTailBytes = _server.Encoding.GetBytes(payloadTail);
byte[] packet = new byte[payloadHeaderBytes.Length + payload.Length + payloadTailBytes.Length];
Buffer.BlockCopy(payloadHeaderBytes, 0, packet, 0, payloadHeaderBytes.Length);
Buffer.BlockCopy(payload, 0, packet, payloadHeaderBytes.Length, payload.Length);
Buffer.BlockCopy(payloadTailBytes, 0, packet, payloadHeaderBytes.Length + payload.Length, payloadTailBytes.Length);
_server.SendAll(packet);
}
private byte[] BytesOf(Bitmap image)
{
MemoryStream ms = new MemoryStream();
image.Save(ms, System.Drawing.Imaging.ImageFormat.Jpeg);
byte[] payload = ms.ToArray();
return payload;
}
}
}
用到的AsyncTcpServer类:
using System.Collections.Generic;
using System.Net.Sockets;
using System.Net;
using System.Reflection.Emit;
using System.Text;
using System;
namespace Mjpeg
{
/// <summary>
/// 异步TCP服务器
/// </summary>
public class AsyncTcpServer : IDisposable
{
#region Fields
private TcpListener listener;
private List<TcpClientState> clients;
private bool disposed = false;
#endregion
#region Ctors
/// <summary>
/// 异步TCP服务器
/// </summary>
/// <param name="listenPort">监听的端口</param>
public AsyncTcpServer(int listenPort)
: this(IPAddress.Any, listenPort)
{
}
/// <summary>
/// 异步TCP服务器
/// </summary>
/// <param name="localEP">监听的终结点</param>
public AsyncTcpServer(IPEndPoint localEP)
: this(localEP.Address, localEP.Port)
{
}
/// <summary>
/// 异步TCP服务器
/// </summary>
/// <param name="localIPAddress">监听的IP地址</param>
/// <param name="listenPort">监听的端口</param>
public AsyncTcpServer(IPAddress localIPAddress, int listenPort)
{
Address = localIPAddress;
Port = listenPort;
this.Encoding = Encoding.Default;
clients = new List<TcpClientState>();
listener = new TcpListener(Address, Port);
listener.AllowNatTraversal(true);
}
#endregion
#region Properties
/// <summary>
/// 服务器是否正在运行
/// </summary>
public bool IsRunning { get; private set; }
/// <summary>
/// 监听的IP地址
/// </summary>
public IPAddress Address { get; private set; }
/// <summary>
/// 监听的端口
/// </summary>
public int Port { get; private set; }
/// <summary>
/// 通信使用的编码
/// </summary>
public Encoding Encoding { get; set; }
#endregion
#region Server
/// <summary>
/// 启动服务器
/// </summary>
/// <returns>异步TCP服务器</returns>
public AsyncTcpServer Start()
{
if (!IsRunning)
{
IsRunning = true;
listener.Start();
listener.BeginAcceptTcpClient(
new AsyncCallback(HandleTcpClientAccepted), listener);
}
return this;
}
/// <summary>
/// 启动服务器
/// </summary>
/// <param name="backlog">
/// 服务器所允许的挂起连接序列的最大长度
/// </param>
/// <returns>异步TCP服务器</returns>
public AsyncTcpServer Start(int backlog)
{
if (!IsRunning)
{
IsRunning = true;
listener.Start(backlog);
listener.BeginAcceptTcpClient(
new AsyncCallback(HandleTcpClientAccepted), listener);
}
return this;
}
/// <summary>
/// 停止服务器
/// </summary>
/// <returns>异步TCP服务器</returns>
public AsyncTcpServer Stop()
{
if (IsRunning)
{
IsRunning = false;
listener.Stop();
lock (this.clients)
{
for (int i = 0; i < this.clients.Count; i++)
{
this.clients[i].TcpClient.Client.Disconnect(false);
}
this.clients.Clear();
}
}
return this;
}
#endregion
#region Receive
private void HandleTcpClientAccepted(IAsyncResult ar)
{
if (IsRunning)
{
TcpListener tcpListener = (TcpListener)ar.AsyncState;
TcpClient tcpClient = tcpListener.EndAcceptTcpClient(ar);
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
TcpClientState internalClient
= new TcpClientState(tcpClient, buffer);
lock (this.clients)
{
this.clients.Add(internalClient);
RaiseClientConnected(tcpClient);
}
NetworkStream networkStream = internalClient.NetworkStream;
networkStream.BeginRead(
internalClient.Buffer,
0,
internalClient.Buffer.Length,
HandleDatagramReceived,
internalClient);
tcpListener.BeginAcceptTcpClient(
new AsyncCallback(HandleTcpClientAccepted), ar.AsyncState);
}
}
private void HandleDatagramReceived(IAsyncResult ar)
{
if (IsRunning)
{
TcpClientState internalClient = (TcpClientState)ar.AsyncState;
NetworkStream networkStream = internalClient.NetworkStream;
int numberOfReadBytes = 0;
try
{
numberOfReadBytes = networkStream.EndRead(ar);
}
catch
{
numberOfReadBytes = 0;
}
if (numberOfReadBytes == 0)
{
// connection has been closed
lock (this.clients)
{
this.clients.Remove(internalClient);
RaiseClientDisconnected(internalClient.TcpClient);
return;
}
}
// received byte and trigger event notification
byte[] receivedBytes = new byte[numberOfReadBytes];
Buffer.BlockCopy(
internalClient.Buffer, 0,
receivedBytes, 0, numberOfReadBytes);
RaiseDatagramReceived(internalClient.TcpClient, receivedBytes);
RaisePlaintextReceived(internalClient.TcpClient, receivedBytes);
// continue listening for tcp datagram packets
networkStream.BeginRead(
internalClient.Buffer,
0,
internalClient.Buffer.Length,
HandleDatagramReceived,
internalClient);
}
}
#endregion
#region Events
/// <summary>
/// 接收到数据报文事件
/// </summary>
public event EventHandler<TcpDatagramReceivedEventArgs<byte[]>> DatagramReceived;
/// <summary>
/// 接收到数据报文明文事件
/// </summary>
public event EventHandler<TcpDatagramReceivedEventArgs<string>> PlaintextReceived;
private void RaiseDatagramReceived(TcpClient sender, byte[] datagram)
{
if (DatagramReceived != null)
{
DatagramReceived(this, new TcpDatagramReceivedEventArgs<byte[]>(sender, datagram));
}
}
private void RaisePlaintextReceived(TcpClient sender, byte[] datagram)
{
if (PlaintextReceived != null)
{
PlaintextReceived(this, new TcpDatagramReceivedEventArgs<string>(
sender, this.Encoding.GetString(datagram, 0, datagram.Length)));
}
}
/// <summary>
/// 与客户端的连接已建立事件
/// </summary>
public event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
/// <summary>
/// 与客户端的连接已断开事件
/// </summary>
public event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;
private void RaiseClientConnected(TcpClient tcpClient)
{
if (ClientConnected != null)
{
ClientConnected(this, new TcpClientConnectedEventArgs(tcpClient));
}
}
private void RaiseClientDisconnected(TcpClient tcpClient)
{
if (ClientDisconnected != null)
{
ClientDisconnected(this, new TcpClientDisconnectedEventArgs(tcpClient));
}
}
#endregion
#region Send
/// <summary>
/// 发送报文至指定的客户端
/// </summary>
/// <param name="tcpClient">客户端</param>
/// <param name="datagram">报文</param>
public void Send(TcpClient tcpClient, byte[] datagram)
{
if (!IsRunning)
throw new InvalidProgramException("This TCP server has not been started.");
if (tcpClient == null)
throw new ArgumentNullException("tcpClient");
if (datagram == null)
throw new ArgumentNullException("datagram");
tcpClient.GetStream().BeginWrite(
datagram, 0, datagram.Length, HandleDatagramWritten, tcpClient);
}
private void HandleDatagramWritten(IAsyncResult ar)
{
((TcpClient)ar.AsyncState).GetStream().EndWrite(ar);
}
/// <summary>
/// 发送报文至指定的客户端
/// </summary>
/// <param name="tcpClient">客户端</param>
/// <param name="datagram">报文</param>
public void Send(TcpClient tcpClient, string datagram)
{
Send(tcpClient, this.Encoding.GetBytes(datagram));
}
/// <summary>
/// 发送报文至所有客户端
/// </summary>
/// <param name="datagram">报文</param>
public void SendAll(byte[] datagram)
{
if (!IsRunning)
throw new InvalidProgramException("This TCP server has not been started.");
for (int i = 0; i < this.clients.Count; i++)
{
Send(this.clients[i].TcpClient, datagram);
}
}
/// <summary>
/// 发送报文至所有客户端
/// </summary>
/// <param name="datagram">报文</param>
public void SendAll(string datagram)
{
if (!IsRunning)
throw new InvalidProgramException("This TCP server has not been started.");
SendAll(this.Encoding.GetBytes(datagram));
}
#endregion
#region IDisposable Members
/// <summary>
/// Performs application-defined tasks associated with freeing,
/// releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Releases unmanaged and - optionally - managed resources
/// </summary>
/// <param name="disposing"><c>true</c> to release
/// both managed and unmanaged resources; <c>false</c>
/// to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
try
{
Stop();
if (listener != null)
{
listener = null;
}
}
catch (SocketException ex)
{
//ExceptionHandler.Handle(ex);
}
}
disposed = true;
}
}
#endregion
}
}
调用:实现截屏发送
MJpegStreamingServer server = new MJpegStreamingServer(1022);
server.Start();
Task.Run(() =>
{
while (true)
{
try
{
var bit = CaptureActiveScreen();
server.Write(bit);
Thread.Sleep(50);
}
catch { }
};
});
截屏代码:
/// <summary>
/// 截屏
/// </summary>
/// <returns></returns>
public Bitmap CaptureActiveScreen()
{
// 创建一个和当前屏幕一样大小的Bitmap
Rectangle bounds = Screen.GetBounds(Point.Empty);
Bitmap bmp = new Bitmap(bounds.Width, bounds.Height, PixelFormat.Format32bppArgb);
// 创建一个画布
Graphics g = Graphics.FromImage(bmp);
// 截取整个屏幕
g.CopyFromScreen(Point.Empty, Point.Empty, bounds.Size);
// 释放画布资源
g.Dispose();
return bmp;
}
2、显示Mjpeg流:
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Mjpeg
{
public class MjpegUtils : IDisposable
{
public bool IsExit = false;
/// <summary>
/// 播放MJPEG视频流
/// </summary>
/// <param name="mjpegStreamUrl">MJPEG视频流地址</param>
/// <param name="accessImageHandler">读到图片后处理</param>
/// <param name="sleep">每次读取文件睡眠时间,太长会导致卡顿,太短可能频繁矫正流</param>
public MjpegUtils(string mjpegStreamUrl, Action<byte[]> accessImageHandler, int sleep = 50)
{
HttpWebRequest hwRequest = (System.Net.HttpWebRequest)WebRequest.Create(mjpegStreamUrl);
hwRequest.Method = "GET";
HttpWebResponse hwResponse = (HttpWebResponse)hwRequest.GetResponse();
string boundary = hwRequest.Headers["Content-Type:"];
//响应流中没有响应头信息,全是响应体内容
Stream stream = hwResponse.GetResponseStream();
string headerName = "Content-Length:";
StringBuilder sb = new StringBuilder();
int len = 1024;
while (true)
{
while (true)
{
char c = (char)stream.ReadByte();
//Console.Write(c);
if (c == '\n')
{
break;
}
sb.Append(c);
}
string line = sb.ToString();
sb.Remove(0, sb.Length);
int i = line.IndexOf(headerName);
if (i != -1)
{
int imageFileLength = Convert.ToInt32(line.Substring(i + headerName.Length).Trim());
stream.Read(new byte[2], 0, 2);
byte[] imageFileBytes = new byte[imageFileLength];
stream.Read(imageFileBytes, 0, imageFileBytes.Length);
//Console.WriteLine("文件头:" + imageFileBytes[0].ToString("X") + " " + imageFileBytes[1].ToString("X") + " " + imageFileBytes[2].ToString("X") + " " + imageFileBytes[3].ToString("X") + " " + imageFileBytes[4].ToString("X"));
//Console.WriteLine("文件尾:" + imageFileBytes[imageFileLength - 2].ToString("X") + " " + imageFileBytes[imageFileLength - 1].ToString("X"));
if (imageFileBytes[imageFileLength - 2].ToString("X") != "FF" && imageFileBytes[imageFileLength - 1].ToString("X") != "D9")
{
//Console.WriteLine("开始矫正...");
//修正
char l = '0';
while (true)
{
char c = (char)stream.ReadByte();
if (l == '-' && c == '-')
{
break;
}
l = c;
}
}
else
{
//读取图片成功!
accessImageHandler(imageFileBytes);
}
Thread.Sleep(sleep);
}
}
stream.Close();
hwResponse.Close();
Console.Read();
}
public int StreamFindContentLength(Stream stream)
{
StringBuilder sb = new StringBuilder();
sb.Append((char)stream.ReadByte());
sb.Append((char)stream.ReadByte());
sb.Append((char)stream.ReadByte());
sb.Append((char)stream.ReadByte());
Console.WriteLine("num:" + sb);
int num = Convert.ToInt32(sb.ToString().Trim());
//跳过\r\n
char c = '0';
do
{
c = (char)stream.ReadByte();
} while (c == '\r' || c == '\n');
return num;
}
public void Dispose()
{
IsExit = true;
}
}
}
调用:
var thread = new Thread(new ThreadStart(delegate ()
{
var mjpegUtils = new MjpegUtils("http://127.0.0.1:1022", delegate (byte[] bytes)
{
//Console.WriteLine(bytes.Length);
MemoryStream ms = new MemoryStream(bytes);
pictureBox1.Image = Image.FromStream(ms);
ms.Close();
}, 50);
}));
thread.IsBackground = true;
thread.Start();
最终效果:
源码下载
参考文章:
https://blog.csdn.net/weixin_33709364/article/details/86419401
https://www.jb51.net/article/206675.htm
https://www.cnblogs.com/gaochundong/archive/2013/04/14/csharp_async_tcp_server.html
https://www.cnblogs.com/gaochundong/archive/2013/04/14/csharp_async_tcp_client.html